StorageOutputStream.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.storage.common;
import com.azure.core.util.logging.ClientLogger;
import com.azure.storage.common.implementation.Constants;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.NonNull;
import java.io.IOException;
import java.io.OutputStream;
/**
* StorageOutputStream allows for uploading data to an Azure Storage service using stream concepts.
*/
public abstract class StorageOutputStream extends OutputStream {
final ClientLogger logger = new ClientLogger(StorageOutputStream.class);
/*
* Holds the write threshold of number of bytes to buffer prior to dispatching a write. For block blob this is the
* block size, for page blob this is the Page commit size.
*/
private final int writeThreshold;
/*
* Holds the last exception this stream encountered.
*/
protected volatile IOException lastError;
protected abstract Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset);
protected StorageOutputStream(final int writeThreshold) {
this.writeThreshold = writeThreshold;
}
/**
* Writes the data to the buffer and triggers writes to the service as needed.
*
* @param data A <code>byte</code> array which represents the data to write.
* @param offset An <code>int</code> which represents the start offset in the data.
* @param length An <code>int</code> which represents the number of bytes to write.
*/
protected void writeInternal(final byte[] data, int offset, int length) {
int chunks = (int) (Math.ceil((double) length / (double) this.writeThreshold));
Flux.range(0, chunks).map(c -> offset + c * this.writeThreshold)
.concatMap(pos -> processChunk(data, pos, offset, length))
.then()
.block();
}
private Mono<Void> processChunk(byte[] data, int position, int offset, int length) {
int chunkLength = this.writeThreshold;
if (position + chunkLength > offset + length) {
chunkLength = offset + length - position;
}
// Flux<ByteBuffer> chunkData = new ByteBufferStreamFromByteArray(data, writeThreshold, position, chunkLength);
return dispatchWrite(data, chunkLength, position)
.doOnError(t -> {
if (t instanceof IOException) {
lastError = (IOException) t;
} else {
lastError = new IOException(t);
}
});
}
/**
* Helper function to check if the stream is faulted, if it is it surfaces the exception.
*
* @throws RuntimeException If an I/O error occurs. In particular, an IOException may be thrown
* if the output stream has been closed.
*/
protected void checkStreamState() {
if (this.lastError != null) {
throw logger.logExceptionAsError(new RuntimeException(this.lastError.getMessage()));
}
}
/**
* Flushes this output stream and forces any buffered output bytes to be written out. If any data remains in the
* buffer it is committed to the service.
*/
@Override
public void flush() {
this.checkStreamState();
}
/**
* Writes <code>b.length</code> bytes from the specified byte array to this output stream.
* <p>
*
* @param data A <code>byte</code> array which represents the data to write.
*/
@Override
public void write(@NonNull final byte[] data) {
this.write(data, 0, data.length);
}
/**
* Writes length bytes from the specified byte array starting at offset to this output stream.
* <p>
*
* @param data A <code>byte</code> array which represents the data to write.
* @param offset An <code>int</code> which represents the start offset in the data.
* @param length An <code>int</code> which represents the number of bytes to write.
* @throws IndexOutOfBoundsException when access the bytes out of the bound.
*/
@Override
public void write(@NonNull final byte[] data, final int offset, final int length) {
if (offset < 0 || length < 0 || length > data.length - offset) {
throw logger.logExceptionAsError(new IndexOutOfBoundsException());
}
this.writeInternal(data, offset, length);
}
/**
* Writes the specified byte to this output stream. The general contract for write is that one byte is written to
* the output stream. The byte to be written is the eight low-order bits of the argument b. The 24 high-order bits
* of b are ignored.
* <p>
* <code>true</code> is acceptable for you.
*
* @param byteVal An <code>int</code> which represents the byte value to write.
*/
@Override
public void write(final int byteVal) {
this.write(new byte[]{(byte) (byteVal & 0xFF)});
}
/**
* Closes this output stream and releases any system resources associated with this stream. If any data remains in
* the buffer it is committed to the service.
*
* @throws IOException If an I/O error occurs.
*/
@Override
public synchronized void close() throws IOException {
try {
// if the user has already closed the stream, this will throw a STREAM_CLOSED exception
// if an exception was thrown by any thread in the threadExecutor, realize it now
this.checkStreamState();
// flush any remaining data
this.flush();
} finally {
// if close() is called again, an exception will be thrown
this.lastError = new IOException(Constants.STREAM_CLOSED);
}
}
}