| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Globalization; |
| | 6 | | using System.Net.Sockets; |
| | 7 | | using System.Threading; |
| | 8 | | using System.Threading.Tasks; |
| | 9 | | using Azure.Core; |
| | 10 | |
|
| | 11 | | namespace Azure.Messaging.EventHubs.Core |
| | 12 | | { |
| | 13 | | /// <summary> |
| | 14 | | /// The default retry policy for the Event Hubs client library, respecting the |
| | 15 | | /// configuration specified as a set of <see cref="EventHubsRetryOptions" />. |
| | 16 | | /// </summary> |
| | 17 | | /// |
| | 18 | | /// <seealso cref="EventHubsRetryOptions"/> |
| | 19 | | /// |
| | 20 | | internal class BasicRetryPolicy : EventHubsRetryPolicy |
| | 21 | | { |
| | 22 | | /// <summary>The seed to use for initializing random number generated for a given thread-specific instance.</sum |
| 2 | 23 | | private static int s_randomSeed = Environment.TickCount; |
| | 24 | |
|
| | 25 | | /// <summary>The random number generator to use for a specific thread.</summary> |
| 30 | 26 | | private static readonly ThreadLocal<Random> RandomNumberGenerator = new ThreadLocal<Random>(() => new Random(Int |
| | 27 | |
|
| | 28 | | /// <summary> |
| | 29 | | /// The set of options responsible for configuring the retry |
| | 30 | | /// behavior. |
| | 31 | | /// </summary> |
| | 32 | | /// |
| 1670 | 33 | | public EventHubsRetryOptions Options { get; } |
| | 34 | |
|
| | 35 | | /// <summary> |
| | 36 | | /// The factor to apply to the base delay for use as a base jitter value. |
| | 37 | | /// </summary> |
| | 38 | | /// |
| | 39 | | /// <value>This factor is used as the basis for random jitter to apply to the calculated retry duration.</value> |
| | 40 | | /// |
| 844 | 41 | | public double JitterFactor { get; } = 0.08; |
| | 42 | |
|
| | 43 | | /// <summary> |
| | 44 | | /// The minimum number of seconds to increase the calculated retry duration when a |
| | 45 | | /// service signals a request to throttle. |
| | 46 | | /// </summary> |
| | 47 | | /// |
| 762 | 48 | | public int MinimumThrottleSeconds { get; } = 4; |
| | 49 | |
|
| | 50 | | /// <summary> |
| | 51 | | /// The maximum number of seconds to increase the calculated retry duration when a |
| | 52 | | /// service signals a request to throttle. |
| | 53 | | /// </summary> |
| | 54 | | /// |
| 762 | 55 | | public int MaximumThrottleSeconds { get; } = 8; |
| | 56 | |
|
| | 57 | | /// <summary> |
| | 58 | | /// Initializes a new instance of the <see cref="BasicRetryPolicy"/> class. |
| | 59 | | /// </summary> |
| | 60 | | /// |
| | 61 | | /// <param name="retryOptions">The options which control the retry approach.</param> |
| | 62 | | /// |
| 726 | 63 | | public BasicRetryPolicy(EventHubsRetryOptions retryOptions) |
| | 64 | | { |
| 726 | 65 | | Argument.AssertNotNull(retryOptions, nameof(retryOptions)); |
| 726 | 66 | | Options = retryOptions; |
| 726 | 67 | | } |
| | 68 | |
|
| | 69 | | /// <summary> |
| | 70 | | /// Calculates the amount of time to allow the current attempt for an operation to |
| | 71 | | /// complete before considering it to be timed out. |
| | 72 | | /// </summary> |
| | 73 | | /// |
| | 74 | | /// <param name="attemptCount">The number of total attempts that have been made, including the initial attempt b |
| | 75 | | /// |
| | 76 | | /// <returns>The amount of time to allow for an operation to complete.</returns> |
| | 77 | | /// |
| 236 | 78 | | public override TimeSpan CalculateTryTimeout(int attemptCount) => Options.TryTimeout; |
| | 79 | |
|
| | 80 | | /// <summary> |
| | 81 | | /// Calculates the amount of time to wait before another attempt should be made. |
| | 82 | | /// </summary> |
| | 83 | | /// |
| | 84 | | /// <param name="lastException">The last exception that was observed for the operation to be retried.</param> |
| | 85 | | /// <param name="attemptCount">The number of total attempts that have been made, including the initial attempt b |
| | 86 | | /// |
| | 87 | | /// <returns>The amount of time to delay before retrying the associated operation; if <c>null</c>, then the oper |
| | 88 | | /// |
| | 89 | | public override TimeSpan? CalculateRetryDelay(Exception lastException, |
| | 90 | | int attemptCount) |
| | 91 | | { |
| 272 | 92 | | if ((Options.MaximumRetries <= 0) |
| 272 | 93 | | || (Options.Delay == TimeSpan.Zero) |
| 272 | 94 | | || (Options.MaximumDelay == TimeSpan.Zero) |
| 272 | 95 | | || (attemptCount > Options.MaximumRetries) |
| 272 | 96 | | || (!ShouldRetryException(lastException))) |
| | 97 | | { |
| 154 | 98 | | return null; |
| | 99 | | } |
| | 100 | |
|
| 118 | 101 | | var baseJitterSeconds = (Options.Delay.TotalSeconds * JitterFactor); |
| | 102 | |
|
| 118 | 103 | | TimeSpan retryDelay = Options.Mode switch |
| 118 | 104 | | { |
| 226 | 105 | | EventHubsRetryMode.Fixed => CalculateFixedDelay(Options.Delay.TotalSeconds, baseJitterSeconds, RandomNum |
| 128 | 106 | | EventHubsRetryMode.Exponential => CalculateExponentialDelay(attemptCount, Options.Delay.TotalSeconds, ba |
| 0 | 107 | | _ => throw new NotSupportedException(string.Format(CultureInfo.CurrentCulture, Resources.UnknownRetryMod |
| 118 | 108 | | }; |
| | 109 | |
|
| | 110 | | // If the exception indicates a service throttle, adjust the delay for an |
| | 111 | | // additional throttle factor. |
| | 112 | |
|
| 118 | 113 | | if (IsThrottleException(lastException)) |
| | 114 | | { |
| 36 | 115 | | retryDelay = retryDelay.Add(TimeSpan.FromSeconds(RandomNumberGenerator.Value.Next(MinimumThrottleSeconds |
| | 116 | | } |
| | 117 | |
|
| | 118 | | // Adjust the delay, if needed, to keep within the maximum |
| | 119 | | // duration. |
| | 120 | |
|
| 118 | 121 | | if (Options.MaximumDelay < retryDelay) |
| | 122 | | { |
| 40 | 123 | | return Options.MaximumDelay; |
| | 124 | | } |
| | 125 | |
|
| 78 | 126 | | return retryDelay; |
| | 127 | | } |
| | 128 | |
|
| | 129 | | /// <summary> |
| | 130 | | /// Determines if an exception should be retried. |
| | 131 | | /// </summary> |
| | 132 | | /// |
| | 133 | | /// <param name="exception">The exception to consider.</param> |
| | 134 | | /// |
| | 135 | | /// <returns><c>true</c> to retry the exception; otherwise, <c>false</c>.</returns> |
| | 136 | | /// |
| | 137 | | private static bool ShouldRetryException(Exception exception) |
| | 138 | | { |
| 174 | 139 | | if ((exception is TaskCanceledException) || (exception is OperationCanceledException)) |
| | 140 | | { |
| 0 | 141 | | exception = exception?.InnerException; |
| | 142 | | } |
| 174 | 143 | | else if (exception is AggregateException aggregateEx) |
| | 144 | | { |
| 0 | 145 | | exception = aggregateEx?.Flatten().InnerException; |
| | 146 | | } |
| | 147 | |
|
| | 148 | | switch (exception) |
| | 149 | | { |
| | 150 | | case null: |
| 0 | 151 | | return false; |
| | 152 | |
|
| | 153 | | case EventHubsException ex: |
| 122 | 154 | | return ex.IsTransient; |
| | 155 | |
|
| | 156 | | case TimeoutException _: |
| | 157 | | case SocketException _: |
| 0 | 158 | | return true; |
| | 159 | |
|
| | 160 | | default: |
| 52 | 161 | | return false; |
| | 162 | | } |
| | 163 | | } |
| | 164 | |
|
| | 165 | | /// <summary> |
| | 166 | | /// Determines if an exception represents a request to throttle. |
| | 167 | | /// </summary> |
| | 168 | | /// |
| | 169 | | /// <param name="exception">The exception to consider.</param> |
| | 170 | | /// |
| | 171 | | /// <returns><c>true</c> to consider the exception as a throttle request; otherwise, <c>false</c>.</returns> |
| | 172 | | /// |
| | 173 | | private static bool IsThrottleException(Exception exception) => |
| 118 | 174 | | exception switch |
| 118 | 175 | | { |
| 272 | 176 | | EventHubsException ex when (ex.Reason == EventHubsException.FailureReason.ServiceBusy) => true, |
| 200 | 177 | | _ => false |
| 118 | 178 | | }; |
| | 179 | |
|
| | 180 | | /// <summary> |
| | 181 | | /// Calculates the delay for an exponential back-off. |
| | 182 | | /// </summary> |
| | 183 | | /// |
| | 184 | | /// <param name="attemptCount">The number of total attempts that have been made, including the initial attempt b |
| | 185 | | /// <param name="baseDelaySeconds">The delay to use as a basis for the exponential back-off, in seconds.</param> |
| | 186 | | /// <param name="baseJitterSeconds">The delay to use as the basis for a random jitter value, in seconds.</param> |
| | 187 | | /// <param name="random">The random number generator to use for the calculation.</param> |
| | 188 | | /// |
| | 189 | | /// <returns>The recommended duration to delay before retrying; this value does not take the maximum delay or el |
| | 190 | | /// |
| | 191 | | private static TimeSpan CalculateExponentialDelay(int attemptCount, |
| | 192 | | double baseDelaySeconds, |
| | 193 | | double baseJitterSeconds, |
| | 194 | | Random random) => |
| 10 | 195 | | TimeSpan.FromSeconds((Math.Pow(2, attemptCount) * baseDelaySeconds) + (random.NextDouble() * baseJitterSecon |
| | 196 | |
|
| | 197 | | /// <summary> |
| | 198 | | /// Calculates the delay for a fixed back-off. |
| | 199 | | /// </summary> |
| | 200 | | /// |
| | 201 | | /// <param name="baseDelaySeconds">The delay to use as a basis for the fixed back-off, in seconds.</param> |
| | 202 | | /// <param name="baseJitterSeconds">The delay to use as the basis for a random jitter value, in seconds.</param> |
| | 203 | | /// <param name="random">The random number generator to use for the calculation.</param> |
| | 204 | | /// |
| | 205 | | /// <returns>The recommended duration to delay before retrying; this value does not take the maximum delay or el |
| | 206 | | /// |
| | 207 | | private static TimeSpan CalculateFixedDelay(double baseDelaySeconds, |
| | 208 | | double baseJitterSeconds, |
| | 209 | | Random random) => |
| 108 | 210 | | TimeSpan.FromSeconds(baseDelaySeconds + (random.NextDouble() * baseJitterSeconds)); |
| | 211 | | } |
| | 212 | | } |