StorageFileInputStream.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.storage.file;
import com.azure.core.implementation.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.common.Constants;
import com.azure.storage.common.StorageInputStream;
import com.azure.storage.file.models.FileRange;
import com.azure.storage.file.models.StorageException;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* Provides an input stream to read a given storage file resource.
*/
public class StorageFileInputStream extends StorageInputStream {
final ClientLogger logger = new ClientLogger(StorageFileInputStream.class);
private final FileAsyncClient fileAsyncClient;
/**
* Initializes a new instance of the StorageFileInputStream class.
*
* @param fileAsyncClient A {@link FileClient} object which represents the blob that this stream is associated with.
* @throws StorageException An exception representing any error which occurred during the operation.
*/
StorageFileInputStream(final FileAsyncClient fileAsyncClient)
throws StorageException {
this(fileAsyncClient, 0, null);
}
/**
* Initializes a new instance of the StorageFileInputStream class. Note that if {@code fileRangeOffset} is not
* {@code 0} or {@code fileRangeLength} is not {@code null}, there will be no content MD5 verification.
*
* @param fileAsyncClient A {@link FileAsyncClient} object which represents the blob
* that this stream is associated with.
* @param fileRangeOffset The offset of file range data to begin stream.
* @param fileRangeLength How much data the stream should return after fileRangeOffset.
* @throws StorageException An exception representing any error which occurred during the operation.
*/
StorageFileInputStream(final FileAsyncClient fileAsyncClient, long fileRangeOffset, Long fileRangeLength)
throws StorageException {
super(fileRangeOffset, fileRangeLength, 4 * Constants.MB,
fileAsyncClient.getProperties().block().getContentLength());
this.fileAsyncClient = fileAsyncClient;
}
/**
* Dispatches a read operation of N bytes.
*
* @param readLength An <code>int</code> which represents the number of bytes to read.
*/
@Override
protected synchronized ByteBuffer dispatchRead(final int readLength, final long offset) {
try {
ByteBuffer currentBuffer = this.fileAsyncClient.downloadWithPropertiesWithResponse(new FileRange(offset,
offset + readLength - 1), false)
.flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getValue().getBody())
.map(ByteBuffer::wrap))
.block();
this.bufferSize = readLength;
this.bufferStartOffset = offset;
return currentBuffer;
} catch (final StorageException e) {
this.streamFaulted = true;
this.lastError = new IOException(e);
throw logger.logExceptionAsError(new RuntimeException(this.lastError.getMessage()));
}
}
}