| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System.IO; |
| | 5 | | using Azure.Core; |
| | 6 | | using System; |
| | 7 | | using System.Net; |
| | 8 | | using System.Text; |
| | 9 | | using System.Threading; |
| | 10 | | using System.Threading.Tasks; |
| | 11 | | using Azure.Messaging.ServiceBus.Authorization; |
| | 12 | | using Azure.Messaging.ServiceBus.Primitives; |
| | 13 | | using Azure.Core.Pipeline; |
| | 14 | | using System.Collections.Generic; |
| | 15 | | using System.Globalization; |
| | 16 | |
|
| | 17 | | namespace Azure.Messaging.ServiceBus.Management |
| | 18 | | { |
| | 19 | | internal class HttpRequestAndResponse |
| | 20 | | { |
| | 21 | | private readonly HttpPipeline _pipeline; |
| | 22 | | private readonly string _fullyQualifiedNamespace; |
| | 23 | | private readonly TokenCredential _tokenCredential; |
| | 24 | | private readonly int _port; |
| | 25 | | private readonly ClientDiagnostics _diagnostics; |
| | 26 | |
|
| | 27 | | /// <summary> |
| | 28 | | /// Initializes a new <see cref="HttpRequestAndResponse"/> which can be used to send http request and response. |
| | 29 | | /// </summary> |
| 46 | 30 | | public HttpRequestAndResponse( |
| 46 | 31 | | HttpPipeline pipeline, |
| 46 | 32 | | ClientDiagnostics diagnostics, |
| 46 | 33 | | TokenCredential tokenCredential, |
| 46 | 34 | | string fullyQualifiedNamespace) |
| | 35 | | { |
| 46 | 36 | | _pipeline = pipeline; |
| 46 | 37 | | _diagnostics = diagnostics; |
| 46 | 38 | | _tokenCredential = tokenCredential; |
| 46 | 39 | | _fullyQualifiedNamespace = fullyQualifiedNamespace; |
| 46 | 40 | | _port = GetPort(_fullyQualifiedNamespace); |
| 46 | 41 | | } |
| | 42 | |
|
| | 43 | |
|
| | 44 | | internal async Task ThrowIfRequestFailedAsync(Request request, Response response) |
| | 45 | | { |
| 356 | 46 | | if ((response.Status >= 200) && (response.Status < 400)) |
| | 47 | | { |
| 288 | 48 | | return; |
| | 49 | | } |
| 68 | 50 | | RequestFailedException ex = await _diagnostics.CreateRequestFailedExceptionAsync(response).ConfigureAwait(fa |
| 68 | 51 | | if (response.Status == (int)HttpStatusCode.Unauthorized) |
| | 52 | | { |
| 2 | 53 | | throw new ServiceBusException( |
| 2 | 54 | | ex.Message, |
| 2 | 55 | | ServiceBusFailureReason.Unauthorized, |
| 2 | 56 | | innerException: ex); |
| | 57 | | } |
| | 58 | |
|
| 66 | 59 | | if (response.Status == (int)HttpStatusCode.NotFound) |
| | 60 | | { |
| 38 | 61 | | throw new ServiceBusException( |
| 38 | 62 | | ex.Message, |
| 38 | 63 | | ServiceBusFailureReason.MessagingEntityNotFound, |
| 38 | 64 | | innerException: ex); |
| | 65 | | } |
| | 66 | |
|
| 28 | 67 | | if (response.Status == (int)HttpStatusCode.Conflict) |
| | 68 | | { |
| 18 | 69 | | if (request.Method.Equals(RequestMethod.Delete)) |
| | 70 | | { |
| 2 | 71 | | throw new ServiceBusException(true, ex.Message, innerException: ex); |
| | 72 | | } |
| | 73 | |
|
| 16 | 74 | | if (request.Method.Equals(RequestMethod.Put) && request.Headers.TryGetValue("If-Match", out _)) |
| | 75 | | { |
| | 76 | | // response.RequestMessage.Headers.IfMatch.Count > 0 is true for UpdateEntity scenario |
| 2 | 77 | | throw new ServiceBusException( |
| 2 | 78 | | true, |
| 2 | 79 | | ex.Message, |
| 2 | 80 | | innerException: ex); |
| | 81 | | } |
| | 82 | |
|
| 14 | 83 | | throw new ServiceBusException( |
| 14 | 84 | | ex.Message, |
| 14 | 85 | | ServiceBusFailureReason.MessagingEntityAlreadyExists, |
| 14 | 86 | | innerException: ex); |
| | 87 | | } |
| | 88 | |
|
| 10 | 89 | | if (response.Status == (int)HttpStatusCode.Forbidden) |
| | 90 | | { |
| 4 | 91 | | if (ex.Message.Contains(ManagementClientConstants.ForbiddenInvalidOperationSubCode)) |
| | 92 | | { |
| 2 | 93 | | throw new InvalidOperationException(ex.Message, ex); |
| | 94 | | } |
| | 95 | |
|
| 2 | 96 | | throw new ServiceBusException( |
| 2 | 97 | | ex.Message, |
| 2 | 98 | | ServiceBusFailureReason.QuotaExceeded, |
| 2 | 99 | | innerException: ex); |
| | 100 | | } |
| | 101 | |
|
| 6 | 102 | | if (response.Status == (int)HttpStatusCode.BadRequest) |
| | 103 | | { |
| 2 | 104 | | throw new ArgumentException(ex.Message, ex); |
| | 105 | | } |
| | 106 | |
|
| 4 | 107 | | if (response.Status == (int)HttpStatusCode.ServiceUnavailable) |
| | 108 | | { |
| 2 | 109 | | throw new ServiceBusException( |
| 2 | 110 | | ex.Message, |
| 2 | 111 | | ServiceBusFailureReason.ServiceBusy, |
| 2 | 112 | | innerException: ex); |
| | 113 | | } |
| | 114 | |
|
| 2 | 115 | | throw new ServiceBusException( |
| 2 | 116 | | true, |
| 2 | 117 | | $"{ex.Message}; response status code: {response.Status}", |
| 2 | 118 | | innerException: ex); |
| 288 | 119 | | } |
| | 120 | |
|
| | 121 | | private Task<string> GetToken(Uri requestUri) => |
| 320 | 122 | | GetTokenAsync(requestUri.GetLeftPart(UriPartial.Path)); |
| | 123 | |
|
| | 124 | | public async Task<string> GetTokenAsync(string requestUri) |
| | 125 | | { |
| 320 | 126 | | var scope = requestUri; |
| 320 | 127 | | var credential = (ServiceBusTokenCredential)_tokenCredential; |
| 320 | 128 | | if (!credential.IsSharedAccessSignatureCredential) |
| | 129 | | { |
| 0 | 130 | | scope = Constants.DefaultScope; |
| | 131 | | } |
| 320 | 132 | | AccessToken token = await _tokenCredential.GetTokenAsync(new TokenRequestContext(new[] { scope }), Cancellat |
| 320 | 133 | | return token.Token; |
| 320 | 134 | | } |
| | 135 | |
|
| | 136 | | public async Task<Page<T>> GetEntitiesPageAsync<T>( |
| | 137 | | string path, |
| | 138 | | string nextSkip, |
| | 139 | | Func<string, IReadOnlyList<T>> parseFunction, |
| | 140 | | CancellationToken cancellationToken) |
| | 141 | | { |
| 20 | 142 | | int skip = 0; |
| 20 | 143 | | int maxCount = 100; |
| 20 | 144 | | if (nextSkip != null) |
| | 145 | | { |
| 0 | 146 | | skip = int.Parse(nextSkip, CultureInfo.InvariantCulture); |
| | 147 | | } |
| 20 | 148 | | Response response = await GetEntityAsync(path, $"$skip={skip}&$top={maxCount}", false, cancellationToken).Co |
| 20 | 149 | | string result = await ReadAsString(response).ConfigureAwait(false); |
| | 150 | |
|
| 20 | 151 | | IReadOnlyList<T> description = parseFunction.Invoke(result); |
| 20 | 152 | | skip += maxCount; |
| 20 | 153 | | nextSkip = skip.ToString(CultureInfo.InvariantCulture); |
| | 154 | |
|
| 20 | 155 | | if (description.Count < maxCount || description.Count == 0) |
| | 156 | | { |
| 20 | 157 | | nextSkip = null; |
| | 158 | | } |
| 20 | 159 | | return Page<T>.FromValues(description, nextSkip, response); |
| 20 | 160 | | } |
| | 161 | |
|
| | 162 | | public async Task<Response> GetEntityAsync( |
| | 163 | | string entityPath, |
| | 164 | | string query, |
| | 165 | | bool enrich, |
| | 166 | | CancellationToken cancellationToken) |
| | 167 | | { |
| 128 | 168 | | var queryString = $"{ManagementClientConstants.apiVersionQuery}&enrich={enrich}"; |
| 128 | 169 | | if (query != null) |
| | 170 | | { |
| 20 | 171 | | queryString = queryString + "&" + query; |
| | 172 | | } |
| 128 | 173 | | Uri uri = new UriBuilder(_fullyQualifiedNamespace) |
| 128 | 174 | | { |
| 128 | 175 | | Path = entityPath, |
| 128 | 176 | | Scheme = Uri.UriSchemeHttps, |
| 128 | 177 | | Port = _port, |
| 128 | 178 | | Query = queryString |
| 128 | 179 | | }.Uri; |
| | 180 | |
|
| 128 | 181 | | RequestUriBuilder requestUriBuilder = new RequestUriBuilder(); |
| 128 | 182 | | requestUriBuilder.Reset(uri); |
| | 183 | |
|
| 128 | 184 | | Request request = _pipeline.CreateRequest(); |
| 128 | 185 | | request.Method = RequestMethod.Get; |
| 128 | 186 | | request.Uri = requestUriBuilder; |
| 128 | 187 | | Response response = await SendHttpRequestAsync(request, cancellationToken).ConfigureAwait(false); |
| | 188 | |
|
| 116 | 189 | | return response; |
| 116 | 190 | | } |
| | 191 | |
|
| | 192 | | public async Task<Response> PutEntityAsync( |
| | 193 | | string entityPath, |
| | 194 | | string requestBody, |
| | 195 | | bool isUpdate, |
| | 196 | | string forwardTo, |
| | 197 | | string fwdDeadLetterTo, |
| | 198 | | CancellationToken cancellationToken) |
| | 199 | | { |
| 136 | 200 | | Uri uri = new UriBuilder(_fullyQualifiedNamespace) |
| 136 | 201 | | { |
| 136 | 202 | | Path = entityPath, |
| 136 | 203 | | Port = _port, |
| 136 | 204 | | Scheme = Uri.UriSchemeHttps, |
| 136 | 205 | | Query = $"{ManagementClientConstants.apiVersionQuery}" |
| 136 | 206 | | }.Uri; |
| 136 | 207 | | var requestUriBuilder = new RequestUriBuilder(); |
| 136 | 208 | | requestUriBuilder.Reset(uri); |
| | 209 | |
|
| 136 | 210 | | Request request = _pipeline.CreateRequest(); |
| 136 | 211 | | request.Method = RequestMethod.Put; |
| 136 | 212 | | request.Uri = requestUriBuilder; |
| 136 | 213 | | request.Content = RequestContent.Create(Encoding.UTF8.GetBytes(requestBody)); |
| 136 | 214 | | request.Headers.Add("Content-Type", ManagementClientConstants.AtomContentType); |
| | 215 | |
|
| 136 | 216 | | if (isUpdate) |
| | 217 | | { |
| 32 | 218 | | request.Headers.Add("If-Match", "*"); |
| | 219 | | } |
| | 220 | |
|
| 136 | 221 | | var credential = (ServiceBusTokenCredential)_tokenCredential; |
| 136 | 222 | | if (!string.IsNullOrWhiteSpace(forwardTo)) |
| | 223 | | { |
| 0 | 224 | | var token = await GetTokenAsync(forwardTo).ConfigureAwait(false); |
| 0 | 225 | | request.Headers.Add( |
| 0 | 226 | | ManagementClientConstants.ServiceBusSupplementartyAuthorizationHeaderName, |
| 0 | 227 | | credential.IsSharedAccessSignatureCredential == true ? token : $"Bearer { token }"); |
| | 228 | | } |
| | 229 | |
|
| 136 | 230 | | if (!string.IsNullOrWhiteSpace(fwdDeadLetterTo)) |
| | 231 | | { |
| 0 | 232 | | var token = await GetTokenAsync(fwdDeadLetterTo).ConfigureAwait(false); |
| 0 | 233 | | request.Headers.Add( |
| 0 | 234 | | ManagementClientConstants.ServiceBusDlqSupplementaryAuthorizationHeaderName, |
| 0 | 235 | | credential.IsSharedAccessSignatureCredential == true ? token : $"Bearer { token }"); |
| | 236 | | } |
| | 237 | |
|
| 136 | 238 | | Response response = await SendHttpRequestAsync(request, cancellationToken).ConfigureAwait(false); |
| | 239 | |
|
| 112 | 240 | | return response; |
| 112 | 241 | | } |
| | 242 | |
|
| | 243 | | public async Task<Response> DeleteEntityAsync( |
| | 244 | | string entityPath, |
| | 245 | | CancellationToken cancellationToken) |
| | 246 | | { |
| 72 | 247 | | Uri uri = new UriBuilder(_fullyQualifiedNamespace) |
| 72 | 248 | | { |
| 72 | 249 | | Path = entityPath, |
| 72 | 250 | | Scheme = Uri.UriSchemeHttps, |
| 72 | 251 | | Port = _port, |
| 72 | 252 | | Query = ManagementClientConstants.apiVersionQuery |
| 72 | 253 | | }.Uri; |
| 72 | 254 | | var requestUriBuilder = new RequestUriBuilder(); |
| 72 | 255 | | requestUriBuilder.Reset(uri); |
| | 256 | |
|
| 72 | 257 | | Request request = _pipeline.CreateRequest(); |
| 72 | 258 | | request.Uri = requestUriBuilder; |
| 72 | 259 | | request.Method = RequestMethod.Delete; |
| | 260 | |
|
| 72 | 261 | | Response response = await SendHttpRequestAsync(request, cancellationToken).ConfigureAwait(false); |
| | 262 | |
|
| 60 | 263 | | return response; |
| 60 | 264 | | } |
| | 265 | |
|
| | 266 | | private async Task<Response> SendHttpRequestAsync( |
| | 267 | | Request request, |
| | 268 | | CancellationToken cancellationToken) |
| | 269 | | { |
| 336 | 270 | | var credential = (ServiceBusTokenCredential)_tokenCredential; |
| 336 | 271 | | if (credential.IsSharedAccessSignatureCredential) |
| | 272 | | { |
| 320 | 273 | | var token = await GetToken(request.Uri.ToUri()).ConfigureAwait(false); |
| 320 | 274 | | request.Headers.Add("Authorization", token); |
| | 275 | | } |
| | 276 | |
|
| 336 | 277 | | Response response = await _pipeline.SendRequestAsync(request, cancellationToken).ConfigureAwait(false); |
| | 278 | |
|
| 336 | 279 | | await ThrowIfRequestFailedAsync(request, response).ConfigureAwait(false); |
| 288 | 280 | | return response; |
| 288 | 281 | | } |
| | 282 | |
|
| | 283 | | private static async Task<string> ReadAsString(Response response) |
| | 284 | | { |
| | 285 | | string exceptionMessage; |
| 20 | 286 | | using StreamReader reader = new StreamReader(response.ContentStream); |
| 20 | 287 | | exceptionMessage = await reader.ReadToEndAsync().ConfigureAwait(false); |
| 20 | 288 | | return exceptionMessage; |
| 20 | 289 | | } |
| | 290 | |
|
| | 291 | | private static int GetPort(string endpoint) |
| | 292 | | { |
| | 293 | | // used for internal testing |
| 46 | 294 | | if (endpoint.EndsWith("onebox.windows-int.net", StringComparison.InvariantCultureIgnoreCase)) |
| | 295 | | { |
| 0 | 296 | | return 4446; |
| | 297 | | } |
| | 298 | |
|
| 46 | 299 | | return -1; |
| | 300 | | } |
| | 301 | | } |
| | 302 | | } |