1 // Copyright (c) Microsoft Corporation. All rights reserved.
2 // Licensed under the MIT License.
3
4 package com.azure.storage.blob;
5
6 import com.azure.core.http.rest.Response;
7 import com.azure.core.http.rest.ResponseBase;
8 import com.azure.core.http.rest.SimpleResponse;
9 import com.azure.core.http.rest.VoidResponse;
10 import com.azure.core.implementation.util.FluxUtil;
11 import com.azure.storage.blob.implementation.AzureBlobStorageBuilder;
12 import com.azure.storage.blob.models.BlobAccessConditions;
13 import com.azure.storage.blob.models.BlobHTTPHeaders;
14 import com.azure.storage.blob.models.BlobRange;
15 import com.azure.storage.blob.models.BlockBlobItem;
16 import com.azure.storage.blob.models.BlockItem;
17 import com.azure.storage.blob.models.BlockListType;
18 import com.azure.storage.blob.models.LeaseAccessConditions;
19 import com.azure.storage.blob.models.Metadata;
20 import com.azure.storage.blob.models.SourceModifiedAccessConditions;
21 import io.netty.buffer.ByteBuf;
22 import io.netty.buffer.Unpooled;
23 import reactor.core.publisher.Flux;
24 import reactor.core.publisher.Mono;
25
26 import java.io.File;
27 import java.io.IOException;
28 import java.io.UncheckedIOException;
29 import java.net.URL;
30 import java.nio.ByteBuffer;
31 import java.nio.channels.AsynchronousFileChannel;
32 import java.nio.charset.StandardCharsets;
33 import java.nio.file.Paths;
34 import java.nio.file.StandardOpenOption;
35 import java.util.ArrayList;
36 import java.util.Base64;
37 import java.util.List;
38 import java.util.SortedMap;
39 import java.util.TreeMap;
40 import java.util.UUID;
41
42 /**
43 * Client to a block blob. It may only be instantiated through a {@link BlockBlobClientBuilder}, via
44 * the method {@link BlobAsyncClient#asBlockBlobAsyncClient()}, or via the method
45 * {@link ContainerAsyncClient#getBlockBlobAsyncClient(String)}. This class does not hold
46 * any state about a particular blob, but is instead a convenient way of sending appropriate
47 * requests to the resource on the service.
48 *
49 * <p>
50 * This client contains operations on a blob. Operations on a container are available on {@link ContainerAsyncClient},
51 * and operations on the service are available on {@link StorageAsyncClient}.
52 *
53 * <p>
54 * Please refer
55 * to the <a href=https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs>Azure Docs</a>
56 * for more information.
57 *
58 * <p>
59 * Note this client is an async client that returns reactive responses from Spring Reactor Core
60 * project (https://projectreactor.io/). Calling the methods in this client will <strong>NOT</strong>
61 * start the actual network operation, until {@code .subscribe()} is called on the reactive response.
62 * You can simply convert one of these responses to a {@link java.util.concurrent.CompletableFuture}
63 * object through {@link Mono#toFuture()}.
64 */
65 public final class BlockBlobAsyncClient extends BlobAsyncClient {
66 static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB;
67 static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB;
68
69 final BlockBlobAsyncRawClient blockBlobAsyncRawClient;
70
71 /**
72 * Indicates the maximum number of bytes that can be sent in a call to upload.
73 */
74 public static final int MAX_UPLOAD_BLOB_BYTES = 256 * Constants.MB;
75
76 /**
77 * Indicates the maximum number of bytes that can be sent in a call to stageBlock.
78 */
79 public static final int MAX_STAGE_BLOCK_BYTES = 100 * Constants.MB;
80
81 /**
82 * Indicates the maximum number of blocks allowed in a block blob.
83 */
84 public static final int MAX_BLOCKS = 50000;
85
86 /**
87 * Package-private constructor for use by {@link BlockBlobClientBuilder}.
88 * @param azureBlobStorageBuilder the API client builder for blob storage API
89 */
90 BlockBlobAsyncClient(AzureBlobStorageBuilder azureBlobStorageBuilder, String snapshot) {
91 super(azureBlobStorageBuilder, snapshot);
92 this.blockBlobAsyncRawClient = new BlockBlobAsyncRawClient(azureBlobStorageBuilder.build(), snapshot);
93 }
94
95 /**
96 * Creates a new block blob, or updates the content of an existing block blob.
97 * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not
98 * supported with PutBlob; the content of the existing blob is overwritten with the new content. To
99 * perform a partial update of a block blob's, use PutBlock and PutBlockList.
100 * For more information, see the
101 * <a href="https://docs.microsoft.com/rest/api/storageservices/put-blob">Azure Docs</a>.
102 * <p>
103 * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
104 * {@code Flux} must produce the same data each time it is subscribed to.
105 * <p>
106 *
107 * @param data
108 * The data to write to the blob. Note that this {@code Flux} must be replayable if retries are enabled
109 * (the default). In other words, the Flux must produce the same data each time it is subscribed to.
110 * @param length
111 * The exact length of the data. It is important that this value match precisely the length of the data
112 * emitted by the {@code Flux}.
113 *
114 * @return
115 * A reactive response containing the information of the uploaded block blob.
116 */
117 public Mono<Response<BlockBlobItem>> upload(Flux<ByteBuffer> data, long length) {
118 return this.upload(data, length, null, null, null);
119 }
120
121 /**
122 * Creates a new block blob, or updates the content of an existing block blob.
123 * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not
124 * supported with PutBlob; the content of the existing blob is overwritten with the new content. To
125 * perform a partial update of a block blob's, use PutBlock and PutBlockList.
126 * For more information, see the
127 * <a href="https://docs.microsoft.com/rest/api/storageservices/put-blob">Azure Docs</a>.
128 * <p>
129 * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
130 * {@code Flux} must produce the same data each time it is subscribed to.
131 * <p>
132 *
133 * @param data
134 * The data to write to the blob. Note that this {@code Flux} must be replayable if retries are enabled
135 * (the default). In other words, the Flux must produce the same data each time it is subscribed to.
136 * @param length
137 * The exact length of the data. It is important that this value match precisely the length of the data
138 * emitted by the {@code Flux}.
139 * @param headers
140 * {@link BlobHTTPHeaders}
141 * @param metadata
142 * {@link Metadata}
143 * @param accessConditions
144 * {@link BlobAccessConditions}
145 *
146 * @return
147 * A reactive response containing the information of the uploaded block blob.
148 */
149 public Mono<Response<BlockBlobItem>> upload(Flux<ByteBuffer> data, long length, BlobHTTPHeaders headers,
150 Metadata metadata, BlobAccessConditions accessConditions) {
151 return blockBlobAsyncRawClient
152 .upload(data.map(Unpooled::wrappedBuffer), length, headers, metadata, accessConditions)
153 .map(rb -> new SimpleResponse<>(rb, new BlockBlobItem(rb.deserializedHeaders())));
154 }
155
156 /**
157 * Creates a new block blob, or updates the content of an existing block blob, with the content of the specified file.
158 *
159 * @param filePath Path to the upload file
160 * @return An empty response
161 */
162 public Mono<Void> uploadFromFile(String filePath) {
163 return this.uploadFromFile(filePath, BLOB_DEFAULT_UPLOAD_BLOCK_SIZE, null, null, null);
164 }
165
166 /**
167 * Creates a new block blob, or updates the content of an existing block blob, with the content of the specified file.
168 *
169 * @param filePath Path to the upload file
170 * @param blockSize Size of the blocks to upload
171 * @param headers {@link BlobHTTPHeaders}
172 * @param metadata {@link Metadata}
173 * @param accessConditions {@link BlobAccessConditions}
174 * @return An empty response
175 * @throws IllegalArgumentException If {@code blockSize} is less than 0 or greater than 100MB
176 * @throws UncheckedIOException If an I/O error occurs
177 */
178 public Mono<Void> uploadFromFile(String filePath, Integer blockSize, BlobHTTPHeaders headers, Metadata metadata,
179 BlobAccessConditions accessConditions) {
180 if (blockSize < 0 || blockSize > BLOB_MAX_UPLOAD_BLOCK_SIZE) {
181 throw new IllegalArgumentException("Block size should not exceed 100MB");
182 }
183
184 return Mono.using(() -> uploadFileResourceSupplier(filePath),
185 channel -> {
186 final SortedMap<Long, String> blockIds = new TreeMap<>();
187 return Flux.fromIterable(sliceFile(filePath, blockSize))
188 .doOnNext(chunk -> blockIds.put(chunk.offset(), getBlockID()))
189 .flatMap(chunk -> {
190 String blockId = blockIds.get(chunk.offset());
191 return stageBlock(blockId, FluxUtil.byteBufStreamFromFile(channel, chunk.offset(), chunk.count()), chunk.count(), null);
192 })
193 .then(Mono.defer(() -> commitBlockList(new ArrayList<>(blockIds.values()), headers, metadata, accessConditions)))
194 .then()
195 .doOnTerminate(() -> {
196 try {
197 channel.close();
198 } catch (IOException e) {
199 throw new UncheckedIOException(e);
200 }
201 });
202 }, this::uploadFileCleanup);
203 }
204
205 private AsynchronousFileChannel uploadFileResourceSupplier(String filePath) {
206 try {
207 return AsynchronousFileChannel.open(Paths.get(filePath), StandardOpenOption.READ);
208 } catch (IOException e) {
209 throw new UncheckedIOException(e);
210 }
211 }
212
213 private void uploadFileCleanup(AsynchronousFileChannel channel) {
214 try {
215 channel.close();
216 } catch (IOException e) {
217 throw new UncheckedIOException(e);
218 }
219 }
220
221 private String getBlockID() {
222 return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
223 }
224
225 private List<BlobRange> sliceFile(String path, Integer blockSize) {
226 if (blockSize == null) {
227 blockSize = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
228 }
229 File file = new File(path);
230 assert file.exists();
231 List<BlobRange> ranges = new ArrayList<>();
232 for (long pos = 0; pos < file.length(); pos += blockSize) {
233 long count = blockSize;
234 if (pos + count > file.length()) {
235 count = file.length() - pos;
236 }
237 ranges.add(new BlobRange(pos, count));
238 }
239 return ranges;
240 }
241
242 /**
243 * Uploads the specified block to the block blob's "staging area" to be later committed by a call to
244 * commitBlockList. For more information, see the
245 * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs</a>.
246 * <p>
247 * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
248 * {@code Flux} must produce the same data each time it is subscribed to.
249 *
250 * @param base64BlockID
251 * A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
252 * blob must be the same length.
253 * @param data
254 * The data to write to the block. Note that this {@code Flux} must be replayable if retries are enabled
255 * (the default). In other words, the Flux must produce the same data each time it is subscribed to.
256 * @param length
257 * The exact length of the data. It is important that this value match precisely the length of the data
258 * emitted by the {@code Flux}.
259 *
260 * @return
261 * A reactive response signalling completion.
262 */
263 public Mono<VoidResponse> stageBlock(String base64BlockID, Flux<ByteBuf> data,
264 long length) {
265 return this.stageBlock(base64BlockID, data, length, null);
266 }
267
268 /**
269 * Uploads the specified block to the block blob's "staging area" to be later committed by a call to
270 * commitBlockList. For more information, see the
271 * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs</a>.
272 * <p>
273 * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
274 * {@code Flux} must produce the same data each time it is subscribed to.
275 *
276 * @param base64BlockID
277 * A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
278 * blob must be the same length.
279 * @param data
280 * The data to write to the block. Note that this {@code Flux} must be replayable if retries are enabled
281 * (the default). In other words, the Flux must produce the same data each time it is subscribed to.
282 * @param length
283 * The exact length of the data. It is important that this value match precisely the length of the data
284 * emitted by the {@code Flux}.
285 * @param leaseAccessConditions
286 * By setting lease access conditions, requests will fail if the provided lease does not match the active
287 * lease on the blob.
288 *
289 * @return
290 * A reactive response signalling completion.
291 */
292 public Mono<VoidResponse> stageBlock(String base64BlockID, Flux<ByteBuf> data, long length,
293 LeaseAccessConditions leaseAccessConditions) {
294 return blockBlobAsyncRawClient
295 .stageBlock(base64BlockID, data, length, leaseAccessConditions)
296 .map(VoidResponse::new);
297 }
298
299 /**
300 * Creates a new block to be committed as part of a blob where the contents are read from a URL. For more
301 * information, see the <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-from-url">Azure Docs</a>.
302 *
303 * @param base64BlockID
304 * A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
305 * blob must be the same length.
306 * @param sourceURL
307 * The url to the blob that will be the source of the copy. A source blob in the same storage account can be
308 * authenticated via Shared Key. However, if the source is a blob in another account, the source blob must
309 * either be public or must be authenticated via a shared access signature. If the source blob is public, no
310 * authentication is required to perform the operation.
311 * @param sourceRange
312 * {@link BlobRange}
313 *
314 * @return
315 * A reactive response signalling completion.
316 */
317 public Mono<VoidResponse> stageBlockFromURL(String base64BlockID, URL sourceURL,
318 BlobRange sourceRange) {
319 return this.stageBlockFromURL(base64BlockID, sourceURL, sourceRange, null,
320 null, null);
321 }
322
323 /**
324 * Creates a new block to be committed as part of a blob where the contents are read from a URL. For more
325 * information, see the <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-from-url">Azure Docs</a>.
326 *
327 * @param base64BlockID
328 * A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
329 * blob must be the same length.
330 * @param sourceURL
331 * The url to the blob that will be the source of the copy. A source blob in the same storage account can
332 * be authenticated via Shared Key. However, if the source is a blob in another account, the source blob
333 * must either be public or must be authenticated via a shared access signature. If the source blob is
334 * public, no authentication is required to perform the operation.
335 * @param sourceRange
336 * {@link BlobRange}
337 * @param sourceContentMD5
338 * An MD5 hash of the block content from the source blob. If specified, the service will calculate the MD5
339 * of the received data and fail the request if it does not match the provided MD5.
340 * @param leaseAccessConditions
341 * By setting lease access conditions, requests will fail if the provided lease does not match the active
342 * lease on the blob.
343 * @param sourceModifiedAccessConditions
344 * {@link SourceModifiedAccessConditions}
345 *
346 * @return
347 * A reactive response signalling completion.
348 */
349 public Mono<VoidResponse> stageBlockFromURL(String base64BlockID, URL sourceURL,
350 BlobRange sourceRange, byte[] sourceContentMD5, LeaseAccessConditions leaseAccessConditions,
351 SourceModifiedAccessConditions sourceModifiedAccessConditions) {
352 return blockBlobAsyncRawClient
353 .stageBlockFromURL(base64BlockID, sourceURL, sourceRange, sourceContentMD5, leaseAccessConditions, sourceModifiedAccessConditions)
354 .map(VoidResponse::new);
355 }
356
357 /**
358 * Returns the list of blocks that have been uploaded as part of a block blob using the specified block list filter.
359 * For more information, see the
360 * <a href="https://docs.microsoft.com/rest/api/storageservices/get-block-list">Azure Docs</a>.
361 *
362 * @param listType
363 * Specifies which type of blocks to return.
364 *
365 * @return
366 * A reactive response containing the list of blocks.
367 */
368 public Flux<BlockItem> listBlocks(BlockListType listType) {
369 return this.listBlocks(listType, null);
370 }
371
372 /**
373 *
374 * Returns the list of blocks that have been uploaded as part of a block blob using the specified block list filter.
375 * For more information, see the
376 * <a href="https://docs.microsoft.com/rest/api/storageservices/get-block-list">Azure Docs</a>.
377 *
378 * @param listType
379 * Specifies which type of blocks to return.
380 * @param leaseAccessConditions
381 * By setting lease access conditions, requests will fail if the provided lease does not match the active
382 * lease on the blob.
383 *
384 * @return
385 * A reactive response containing the list of blocks.
386 */
387 public Flux<BlockItem> listBlocks(BlockListType listType,
388 LeaseAccessConditions leaseAccessConditions) {
389 return blockBlobAsyncRawClient
390 .listBlocks(listType, leaseAccessConditions)
391 .map(ResponseBase::value)
392 .flatMapMany(bl -> {
393 Flux<BlockItem> committed = Flux.fromIterable(bl.committedBlocks())
394 .map(block -> new BlockItem(block, true));
395 Flux<BlockItem> uncommitted = Flux.fromIterable(bl.uncommittedBlocks())
396 .map(block -> new BlockItem(block, false));
397 return Flux.concat(committed, uncommitted);
398 });
399 }
400
401 /**
402 * Writes a blob by specifying the list of block IDs that are to make up the blob.
403 * In order to be written as part of a blob, a block must have been successfully written
404 * to the server in a prior stageBlock operation. You can call commitBlockList to update a blob
405 * by uploading only those blocks that have changed, then committing the new and existing
406 * blocks together. Any blocks not specified in the block list and permanently deleted.
407 * For more information, see the
408 * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs</a>.
409 *
410 * @param base64BlockIDs
411 * A list of base64 encode {@code String}s that specifies the block IDs to be committed.
412 *
413 * @return
414 * A reactive response containing the information of the block blob.
415 */
416 public Mono<Response<BlockBlobItem>> commitBlockList(List<String> base64BlockIDs) {
417 return this.commitBlockList(base64BlockIDs, null, null, null);
418 }
419
420 /**
421 * Writes a blob by specifying the list of block IDs that are to make up the blob.
422 * In order to be written as part of a blob, a block must have been successfully written
423 * to the server in a prior stageBlock operation. You can call commitBlockList to update a blob
424 * by uploading only those blocks that have changed, then committing the new and existing
425 * blocks together. Any blocks not specified in the block list and permanently deleted.
426 * For more information, see the
427 * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs</a>.
428 *
429 * @param base64BlockIDs
430 * A list of base64 encode {@code String}s that specifies the block IDs to be committed.
431 * @param headers
432 * {@link BlobHTTPHeaders}
433 * @param metadata
434 * {@link Metadata}
435 * @param accessConditions
436 * {@link BlobAccessConditions}
437 *
438 * @return
439 * A reactive response containing the information of the block blob.
440 */
441 public Mono<Response<BlockBlobItem>> commitBlockList(List<String> base64BlockIDs,
442 BlobHTTPHeaders headers, Metadata metadata, BlobAccessConditions accessConditions) {
443 return blockBlobAsyncRawClient
444 .commitBlockList(base64BlockIDs, headers, metadata, accessConditions)
445 .map(rb -> new SimpleResponse<>(rb, new BlockBlobItem(rb.deserializedHeaders())));
446 }
447 }