| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.ComponentModel; |
| | 7 | | using System.Diagnostics.CodeAnalysis; |
| | 8 | | using System.Threading; |
| | 9 | | using System.Threading.Tasks; |
| | 10 | | using Azure.Messaging.ServiceBus.Plugins; |
| | 11 | |
|
| | 12 | | namespace Azure.Messaging.ServiceBus |
| | 13 | | { |
| | 14 | | /// <summary> |
| | 15 | | /// The <see cref="ServiceBusSessionProcessor"/> provides an abstraction around a set of <see cref="ServiceBusSessio |
| | 16 | | /// allows using an event based model for processing received <see cref="ServiceBusReceivedMessage" />. |
| | 17 | | /// It is constructed by calling <see cref="ServiceBusClient.CreateSessionProcessor(string, ServiceBusSessionProcess |
| | 18 | | /// The event handler is specified with the <see cref="ProcessMessageAsync"/> |
| | 19 | | /// property. The error handler is specified with the <see cref="ProcessErrorAsync"/> property. |
| | 20 | | /// To start processing after the handlers have been specified, call <see cref="StartProcessingAsync"/>. |
| | 21 | | /// </summary> |
| | 22 | | public class ServiceBusSessionProcessor |
| | 23 | | { |
| | 24 | | private readonly ServiceBusProcessor _innerProcessor; |
| | 25 | |
|
| | 26 | | /// <summary> |
| | 27 | | /// The path of the Service Bus entity that the processor is connected to, specific to the |
| | 28 | | /// Service Bus namespace that contains it. |
| | 29 | | /// </summary> |
| 0 | 30 | | public string EntityPath => _innerProcessor.EntityPath; |
| | 31 | |
|
| | 32 | | /// <summary> |
| | 33 | | /// Gets the ID to identify this client. This can be used to correlate logs and exceptions. |
| | 34 | | /// </summary> |
| | 35 | | /// <remarks>Every new client has a unique ID.</remarks> |
| 0 | 36 | | internal string Identifier => _innerProcessor.Identifier; |
| | 37 | |
|
| | 38 | | /// <summary> |
| | 39 | | /// The <see cref="ReceiveMode"/> used to specify how messages are received. Defaults to PeekLock mode. |
| | 40 | | /// </summary> |
| 2 | 41 | | public ReceiveMode ReceiveMode => _innerProcessor.ReceiveMode; |
| | 42 | |
|
| | 43 | | /// <summary> |
| | 44 | | /// Gets the number of messages that will be eagerly requested from Queues or Subscriptions and queued locally w |
| | 45 | | /// whether a processing is currently active, intended to help maximize throughput by allowing the receiver to r |
| | 46 | | /// from a local cache rather than waiting on a service request. |
| | 47 | | /// </summary> |
| 2 | 48 | | public int PrefetchCount => _innerProcessor.PrefetchCount; |
| | 49 | |
|
| | 50 | | /// <summary> |
| | 51 | | /// Indicates whether or not this <see cref="ServiceBusSessionProcessor"/> is currently processing messages. |
| | 52 | | /// </summary> |
| | 53 | | /// |
| | 54 | | /// <value> |
| | 55 | | /// <c>true</c> if the client is processing messages; otherwise, <c>false</c>. |
| | 56 | | /// </value> |
| 0 | 57 | | public bool IsProcessing => _innerProcessor.IsProcessing; |
| | 58 | |
|
| | 59 | | /// <summary>Gets a value that indicates whether the <see cref="ServiceBusSessionProcessor"/> should automatical |
| | 60 | | /// complete messages after the event handler has completed processing. If the event handler |
| | 61 | | /// triggers an exception, the message will not be automatically completed.</summary> |
| | 62 | | /// |
| | 63 | | /// <value>true if the message will be completed automatically on successful execution of the operation; otherwi |
| 2 | 64 | | public bool AutoComplete => _innerProcessor.AutoComplete; |
| | 65 | |
|
| | 66 | | /// <summary> |
| | 67 | | /// Gets the maximum duration within which the lock will be renewed automatically. This |
| | 68 | | /// value should be greater than the longest message lock duration; for example, the LockDuration Property. |
| | 69 | | /// </summary> |
| | 70 | | /// |
| | 71 | | /// <value>The maximum duration during which locks are automatically renewed.</value> |
| | 72 | | /// |
| | 73 | | /// <remarks>The message renew can continue for sometime in the background |
| | 74 | | /// after completion of message and result in a few false MessageLockLostExceptions temporarily.</remarks> |
| 2 | 75 | | public TimeSpan MaxAutoLockRenewalDuration => _innerProcessor.MaxAutoLockRenewalDuration; |
| | 76 | |
|
| | 77 | | /// <summary>Gets the maximum number of sessions that will be processed concurrently by the processor. |
| | 78 | | /// The default value is 8.</summary> |
| 2 | 79 | | public int MaxConcurrentSessions => _innerProcessor.MaxConcurrentSessions; |
| | 80 | |
|
| | 81 | | /// <summary>Gets the maximum number of calls to the callback the processor will initiate per session. |
| | 82 | | /// Thus the total number of callbacks will be equal to MaxConcurrentSessions * MaxConcurrentCallsPerSession. |
| | 83 | | /// The default value is 1.</summary> |
| 2 | 84 | | public int MaxConcurrentCallsPerSession => _innerProcessor.MaxConcurrentCallsPerSession; |
| | 85 | |
|
| | 86 | | /// <summary> |
| | 87 | | /// The fully qualified Service Bus namespace that the receiver is associated with. This is likely |
| | 88 | | /// to be similar to <c>{yournamespace}.servicebus.windows.net</c>. |
| | 89 | | /// </summary> |
| 2 | 90 | | public string FullyQualifiedNamespace => _innerProcessor.FullyQualifiedNamespace; |
| | 91 | |
|
| | 92 | | /// <summary> |
| | 93 | | /// The maximum amount of time to wait for each Receive call using the processor's underlying receiver. If not s |
| | 94 | | /// </summary> |
| 0 | 95 | | public TimeSpan? MaxReceiveWaitTime => _innerProcessor.MaxReceiveWaitTime; |
| | 96 | |
|
| 14 | 97 | | internal ServiceBusSessionProcessor( |
| 14 | 98 | | ServiceBusConnection connection, |
| 14 | 99 | | string entityPath, |
| 14 | 100 | | IList<ServiceBusPlugin> plugins, |
| 14 | 101 | | ServiceBusSessionProcessorOptions options) |
| | 102 | | { |
| 14 | 103 | | _innerProcessor = new ServiceBusProcessor( |
| 14 | 104 | | connection, |
| 14 | 105 | | entityPath, |
| 14 | 106 | | true, |
| 14 | 107 | | plugins, |
| 14 | 108 | | options.ToProcessorOptions(), |
| 14 | 109 | | options.SessionIds, |
| 14 | 110 | | options.MaxConcurrentSessions, |
| 14 | 111 | | options.MaxConcurrentCallsPerSession); |
| 14 | 112 | | } |
| | 113 | |
|
| | 114 | | /// <summary> |
| | 115 | | /// Initializes a new instance of the <see cref="ServiceBusSessionProcessor"/> class for mocking. |
| | 116 | | /// </summary> |
| 0 | 117 | | protected ServiceBusSessionProcessor() |
| | 118 | | { |
| 0 | 119 | | } |
| | 120 | |
|
| | 121 | | /// <summary> |
| | 122 | | /// The event responsible for processing messages received from the Queue or Subscription. Implementation |
| | 123 | | /// is mandatory. |
| | 124 | | /// </summary> |
| | 125 | | /// |
| | 126 | | [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju |
| | 127 | | [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s |
| | 128 | | public event Func<ProcessSessionMessageEventArgs, Task> ProcessMessageAsync |
| | 129 | | { |
| | 130 | | add |
| | 131 | | { |
| 14 | 132 | | _innerProcessor.ProcessSessionMessageAsync += value; |
| | 133 | |
|
| 10 | 134 | | } |
| | 135 | |
|
| | 136 | | remove |
| | 137 | | { |
| 6 | 138 | | _innerProcessor.ProcessSessionMessageAsync -= value; |
| 2 | 139 | | } |
| | 140 | | } |
| | 141 | |
|
| | 142 | | /// <summary> |
| | 143 | | /// The event responsible for processing unhandled exceptions thrown while this processor is running. |
| | 144 | | /// Implementation is mandatory. |
| | 145 | | /// </summary> |
| | 146 | | /// |
| | 147 | | [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju |
| | 148 | | [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s |
| | 149 | | public event Func<ProcessErrorEventArgs, Task> ProcessErrorAsync |
| | 150 | | { |
| | 151 | | add |
| | 152 | | { |
| 12 | 153 | | _innerProcessor.ProcessErrorAsync += value; |
| 8 | 154 | | } |
| | 155 | |
|
| | 156 | | remove |
| | 157 | | { |
| 6 | 158 | | _innerProcessor.ProcessErrorAsync -= value; |
| 2 | 159 | | } |
| | 160 | | } |
| | 161 | |
|
| | 162 | | /// <summary> |
| | 163 | | /// Optional event that can be set to be notified when a new session is about to be processed. |
| | 164 | | /// </summary> |
| | 165 | | [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju |
| | 166 | | [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s |
| | 167 | | public event Func<ProcessSessionEventArgs, Task> SessionInitializingAsync |
| | 168 | | { |
| | 169 | | add |
| | 170 | | { |
| 12 | 171 | | _innerProcessor.SessionInitializingAsync += value; |
| | 172 | |
|
| 8 | 173 | | } |
| | 174 | |
|
| | 175 | | remove |
| | 176 | | { |
| 6 | 177 | | _innerProcessor.SessionInitializingAsync -= value; |
| 2 | 178 | | } |
| | 179 | | } |
| | 180 | |
|
| | 181 | | /// <summary> |
| | 182 | | /// Optional event that can be set to be notified when a session is about to be closed for processing. |
| | 183 | | /// This means that the most recent <see cref="ServiceBusReceiver.ReceiveMessageAsync"/> call timed out, |
| | 184 | | /// so there are currently no messages available to be received for the session. |
| | 185 | | /// </summary> |
| | 186 | | [SuppressMessage("Usage", "AZC0002:Ensure all service methods take an optional CancellationToken parameter.", Ju |
| | 187 | | [SuppressMessage("Usage", "AZC0003:DO make service methods virtual.", Justification = "This member follows the s |
| | 188 | | public event Func<ProcessSessionEventArgs, Task> SessionClosingAsync |
| | 189 | | { |
| | 190 | | add |
| | 191 | | { |
| 12 | 192 | | _innerProcessor.SessionClosingAsync += value; |
| 8 | 193 | | } |
| | 194 | |
|
| | 195 | | remove |
| | 196 | | { |
| 6 | 197 | | _innerProcessor.SessionClosingAsync -= value; |
| 2 | 198 | | } |
| | 199 | | } |
| | 200 | |
|
| | 201 | | /// <summary> |
| | 202 | | /// Signals the <see cref="ServiceBusSessionProcessor" /> to begin processing messages. Should this method be ca |
| | 203 | | /// is running, no action is taken. |
| | 204 | | /// </summary> |
| | 205 | | /// |
| | 206 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 207 | | public virtual async Task StartProcessingAsync(CancellationToken cancellationToken = default) => |
| 4 | 208 | | await _innerProcessor.StartProcessingAsync(cancellationToken).ConfigureAwait(false); |
| | 209 | |
|
| | 210 | | /// <summary> |
| | 211 | | /// Signals the <see cref="ServiceBusSessionProcessor" /> to stop processing events. Should this method be calle |
| | 212 | | /// is not running, no action is taken. |
| | 213 | | /// </summary> |
| | 214 | | /// |
| | 215 | | /// <param name="cancellationToken">A <see cref="CancellationToken"/> instance to signal the request to cancel t |
| | 216 | | public virtual async Task StopProcessingAsync(CancellationToken cancellationToken = default) => |
| 0 | 217 | | await _innerProcessor.StopProcessingAsync(cancellationToken).ConfigureAwait(false); |
| | 218 | |
|
| | 219 | | /// <summary> |
| | 220 | | /// Determines whether the specified <see cref="System.Object" /> is equal to this instance. |
| | 221 | | /// </summary> |
| | 222 | | /// |
| | 223 | | /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param> |
| | 224 | | /// |
| | 225 | | /// <returns><c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, <c> |
| | 226 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 227 | | public override bool Equals(object obj) => base.Equals(obj); |
| | 228 | |
|
| | 229 | | /// <summary> |
| | 230 | | /// Returns a hash code for this instance. |
| | 231 | | /// </summary> |
| | 232 | | /// |
| | 233 | | /// <returns>A hash code for this instance, suitable for use in hashing algorithms and data structures like a ha |
| | 234 | | /// |
| | 235 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 236 | | public override int GetHashCode() => base.GetHashCode(); |
| | 237 | |
|
| | 238 | | /// <summary> |
| | 239 | | /// Converts the instance to string representation. |
| | 240 | | /// </summary> |
| | 241 | | /// |
| | 242 | | /// <returns>A <see cref="System.String" /> that represents this instance.</returns> |
| | 243 | | /// |
| | 244 | | [EditorBrowsable(EditorBrowsableState.Never)] |
| 0 | 245 | | public override string ToString() => base.ToString(); |
| | 246 | | } |
| | 247 | | } |