StorageFileOutputStream.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.storage.file;
import com.azure.storage.common.Constants;
import com.azure.storage.common.StorageOutputStream;
import com.azure.storage.file.models.StorageException;
import java.io.IOException;
import java.nio.ByteBuffer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
public class StorageFileOutputStream extends StorageOutputStream {
private long offsetPos;
private final FileAsyncClient client;
StorageFileOutputStream(final FileAsyncClient client, long offsetPos) {
super(4 * Constants.MB);
this.client = client;
this.offsetPos = offsetPos;
}
private Mono<Void> uploadData(Flux<ByteBuffer> inputData, long writeLength, long offset) {
return client.uploadWithResponse(inputData, writeLength, offset)
.then()
.onErrorResume(t -> t instanceof IOException || t instanceof StorageException, e -> {
this.lastError = new IOException(e);
return null;
});
}
@Override
protected Mono<Void> dispatchWrite(byte[] data, int writeLength, long offset) {
if (writeLength == 0) {
return Mono.empty();
}
Flux<ByteBuffer> fbb = Flux.range(0, 1)
.concatMap(pos -> Mono.fromCallable(() -> ByteBuffer.wrap(data, (int) offset, writeLength)));
long fileOffset = this.offsetPos;
this.offsetPos = this.offsetPos + writeLength;
return this.uploadData(fbb.subscribeOn(Schedulers.elastic()), writeLength, fileOffset);
}
}