| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Threading; |
| | 6 | | using System.Threading.Tasks; |
| | 7 | | using Azure.Messaging.ServiceBus.Core; |
| | 8 | | using Azure.Core; |
| | 9 | | using Azure.Messaging.ServiceBus.Diagnostics; |
| | 10 | | using Azure.Core.Pipeline; |
| | 11 | | using Azure.Messaging.ServiceBus.Plugins; |
| | 12 | | using System.Collections.Generic; |
| | 13 | |
|
| | 14 | | namespace Azure.Messaging.ServiceBus |
| | 15 | | { |
| | 16 | | /// <summary> |
| | 17 | | /// The <see cref="ServiceBusSessionReceiver" /> is responsible for receiving <see cref="ServiceBusReceivedMessage" |
| | 18 | | /// and settling messages from session-enabled Queues and Subscriptions. It is constructed by calling |
| | 19 | | /// <see cref="ServiceBusClient.CreateSessionReceiverAsync(string, string, ServiceBusSessionReceiverOptions, Cancel |
| | 20 | | /// </summary> |
| | 21 | | public class ServiceBusSessionReceiver : ServiceBusReceiver |
| | 22 | | { |
| | 23 | | /// <summary> |
| | 24 | | /// The Session Id associated with the receiver. |
| | 25 | | /// </summary> |
| 24 | 26 | | public string SessionId => InnerReceiver.SessionId; |
| | 27 | |
|
| | 28 | | /// <summary> |
| | 29 | | /// Gets the <see cref="DateTimeOffset"/> that the receiver's session is locked until. |
| | 30 | | /// </summary> |
| 0 | 31 | | public DateTimeOffset SessionLockedUntil => InnerReceiver.SessionLockedUntil; |
| | 32 | |
|
| | 33 | | /// <summary> |
| | 34 | | /// Creates a session receiver which can be used to interact with all messages with the same sessionId. |
| | 35 | | /// </summary> |
| | 36 | | /// |
| | 37 | | /// <param name="entityPath">The name of the specific queue to associate the receiver with.</param> |
| | 38 | | /// <param name="connection">The <see cref="ServiceBusConnection" /> connection to use for communication with th |
| | 39 | | /// <param name="plugins">The set of plugins to apply to incoming messages.</param> |
| | 40 | | /// <param name="options">A set of options to apply when configuring the receiver.</param> |
| | 41 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 42 | | /// |
| | 43 | | ///<returns>Returns a new instance of the <see cref="ServiceBusSessionReceiver"/> class.</returns> |
| | 44 | | internal static async Task<ServiceBusSessionReceiver> CreateSessionReceiverAsync( |
| | 45 | | string entityPath, |
| | 46 | | ServiceBusConnection connection, |
| | 47 | | IList<ServiceBusPlugin> plugins, |
| | 48 | | ServiceBusSessionReceiverOptions options = default, |
| | 49 | | CancellationToken cancellationToken = default) |
| | 50 | | { |
| 0 | 51 | | var receiver = new ServiceBusSessionReceiver( |
| 0 | 52 | | connection: connection, |
| 0 | 53 | | entityPath: entityPath, |
| 0 | 54 | | plugins: plugins, |
| 0 | 55 | | options: options); |
| | 56 | | try |
| | 57 | | { |
| 0 | 58 | | await receiver.OpenLinkAsync(cancellationToken).ConfigureAwait(false); |
| 0 | 59 | | } |
| 0 | 60 | | catch (Exception ex) |
| | 61 | | { |
| 0 | 62 | | receiver.Logger.ClientCreateException(typeof(ServiceBusSessionReceiver), receiver.FullyQualifiedNamespac |
| 0 | 63 | | throw; |
| | 64 | | } |
| 0 | 65 | | receiver.Logger.ClientCreateComplete(typeof(ServiceBusSessionReceiver), receiver.Identifier); |
| 0 | 66 | | return receiver; |
| 0 | 67 | | } |
| | 68 | |
|
| | 69 | | /// <summary> |
| | 70 | | /// Initializes a new instance of the <see cref="ServiceBusReceiver"/> class. |
| | 71 | | /// </summary> |
| | 72 | | /// |
| | 73 | | /// <param name="connection">The <see cref="ServiceBusConnection" /> connection to use for communication with th |
| | 74 | | /// <param name="entityPath"></param> |
| | 75 | | /// <param name="plugins">The set of plugins to apply to incoming messages.</param> |
| | 76 | | /// <param name="options">A set of options to apply when configuring the consumer.</param> |
| | 77 | | internal ServiceBusSessionReceiver( |
| | 78 | | ServiceBusConnection connection, |
| | 79 | | string entityPath, |
| | 80 | | IList<ServiceBusPlugin> plugins, |
| | 81 | | ServiceBusSessionReceiverOptions options) : |
| 14 | 82 | | base(connection, entityPath, true, plugins, options?.ToReceiverOptions(), options?.SessionId) |
| | 83 | | { |
| 14 | 84 | | } |
| | 85 | |
|
| | 86 | | /// <summary> |
| | 87 | | /// Initializes a new instance of the <see cref="ServiceBusReceiver"/> class for mocking. |
| | 88 | | /// </summary> |
| | 89 | | /// |
| 8 | 90 | | protected ServiceBusSessionReceiver() : base() { } |
| | 91 | |
|
| | 92 | | /// <summary> |
| | 93 | | /// Gets the session state. |
| | 94 | | /// </summary> |
| | 95 | | /// |
| | 96 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 97 | | /// |
| | 98 | | /// <returns>The session state as byte array.</returns> |
| | 99 | | public virtual async Task<byte[]> GetSessionStateAsync(CancellationToken cancellationToken = default) |
| | 100 | | { |
| 4 | 101 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver)); |
| 4 | 102 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 103 | | Logger.GetSessionStateStart(Identifier, SessionId); |
| 4 | 104 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 105 | | DiagnosticProperty.GetSessionStateActivityName, |
| 4 | 106 | | sessionId: SessionId); |
| 4 | 107 | | scope.Start(); |
| | 108 | |
|
| 4 | 109 | | byte[] sessionState = null; |
| | 110 | |
|
| | 111 | | try |
| | 112 | | { |
| 4 | 113 | | sessionState = await InnerReceiver.GetStateAsync(cancellationToken).ConfigureAwait(false); |
| 2 | 114 | | } |
| 2 | 115 | | catch (Exception exception) |
| | 116 | | { |
| 2 | 117 | | Logger.GetSessionStateException(Identifier, exception.ToString()); |
| 2 | 118 | | scope.Failed(exception); |
| 2 | 119 | | throw; |
| | 120 | | } |
| | 121 | |
|
| 2 | 122 | | Logger.GetSessionStateComplete(Identifier); |
| 2 | 123 | | return sessionState; |
| 2 | 124 | | } |
| | 125 | |
|
| | 126 | | /// <summary> |
| | 127 | | /// Set a custom state on the session which can be later retrieved using <see cref="GetSessionStateAsync"/> |
| | 128 | | /// </summary> |
| | 129 | | /// |
| | 130 | | /// <param name="sessionState">A byte array of session state</param> |
| | 131 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 132 | | /// |
| | 133 | | /// <remarks>This state is stored on Service Bus forever unless you set an empty state on it.</remarks> |
| | 134 | | /// |
| | 135 | | /// <returns>A task to be resolved on when the operation has completed.</returns> |
| | 136 | | public virtual async Task SetSessionStateAsync( |
| | 137 | | byte[] sessionState, |
| | 138 | | CancellationToken cancellationToken = default) |
| | 139 | | { |
| 4 | 140 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver)); |
| 4 | 141 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 142 | | Logger.SetSessionStateStart(Identifier, SessionId); |
| 4 | 143 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 144 | | DiagnosticProperty.SetSessionStateActivityName, |
| 4 | 145 | | sessionId: SessionId); |
| 4 | 146 | | scope.Start(); |
| | 147 | |
|
| | 148 | | try |
| | 149 | | { |
| 4 | 150 | | await InnerReceiver.SetStateAsync(sessionState, cancellationToken).ConfigureAwait(false); |
| 2 | 151 | | } |
| 2 | 152 | | catch (Exception exception) |
| | 153 | | { |
| 2 | 154 | | Logger.SetSessionStateException(Identifier, exception.ToString()); |
| 2 | 155 | | scope.Failed(exception); |
| 2 | 156 | | throw; |
| | 157 | | } |
| | 158 | |
|
| 2 | 159 | | Logger.SetSessionStateComplete(Identifier); |
| 2 | 160 | | } |
| | 161 | |
|
| | 162 | | /// <summary> |
| | 163 | | /// Renews the lock on the session specified by the <see cref="SessionId"/>. The lock will be renewed based on t |
| | 164 | | /// </summary> |
| | 165 | | /// |
| | 166 | | /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t |
| | 167 | | /// |
| | 168 | | /// <remarks> |
| | 169 | | /// <para> |
| | 170 | | /// When you get session receiver, the session is locked for this receiver by the service for a duration as spec |
| | 171 | | /// If processing of the session requires longer than this duration, the session-lock needs to be renewed. |
| | 172 | | /// For each renewal, it resets the time the session is locked by the LockDuration set on the Entity. |
| | 173 | | /// </para> |
| | 174 | | /// <para> |
| | 175 | | /// Renewal of session renews all the messages in the session as well. Each individual message need not be renew |
| | 176 | | /// </para> |
| | 177 | | /// </remarks> |
| | 178 | | public virtual async Task RenewSessionLockAsync(CancellationToken cancellationToken = default) |
| | 179 | | { |
| 4 | 180 | | Argument.AssertNotDisposed(IsDisposed, nameof(ServiceBusSessionReceiver)); |
| 4 | 181 | | cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>(); |
| 4 | 182 | | Logger.RenewSessionLockStart(Identifier, SessionId); |
| 4 | 183 | | using DiagnosticScope scope = ScopeFactory.CreateScope( |
| 4 | 184 | | DiagnosticProperty.RenewSessionLockActivityName, |
| 4 | 185 | | sessionId: SessionId); |
| 4 | 186 | | scope.Start(); |
| | 187 | |
|
| | 188 | | try |
| | 189 | | { |
| 4 | 190 | | await InnerReceiver.RenewSessionLockAsync(cancellationToken).ConfigureAwait(false); |
| 2 | 191 | | } |
| 2 | 192 | | catch (Exception exception) |
| | 193 | | { |
| 2 | 194 | | Logger.RenewSessionLockException(Identifier, exception.ToString()); |
| 2 | 195 | | scope.Failed(exception); |
| 2 | 196 | | throw; |
| | 197 | | } |
| | 198 | |
|
| 2 | 199 | | Logger.RenewSessionLockComplete(Identifier); |
| 2 | 200 | | } |
| | 201 | | } |
| | 202 | | } |