View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.azure.storage.blob;
5   
6   import com.azure.core.http.rest.Response;
7   import com.azure.core.http.rest.ResponseBase;
8   import com.azure.core.http.rest.SimpleResponse;
9   import com.azure.core.http.rest.VoidResponse;
10  import com.azure.core.implementation.util.FluxUtil;
11  import com.azure.storage.blob.implementation.AzureBlobStorageBuilder;
12  import com.azure.storage.blob.models.BlobAccessConditions;
13  import com.azure.storage.blob.models.BlobHTTPHeaders;
14  import com.azure.storage.blob.models.BlobRange;
15  import com.azure.storage.blob.models.BlockBlobItem;
16  import com.azure.storage.blob.models.BlockItem;
17  import com.azure.storage.blob.models.BlockListType;
18  import com.azure.storage.blob.models.LeaseAccessConditions;
19  import com.azure.storage.blob.models.Metadata;
20  import com.azure.storage.blob.models.SourceModifiedAccessConditions;
21  import io.netty.buffer.ByteBuf;
22  import io.netty.buffer.Unpooled;
23  import reactor.core.publisher.Flux;
24  import reactor.core.publisher.Mono;
25  
26  import java.io.File;
27  import java.io.IOException;
28  import java.io.UncheckedIOException;
29  import java.net.URL;
30  import java.nio.ByteBuffer;
31  import java.nio.channels.AsynchronousFileChannel;
32  import java.nio.charset.StandardCharsets;
33  import java.nio.file.Paths;
34  import java.nio.file.StandardOpenOption;
35  import java.util.ArrayList;
36  import java.util.Base64;
37  import java.util.List;
38  import java.util.SortedMap;
39  import java.util.TreeMap;
40  import java.util.UUID;
41  
42  /**
43   * Client to a block blob. It may only be instantiated through a {@link BlockBlobClientBuilder}, via
44   * the method {@link BlobAsyncClient#asBlockBlobAsyncClient()}, or via the method
45   * {@link ContainerAsyncClient#getBlockBlobAsyncClient(String)}. This class does not hold
46   * any state about a particular blob, but is instead a convenient way of sending appropriate
47   * requests to the resource on the service.
48   *
49   * <p>
50   * This client contains operations on a blob. Operations on a container are available on {@link ContainerAsyncClient},
51   * and operations on the service are available on {@link StorageAsyncClient}.
52   *
53   * <p>
54   * Please refer
55   * to the <a href=https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs>Azure Docs</a>
56   * for more information.
57   *
58   * <p>
59   * Note this client is an async client that returns reactive responses from Spring Reactor Core
60   * project (https://projectreactor.io/). Calling the methods in this client will <strong>NOT</strong>
61   * start the actual network operation, until {@code .subscribe()} is called on the reactive response.
62   * You can simply convert one of these responses to a {@link java.util.concurrent.CompletableFuture}
63   * object through {@link Mono#toFuture()}.
64   */
65  public final class BlockBlobAsyncClient extends BlobAsyncClient {
66      static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB;
67      static final int BLOB_MAX_UPLOAD_BLOCK_SIZE = 100 * Constants.MB;
68  
69      final BlockBlobAsyncRawClient blockBlobAsyncRawClient;
70  
71      /**
72       * Indicates the maximum number of bytes that can be sent in a call to upload.
73       */
74      public static final int MAX_UPLOAD_BLOB_BYTES = 256 * Constants.MB;
75  
76      /**
77       * Indicates the maximum number of bytes that can be sent in a call to stageBlock.
78       */
79      public static final int MAX_STAGE_BLOCK_BYTES = 100 * Constants.MB;
80  
81      /**
82       * Indicates the maximum number of blocks allowed in a block blob.
83       */
84      public static final int MAX_BLOCKS = 50000;
85  
86      /**
87       * Package-private constructor for use by {@link BlockBlobClientBuilder}.
88       * @param azureBlobStorageBuilder the API client builder for blob storage API
89       */
90      BlockBlobAsyncClient(AzureBlobStorageBuilder azureBlobStorageBuilder, String snapshot) {
91          super(azureBlobStorageBuilder, snapshot);
92          this.blockBlobAsyncRawClient = new BlockBlobAsyncRawClient(azureBlobStorageBuilder.build(), snapshot);
93      }
94  
95      /**
96       * Creates a new block blob, or updates the content of an existing block blob.
97       * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not
98       * supported with PutBlob; the content of the existing blob is overwritten with the new content. To
99       * perform a partial update of a block blob's, use PutBlock and PutBlockList.
100      * For more information, see the
101      * <a href="https://docs.microsoft.com/rest/api/storageservices/put-blob">Azure Docs</a>.
102      * <p>
103      * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
104      * {@code Flux} must produce the same data each time it is subscribed to.
105      * <p>
106      *
107      * @param data
108      *         The data to write to the blob. Note that this {@code Flux} must be replayable if retries are enabled
109      *         (the default). In other words, the Flux must produce the same data each time it is subscribed to.
110      * @param length
111      *         The exact length of the data. It is important that this value match precisely the length of the data
112      *         emitted by the {@code Flux}.
113      *
114      * @return
115      *      A reactive response containing the information of the uploaded block blob.
116      */
117     public Mono<Response<BlockBlobItem>> upload(Flux<ByteBuffer> data, long length) {
118         return this.upload(data, length, null, null, null);
119     }
120 
121     /**
122      * Creates a new block blob, or updates the content of an existing block blob.
123      * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not
124      * supported with PutBlob; the content of the existing blob is overwritten with the new content. To
125      * perform a partial update of a block blob's, use PutBlock and PutBlockList.
126      * For more information, see the
127      * <a href="https://docs.microsoft.com/rest/api/storageservices/put-blob">Azure Docs</a>.
128      * <p>
129      * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
130      * {@code Flux} must produce the same data each time it is subscribed to.
131      * <p>
132      *
133      * @param data
134      *         The data to write to the blob. Note that this {@code Flux} must be replayable if retries are enabled
135      *         (the default). In other words, the Flux must produce the same data each time it is subscribed to.
136      * @param length
137      *         The exact length of the data. It is important that this value match precisely the length of the data
138      *         emitted by the {@code Flux}.
139      * @param headers
140      *         {@link BlobHTTPHeaders}
141      * @param metadata
142      *         {@link Metadata}
143      * @param accessConditions
144      *         {@link BlobAccessConditions}
145      *
146      * @return
147      *      A reactive response containing the information of the uploaded block blob.
148      */
149     public Mono<Response<BlockBlobItem>> upload(Flux<ByteBuffer> data, long length, BlobHTTPHeaders headers,
150             Metadata metadata, BlobAccessConditions accessConditions) {
151         return blockBlobAsyncRawClient
152             .upload(data.map(Unpooled::wrappedBuffer), length, headers, metadata, accessConditions)
153             .map(rb -> new SimpleResponse<>(rb, new BlockBlobItem(rb.deserializedHeaders())));
154     }
155 
156     /**
157      * Creates a new block blob, or updates the content of an existing block blob, with the content of the specified file.
158      *
159      * @param filePath Path to the upload file
160      * @return An empty response
161      */
162     public Mono<Void> uploadFromFile(String filePath) {
163         return this.uploadFromFile(filePath, BLOB_DEFAULT_UPLOAD_BLOCK_SIZE, null, null, null);
164     }
165 
166     /**
167      * Creates a new block blob, or updates the content of an existing block blob, with the content of the specified file.
168      *
169      * @param filePath Path to the upload file
170      * @param blockSize Size of the blocks to upload
171      * @param headers {@link BlobHTTPHeaders}
172      * @param metadata {@link Metadata}
173      * @param accessConditions {@link BlobAccessConditions}
174      * @return An empty response
175      * @throws IllegalArgumentException If {@code blockSize} is less than 0 or greater than 100MB
176      * @throws UncheckedIOException If an I/O error occurs
177      */
178     public Mono<Void> uploadFromFile(String filePath, Integer blockSize, BlobHTTPHeaders headers, Metadata metadata,
179                                      BlobAccessConditions accessConditions) {
180         if (blockSize < 0 || blockSize > BLOB_MAX_UPLOAD_BLOCK_SIZE) {
181             throw new IllegalArgumentException("Block size should not exceed 100MB");
182         }
183 
184         return Mono.using(() -> uploadFileResourceSupplier(filePath),
185             channel -> {
186                 final SortedMap<Long, String> blockIds = new TreeMap<>();
187                 return Flux.fromIterable(sliceFile(filePath, blockSize))
188                     .doOnNext(chunk -> blockIds.put(chunk.offset(), getBlockID()))
189                     .flatMap(chunk -> {
190                         String blockId = blockIds.get(chunk.offset());
191                         return stageBlock(blockId, FluxUtil.byteBufStreamFromFile(channel, chunk.offset(), chunk.count()), chunk.count(), null);
192                     })
193                     .then(Mono.defer(() -> commitBlockList(new ArrayList<>(blockIds.values()), headers, metadata, accessConditions)))
194                     .then()
195                     .doOnTerminate(() -> {
196                         try {
197                             channel.close();
198                         } catch (IOException e) {
199                             throw new UncheckedIOException(e);
200                         }
201                     });
202             }, this::uploadFileCleanup);
203     }
204 
205     private AsynchronousFileChannel uploadFileResourceSupplier(String filePath) {
206         try {
207             return AsynchronousFileChannel.open(Paths.get(filePath), StandardOpenOption.READ);
208         } catch (IOException e) {
209             throw new UncheckedIOException(e);
210         }
211     }
212 
213     private void uploadFileCleanup(AsynchronousFileChannel channel) {
214         try {
215             channel.close();
216         } catch (IOException e) {
217             throw new UncheckedIOException(e);
218         }
219     }
220 
221     private String getBlockID() {
222         return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
223     }
224 
225     private List<BlobRange> sliceFile(String path, Integer blockSize) {
226         if (blockSize == null) {
227             blockSize = BLOB_DEFAULT_UPLOAD_BLOCK_SIZE;
228         }
229         File file = new File(path);
230         assert file.exists();
231         List<BlobRange> ranges = new ArrayList<>();
232         for (long pos = 0; pos < file.length(); pos += blockSize) {
233             long count = blockSize;
234             if (pos + count > file.length()) {
235                 count = file.length() - pos;
236             }
237             ranges.add(new BlobRange(pos, count));
238         }
239         return ranges;
240     }
241 
242     /**
243      * Uploads the specified block to the block blob's "staging area" to be later committed by a call to
244      * commitBlockList. For more information, see the
245      * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs</a>.
246      * <p>
247      * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
248      * {@code Flux} must produce the same data each time it is subscribed to.
249      *
250      * @param base64BlockID
251      *         A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
252      *         blob must be the same length.
253      * @param data
254      *         The data to write to the block. Note that this {@code Flux} must be replayable if retries are enabled
255      *         (the default). In other words, the Flux must produce the same data each time it is subscribed to.
256      * @param length
257      *         The exact length of the data. It is important that this value match precisely the length of the data
258      *         emitted by the {@code Flux}.
259      *
260      * @return
261      *      A reactive response signalling completion.
262      */
263     public Mono<VoidResponse> stageBlock(String base64BlockID, Flux<ByteBuf> data,
264                                                          long length) {
265         return this.stageBlock(base64BlockID, data, length, null);
266     }
267 
268     /**
269      * Uploads the specified block to the block blob's "staging area" to be later committed by a call to
270      * commitBlockList. For more information, see the
271      * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs</a>.
272      * <p>
273      * Note that the data passed must be replayable if retries are enabled (the default). In other words, the
274      * {@code Flux} must produce the same data each time it is subscribed to.
275      *
276      * @param base64BlockID
277      *         A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
278      *         blob must be the same length.
279      * @param data
280      *         The data to write to the block. Note that this {@code Flux} must be replayable if retries are enabled
281      *         (the default). In other words, the Flux must produce the same data each time it is subscribed to.
282      * @param length
283      *         The exact length of the data. It is important that this value match precisely the length of the data
284      *         emitted by the {@code Flux}.
285      * @param leaseAccessConditions
286      *         By setting lease access conditions, requests will fail if the provided lease does not match the active
287      *         lease on the blob.
288      *
289      * @return
290      *      A reactive response signalling completion.
291      */
292     public Mono<VoidResponse> stageBlock(String base64BlockID, Flux<ByteBuf> data, long length,
293                  LeaseAccessConditions leaseAccessConditions) {
294         return blockBlobAsyncRawClient
295             .stageBlock(base64BlockID, data, length, leaseAccessConditions)
296             .map(VoidResponse::new);
297     }
298 
299     /**
300      * Creates a new block to be committed as part of a blob where the contents are read from a URL. For more
301      * information, see the <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-from-url">Azure Docs</a>.
302      *
303      * @param base64BlockID
304      *         A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
305      *         blob must be the same length.
306      * @param sourceURL
307      *         The url to the blob that will be the source of the copy.  A source blob in the same storage account can be
308      *         authenticated via Shared Key. However, if the source is a blob in another account, the source blob must
309      *         either be public or must be authenticated via a shared access signature. If the source blob is public, no
310      *         authentication is required to perform the operation.
311      * @param sourceRange
312      *         {@link BlobRange}
313      *
314      * @return
315      *      A reactive response signalling completion.
316      */
317     public Mono<VoidResponse> stageBlockFromURL(String base64BlockID, URL sourceURL,
318             BlobRange sourceRange) {
319         return this.stageBlockFromURL(base64BlockID, sourceURL, sourceRange, null,
320                 null, null);
321     }
322 
323     /**
324      * Creates a new block to be committed as part of a blob where the contents are read from a URL. For more
325      * information, see the <a href="https://docs.microsoft.com/en-us/rest/api/storageservices/put-block-from-url">Azure Docs</a>.
326      *
327      * @param base64BlockID
328      *         A Base64 encoded {@code String} that specifies the ID for this block. Note that all block ids for a given
329      *         blob must be the same length.
330      * @param sourceURL
331      *         The url to the blob that will be the source of the copy.  A source blob in the same storage account can
332      *         be authenticated via Shared Key. However, if the source is a blob in another account, the source blob
333      *         must either be public or must be authenticated via a shared access signature. If the source blob is
334      *         public, no authentication is required to perform the operation.
335      * @param sourceRange
336      *         {@link BlobRange}
337      * @param sourceContentMD5
338      *         An MD5 hash of the block content from the source blob. If specified, the service will calculate the MD5
339      *         of the received data and fail the request if it does not match the provided MD5.
340      * @param leaseAccessConditions
341      *         By setting lease access conditions, requests will fail if the provided lease does not match the active
342      *         lease on the blob.
343      * @param sourceModifiedAccessConditions
344      *         {@link SourceModifiedAccessConditions}
345      *
346      * @return
347      *      A reactive response signalling completion.
348      */
349     public Mono<VoidResponse> stageBlockFromURL(String base64BlockID, URL sourceURL,
350             BlobRange sourceRange, byte[] sourceContentMD5, LeaseAccessConditions leaseAccessConditions,
351             SourceModifiedAccessConditions sourceModifiedAccessConditions) {
352         return blockBlobAsyncRawClient
353             .stageBlockFromURL(base64BlockID, sourceURL, sourceRange, sourceContentMD5, leaseAccessConditions, sourceModifiedAccessConditions)
354             .map(VoidResponse::new);
355     }
356 
357     /**
358      * Returns the list of blocks that have been uploaded as part of a block blob using the specified block list filter.
359      * For more information, see the
360      * <a href="https://docs.microsoft.com/rest/api/storageservices/get-block-list">Azure Docs</a>.
361      *
362      * @param listType
363      *         Specifies which type of blocks to return.
364      *
365      * @return
366      *      A reactive response containing the list of blocks.
367      */
368     public Flux<BlockItem> listBlocks(BlockListType listType) {
369         return this.listBlocks(listType, null);
370     }
371 
372     /**
373      *
374      * Returns the list of blocks that have been uploaded as part of a block blob using the specified block list filter.
375      * For more information, see the
376      * <a href="https://docs.microsoft.com/rest/api/storageservices/get-block-list">Azure Docs</a>.
377      *
378      * @param listType
379      *         Specifies which type of blocks to return.
380      * @param leaseAccessConditions
381      *         By setting lease access conditions, requests will fail if the provided lease does not match the active
382      *         lease on the blob.
383      *
384      * @return
385      *      A reactive response containing the list of blocks.
386      */
387     public Flux<BlockItem> listBlocks(BlockListType listType,
388                                       LeaseAccessConditions leaseAccessConditions) {
389         return blockBlobAsyncRawClient
390             .listBlocks(listType, leaseAccessConditions)
391             .map(ResponseBase::value)
392             .flatMapMany(bl -> {
393                 Flux<BlockItem> committed = Flux.fromIterable(bl.committedBlocks())
394                     .map(block -> new BlockItem(block, true));
395                 Flux<BlockItem> uncommitted = Flux.fromIterable(bl.uncommittedBlocks())
396                     .map(block -> new BlockItem(block, false));
397                 return Flux.concat(committed, uncommitted);
398             });
399     }
400 
401     /**
402      * Writes a blob by specifying the list of block IDs that are to make up the blob.
403      * In order to be written as part of a blob, a block must have been successfully written
404      * to the server in a prior stageBlock operation. You can call commitBlockList to update a blob
405      * by uploading only those blocks that have changed, then committing the new and existing
406      * blocks together. Any blocks not specified in the block list and permanently deleted.
407      * For more information, see the
408      * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs</a>.
409      *
410      * @param base64BlockIDs
411      *         A list of base64 encode {@code String}s that specifies the block IDs to be committed.
412      *
413      * @return
414      *      A reactive response containing the information of the block blob.
415      */
416     public Mono<Response<BlockBlobItem>> commitBlockList(List<String> base64BlockIDs) {
417         return this.commitBlockList(base64BlockIDs, null, null, null);
418     }
419 
420     /**
421      * Writes a blob by specifying the list of block IDs that are to make up the blob.
422      * In order to be written as part of a blob, a block must have been successfully written
423      * to the server in a prior stageBlock operation. You can call commitBlockList to update a blob
424      * by uploading only those blocks that have changed, then committing the new and existing
425      * blocks together. Any blocks not specified in the block list and permanently deleted.
426      * For more information, see the
427      * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs</a>.
428      *
429      * @param base64BlockIDs
430      *         A list of base64 encode {@code String}s that specifies the block IDs to be committed.
431      * @param headers
432      *         {@link BlobHTTPHeaders}
433      * @param metadata
434      *         {@link Metadata}
435      * @param accessConditions
436      *         {@link BlobAccessConditions}
437      *
438      * @return
439      *      A reactive response containing the information of the block blob.
440      */
441     public Mono<Response<BlockBlobItem>> commitBlockList(List<String> base64BlockIDs,
442                                               BlobHTTPHeaders headers, Metadata metadata, BlobAccessConditions accessConditions) {
443         return blockBlobAsyncRawClient
444             .commitBlockList(base64BlockIDs, headers, metadata, accessConditions)
445             .map(rb -> new SimpleResponse<>(rb, new BlockBlobItem(rb.deserializedHeaders())));
446     }
447 }