| | 1 | | // Copyright (c) Microsoft Corporation. All rights reserved. |
| | 2 | | // Licensed under the MIT License. |
| | 3 | |
|
| | 4 | | using System; |
| | 5 | | using System.Collections.Generic; |
| | 6 | | using System.Globalization; |
| | 7 | | using System.IO; |
| | 8 | | using System.Text; |
| | 9 | | using System.Threading; |
| | 10 | | using System.Threading.Tasks; |
| | 11 | | using Azure.Core; |
| | 12 | | using Azure.Core.Http.Multipart; |
| | 13 | |
|
| | 14 | | namespace Azure.Storage.Blobs.Specialized |
| | 15 | | { |
| | 16 | | /// <summary> |
| | 17 | | /// Provides support for creating and parsing multipart/mixed content. |
| | 18 | | /// This is implementing a couple of layered standards as mentioned at |
| | 19 | | /// https://docs.microsoft.com/en-us/rest/api/storageservices/blob-batch |
| | 20 | | /// including https://www.odata.org/documentation/odata-version-3-0/batch-processing/ |
| | 21 | | /// and https://www.ietf.org/rfc/rfc2046.txt. |
| | 22 | | /// </summary> |
| | 23 | | internal static class Multipart |
| | 24 | | { |
| | 25 | | /// <summary> |
| | 26 | | /// Get a random GUID to use as our multipart boundary. This is a hack |
| | 27 | | /// to allow for repeatable boundaries in our recorded unit tests. |
| | 28 | | /// </summary> |
| 0 | 29 | | internal static Func<Guid> GetRandomGuid { get; set; } = () => Guid.NewGuid(); |
| | 30 | |
|
| | 31 | | /// <summary> |
| | 32 | | /// Create a multipart/mixed request body that combines several |
| | 33 | | /// messages. |
| | 34 | | /// </summary> |
| | 35 | | /// <param name="messages"> |
| | 36 | | /// The batch sub-operation messages to submit together. |
| | 37 | | /// </param> |
| | 38 | | /// <param name="prefix"> |
| | 39 | | /// A prefix used for the multipart/mixed boundary. |
| | 40 | | /// </param> |
| | 41 | | /// <param name="async"> |
| | 42 | | /// Whether to invoke the operation asynchronously. |
| | 43 | | /// </param> |
| | 44 | | /// <param name="cancellationToken"> |
| | 45 | | /// Optional <see cref="CancellationToken"/> to propagate notifications |
| | 46 | | /// that the operation should be cancelled. |
| | 47 | | /// </param> |
| | 48 | | /// <returns> |
| | 49 | | /// A tuple containing the batch sub-operation messages merged into a |
| | 50 | | /// single multipart/mixed content stream and content type. |
| | 51 | | /// </returns> |
| | 52 | | public static Task<(Stream, string)> CreateAsync( |
| | 53 | | IEnumerable<HttpMessage> messages, |
| | 54 | | string prefix, |
| | 55 | | #pragma warning disable CA1801 // Remove unused parameter (leaving for future changes) |
| | 56 | | #pragma warning disable IDE0060 // Remove unused parameter |
| | 57 | | bool async, |
| | 58 | | CancellationToken cancellationToken) |
| | 59 | | #pragma warning restore IDE0060 // Remove unused parameter |
| | 60 | | #pragma warning restore CA1801 // Remove unused parameter |
| | 61 | | { |
| | 62 | | // Set the content-type |
| 92 | 63 | | var boundary = $"{prefix}_{GetRandomGuid().ToString()}"; |
| 92 | 64 | | var contentType = BatchConstants.MultipartContentTypePrefix + boundary; |
| | 65 | |
|
| | 66 | | // TODO: Investigate whether to use a StreamWriter instead of a |
| | 67 | | // StringBuilder we turn into a MemoryStream (this will have to |
| | 68 | | // happen once we support binary content anyway). |
| 92 | 69 | | var content = new StringBuilder(1024); |
| | 70 | |
|
| | 71 | | // We're implementing the limited subset of |
| | 72 | | // https://www.ietf.org/rfc/rfc2046.txt required for Storage |
| | 73 | | // batching. The format needs to be followed precisely for the |
| | 74 | | // service to correctly parse the request. |
| | 75 | |
|
| | 76 | | const string newline = "\r\n"; |
| 92 | 77 | | var operationId = 0; |
| 4792 | 78 | | foreach (HttpMessage message in messages) |
| | 79 | | { |
| | 80 | | // Write the boundary |
| 2304 | 81 | | content.Append(BatchConstants.BatchSeparator).Append(boundary).Append(newline); |
| 2304 | 82 | | content.Append(BatchConstants.RequestContentType).Append(newline); |
| 2304 | 83 | | content.Append(BatchConstants.RequestContentTransferEncoding).Append(newline); |
| 2304 | 84 | | content.Append(BatchConstants.ContentIdName).Append(": ").Append(operationId++.ToString(CultureInfo.Inva |
| 2304 | 85 | | content.Append(newline); |
| | 86 | |
|
| | 87 | | // Write the request URI |
| 2304 | 88 | | content |
| 2304 | 89 | | .Append(message.Request.Method.Method) |
| 2304 | 90 | | .Append(" ") |
| 2304 | 91 | | .Append(message.Request.Uri.PathAndQuery) |
| 2304 | 92 | | .Append(" ") |
| 2304 | 93 | | .Append(BatchConstants.HttpVersion) |
| 2304 | 94 | | .Append(newline); |
| | 95 | |
|
| | 96 | | // Write the request headers |
| 13984 | 97 | | foreach (HttpHeader header in message.Request.Headers) |
| | 98 | | { |
| 4688 | 99 | | content.Append(header.Name).Append(": ").Append(header.Value).Append(newline); |
| | 100 | | } |
| | 101 | |
|
| | 102 | | // Write the request content (or lack thereof since none of |
| | 103 | | // the current possible operations includes a request body) |
| 2304 | 104 | | content.Append(BatchConstants.ContentLengthName).Append(": 0").Append(newline); |
| | 105 | |
|
| | 106 | | // Add an extra line |
| 2304 | 107 | | content.Append(newline); |
| | 108 | | } |
| | 109 | |
|
| | 110 | | // Write the final boundary |
| 92 | 111 | | content.Append(BatchConstants.BatchSeparator).Append(boundary).Append(BatchConstants.BatchSeparator).Append( |
| | 112 | |
|
| | 113 | | // Convert the content into a request stream |
| 92 | 114 | | Stream stream = new MemoryStream(System.Text.Encoding.UTF8.GetBytes(content.ToString())); |
| | 115 | |
|
| | 116 | | // Return the content and type |
| 92 | 117 | | return Task.FromResult((stream, contentType)); |
| | 118 | | } |
| | 119 | |
|
| | 120 | | /// <summary> |
| | 121 | | /// Parse a multipart/mixed response body into several responses. |
| | 122 | | /// </summary> |
| | 123 | | /// <param name="batchContent">The response content.</param> |
| | 124 | | /// <param name="batchContentType">The response content type.</param> |
| | 125 | | /// <param name="async"> |
| | 126 | | /// Whether to invoke the operation asynchronously. |
| | 127 | | /// </param> |
| | 128 | | /// <param name="cancellationToken"> |
| | 129 | | /// Optional <see cref="CancellationToken"/> to propagate notifications |
| | 130 | | /// that the operation should be cancelled. |
| | 131 | | /// </param> |
| | 132 | | /// <returns>The parsed <see cref="Response"/>s.</returns> |
| | 133 | | public static async Task<Response[]> ParseAsync( |
| | 134 | | Stream batchContent, |
| | 135 | | string batchContentType, |
| | 136 | | bool async, |
| | 137 | | CancellationToken cancellationToken) |
| | 138 | | { |
| | 139 | | // Get the batch boundary |
| 88 | 140 | | if (batchContentType == null || |
| 88 | 141 | | !batchContentType.StartsWith(BatchConstants.MultipartContentTypePrefix, StringComparison.Ordinal)) |
| | 142 | | { |
| 0 | 143 | | throw BatchErrors.InvalidBatchContentType(batchContentType); |
| | 144 | | } |
| 88 | 145 | | string batchBoundary = batchContentType.Substring(BatchConstants.MultipartContentTypePrefix.Length); |
| | 146 | |
|
| | 147 | | // Collect the responses in a dictionary (in case the Content-ID |
| | 148 | | // values come back out of order) |
| 88 | 149 | | Dictionary<int, Response> responses = new Dictionary<int, Response>(); |
| | 150 | |
|
| | 151 | | // Read through the batch body one section at a time until the |
| | 152 | | // reader returns null |
| 88 | 153 | | MultipartReader reader = new MultipartReader(batchBoundary, batchContent); |
| 88 | 154 | | for (MultipartSection section = await reader.GetNextSectionAsync(async, cancellationToken).ConfigureAwait(fa |
| 1360 | 155 | | section != null; |
| 1272 | 156 | | section = await reader.GetNextSectionAsync(async, cancellationToken).ConfigureAwait(false)) |
| | 157 | | { |
| | 158 | | // Get the Content-ID header |
| 1272 | 159 | | if (!section.Headers.TryGetValue(BatchConstants.ContentIdName, out StringValues contentIdValues) || |
| 1272 | 160 | | contentIdValues.Count != 1 || |
| 1272 | 161 | | !int.TryParse(contentIdValues[0], out int contentId)) |
| | 162 | | { |
| | 163 | | // If the header wasn't found, this is a failed request |
| | 164 | | // with the details being sent as the first sub-operation |
| | 165 | | // so we default the Content-ID to 0 |
| 4 | 166 | | contentId = 0; |
| | 167 | | } |
| | 168 | |
|
| | 169 | | // Build a response |
| 1272 | 170 | | MemoryResponse response = new MemoryResponse(); |
| 1272 | 171 | | responses[contentId] = response; |
| | 172 | |
|
| | 173 | | // We're going to read the section's response body line by line |
| 1272 | 174 | | using var body = new BufferedReadStream(section.Body, BatchConstants.ResponseLineSize); |
| | 175 | |
|
| | 176 | | // The first line is the status like "HTTP/1.1 202 Accepted" |
| 1272 | 177 | | string line = await body.ReadLineAsync(async, cancellationToken).ConfigureAwait(false); |
| 1272 | 178 | | string[] status = line.Split(new char[] { ' ' }, 3, StringSplitOptions.RemoveEmptyEntries); |
| 1272 | 179 | | if (status.Length != 3) |
| | 180 | | { |
| 0 | 181 | | throw BatchErrors.InvalidHttpStatusLine(line); |
| | 182 | | } |
| 1272 | 183 | | response.SetStatus(int.Parse(status[1], CultureInfo.InvariantCulture)); |
| 1272 | 184 | | response.SetReasonPhrase(status[2]); |
| | 185 | |
|
| | 186 | | // Continue reading headers until we reach a blank line |
| 1272 | 187 | | line = await body.ReadLineAsync(async, cancellationToken).ConfigureAwait(false); |
| 6464 | 188 | | while (!string.IsNullOrEmpty(line)) |
| | 189 | | { |
| | 190 | | // Split the header into the name and value |
| 5192 | 191 | | int splitIndex = line.IndexOf(':'); |
| 5192 | 192 | | if (splitIndex <= 0) |
| | 193 | | { |
| 0 | 194 | | throw BatchErrors.InvalidHttpHeaderLine(line); |
| | 195 | | } |
| 5192 | 196 | | var name = line.Substring(0, splitIndex); |
| 5192 | 197 | | var value = line.Substring(splitIndex + 1, line.Length - splitIndex - 1).Trim(); |
| 5192 | 198 | | response.AddHeader(name, value); |
| | 199 | |
|
| 5192 | 200 | | line = await body.ReadLineAsync(async, cancellationToken).ConfigureAwait(false); |
| | 201 | | } |
| | 202 | |
|
| | 203 | | // Copy the rest of the body as the response content |
| 1272 | 204 | | var responseContent = new MemoryStream(); |
| 1272 | 205 | | if (async) |
| | 206 | | { |
| 636 | 207 | | await body.CopyToAsync(responseContent).ConfigureAwait(false); |
| | 208 | | } |
| | 209 | | else |
| | 210 | | { |
| 636 | 211 | | body.CopyTo(responseContent); |
| | 212 | | } |
| 1272 | 213 | | responseContent.Seek(0, SeekOrigin.Begin); |
| 1272 | 214 | | response.ContentStream = responseContent; |
| 1272 | 215 | | } |
| | 216 | |
|
| | 217 | | // Collect the responses and order by Content-ID |
| 88 | 218 | | Response[] ordered = new Response[responses.Count]; |
| 2720 | 219 | | for (int i = 0; i < ordered.Length; i++) |
| | 220 | | { |
| 1272 | 221 | | ordered[i] = responses[i]; |
| | 222 | | } |
| 88 | 223 | | return ordered; |
| 88 | 224 | | } |
| | 225 | |
|
| | 226 | | /// <summary> |
| | 227 | | /// Read the next line of text. |
| | 228 | | /// </summary> |
| | 229 | | /// <param name="stream">The stream to read from.</param> |
| | 230 | | /// <param name="async"> |
| | 231 | | /// Whether to invoke the operation asynchronously. |
| | 232 | | /// </param> |
| | 233 | | /// <param name="cancellationToken"> |
| | 234 | | /// Optional <see cref="CancellationToken"/> to propagate notifications |
| | 235 | | /// that the operation should be cancelled. |
| | 236 | | /// </param> |
| | 237 | | /// <returns>The next line of text.</returns> |
| | 238 | | internal static async Task<string> ReadLineAsync( |
| | 239 | | this BufferedReadStream stream, |
| | 240 | | bool async, |
| | 241 | | CancellationToken cancellationToken) => |
| 7736 | 242 | | async ? |
| 7736 | 243 | | await stream.ReadLineAsync(BatchConstants.ResponseLineSize, cancellationToken).ConfigureAwait(false) : |
| 7736 | 244 | | stream.ReadLine(BatchConstants.ResponseLineSize); |
| | 245 | |
|
| | 246 | | /// <summary> |
| | 247 | | /// Read the next multipart section |
| | 248 | | /// </summary> |
| | 249 | | /// <param name="reader">The reader to parse with.</param> |
| | 250 | | /// <param name="async"> |
| | 251 | | /// Whether to invoke the operation asynchronously. |
| | 252 | | /// </param> |
| | 253 | | /// <param name="cancellationToken"> |
| | 254 | | /// Optional <see cref="CancellationToken"/> to propagate notifications |
| | 255 | | /// that the operation should be cancelled. |
| | 256 | | /// </param> |
| | 257 | | /// <returns>The next multipart section.</returns> |
| | 258 | | internal static async Task<MultipartSection> GetNextSectionAsync( |
| | 259 | | this MultipartReader reader, |
| | 260 | | bool async, |
| | 261 | | CancellationToken cancellationToken) => |
| 1360 | 262 | | async ? |
| 1360 | 263 | | await reader.ReadNextSectionAsync(cancellationToken).ConfigureAwait(false) : |
| 1360 | 264 | | #pragma warning disable AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi |
| 1360 | 265 | | reader.ReadNextSectionAsync(cancellationToken).GetAwaiter().GetResult(); // #7972: Decide if we need a p |
| | 266 | | #pragma warning restore AZC0102 // Do not use GetAwaiter().GetResult(). Use the TaskExtensions.EnsureCompleted() extensi |
| | 267 | | } |
| | 268 | | } |