| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.ComponentModel; |
| | 7 | | using System.Diagnostics.CodeAnalysis; |
| | 8 | | using System.Linq; |
| | 9 | | using System.Runtime.CompilerServices; |
| | 10 | | using System.Threading; |
| | 11 | | using System.Threading.Tasks; |
| | 12 | | using Azure.Core; |
| | 13 | | using Azure.Core.Pipeline; |
| | 14 | | using Azure.Messaging.ServiceBus.Core; |
| | 15 | | using Azure.Messaging.ServiceBus.Diagnostics; |
| | 16 | | using Azure.Messaging.ServiceBus.Plugins; |
| | 17 | |
|
| | 18 | | namespace Azure.Messaging.ServiceBus |
| | 19 | | { |
| | 20 | | /// <summary> |
| | 21 | | /// The <see cref="ServiceBusReceiver" /> is responsible for receiving |
| | 22 | | /// <see cref="ServiceBusReceivedMessage" /> and settling messages from Queues and Subscriptions. |
| | 23 | | /// It is constructed by calling <see cref="ServiceBusClient.CreateReceiver(string, ServiceBusReceiverOptions)"/>. |
| | 24 | | /// </summary> |
| | 25 | | public class ServiceBusReceiver : IAsyncDisposable |
| | 26 | | { |
| | 27 | | /// <summary> |
| | 28 | | /// The fully qualified Service Bus namespace that the receiver is associated with. This is likely |
| | 29 | | /// to be similar to <c>{yournamespace}.servicebus.windows.net</c>. |
| | 30 | | /// </summary> |
| 60 | 31 | | public string FullyQualifiedNamespace => _connection.FullyQualifiedNamespace; |
| | 32 | |
|
| | 33 | | /// <summary> |
| | 34 | | /// The path of the Service Bus entity that the receiver is connected to, specific to the |
| | 35 | | /// Service Bus namespace that contains it. |
| | 36 | | /// </summary> |
| 118 | 37 | | public string EntityPath { get; } |
| | 38 | |
|
| | 39 | | /// <summary> |
| | 40 | | /// The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode. |
| | 41 | | /// </summary> |
| 88 | 42 | | public ReceiveMode ReceiveMode { get; } |
| | 43 | |
|
| | 44 | | /// <summary> |
| | 45 | | /// Indicates whether the receiver entity is session enabled. |
| | 46 | | /// </summary> |
| 68 | 47 | | internal bool IsSessionReceiver { get; } |
| | 48 | |
|
| | 49 | | /// <summary> |
| | 50 | | /// The number of messages that will be eagerly requested from Queues or Subscriptions and queued locally withou |
| | 51 | | /// whether a processing is currently active, intended to help maximize throughput by allowing the receiver to r |
| | 52 | | /// from a local cache rather than waiting on a service request. |
| | 53 | | /// </summary> |
| 80 | 54 | | public int PrefetchCount { get; } |
| | 55 | |
|
| | 56 | | /// <summary> |
| | 57 | | /// Gets the ID to identify this client. This can be used to correlate logs and exceptions. |
| | 58 | | /// </summary> |
| | 59 | | /// <remarks>Every new client has a unique ID.</remarks> |
| 418 | 60 | | internal string Identifier { get; } |
| | 61 | |
|
| | 62 | | /// <summary> |
| | 63 | | /// Indicates whether or not this <see cref="ServiceBusReceiver"/> has been disposed. |
| | 64 | | /// </summary> |
| | 65 | | /// |
| | 66 | | /// <value> |
| | 67 | | /// <c>true</c> if the client is disposed; otherwise, <c>false</c>. |
| | 68 | | /// </value> |
| 60 | 69 | | public bool IsDisposed { get; private set; } = false; |
| | 70 | |
|
| | 71 | | /// <summary> |
| | 72 | | /// The policy to use for determining retry behavior for when an operation fails. |
| | 73 | | /// </summary> |
| | 74 | | /// |
| | 75 | | private readonly ServiceBusRetryPolicy _retryPolicy; |
| | 76 | |
|
| | 77 | | /// <summary> |
| | 78 | | /// The active connection to the Azure Service Bus service, enabling client communications for metadata |
| | 79 | | /// about the associated Service Bus entity and access to transport-aware receivers. |
| | 80 | | /// </summary> |
| | 81 | | /// |
| | 82 | | private readonly ServiceBusConnection _connection; |
| | 83 | |
|
| | 84 | | /// <summary> |
| | 85 | | /// An abstracted Service Bus transport-specific receiver that is associated with the |
| | 86 | | /// Service Bus entity gateway; intended to perform delegated operations. |
| | 87 | | /// </summary> |
| 80 | 88 | | internal TransportReceiver InnerReceiver => _innerReceiver; |
| | 89 | | private readonly TransportReceiver _innerReceiver; |
| | 90 | |
|
| | 91 | | /// <summary> |
| | 92 | | /// Responsible for creating entity scopes. |
| | 93 | | /// </summary> |
| 52 | 94 | | internal EntityScopeFactory ScopeFactory => _scopeFactory; |
| | 95 | | private readonly EntityScopeFactory _scopeFactory; |
| | 96 | |
|
| | 97 | | /// <summary> |
| | 98 | | /// The list of plugins to apply to incoming messages. |
| | 99 | | /// </summary> |
| | 100 | | private readonly IList<ServiceBusPlugin> _plugins; |
| | 101 | |
|
| | 102 | | /// <summary> |
| | 103 | | /// The instance of <see cref="ServiceBusEventSource" /> which can be mocked for testing. |
| | 104 | | /// </summary> |
| | 105 | | /// |
| 330 | 106 | | internal ServiceBusEventSource Logger { get; set; } = ServiceBusEventSource.Log; |
| | 107 | |
|
| | 108 | | /// <summary> |
| | 109 | | /// Initializes a new instance of the <see cref="ServiceBusReceiver"/> class. |
| | 110 | | /// </summary> |
| | 111 | | /// |
| | 112 | | /// <param name="connection">The <see cref="ServiceBusConnection" /> connection to use for communication with th |
| | 113 | | /// <param name="entityPath"></param> |
| | 114 | | /// <param name="isSessionEntity"></param> |
| | 115 | | /// <param name="plugins">The plugins to apply to incoming messages.</param> |
| | 116 | | /// <param name="options">A set of options to apply when configuring the consumer.</param> |
| | 117 | | /// <param name="sessionId">An optional session Id to scope the receiver to. If not specified, |
| | 118 | | /// the next available session returned from the service will be used.</param> |
| | 119 | | /// |
| 58 | 120 | | internal ServiceBusReceiver( |
| 58 | 121 | | ServiceBusConnection connection, |
| 58 | 122 | | string entityPath, |
| 58 | 123 | | bool isSessionEntity, |
| 58 | 124 | | IList<ServiceBusPlugin> plugins, |
| 58 | 125 | | ServiceBusReceiverOptions options, |
| 58 | 126 | | string sessionId = default) |
| | 127 | | { |
| 58 | 128 | | Type type = GetType(); |
| 58 | 129 | | Logger.ClientCreateStart(type, connection?.FullyQualifiedNamespace, entityPath); |
| | 130 | | try |
| | 131 | | { |
| 58 | 132 | | Argument.AssertNotNull(connection, nameof(connection)); |
| 58 | 133 | | Argument.AssertNotNull(connection.RetryOptions, nameof(connection.RetryOptions)); |
| 58 | 134 | | Argument.AssertNotNullOrWhiteSpace(entityPath, nameof(entityPath)); |
| 58 | 135 | | connection.ThrowIfClosed(); |
| 58 | 136 | | options = options?.Clone() ?? new ServiceBusReceiverOptions(); |
| 58 | 137 | | Identifier = DiagnosticUtilities.GenerateIdentifier(entityPath); |
| 58 | 138 | | _connection = connection; |
| 58 | 139 | | _retryPolicy = connection.RetryOptions.ToRetryPolicy(); |
| 58 | 140 | | ReceiveMode = options.ReceiveMode; |
| 58 | 141 | | PrefetchCount = options.PrefetchCount; |
| 58 | 142 | | EntityPath = entityPath; |
| 58 | 143 | | IsSessionReceiver = isSessionEntity; |
| 58 | 144 | | _innerReceiver = _connection.CreateTransportReceiver( |
| 58 | 145 | | entityPath: EntityPath, |
| 58 | 146 | | retryPolicy: _retryPolicy, |
| 58 | 147 | | receiveMode: ReceiveMode, |
| 58 | 148 | | prefetchCount: (uint)PrefetchCount, |
| 58 | 149 | | identifier: Identifier, |
| 58 | 150 | | sessionId: sessionId, |
| 58 | 151 | | isSessionReceiver: IsSessionReceiver); |
| 58 | 152 | | _scopeFactory = new EntityScopeFactory(EntityPath, FullyQualifiedNamespace); |
| 58 | 153 | | _plugins = plugins; |
| 58 | 154 | | if (!isSessionEntity) |
| | 155 | | { |
| | 156 | | // don't log client completion for session receiver here as it is not complete until |
| | 157 | | // the link is opened. |
| 44 | 158 | | Logger.ClientCreateComplete(type, Identifier); |
| | 159 | | } |
| 58 | 160 | | } |
| 0 | 161 | | catch (Exception ex) |
| | 162 | | { |
| 0 | 163 | | Logger.ClientCreateException(type, connection?.FullyQualifiedNamespace, entityPath, ex); |
| 0 | 164 | | throw; |
| | 165 | | } |
| 58 | 166 | | } |
| | 167 | |
|
| | 168 | | /// <summary> |
| | 169 | | /// Initializes a new instance of the <see cref="ServiceBusReceiver"/> class for mocking. |
| | 170 | | /// </summary> |
| | 171 | | /// |
| 16 | 172 | | protected ServiceBusReceiver() { } |
| | 173 | |
|
| | 174 | | /// <summary> |
| | 175 | | /// Receives a list of <see cref="ServiceBusReceivedMessage" /> from the entity using <see cref="ReceiveMode"/> |
| | 176 | | /// <see cref="ReceiveMode"/> defaults to PeekLock mode. |
| | 177 | | /// This method doesn't guarantee to return exact `maxMessages` messages, |
| | 178 | | /// even if there are `maxMessages` messages available in the queue or topic. |
| | 179 | | /// </summary> |
| | 180 | | /// |
| | 181 | | /// <param name="maxMessages">The maximum number of messages that will be received.</param> |
| | 182 | | /// <param name="maxWaitTime">An optional <see cref="TimeSpan"/> specifying the maximum time to wait for the fir |
| | 183 | | /// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.</param> |
| | 184 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 185 | | /// |
| | 186 | | /// <returns>List of messages received. Returns an empty list if no message is found.</returns> |
| | 187 | | public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveMessagesAsync( |
| | 188 | | int maxMessages, |
| | 189 | | TimeSpan? maxWaitTime = default, |
| | 190 | | CancellationToken cancellationToken = default) |
| | 191 | | { |
| 14 | 192 | | Argument.AssertAtLeast(maxMessages, 1, nameof(maxMessages)); |
| 14 | 193 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 14 | 194 | | if (maxWaitTime.HasValue) |
| | 195 | | { |
| 4 | 196 | | Argument.AssertPositive(maxWaitTime.Value, nameof(maxWaitTime)); |
| | 197 | | } |
| 10 | 198 | | if (PrefetchCount > 0 && maxMessages > PrefetchCount) |
| | 199 | | { |
| 2 | 200 | | Logger.MaxMessagesExceedsPrefetch(Identifier, PrefetchCount, maxMessages); |
| | 201 | | } |
| | 202 | |
|
| 10 | 203 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 10 | 204 | | Logger.ReceiveMessageStart(Identifier, maxMessages); |
| 10 | 205 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 10 | 206 | | DiagnosticProperty.ReceiveActivityName, |
| 10 | 207 | | requestedMessageCount: maxMessages); |
| 10 | 208 | | scope.Start(); |
| | 209 | |
|
| 10 | 210 | | IReadOnlyList<ServiceBusReceivedMessage> messages = null; |
| | 211 | |
|
| | 212 | | try |
| | 213 | | { |
| 10 | 214 | | messages = await InnerReceiver.ReceiveMessagesAsync( |
| 10 | 215 | | maxMessages, |
| 10 | 216 | | maxWaitTime, |
| 10 | 217 | | cancellationToken).ConfigureAwait(false); |
| 8 | 218 | | await ApplyPlugins(messages).ConfigureAwait(false); |
| 8 | 219 | | } |
| 2 | 220 | | catch (Exception exception) |
| | 221 | | { |
| 2 | 222 | | Logger.ReceiveMessageException(Identifier, exception.ToString()); |
| 2 | 223 | | scope.Failed(exception); |
| 2 | 224 | | throw; |
| | 225 | | } |
| | 226 | |
|
| 8 | 227 | | Logger.ReceiveMessageComplete(Identifier, messages.Count); |
| 8 | 228 | | scope.SetMessageData(messages); |
| | 229 | |
|
| 8 | 230 | | return messages; |
| 8 | 231 | | } |
| | 232 | |
|
| | 233 | | /// <summary> |
| | 234 | | /// Receives messages as an asynchronous enumerable from the entity using <see cref="ReceiveMode"/> mode. |
| | 235 | | /// <see cref="ReceiveMode"/> defaults to PeekLock mode. Messages will be received from the entity as |
| | 236 | | /// the IAsyncEnumerable is iterated. If no messages are available, this method will continue polling |
| | 237 | | /// until messages are available, i.e. it will never return null. |
| | 238 | | /// </summary> |
| | 239 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the |
| | 240 | | /// request to cancel the operation.</param> |
| | 241 | | /// <returns>The message received.</returns> |
| | 242 | | public virtual async IAsyncEnumerable<ServiceBusReceivedMessage> ReceiveMessagesAsync( |
| | 243 | | [EnumeratorCancellation] |
| | 244 | | CancellationToken cancellationToken = default) |
| | 245 | | { |
| 0 | 246 | | while (!cancellationToken.IsCancellationRequested) |
| | 247 | | { |
| 0 | 248 | | var msg = await ReceiveMessageAsync(cancellationToken: cancellationToken).ConfigureAwait(false); |
| 0 | 249 | | if (msg == null) |
| | 250 | | { |
| | 251 | | continue; |
| | 252 | | } |
| 0 | 253 | | yield return msg; |
| | 254 | | } |
| | 255 | |
|
| | 256 | | // Surface the TCE to ensure deterministic behavior when cancelling. |
| 0 | 257 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 0 | 258 | | } |
| | 259 | |
|
| | 260 | | /// <summary> |
| | 261 | | /// Receives a <see cref="ServiceBusReceivedMessage" /> from the entity using <see cref="ReceiveMode"/> mode. |
| | 262 | | /// <see cref="ReceiveMode"/> defaults to PeekLock mode. |
| | 263 | | /// </summary> |
| | 264 | | /// <param name="maxWaitTime">An optional <see cref="TimeSpan"/> specifying the maximum time to wait for a messa |
| | 265 | | /// null if no messages are available. |
| | 266 | | /// If not specified, the <see cref="ServiceBusRetryOptions.TryTimeout"/> will be used.</param> |
| | 267 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 268 | | /// operation.</param> |
| | 269 | | /// |
| | 270 | | /// <returns>The message received. Returns null if no message is found.</returns> |
| | 271 | | public virtual async Task<ServiceBusReceivedMessage> ReceiveMessageAsync( |
| | 272 | | TimeSpan? maxWaitTime = default, |
| | 273 | | CancellationToken cancellationToken = default) |
| | 274 | | { |
| 12 | 275 | | IEnumerable<ServiceBusReceivedMessage> result = await ReceiveMessagesAsync( |
| 12 | 276 | | maxMessages: 1, |
| 12 | 277 | | maxWaitTime: maxWaitTime, |
| 12 | 278 | | cancellationToken: cancellationToken) |
| 12 | 279 | | .ConfigureAwait(false); |
| | 280 | |
|
| 18 | 281 | | foreach (ServiceBusReceivedMessage message in result) |
| | 282 | | { |
| 6 | 283 | | return message; |
| | 284 | | } |
| 0 | 285 | | return null; |
| 6 | 286 | | } |
| | 287 | |
|
| | 288 | | private async Task ApplyPlugins(IReadOnlyList<ServiceBusReceivedMessage> messages) |
| | 289 | | { |
| 0 | 290 | | foreach (ServiceBusPlugin plugin in _plugins) |
| | 291 | | { |
| 0 | 292 | | string pluginType = plugin.GetType().Name; |
| 0 | 293 | | foreach (ServiceBusReceivedMessage message in messages) |
| | 294 | | { |
| | 295 | | try |
| | 296 | | { |
| 0 | 297 | | Logger.PluginCallStarted(pluginType, message.MessageId); |
| 0 | 298 | | await plugin.AfterMessageReceiveAsync(message).ConfigureAwait(false); |
| 0 | 299 | | Logger.PluginCallCompleted(pluginType, message.MessageId); |
| 0 | 300 | | } |
| 0 | 301 | | catch (Exception ex) |
| | 302 | | { |
| 0 | 303 | | Logger.PluginCallException(pluginType, message.MessageId, ex.ToString()); |
| 0 | 304 | | throw; |
| | 305 | | } |
| 0 | 306 | | } |
| 0 | 307 | | } |
| 8 | 308 | | } |
| | 309 | |
|
| | 310 | | /// <summary> |
| | 311 | | /// Fetches the next active <see cref="ServiceBusReceivedMessage"/> without changing the state of the receiver o |
| | 312 | | /// </summary> |
| | 313 | | /// <param name="fromSequenceNumber">An optional sequence number from where to peek the |
| | 314 | | /// message. This corresponds to the <see cref="ServiceBusReceivedMessage.SequenceNumber"/>.</param> |
| | 315 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 316 | | /// operation.</param> |
| | 317 | | /// |
| | 318 | | /// <remarks> |
| | 319 | | /// The first call to <see cref="PeekMessageAsync(long?, CancellationToken)"/> fetches the first active message |
| | 320 | | /// Unlike a received message, a peeked message will not have a lock token associated with it, and hence it cann |
| | 321 | | /// Also, unlike <see cref="ReceiveMessageAsync(TimeSpan?, CancellationToken)"/>, this method will fetch even De |
| | 322 | | /// </remarks> |
| | 323 | | /// |
| | 324 | | /// <returns>The <see cref="ServiceBusReceivedMessage" /> that represents the next message to be read. Returns n |
| | 325 | | public virtual async Task<ServiceBusReceivedMessage> PeekMessageAsync( |
| | 326 | | long? fromSequenceNumber = default, |
| | 327 | | CancellationToken cancellationToken = default) |
| | 328 | | { |
| 6 | 329 | | IEnumerable<ServiceBusReceivedMessage> result = await PeekMessagesInternalAsync( |
| 6 | 330 | | sequenceNumber: fromSequenceNumber, |
| 6 | 331 | | maxMessages: 1, |
| 6 | 332 | | cancellationToken: cancellationToken) |
| 6 | 333 | | .ConfigureAwait(false); |
| | 334 | |
|
| 12 | 335 | | foreach (ServiceBusReceivedMessage message in result) |
| | 336 | | { |
| 4 | 337 | | return message; |
| | 338 | | } |
| 0 | 339 | | return null; |
| 4 | 340 | | } |
| | 341 | |
|
| | 342 | | /// Fetches a list of active messages without changing the state of the receiver or the message source. |
| | 343 | | /// |
| | 344 | | /// <param name="maxMessages">The maximum number of messages that will be fetched.</param> |
| | 345 | | /// <param name="fromSequenceNumber">An optional sequence number from where to peek the |
| | 346 | | /// message. This corresponds to the <see cref="ServiceBusReceivedMessage.SequenceNumber"/>.</param> |
| | 347 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 348 | | /// the operation.</param> |
| | 349 | | /// |
| | 350 | | /// <remarks> |
| | 351 | | /// Unlike a received message, a peeked message will not have a lock token associated with it, and hence it cann |
| | 352 | | /// Completed/Abandoned/Deferred/Deadlettered/Renewed. |
| | 353 | | /// Also, unlike <see cref="ReceiveMessageAsync(TimeSpan?, CancellationToken)"/>, this method will fetch even De |
| | 354 | | /// </remarks> |
| | 355 | | /// |
| | 356 | | /// <returns>An <see cref="IReadOnlyList{ServiceBusReceivedMessage}" /> of messages that were peeked.</returns> |
| | 357 | | public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesAsync( |
| | 358 | | int maxMessages, |
| | 359 | | long? fromSequenceNumber = default, |
| | 360 | | CancellationToken cancellationToken = default) => |
| 4 | 361 | | await PeekMessagesInternalAsync( |
| 4 | 362 | | sequenceNumber: fromSequenceNumber, |
| 4 | 363 | | maxMessages: maxMessages, |
| 4 | 364 | | cancellationToken: cancellationToken).ConfigureAwait(false); |
| | 365 | |
|
| | 366 | | /// <summary> |
| | 367 | | /// Fetches a list of active messages without changing the state of the receiver or the message source. |
| | 368 | | /// </summary> |
| | 369 | | /// <param name="sequenceNumber">The sequence number from where to peek the message.</param> |
| | 370 | | /// <param name="maxMessages">The maximum number of messages that will be fetched.</param> |
| | 371 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 372 | | /// <returns>An <see cref="IList{ServiceBusReceivedMessage}" /> of messages that were peeked.</returns> |
| | 373 | | private async Task<IReadOnlyList<ServiceBusReceivedMessage>> PeekMessagesInternalAsync( |
| | 374 | | long? sequenceNumber, |
| | 375 | | int maxMessages, |
| | 376 | | CancellationToken cancellationToken) |
| | 377 | | { |
| 10 | 378 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 10 | 379 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 10 | 380 | | Logger.PeekMessageStart(Identifier, sequenceNumber, maxMessages); |
| 10 | 381 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 10 | 382 | | DiagnosticProperty.PeekActivityName, |
| 10 | 383 | | requestedMessageCount: maxMessages); |
| 10 | 384 | | scope.Start(); |
| | 385 | |
|
| 10 | 386 | | IReadOnlyList<ServiceBusReceivedMessage> messages = new List<ServiceBusReceivedMessage>(); |
| | 387 | | try |
| | 388 | | { |
| 10 | 389 | | messages = await InnerReceiver.PeekMessagesAsync( |
| 10 | 390 | | sequenceNumber, |
| 10 | 391 | | maxMessages, |
| 10 | 392 | | cancellationToken) |
| 10 | 393 | | .ConfigureAwait(false); |
| 8 | 394 | | } |
| 2 | 395 | | catch (Exception exception) |
| | 396 | | { |
| 2 | 397 | | Logger.PeekMessageException(Identifier, exception.ToString()); |
| 2 | 398 | | scope.Failed(exception); |
| 2 | 399 | | throw; |
| | 400 | | } |
| | 401 | |
|
| 8 | 402 | | Logger.PeekMessageComplete(Identifier, messages.Count); |
| 8 | 403 | | scope.SetMessageData(messages); |
| 8 | 404 | | return messages; |
| 8 | 405 | | } |
| | 406 | |
|
| | 407 | | /// <summary> |
| | 408 | | /// Opens an AMQP link for use with receiver operations. |
| | 409 | | /// </summary> |
| | 410 | | /// |
| | 411 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 412 | | /// |
| | 413 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 414 | | internal async Task OpenLinkAsync(CancellationToken cancellationToken) => |
| 0 | 415 | | await InnerReceiver.OpenLinkAsync(cancellationToken).ConfigureAwait(false); |
| | 416 | |
|
| | 417 | | /// <summary> |
| | 418 | | /// Completes a <see cref="ServiceBusReceivedMessage"/>. This will delete the message from the service. |
| | 419 | | /// </summary> |
| | 420 | | /// <param name="message">The message to complete.</param> |
| | 421 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 422 | | /// |
| | 423 | | /// <remarks> |
| | 424 | | /// This operation can only be performed on a message that was received by this receiver |
| | 425 | | /// when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 426 | | /// </remarks> |
| | 427 | | /// |
| | 428 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 429 | | public virtual async Task CompleteMessageAsync( |
| | 430 | | ServiceBusReceivedMessage message, |
| | 431 | | CancellationToken cancellationToken = default) |
| | 432 | | { |
| 4 | 433 | | Argument.AssertNotNull(message, nameof(message)); |
| 4 | 434 | | await CompleteMessageAsync(message.LockToken, cancellationToken).ConfigureAwait(false); |
| 2 | 435 | | } |
| | 436 | |
|
| | 437 | | /// <summary> |
| | 438 | | /// Completes a <see cref="ServiceBusReceivedMessage"/>. This will delete the message from the service. |
| | 439 | | /// </summary> |
| | 440 | | /// |
| | 441 | | /// <param name="lockToken">The lock token of the <see cref="ServiceBusReceivedMessage"/> message to complete.</ |
| | 442 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 443 | | /// |
| | 444 | | /// <remarks> |
| | 445 | | /// This operation can only be performed on a message that was received by this receiver |
| | 446 | | /// when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 447 | | /// </remarks> |
| | 448 | | /// |
| | 449 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 450 | | public virtual async Task CompleteMessageAsync( |
| | 451 | | string lockToken, |
| | 452 | | CancellationToken cancellationToken = default) |
| | 453 | | { |
| 4 | 454 | | ThrowIfLockTokenIsEmpty(lockToken); |
| 4 | 455 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 4 | 456 | | Argument.AssertNotNullOrEmpty(lockToken, nameof(lockToken)); |
| 4 | 457 | | ThrowIfNotPeekLockMode(); |
| 4 | 458 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 459 | | Logger.CompleteMessageStart( |
| 4 | 460 | | Identifier, |
| 4 | 461 | | 1, |
| 4 | 462 | | lockToken); |
| 4 | 463 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 464 | | DiagnosticProperty.CompleteActivityName, |
| 4 | 465 | | lockToken: lockToken); |
| 4 | 466 | | scope.Start(); |
| | 467 | |
|
| | 468 | | try |
| | 469 | | { |
| 4 | 470 | | await InnerReceiver.CompleteAsync( |
| 4 | 471 | | lockToken, |
| 4 | 472 | | cancellationToken).ConfigureAwait(false); |
| 2 | 473 | | } |
| 2 | 474 | | catch (Exception exception) |
| | 475 | | { |
| 2 | 476 | | Logger.CompleteMessageException(Identifier, exception.ToString()); |
| 2 | 477 | | scope.Failed(exception); |
| 2 | 478 | | throw; |
| | 479 | | } |
| | 480 | |
|
| 2 | 481 | | Logger.CompleteMessageComplete(Identifier); |
| 2 | 482 | | } |
| | 483 | |
|
| | 484 | | /// <summary> |
| | 485 | | /// Abandons a <see cref="ServiceBusReceivedMessage"/>.This will make the message available again for immediate |
| | 486 | | /// </summary> |
| | 487 | | /// |
| | 488 | | /// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to abandon.</param> |
| | 489 | | /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para |
| | 490 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 491 | | /// |
| | 492 | | /// <remarks> |
| | 493 | | /// Abandoning a message will increase the delivery count on the message. |
| | 494 | | /// This operation can only be performed on messages that were received by this receiver |
| | 495 | | /// when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 496 | | /// </remarks> |
| | 497 | | /// |
| | 498 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 499 | | public virtual async Task AbandonMessageAsync( |
| | 500 | | ServiceBusReceivedMessage message, |
| | 501 | | IDictionary<string, object> propertiesToModify = null, |
| | 502 | | CancellationToken cancellationToken = default) |
| | 503 | | { |
| 4 | 504 | | Argument.AssertNotNull(message, nameof(message)); |
| 4 | 505 | | await AbandonMessageAsync( |
| 4 | 506 | | message.LockToken, |
| 4 | 507 | | propertiesToModify, |
| 4 | 508 | | cancellationToken).ConfigureAwait(false); |
| 2 | 509 | | } |
| | 510 | |
|
| | 511 | | /// <summary> |
| | 512 | | /// Abandons a <see cref="ServiceBusReceivedMessage"/>. This will make the message available again for processin |
| | 513 | | /// </summary> |
| | 514 | | /// |
| | 515 | | /// <param name="lockToken">The lock token <see cref="ServiceBusReceivedMessage"/> to abandon.</param> |
| | 516 | | /// <param name="propertiesToModify">The properties of the message to modify while abandoning the message.</para |
| | 517 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 518 | | /// |
| | 519 | | /// <remarks> |
| | 520 | | /// Abandoning a message will increase the delivery count on the message. |
| | 521 | | /// This operation can only be performed on messages that were received by this receiver |
| | 522 | | /// when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 523 | | /// </remarks> |
| | 524 | | /// |
| | 525 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 526 | | public virtual async Task AbandonMessageAsync( |
| | 527 | | string lockToken, |
| | 528 | | IDictionary<string, object> propertiesToModify = null, |
| | 529 | | CancellationToken cancellationToken = default) |
| | 530 | | { |
| 4 | 531 | | ThrowIfLockTokenIsEmpty(lockToken); |
| 4 | 532 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 4 | 533 | | ThrowIfNotPeekLockMode(); |
| 4 | 534 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 535 | | Logger.AbandonMessageStart(Identifier, 1, lockToken); |
| 4 | 536 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 537 | | DiagnosticProperty.AbandonActivityName, |
| 4 | 538 | | lockToken: lockToken); |
| 4 | 539 | | scope.Start(); |
| | 540 | |
|
| | 541 | | try |
| | 542 | | { |
| 4 | 543 | | await InnerReceiver.AbandonAsync( |
| 4 | 544 | | lockToken, |
| 4 | 545 | | propertiesToModify, |
| 4 | 546 | | cancellationToken).ConfigureAwait(false); |
| 2 | 547 | | } |
| 2 | 548 | | catch (Exception exception) |
| | 549 | | { |
| 2 | 550 | | Logger.AbandonMessageException(Identifier, exception.ToString()); |
| 2 | 551 | | scope.Failed(exception); |
| 2 | 552 | | throw; |
| | 553 | | } |
| | 554 | |
|
| 2 | 555 | | Logger.AbandonMessageComplete(Identifier); |
| 2 | 556 | | } |
| | 557 | |
|
| | 558 | | /// <summary> |
| | 559 | | /// Moves a message to the deadletter sub-queue. |
| | 560 | | /// </summary> |
| | 561 | | /// |
| | 562 | | /// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to deadletter.</param> |
| | 563 | | /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param> |
| | 564 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 565 | | /// |
| | 566 | | /// <remarks> |
| | 567 | | /// In order to receive a message from the deadletter queue, you can call |
| | 568 | | /// <see cref="ServiceBusClient.CreateDeadLetterReceiver(string, ServiceBusReceiverOptions)"/> |
| | 569 | | /// or <see cref="ServiceBusClient.CreateDeadLetterReceiver(string, string, ServiceBusReceiverOptions)"/> |
| | 570 | | /// to create a receiver for the queue or subscription. |
| | 571 | | /// This operation can only be performed when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLoc |
| | 572 | | /// </remarks> |
| | 573 | | public virtual async Task DeadLetterMessageAsync( |
| | 574 | | ServiceBusReceivedMessage message, |
| | 575 | | IDictionary<string, object> propertiesToModify = null, |
| | 576 | | CancellationToken cancellationToken = default) |
| | 577 | | { |
| 4 | 578 | | Argument.AssertNotNull(message, nameof(message)); |
| 4 | 579 | | await DeadLetterMessageAsync( |
| 4 | 580 | | lockToken: message.LockToken, |
| 4 | 581 | | propertiesToModify: propertiesToModify, |
| 4 | 582 | | cancellationToken: cancellationToken).ConfigureAwait(false); |
| 2 | 583 | | } |
| | 584 | |
|
| | 585 | | /// <summary> |
| | 586 | | /// Moves a message to the deadletter sub-queue. |
| | 587 | | /// </summary> |
| | 588 | | /// |
| | 589 | | /// <param name="lockToken">The lock token of the <see cref="ServiceBusReceivedMessage"/> to deadletter.</param> |
| | 590 | | /// <param name="propertiesToModify">The properties of the message to modify while moving to sub-queue.</param> |
| | 591 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 592 | | /// |
| | 593 | | /// <remarks> |
| | 594 | | /// In order to receive a message from the deadletter queue, you can call |
| | 595 | | /// <see cref="ServiceBusClient.CreateDeadLetterReceiver(string, ServiceBusReceiverOptions)"/> |
| | 596 | | /// or <see cref="ServiceBusClient.CreateDeadLetterReceiver(string, string, ServiceBusReceiverOptions)"/> |
| | 597 | | /// to create a receiver for the queue or subscription. |
| | 598 | | /// This operation can only be performed when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLoc |
| | 599 | | /// </remarks> |
| | 600 | | public virtual async Task DeadLetterMessageAsync( |
| | 601 | | string lockToken, |
| | 602 | | IDictionary<string, object> propertiesToModify = null, |
| | 603 | | CancellationToken cancellationToken = default) => |
| 4 | 604 | | await DeadLetterInternalAsync( |
| 4 | 605 | | lockToken: lockToken, |
| 4 | 606 | | propertiesToModify: propertiesToModify, |
| 4 | 607 | | cancellationToken: cancellationToken).ConfigureAwait(false); |
| | 608 | |
|
| | 609 | | /// <summary> |
| | 610 | | /// Moves a message to the deadletter sub-queue. |
| | 611 | | /// </summary> |
| | 612 | | /// |
| | 613 | | /// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to deadletter.</param> |
| | 614 | | /// <param name="deadLetterReason">The reason for deadlettering the message.</param> |
| | 615 | | /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param> |
| | 616 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 617 | | /// |
| | 618 | | /// <remarks> |
| | 619 | | /// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>, |
| | 620 | | /// only when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 621 | | /// In order to receive a message from the deadletter queue, you will need a new <see cref="ServiceBusReceiver"/ |
| | 622 | | /// You can use EntityNameHelper.FormatDeadLetterPath(string) to help with this. |
| | 623 | | /// This operation can only be performed on messages that were received by this receiver. |
| | 624 | | /// </remarks> |
| | 625 | | public virtual async Task DeadLetterMessageAsync( |
| | 626 | | ServiceBusReceivedMessage message, |
| | 627 | | string deadLetterReason, |
| | 628 | | string deadLetterErrorDescription = default, |
| | 629 | | CancellationToken cancellationToken = default) |
| | 630 | | { |
| 0 | 631 | | Argument.AssertNotNull(message, nameof(message)); |
| 0 | 632 | | await DeadLetterMessageAsync( |
| 0 | 633 | | lockToken: message.LockToken, |
| 0 | 634 | | deadLetterReason: deadLetterReason, |
| 0 | 635 | | deadLetterErrorDescription: deadLetterErrorDescription, |
| 0 | 636 | | cancellationToken: cancellationToken).ConfigureAwait(false); |
| 0 | 637 | | } |
| | 638 | |
|
| | 639 | | /// <summary> |
| | 640 | | /// Moves a message to the deadletter sub-queue. |
| | 641 | | /// </summary> |
| | 642 | | /// |
| | 643 | | /// <param name="lockToken">The lock token of the <see cref="ServiceBusReceivedMessage"/> to deadletter.</param> |
| | 644 | | /// <param name="deadLetterReason">The reason for deadlettering the message.</param> |
| | 645 | | /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param> |
| | 646 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 647 | | /// |
| | 648 | | /// <remarks> |
| | 649 | | /// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>, |
| | 650 | | /// only when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 651 | | /// In order to receive a message from the deadletter queue, you will need a new <see cref="ServiceBusReceiver"/ |
| | 652 | | /// You can use EntityNameHelper.FormatDeadLetterPath(string) to help with this. |
| | 653 | | /// This operation can only be performed on messages that were received by this receiver. |
| | 654 | | /// </remarks> |
| | 655 | | public virtual async Task DeadLetterMessageAsync( |
| | 656 | | string lockToken, |
| | 657 | | string deadLetterReason, |
| | 658 | | string deadLetterErrorDescription = null, |
| | 659 | | CancellationToken cancellationToken = default) => |
| 0 | 660 | | await DeadLetterInternalAsync( |
| 0 | 661 | | lockToken: lockToken, |
| 0 | 662 | | deadLetterReason: deadLetterReason, |
| 0 | 663 | | deadLetterErrorDescription: deadLetterErrorDescription, |
| 0 | 664 | | cancellationToken: cancellationToken).ConfigureAwait(false); |
| | 665 | |
|
| | 666 | | /// <summary> |
| | 667 | | /// Moves a message to the deadletter sub-queue. |
| | 668 | | /// </summary> |
| | 669 | | /// |
| | 670 | | /// <param name="lockToken">The lock token <see cref="ServiceBusReceivedMessage"/> to deadletter.</param> |
| | 671 | | /// <param name="deadLetterReason">The reason for deadlettering the message.</param> |
| | 672 | | /// <param name="deadLetterErrorDescription">The error description for deadlettering the message.</param> |
| | 673 | | /// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param |
| | 674 | | /// <param name="cancellationToken"></param> |
| | 675 | | /// |
| | 676 | | /// <remarks> |
| | 677 | | /// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>, |
| | 678 | | /// only when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 679 | | /// In order to receive a message from the deadletter queue, you will need a new <see cref="ServiceBusReceiver"/ |
| | 680 | | /// You can use EntityNameHelper.FormatDeadLetterPath(string) to help with this. |
| | 681 | | /// This operation can only be performed on messages that were received by this receiver. |
| | 682 | | /// </remarks> |
| | 683 | | private async Task DeadLetterInternalAsync( |
| | 684 | | string lockToken, |
| | 685 | | string deadLetterReason = default, |
| | 686 | | string deadLetterErrorDescription = default, |
| | 687 | | IDictionary<string, object> propertiesToModify = default, |
| | 688 | | CancellationToken cancellationToken = default) |
| | 689 | | { |
| 4 | 690 | | ThrowIfLockTokenIsEmpty(lockToken); |
| 4 | 691 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 4 | 692 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 693 | | ThrowIfNotPeekLockMode(); |
| 4 | 694 | | Logger.DeadLetterMessageStart(Identifier, 1, lockToken); |
| 4 | 695 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 696 | | DiagnosticProperty.DeadLetterActivityName, |
| 4 | 697 | | lockToken: lockToken); |
| 4 | 698 | | scope.Start(); |
| | 699 | |
|
| | 700 | | try |
| | 701 | | { |
| 4 | 702 | | await InnerReceiver.DeadLetterAsync( |
| 4 | 703 | | lockToken: lockToken, |
| 4 | 704 | | deadLetterReason: deadLetterReason, |
| 4 | 705 | | deadLetterErrorDescription: deadLetterErrorDescription, |
| 4 | 706 | | propertiesToModify: propertiesToModify, |
| 4 | 707 | | cancellationToken: cancellationToken).ConfigureAwait(false); |
| 2 | 708 | | } |
| 2 | 709 | | catch (Exception exception) |
| | 710 | | { |
| 2 | 711 | | Logger.DeadLetterMessageException(Identifier, exception.ToString()); |
| 2 | 712 | | scope.Failed(exception); |
| 2 | 713 | | throw; |
| | 714 | | } |
| | 715 | |
|
| 2 | 716 | | Logger.DeadLetterMessageComplete(Identifier); |
| 2 | 717 | | } |
| | 718 | |
|
| | 719 | | /// <summary> Indicates that the receiver wants to defer the processing for the message.</summary> |
| | 720 | | /// |
| | 721 | | /// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to defer.</param> |
| | 722 | | /// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param |
| | 723 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 724 | | /// |
| | 725 | | /// <remarks> |
| | 726 | | /// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>, |
| | 727 | | /// only when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 728 | | /// In order to receive this message again in the future, you will need to save the |
| | 729 | | /// <see cref="ServiceBusReceivedMessage.SequenceNumber"/> |
| | 730 | | /// and receive it using <see cref="ReceiveDeferredMessageAsync(long, CancellationToken)"/>. |
| | 731 | | /// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire. |
| | 732 | | /// This operation can only be performed on messages that were received by this receiver. |
| | 733 | | /// </remarks> |
| | 734 | | /// |
| | 735 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 736 | | public virtual async Task DeferMessageAsync( |
| | 737 | | ServiceBusReceivedMessage message, |
| | 738 | | IDictionary<string, object> propertiesToModify = null, |
| | 739 | | CancellationToken cancellationToken = default) |
| | 740 | | { |
| 4 | 741 | | Argument.AssertNotNull(message, nameof(message)); |
| 4 | 742 | | await DeferMessageAsync( |
| 4 | 743 | | message.LockToken, |
| 4 | 744 | | propertiesToModify, |
| 4 | 745 | | cancellationToken).ConfigureAwait(false); |
| 2 | 746 | | } |
| | 747 | |
|
| | 748 | | /// <summary> Indicates that the receiver wants to defer the processing for the message.</summary> |
| | 749 | | /// |
| | 750 | | /// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to defer.</param> |
| | 751 | | /// <param name="propertiesToModify">The properties of the message to modify while deferring the message.</param |
| | 752 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 753 | | /// |
| | 754 | | /// <remarks> |
| | 755 | | /// A lock token can be found in <see cref="ServiceBusReceivedMessage.LockToken"/>, |
| | 756 | | /// only when <see cref="ReceiveMode"/> is set to <see cref="ReceiveMode.PeekLock"/>. |
| | 757 | | /// In order to receive this message again in the future, you will need to save the |
| | 758 | | /// <see cref="ServiceBusReceivedMessage.SequenceNumber"/> |
| | 759 | | /// and receive it using <see cref="ReceiveDeferredMessageAsync(long, CancellationToken)"/>. |
| | 760 | | /// Deferring messages does not impact message's expiration, meaning that deferred messages can still expire. |
| | 761 | | /// This operation can only be performed on messages that were received by this receiver. |
| | 762 | | /// </remarks> |
| | 763 | | /// |
| | 764 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 765 | | public virtual async Task DeferMessageAsync( |
| | 766 | | string lockToken, |
| | 767 | | IDictionary<string, object> propertiesToModify = null, |
| | 768 | | CancellationToken cancellationToken = default) |
| | 769 | | { |
| 4 | 770 | | ThrowIfLockTokenIsEmpty(lockToken); |
| 4 | 771 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 4 | 772 | | ThrowIfNotPeekLockMode(); |
| 4 | 773 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 774 | | Logger.DeferMessageStart(Identifier, 1, lockToken); |
| 4 | 775 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 776 | | DiagnosticProperty.DeferActivityName, |
| 4 | 777 | | lockToken: lockToken); |
| 4 | 778 | | scope.Start(); |
| | 779 | |
|
| | 780 | | try |
| | 781 | | { |
| 4 | 782 | | await InnerReceiver.DeferAsync( |
| 4 | 783 | | lockToken, |
| 4 | 784 | | propertiesToModify, |
| 4 | 785 | | cancellationToken).ConfigureAwait(false); |
| 2 | 786 | | } |
| 2 | 787 | | catch (Exception exception) |
| | 788 | | { |
| 2 | 789 | | Logger.DeferMessageException(Identifier, exception.ToString()); |
| 2 | 790 | | scope.Failed(exception); |
| 2 | 791 | | throw; |
| | 792 | | } |
| | 793 | |
|
| 2 | 794 | | Logger.DeferMessageComplete(Identifier); |
| 2 | 795 | | } |
| | 796 | |
|
| | 797 | | /// <summary> |
| | 798 | | /// Throws an InvalidOperationException when not in PeekLock mode. |
| | 799 | | /// </summary> |
| | 800 | | private void ThrowIfNotPeekLockMode() |
| | 801 | | { |
| 20 | 802 | | if (ReceiveMode != ReceiveMode.PeekLock) |
| | 803 | | { |
| 0 | 804 | | throw new InvalidOperationException(Resources.OperationNotSupported); |
| | 805 | | } |
| 20 | 806 | | } |
| | 807 | |
|
| | 808 | | /// <summary> |
| | 809 | | /// Throws an InvalidOperationException when the lock token is empty. |
| | 810 | | /// </summary> |
| | 811 | | private static void ThrowIfLockTokenIsEmpty(string lockToken) |
| | 812 | | { |
| 22 | 813 | | if (Guid.Parse(lockToken) == Guid.Empty) |
| | 814 | | { |
| 2 | 815 | | throw new InvalidOperationException(Resources.SettlementOperationNotSupported); |
| | 816 | | } |
| 20 | 817 | | } |
| | 818 | |
|
| | 819 | | /// <summary> |
| | 820 | | /// Receives a deferred message identified by <paramref name="sequenceNumber"/>. |
| | 821 | | /// </summary> |
| | 822 | | /// |
| | 823 | | /// <param name="sequenceNumber">The sequence number of the message to receive. This corresponds to |
| | 824 | | /// the <see cref="ServiceBusReceivedMessage.SequenceNumber"/>.</param> |
| | 825 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 826 | | /// |
| | 827 | | /// <returns>The deferred message identified by the specified sequence number. Returns null if no message is fou |
| | 828 | | /// Throws if the message has not been deferred.</returns> |
| | 829 | | /// <seealso cref="DeferMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)" |
| | 830 | | /// <seealso cref="DeferMessageAsync(string, IDictionary{string, object}, CancellationToken)"/> |
| | 831 | | public virtual async Task<ServiceBusReceivedMessage> ReceiveDeferredMessageAsync( |
| | 832 | | long sequenceNumber, |
| | 833 | | CancellationToken cancellationToken = default) => |
| 0 | 834 | | (await ReceiveDeferredMessagesAsync(new long[] { sequenceNumber }, cancellationToken).ConfigureAwait(false)) |
| | 835 | |
|
| | 836 | | /// <summary> |
| | 837 | | /// Receives a <see cref="IList{ServiceBusReceivedMessage}"/> of deferred messages identified by <paramref name= |
| | 838 | | /// </summary> |
| | 839 | | /// |
| | 840 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 841 | | /// <param name="sequenceNumbers">An <see cref="IEnumerable{T}"/> containing the sequence numbers to receive.</p |
| | 842 | | /// |
| | 843 | | /// <returns>Messages identified by sequence number are returned. Returns null if no messages are found. |
| | 844 | | /// Throws if the messages have not been deferred.</returns> |
| | 845 | | /// <seealso cref="DeferMessageAsync(ServiceBusReceivedMessage, IDictionary{string, object}, CancellationToken)" |
| | 846 | | /// <seealso cref="DeferMessageAsync(string, IDictionary{string, object}, CancellationToken)"/> |
| | 847 | | public virtual async Task<IReadOnlyList<ServiceBusReceivedMessage>> ReceiveDeferredMessagesAsync( |
| | 848 | | IEnumerable<long> sequenceNumbers, |
| | 849 | | CancellationToken cancellationToken = default) |
| | 850 | | { |
| 0 | 851 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 0 | 852 | | Argument.AssertNotNullOrEmpty(sequenceNumbers, nameof(sequenceNumbers)); |
| 0 | 853 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 0 | 854 | | var sequenceNumbersList = sequenceNumbers.ToList(); |
| | 855 | |
|
| 0 | 856 | | Logger.ReceiveDeferredMessageStart(Identifier, sequenceNumbersList); |
| 0 | 857 | | using DiagnosticScope scope = ScopeFactory.CreateScope(DiagnosticProperty.ReceiveDeferredActivityName); |
| 0 | 858 | | scope.AddAttribute( |
| 0 | 859 | | DiagnosticProperty.SequenceNumbersAttribute, |
| 0 | 860 | | string.Join(",", sequenceNumbers)); |
| 0 | 861 | | scope.Start(); |
| | 862 | |
|
| 0 | 863 | | IReadOnlyList<ServiceBusReceivedMessage> deferredMessages = null; |
| | 864 | | try |
| | 865 | | { |
| 0 | 866 | | deferredMessages = await InnerReceiver.ReceiveDeferredMessagesAsync( |
| 0 | 867 | | sequenceNumbersList, |
| 0 | 868 | | cancellationToken).ConfigureAwait(false); |
| 0 | 869 | | } |
| 0 | 870 | | catch (Exception exception) |
| | 871 | | { |
| 0 | 872 | | Logger.ReceiveDeferredMessageException(Identifier, exception.ToString()); |
| 0 | 873 | | scope.Failed(exception); |
| 0 | 874 | | throw; |
| | 875 | | } |
| | 876 | |
|
| 0 | 877 | | Logger.ReceiveDeferredMessageComplete(Identifier, deferredMessages.Count); |
| 0 | 878 | | scope.SetMessageData(deferredMessages); |
| 0 | 879 | | return deferredMessages; |
| 0 | 880 | | } |
| | 881 | |
|
| | 882 | | /// <summary> |
| | 883 | | /// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue. |
| | 884 | | /// </summary> |
| | 885 | | /// |
| | 886 | | /// <remarks> |
| | 887 | | /// When a message is received in <see cref="ReceiveMode.PeekLock"/> mode, the message is locked on the server f |
| | 888 | | /// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration). |
| | 889 | | /// If processing of the message requires longer than this duration, the lock needs to be renewed. |
| | 890 | | /// For each renewal, it resets the time the message is locked by the LockDuration set on the Entity. |
| | 891 | | /// </remarks> |
| | 892 | | /// |
| | 893 | | /// <param name="message">The <see cref="ServiceBusReceivedMessage"/> to renew the lock for.</param> |
| | 894 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 895 | | public virtual async Task RenewMessageLockAsync( |
| | 896 | | ServiceBusReceivedMessage message, |
| | 897 | | CancellationToken cancellationToken = default) |
| | 898 | | { |
| 6 | 899 | | Argument.AssertNotNull(message, nameof(message)); |
| 6 | 900 | | DateTimeOffset lockedUntil = await RenewMessageLockAsync( |
| 6 | 901 | | message.LockToken, |
| 6 | 902 | | cancellationToken).ConfigureAwait(false); |
| 2 | 903 | | message.LockedUntil = lockedUntil; |
| 2 | 904 | | } |
| | 905 | |
|
| | 906 | | /// <summary> |
| | 907 | | /// Renews the lock on the message. The lock will be renewed based on the setting specified on the queue. |
| | 908 | | /// </summary> |
| | 909 | | /// |
| | 910 | | /// <remarks> |
| | 911 | | /// When a message is received in <see cref="ReceiveMode.PeekLock"/> mode, the message is locked on the server f |
| | 912 | | /// receiver instance for a duration as specified during the Queue/Subscription creation (LockDuration). |
| | 913 | | /// If processing of the message requires longer than this duration, the lock needs to be renewed. |
| | 914 | | /// For each renewal, it resets the time the message is locked by the LockDuration set on the Entity. |
| | 915 | | /// </remarks> |
| | 916 | | /// |
| | 917 | | /// <param name="lockToken">The lockToken of the <see cref="ServiceBusReceivedMessage"/> to renew the lock for.< |
| | 918 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 919 | | public virtual async Task<DateTimeOffset> RenewMessageLockAsync( |
| | 920 | | string lockToken, |
| | 921 | | CancellationToken cancellationToken = default) |
| | 922 | | { |
| 6 | 923 | | ThrowIfLockTokenIsEmpty(lockToken); |
| 4 | 924 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusReceiver)); |
| 4 | 925 | | ThrowIfNotPeekLockMode(); |
| 4 | 926 | | ThrowIfSessionReceiver(); |
| 4 | 927 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 928 | | Logger.RenewMessageLockStart(Identifier, 1, lockToken); |
| 4 | 929 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 930 | | DiagnosticProperty.RenewMessageLockActivityName, |
| 4 | 931 | | lockToken: lockToken); |
| 4 | 932 | | scope.Start(); |
| | 933 | |
|
| | 934 | | DateTimeOffset lockedUntil; |
| | 935 | | try |
| | 936 | | { |
| 4 | 937 | | lockedUntil = await InnerReceiver.RenewMessageLockAsync( |
| 4 | 938 | | lockToken, |
| 4 | 939 | | cancellationToken).ConfigureAwait(false); |
| 2 | 940 | | } |
| 2 | 941 | | catch (Exception exception) |
| | 942 | | { |
| 2 | 943 | | Logger.RenewMessageLockException(Identifier, exception.ToString()); |
| 2 | 944 | | scope.Failed(exception); |
| 2 | 945 | | throw; |
| | 946 | | } |
| | 947 | |
|
| 2 | 948 | | Logger.RenewMessageLockComplete(Identifier); |
| 2 | 949 | | scope.AddAttribute(DiagnosticProperty.LockedUntilAttribute, lockedUntil); |
| 2 | 950 | | return lockedUntil; |
| 2 | 951 | | } |
| | 952 | |
|
| | 953 | | /// <summary> |
| | 954 | | /// Throws an exception if the receiver instance is a session receiver. |
| | 955 | | /// </summary> |
| | 956 | | private void ThrowIfSessionReceiver() |
| | 957 | | { |
| 4 | 958 | | if (IsSessionReceiver) |
| | 959 | | { |
| 0 | 960 | | throw new InvalidOperationException(Resources.CannotLockMessageOnSessionEntity); |
| | 961 | | } |
| 4 | 962 | | } |
| | 963 | |
|
| | 964 | | /// <summary> |
| | 965 | | /// Performs the task needed to clean up resources used by the <see cref="ServiceBusReceiver" />. |
| | 966 | | /// </summary> |
| | 967 | | /// |
| | 968 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 969 | | [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju |
| | 970 | | public virtual async ValueTask DisposeAsync() |
| | 971 | | { |
| 4 | 972 | | IsDisposed = true; |
| 4 | 973 | | Type clientType = GetType(); |
| | 974 | |
|
| 4 | 975 | | Logger.ClientDisposeStart(clientType, Identifier); |
| | 976 | | try |
| | 977 | | { |
| 4 | 978 | | await InnerReceiver.CloseAsync(CancellationToken.None).ConfigureAwait(false); |
| 4 | 979 | | } |
| 0 | 980 | | catch (Exception ex) |
| | 981 | | { |
| 0 | 982 | | Logger.ClientDisposeException(clientType, Identifier, ex); |
| 0 | 983 | | throw; |
| | 984 | | } |
| | 985 | |
|
| 4 | 986 | | Logger.ClientDisposeComplete(clientType, Identifier); |
| 4 | 987 | | } |
| | 988 | |
|
| | 989 | | /// <summary> |
| | 990 | | /// Determines whether the specified <see cref="System.Object" /> is equal to this instance. |
| | 991 | | /// </summary> |
| | 992 | | /// |
| | 993 | | /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param> |
| | 994 | | /// |
| | 995 | | /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c> |
| | 996 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 997 | | public override bool Equals(object obj) => base.Equals(obj); |
| | 998 | |
|
| | 999 | | /// <summary> |
| | 1000 | | /// Returns a hash code for this instance. |
| | 1001 | | /// </summary> |
| | 1002 | | /// |
| | 1003 | | /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha |
| | 1004 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 1005 | | public override int GetHashCode() => base.GetHashCode(); |
| | 1006 | |
|
| | 1007 | | /// <summary> |
| | 1008 | | /// Converts the instance to string representation. |
| | 1009 | | /// </summary> |
| | 1010 | | /// |
| | 1011 | | /// <returns>A <see cref="System.String" /> that represents this instance.</returns> |
| | 1012 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 1013 | | public override string ToString() => base.ToString(); |
| | 1014 | | } |
| | 1015 | | } |