BlobAsyncClient.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.storage.blob;

import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.blob.implementation.models.EncryptionScope;
import com.azure.storage.blob.implementation.util.ModelHelper;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlockBlobItem;
import com.azure.storage.blob.models.CpkInfo;
import com.azure.storage.blob.models.CustomerProvidedKey;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.options.BlobParallelUploadOptions;
import com.azure.storage.blob.options.BlobUploadFromFileOptions;
import com.azure.storage.blob.options.BlockBlobCommitBlockListOptions;
import com.azure.storage.blob.options.BlockBlobSimpleUploadOptions;
import com.azure.storage.blob.specialized.AppendBlobAsyncClient;
import com.azure.storage.blob.specialized.BlobAsyncClientBase;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.blob.specialized.BlockBlobClient;
import com.azure.storage.blob.specialized.PageBlobAsyncClient;
import com.azure.storage.blob.specialized.SpecializedBlobClientBuilder;
import com.azure.storage.common.Utility;
import com.azure.storage.common.implementation.Constants;
import com.azure.storage.common.implementation.StorageImplUtils;
import com.azure.storage.common.implementation.UploadBufferPool;
import com.azure.storage.common.implementation.UploadUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousFileChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.core.util.FluxUtil.monoError;
import static java.nio.charset.StandardCharsets.UTF_8;

/**
 * This class provides a client that contains generic blob operations for Azure Storage Blobs. Operations allowed by the
 * client are uploading and downloading, copying a blob, retrieving and setting metadata, retrieving and setting HTTP
 * headers, and deleting and un-deleting a blob.
 *
 * <p>
 * This client is instantiated through {@link BlobClientBuilder} or retrieved via {@link
 * BlobContainerAsyncClient#getBlobAsyncClient(String) getBlobAsyncClient}.
 *
 * <p>
 * For operations on a specific blob type (i.e append, block, or page) use {@link #getAppendBlobAsyncClient()
 * getAppendBlobAsyncClient}, {@link #getBlockBlobAsyncClient() getBlockBlobAsyncClient}, or {@link
 * #getPageBlobAsyncClient() getPageBlobAsyncClient} to construct a client that allows blob specific operations.
 *
 * <p>
 * Please refer to the
 * <a href=https://docs.microsoft.com/en-us/rest/api/storageservices/understanding-block-blobs--append-blobs--and-page-blobs>Azure
 * Docs</a> for more information.
 */
public class BlobAsyncClient extends BlobAsyncClientBase {
    /**
     * The block size to use if none is specified in parallel operations.
     */
    public static final int BLOB_DEFAULT_UPLOAD_BLOCK_SIZE = 4 * Constants.MB;

    /**
     * The number of buffers to use if none is specied on the buffered upload method.
     */
    public static final int BLOB_DEFAULT_NUMBER_OF_BUFFERS = 8;

    /**
     * If a blob is known to be greater than 100MB, using a larger block size will trigger some server-side
     * optimizations. If the block size is not set and the size of the blob is known to be greater than 100MB, this
     * value will be used.
     */
    public static final int BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE = 8 * Constants.MB;

    static final long BLOB_MAX_UPLOAD_BLOCK_SIZE = 4000L * Constants.MB;
    private final ClientLogger logger = new ClientLogger(BlobAsyncClient.class);

    /**
     * Protected constructor for use by {@link BlobClientBuilder}.
     *
     * @param pipeline The pipeline used to send and receive service requests.
     * @param url The endpoint where to send service requests.
     * @param serviceVersion The version of the service to receive requests.
     * @param accountName The storage account name.
     * @param containerName The container name.
     * @param blobName The blob name.
     * @param snapshot The snapshot identifier for the blob, pass {@code null} to interact with the blob directly.
     * @param customerProvidedKey Customer provided key used during encryption of the blob's data on the server, pass
     * {@code null} to allow the service to use its own encryption.
     */
    protected BlobAsyncClient(HttpPipeline pipeline, String url, BlobServiceVersion serviceVersion, String accountName,
        String containerName, String blobName, String snapshot, CpkInfo customerProvidedKey) {
        super(pipeline, url, serviceVersion, accountName, containerName, blobName, snapshot, customerProvidedKey);
    }

    /**
     * Protected constructor for use by {@link BlobClientBuilder}.
     *
     * @param pipeline The pipeline used to send and receive service requests.
     * @param url The endpoint where to send service requests.
     * @param serviceVersion The version of the service to receive requests.
     * @param accountName The storage account name.
     * @param containerName The container name.
     * @param blobName The blob name.
     * @param snapshot The snapshot identifier for the blob, pass {@code null} to interact with the blob directly.
     * @param customerProvidedKey Customer provided key used during encryption of the blob's data on the server, pass
     * {@code null} to allow the service to use its own encryption.
     * @param encryptionScope Encryption scope used during encryption of the blob's data on the server, pass
     * {@code null} to allow the service to use its own encryption.
     */
    protected BlobAsyncClient(HttpPipeline pipeline, String url, BlobServiceVersion serviceVersion, String accountName,
        String containerName, String blobName, String snapshot, CpkInfo customerProvidedKey,
        EncryptionScope encryptionScope) {
        super(pipeline, url, serviceVersion, accountName, containerName, blobName, snapshot, customerProvidedKey,
            encryptionScope);
    }

    /**
     * Protected constructor for use by {@link BlobClientBuilder}.
     *
     * @param pipeline The pipeline used to send and receive service requests.
     * @param url The endpoint where to send service requests.
     * @param serviceVersion The version of the service to receive requests.
     * @param accountName The storage account name.
     * @param containerName The container name.
     * @param blobName The blob name.
     * @param snapshot The snapshot identifier for the blob, pass {@code null} to interact with the blob directly.
     * @param customerProvidedKey Customer provided key used during encryption of the blob's data on the server, pass
     * {@code null} to allow the service to use its own encryption.
     * @param encryptionScope Encryption scope used during encryption of the blob's data on the server, pass
     * {@code null} to allow the service to use its own encryption.
     * @param versionId The version identifier for the blob, pass {@code null} to interact with the latest blob version.
     */
    protected BlobAsyncClient(HttpPipeline pipeline, String url, BlobServiceVersion serviceVersion, String accountName,
                              String containerName, String blobName, String snapshot, CpkInfo customerProvidedKey,
                              EncryptionScope encryptionScope, String versionId) {
        super(pipeline, url, serviceVersion, accountName, containerName, blobName, snapshot, customerProvidedKey,
            encryptionScope, versionId);
    }

    /**
     * Creates a new {@link BlobAsyncClient} linked to the {@code snapshot} of this blob resource.
     *
     * @param snapshot the identifier for a specific snapshot of this blob
     * @return A {@link BlobAsyncClient} used to interact with the specific snapshot.
     */
    @Override
    public BlobAsyncClient getSnapshotClient(String snapshot) {
        return new BlobAsyncClient(getHttpPipeline(), getBlobUrl(), getServiceVersion(), getAccountName(),
            getContainerName(), getBlobName(), snapshot, getCustomerProvidedKey(), encryptionScope, getVersionId());
    }

    /**
     * Creates a new {@link BlobAsyncClient} linked to the {@code versionId} of this blob resource.
     *
     * @param versionId the identifier for a specific version of this blob,
     * pass {@code null} to interact with the latest blob version.
     * @return A {@link BlobAsyncClient} used to interact with the specific version.
     */
    @Override
    public BlobAsyncClient getVersionClient(String versionId) {
        return new BlobAsyncClient(getHttpPipeline(), getBlobUrl(), getServiceVersion(), getAccountName(),
            getContainerName(), getBlobName(), getSnapshotId(), getCustomerProvidedKey(), encryptionScope, versionId);
    }

    /**
     * Creates a new {@link AppendBlobAsyncClient} associated with this blob.
     *
     * @return A {@link AppendBlobAsyncClient} associated with this blob.
     */
    public AppendBlobAsyncClient getAppendBlobAsyncClient() {
        return prepareBuilder().buildAppendBlobAsyncClient();
    }

    /**
     * Creates a new {@link BlockBlobAsyncClient} associated with this blob.
     *
     * @return A {@link BlockBlobAsyncClient} associated with this blob.
     */
    public BlockBlobAsyncClient getBlockBlobAsyncClient() {
        return prepareBuilder().buildBlockBlobAsyncClient();
    }

    /**
     * Creates a new {@link PageBlobAsyncClient} associated with this blob.
     *
     * @return A {@link PageBlobAsyncClient} associated with this blob.
     */
    public PageBlobAsyncClient getPageBlobAsyncClient() {
        return prepareBuilder().buildPageBlobAsyncClient();
    }

    private SpecializedBlobClientBuilder prepareBuilder() {
        SpecializedBlobClientBuilder builder = new SpecializedBlobClientBuilder()
            .pipeline(getHttpPipeline())
            .endpoint(getBlobUrl())
            .snapshot(getSnapshotId())
            .serviceVersion(getServiceVersion());

        CpkInfo cpk = getCustomerProvidedKey();
        if (cpk != null) {
            builder.customerProvidedKey(new CustomerProvidedKey(cpk.getEncryptionKey()));
        }

        if (encryptionScope != null) {
            builder.encryptionScope(encryptionScope.getEncryptionScope());
        }

        return builder;
    }

    /**
     * Creates a new block blob. By default this method will not overwrite an existing blob.
     * <p>
     * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not supported
     * with this method; the content of the existing blob is overwritten with the new content. To perform a partial
     * update of a block blob's, use {@link BlockBlobAsyncClient#stageBlock(String, Flux, long) stageBlock} and {@link
     * BlockBlobAsyncClient#commitBlockList(List) commitBlockList}. For more information, see the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs for Put Block</a> and the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs for Put Block List</a>.
     * <p>
     * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods when
     * retries are enabled, and the length of the data need not be known in advance. Therefore, this method does
     * support uploading any arbitrary data source, including network streams. This behavior is possible because this
     * method will perform some internal buffering as configured by the blockSize and numBuffers parameters, so while
     * this method may offer additional convenience, it will not be as performant as other options, which should be
     * preferred when possible.
     * <p>
     * Typically, the greater the number of buffers used, the greater the possible parallelism when transferring the
     * data. Larger buffers means we will have to stage fewer blocks and therefore require fewer IO operations. The
     * trade-offs between these values are context-dependent, so some experimentation may be required to optimize inputs
     * for a given scenario.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.upload#Flux-ParallelTransferOptions}
     *
     * @param data The data to write to the blob. Unlike other upload methods, this method does not require that the
     * {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected
     * to produce the same values across subscriptions.
     * @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
     * @return A reactive response containing the information of the uploaded block blob.
     */
    public Mono<BlockBlobItem> upload(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions) {
        try {
            return upload(data, parallelTransferOptions, false);
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Creates a new block blob, or updates the content of an existing block blob.
     * <p>
     * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not supported
     * with this method; the content of the existing blob is overwritten with the new content. To perform a partial
     * update of a block blob's, use {@link BlockBlobAsyncClient#stageBlock(String, Flux, long) stageBlock} and {@link
     * BlockBlobAsyncClient#commitBlockList(List) commitBlockList}. For more information, see the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs for Put Block</a> and the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs for Put Block List</a>.
     * <p>
     * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods when
     * retries are enabled, and the length of the data need not be known in advance. Therefore, this method does
     * support uploading any arbitrary data source, including network streams. This behavior is possible because this
     * method will perform some internal buffering as configured by the blockSize and numBuffers parameters, so while
     * this method may offer additional convenience, it will not be as performant as other options, which should be
     * preferred when possible.
     * <p>
     * Typically, the greater the number of buffers used, the greater the possible parallelism when transferring the
     * data. Larger buffers means we will have to stage fewer blocks and therefore require fewer IO operations. The
     * trade-offs between these values are context-dependent, so some experimentation may be required to optimize inputs
     * for a given scenario.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.upload#Flux-ParallelTransferOptions-boolean}
     *
     * @param data The data to write to the blob. Unlike other upload methods, this method does not require that the
     * {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected
     * to produce the same values across subscriptions.
     * @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
     * @param overwrite Whether or not to overwrite, should the blob already exist.
     * @return A reactive response containing the information of the uploaded block blob.
     */
    public Mono<BlockBlobItem> upload(Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions,
        boolean overwrite) {
        try {
            Mono<Void> overwriteCheck;
            BlobRequestConditions requestConditions;

            if (overwrite) {
                overwriteCheck = Mono.empty();
                requestConditions = null;
            } else {
                overwriteCheck = exists().flatMap(exists -> exists
                    ? monoError(logger, new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS))
                    : Mono.empty());
                requestConditions = new BlobRequestConditions().setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD);
            }

            return overwriteCheck
                .then(uploadWithResponse(data, parallelTransferOptions, null, null, null,
                    requestConditions)).flatMap(FluxUtil::toMono);
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Creates a new block blob, or updates the content of an existing block blob.
     * <p>
     * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not supported
     * with this method; the content of the existing blob is overwritten with the new content. To perform a partial
     * update of a block blob's, use {@link BlockBlobAsyncClient#stageBlock(String, Flux, long) stageBlock} and {@link
     * BlockBlobAsyncClient#commitBlockList(List) commitBlockList}. For more information, see the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs for Put Block</a> and the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs for Put Block List</a>.
     * <p>
     * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods when
     * retries are enabled, and the length of the data need not be known in advance. Therefore, this method does
     * support uploading any arbitrary data source, including network streams. This behavior is possible because this
     * method will perform some internal buffering as configured by the blockSize and numBuffers parameters, so while
     * this method may offer additional convenience, it will not be as performant as other options, which should be
     * preferred when possible.
     * <p>
     * Typically, the greater the number of buffers used, the greater the possible parallelism when transferring the
     * data. Larger buffers means we will have to stage fewer blocks and therefore require fewer IO operations. The
     * trade-offs between these values are context-dependent, so some experimentation may be required to optimize inputs
     * for a given scenario.
     * <p>
     * To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadWithResponse#Flux-ParallelTransferOptions-BlobHttpHeaders-Map-AccessTier-BlobRequestConditions}
     *
     * <p><strong>Using Progress Reporting</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadWithResponse#Flux-ParallelTransferOptions-BlobHttpHeaders-Map-AccessTier-BlobRequestConditions.ProgressReporter}
     *
     * @param data The data to write to the blob. Unlike other upload methods, this method does not require that the
     * {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not expected
     * to produce the same values across subscriptions.
     * @param parallelTransferOptions {@link ParallelTransferOptions} used to configure buffered uploading.
     * @param headers {@link BlobHttpHeaders}
     * @param metadata Metadata to associate with the blob. If there is leading or trailing whitespace in any
     * metadata key or value, it must be removed or encoded.
     * @param tier {@link AccessTier} for the destination blob.
     * @param requestConditions {@link BlobRequestConditions}
     * @return A reactive response containing the information of the uploaded block blob.
     */
    public Mono<Response<BlockBlobItem>> uploadWithResponse(Flux<ByteBuffer> data,
        ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers, Map<String, String> metadata,
        AccessTier tier, BlobRequestConditions requestConditions) {
        return this.uploadWithResponse(new BlobParallelUploadOptions(data)
            .setParallelTransferOptions(parallelTransferOptions).setHeaders(headers).setMetadata(metadata).setTier(tier)
            .setRequestConditions(requestConditions));
    }

    /**
     * Creates a new block blob, or updates the content of an existing block blob.
     * <p>
     * Updating an existing block blob overwrites any existing metadata on the blob. Partial updates are not supported
     * with this method; the content of the existing blob is overwritten with the new content. To perform a partial
     * update of a block blob's, use {@link BlockBlobAsyncClient#stageBlock(String, Flux, long) stageBlock} and {@link
     * BlockBlobAsyncClient#commitBlockList(List) commitBlockList}. For more information, see the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block">Azure Docs for Put Block</a> and the
     * <a href="https://docs.microsoft.com/rest/api/storageservices/put-block-list">Azure Docs for Put Block List</a>.
     * <p>
     * The data passed need not support multiple subscriptions/be replayable as is required in other upload methods when
     * retries are enabled, and the length of the data need not be known in advance. Therefore, this method does
     * support uploading any arbitrary data source, including network streams. This behavior is possible because this
     * method will perform some internal buffering as configured by the blockSize and numBuffers parameters, so while
     * this method may offer additional convenience, it will not be as performant as other options, which should be
     * preferred when possible.
     * <p>
     * Typically, the greater the number of buffers used, the greater the possible parallelism when transferring the
     * data. Larger buffers means we will have to stage fewer blocks and therefore require fewer IO operations. The
     * trade-offs between these values are context-dependent, so some experimentation may be required to optimize inputs
     * for a given scenario.
     * <p>
     * To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadWithResponse#BlobParallelUploadOptions}
     *
     * <p><strong>Using Progress Reporting</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadWithResponse#BlobParallelUploadOptions}
     *
     * @param options {@link BlobParallelUploadOptions}. Unlike other upload methods, this method does not require that
     * the {@code Flux} be replayable. In other words, it does not have to support multiple subscribers and is not
     * expected to produce the same values across subscriptions.
     * @return A reactive response containing the information of the uploaded block blob.
     */
    public Mono<Response<BlockBlobItem>> uploadWithResponse(BlobParallelUploadOptions options) {
        try {
            StorageImplUtils.assertNotNull("options", options);

            final ParallelTransferOptions parallelTransferOptions =
                ModelHelper.populateAndApplyDefaults(options.getParallelTransferOptions());
            final BlobHttpHeaders headers = options.getHeaders();
            final Map<String, String> metadata = options.getMetadata();
            final Map<String, String> tags = options.getTags();
            final AccessTier tier = options.getTier();
            final BlobRequestConditions requestConditions = options.getRequestConditions() == null
                ? new BlobRequestConditions() : options.getRequestConditions();
            final boolean computeMd5 = options.isComputeMd5();

            BlockBlobAsyncClient blockBlobAsyncClient = getBlockBlobAsyncClient();

            Function<Flux<ByteBuffer>, Mono<Response<BlockBlobItem>>> uploadInChunksFunction = (stream) ->
                uploadInChunks(blockBlobAsyncClient, stream, parallelTransferOptions, headers, metadata, tags,
                    tier, requestConditions, computeMd5);

            BiFunction<Flux<ByteBuffer>, Long, Mono<Response<BlockBlobItem>>> uploadFullBlobFunction =
                (stream, length) -> uploadFullBlob(blockBlobAsyncClient, stream, length, parallelTransferOptions,
                    headers, metadata, tags, tier, requestConditions, computeMd5);

            Flux<ByteBuffer> data = options.getDataFlux() == null ? Utility.convertStreamToByteBuffer(
                options.getDataStream(), options.getLength(),
                // We can only buffer up to max int due to restrictions in ByteBuffer.
                (int) Math.min(Integer.MAX_VALUE, parallelTransferOptions.getBlockSizeLong()), false)
                : options.getDataFlux();
            return UploadUtils.uploadFullOrChunked(data, ModelHelper.wrapBlobOptions(parallelTransferOptions),
                uploadInChunksFunction, uploadFullBlobFunction);
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    private Mono<Response<BlockBlobItem>> uploadFullBlob(BlockBlobAsyncClient blockBlobAsyncClient,
        Flux<ByteBuffer> data, long length, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
        Map<String, String> metadata, Map<String, String> tags, AccessTier tier,
        BlobRequestConditions requestConditions, boolean computeMd5) {
        // Report progress as necessary.
        Flux<ByteBuffer> progressData = ProgressReporter.addProgressReporting(data,
            parallelTransferOptions.getProgressReceiver());

        return UploadUtils.computeMd5(progressData, computeMd5, logger)
            .map(fluxMd5Wrapper -> new BlockBlobSimpleUploadOptions(fluxMd5Wrapper.getData(), length)
                .setHeaders(headers)
                .setMetadata(metadata)
                .setTags(tags)
                .setTier(tier)
                .setRequestConditions(requestConditions)
                .setContentMd5(fluxMd5Wrapper.getMd5()))
            .flatMap(blockBlobAsyncClient::uploadWithResponse);
    }

    private Mono<Response<BlockBlobItem>> uploadInChunks(BlockBlobAsyncClient blockBlobAsyncClient,
        Flux<ByteBuffer> data, ParallelTransferOptions parallelTransferOptions, BlobHttpHeaders headers,
        Map<String, String> metadata, Map<String, String> tags, AccessTier tier,
        BlobRequestConditions requestConditions, boolean computeMd5) {
        // TODO: Sample/api reference
        // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
        AtomicLong totalProgress = new AtomicLong();
        Lock progressLock = new ReentrantLock();

        // Validation done in the constructor.
        /*
        We use maxConcurrency + 1 for the number of buffers because one buffer will typically be being filled while the
        others are being sent.
         */
        UploadBufferPool pool = new UploadBufferPool(parallelTransferOptions.getMaxConcurrency() + 1,
            parallelTransferOptions.getBlockSizeLong(), BlockBlobClient.MAX_STAGE_BLOCK_BYTES_LONG);

        Flux<ByteBuffer> chunkedSource = UploadUtils.chunkSource(data,
            ModelHelper.wrapBlobOptions(parallelTransferOptions));

        /*
         Write to the pool and upload the output.
         */
        return chunkedSource.concatMap(pool::write)
            .concatWith(Flux.defer(pool::flush))
            .flatMapSequential(bufferAggregator -> {
                // Report progress as necessary.
                Flux<ByteBuffer> progressData = ProgressReporter.addParallelProgressReporting(
                    bufferAggregator.asFlux(),
                    parallelTransferOptions.getProgressReceiver(),
                    progressLock,
                    totalProgress);

                final String blockId = Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(UTF_8));
                return UploadUtils.computeMd5(progressData, computeMd5, logger)
                    .flatMap(fluxMd5Wrapper -> blockBlobAsyncClient.stageBlockWithResponse(blockId,
                        fluxMd5Wrapper.getData(), bufferAggregator.length(), fluxMd5Wrapper.getMd5(),
                        requestConditions.getLeaseId()))
                    // We only care about the stageBlock insofar as it was successful,
                    // but we need to collect the ids.
                    .map(x -> blockId)
                    .doFinally(x -> pool.returnBuffer(bufferAggregator))
                    .flux();
            }, parallelTransferOptions.getMaxConcurrency())
            .collect(Collectors.toList())
            .flatMap(ids ->
                blockBlobAsyncClient.commitBlockListWithResponse(new BlockBlobCommitBlockListOptions(ids)
                    .setHeaders(headers).setMetadata(metadata).setTags(tags).setTier(tier)
                        .setRequestConditions(requestConditions)));
    }

    /**
     * Creates a new block blob with the content of the specified file. By default this method will not overwrite an
     * existing blob.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadFromFile#String}
     *
     * @param filePath Path to the upload file
     * @return An empty response
     * @throws UncheckedIOException If an I/O error occurs
     */
    public Mono<Void> uploadFromFile(String filePath) {
        try {
            return uploadFromFile(filePath, false);
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Creates a new block blob, or updates the content of an existing block blob, with the content of the specified
     * file.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadFromFile#String-boolean}
     *
     * @param filePath Path to the upload file
     * @param overwrite Whether or not to overwrite, should the blob already exist.
     * @return An empty response
     * @throws UncheckedIOException If an I/O error occurs
     */
    public Mono<Void> uploadFromFile(String filePath, boolean overwrite) {
        try {
            Mono<Void> overwriteCheck = Mono.empty();
            BlobRequestConditions requestConditions = null;

            // Note that if the file will be uploaded using a putBlob, we also can skip the exists check.
            if (!overwrite) {
                if (UploadUtils.shouldUploadInChunks(filePath,
                    BlockBlobAsyncClient.MAX_UPLOAD_BLOB_BYTES_LONG, logger)) {
                    overwriteCheck = exists().flatMap(exists -> exists
                        ? monoError(logger, new IllegalArgumentException(Constants.BLOB_ALREADY_EXISTS))
                        : Mono.empty());
                }

                requestConditions = new BlobRequestConditions().setIfNoneMatch(Constants.HeaderConstants.ETAG_WILDCARD);
            }

            return overwriteCheck.then(uploadFromFile(filePath, null, null, null, null, requestConditions));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Creates a new block blob, or updates the content of an existing block blob, with the content of the specified
     * file.
     * <p>
     * To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadFromFile#String-ParallelTransferOptions-BlobHttpHeaders-Map-AccessTier-BlobRequestConditions}
     *
     * @param filePath Path to the upload file
     * @param parallelTransferOptions {@link ParallelTransferOptions} to use to upload from file. Number of parallel
     * transfers parameter is ignored.
     * @param headers {@link BlobHttpHeaders}
     * @param metadata Metadata to associate with the blob. If there is leading or trailing whitespace in any
     * metadata key or value, it must be removed or encoded.
     * @param tier {@link AccessTier} for the destination blob.
     * @param requestConditions {@link BlobRequestConditions}
     * @return An empty response
     * @throws UncheckedIOException If an I/O error occurs
     */
    public Mono<Void> uploadFromFile(String filePath, ParallelTransferOptions parallelTransferOptions,
        BlobHttpHeaders headers, Map<String, String> metadata, AccessTier tier,
        BlobRequestConditions requestConditions) {
        return this.uploadFromFileWithResponse(new BlobUploadFromFileOptions(filePath)
            .setParallelTransferOptions(parallelTransferOptions).setHeaders(headers).setMetadata(metadata)
            .setTier(tier).setRequestConditions(requestConditions))
            .then();
    }

    /**
     * Creates a new block blob, or updates the content of an existing block blob, with the content of the specified
     * file.
     * <p>
     * To avoid overwriting, pass "*" to {@link BlobRequestConditions#setIfNoneMatch(String)}.
     *
     * <p><strong>Code Samples</strong></p>
     *
     * {@codesnippet com.azure.storage.blob.BlobAsyncClient.uploadFromFileWithResponse#BlobUploadFromFileOptions}
     *
     * @param options {@link BlobUploadFromFileOptions}
     * @return A reactive response containing the information of the uploaded block blob.
     * @throws UncheckedIOException If an I/O error occurs
     */
    public Mono<Response<BlockBlobItem>> uploadFromFileWithResponse(BlobUploadFromFileOptions options) {
        StorageImplUtils.assertNotNull("options", options);
        Long originalBlockSize = (options.getParallelTransferOptions() == null)
            ? null
            : options.getParallelTransferOptions().getBlockSizeLong();
        final ParallelTransferOptions finalParallelTransferOptions =
            ModelHelper.populateAndApplyDefaults(options.getParallelTransferOptions());
        try {
            return Mono.using(() -> UploadUtils.uploadFileResourceSupplier(options.getFilePath(), logger),
                channel -> {
                    try {
                        BlockBlobAsyncClient blockBlobAsyncClient = getBlockBlobAsyncClient();
                        long fileSize = channel.size();

                        // If the file is larger than 256MB chunk it and stage it as blocks.
                        if (UploadUtils.shouldUploadInChunks(options.getFilePath(),
                            finalParallelTransferOptions.getMaxSingleUploadSizeLong(), logger)) {
                            return uploadFileChunks(fileSize, finalParallelTransferOptions, originalBlockSize,
                                options.getHeaders(), options.getMetadata(), options.getTags(),
                                options.getTier(), options.getRequestConditions(), channel,
                                blockBlobAsyncClient);
                        } else {
                            // Otherwise we know it can be sent in a single request reducing network overhead.
                            Flux<ByteBuffer> data = FluxUtil.readFile(channel);
                            if (finalParallelTransferOptions.getProgressReceiver() != null) {
                                data = ProgressReporter.addProgressReporting(data,
                                    finalParallelTransferOptions.getProgressReceiver());
                            }
                            return blockBlobAsyncClient.uploadWithResponse(
                                new BlockBlobSimpleUploadOptions(data, fileSize).setHeaders(options.getHeaders())
                                    .setMetadata(options.getMetadata()).setTags(options.getTags())
                                    .setTier(options.getTier())
                                    .setRequestConditions(options.getRequestConditions()));
                        }
                    } catch (IOException ex) {
                        return Mono.error(ex);
                    }
                },
                channel ->
                UploadUtils.uploadFileCleanup(channel, logger));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    private Mono<Response<BlockBlobItem>> uploadFileChunks(
        long fileSize, ParallelTransferOptions parallelTransferOptions,
        Long originalBlockSize, BlobHttpHeaders headers, Map<String, String> metadata, Map<String, String> tags,
        AccessTier tier, BlobRequestConditions requestConditions, AsynchronousFileChannel channel,
        BlockBlobAsyncClient client) {
        final BlobRequestConditions finalRequestConditions = (requestConditions == null)
            ? new BlobRequestConditions() : requestConditions;
        // parallelTransferOptions are finalized in the calling method.

        // See ProgressReporter for an explanation on why this lock is necessary and why we use AtomicLong.
        AtomicLong totalProgress = new AtomicLong();
        Lock progressLock = new ReentrantLock();

        final SortedMap<Long, String> blockIds = new TreeMap<>();
        return Flux.fromIterable(sliceFile(fileSize, originalBlockSize, parallelTransferOptions.getBlockSizeLong()))
            .flatMap(chunk -> {
                String blockId = getBlockID();
                blockIds.put(chunk.getOffset(), blockId);

                Flux<ByteBuffer> progressData = ProgressReporter.addParallelProgressReporting(
                    FluxUtil.readFile(channel, chunk.getOffset(), chunk.getCount()),
                    parallelTransferOptions.getProgressReceiver(), progressLock, totalProgress);

                return client.stageBlockWithResponse(blockId, progressData, chunk.getCount(), null,
                    finalRequestConditions.getLeaseId());
            }, parallelTransferOptions.getMaxConcurrency())
            .then(Mono.defer(() -> client.commitBlockListWithResponse(
                new BlockBlobCommitBlockListOptions(new ArrayList<>(blockIds.values()))
                    .setHeaders(headers).setMetadata(metadata).setTags(tags).setTier(tier)
                    .setRequestConditions(finalRequestConditions))));
    }

    /**
     * RESERVED FOR INTERNAL USE.
     *
     * Resource Supplier for UploadFile.
     *
     * @param filePath The path for the file
     * @return {@code AsynchronousFileChannel}
     * @throws UncheckedIOException an input output exception.
     * @deprecated due to refactoring code to be in the common storage library.
     */
    @Deprecated
    protected AsynchronousFileChannel uploadFileResourceSupplier(String filePath) {
        return UploadUtils.uploadFileResourceSupplier(filePath, logger);
    }

    private String getBlockID() {
        return Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
    }

    private List<BlobRange> sliceFile(long fileSize, Long originalBlockSize, long blockSize) {
        List<BlobRange> ranges = new ArrayList<>();
        if (fileSize > 100 * Constants.MB && originalBlockSize == null) {
            blockSize = BLOB_DEFAULT_HTBB_UPLOAD_BLOCK_SIZE;
        }
        for (long pos = 0; pos < fileSize; pos += blockSize) {
            long count = blockSize;
            if (pos + count > fileSize) {
                count = fileSize - pos;
            }
            ranges.add(new BlobRange(pos, count));
        }
        return ranges;
    }
}