Class FluxUtil

java.lang.Object
com.azure.core.util.FluxUtil

public final class FluxUtil extends Object
Utility type exposing methods to deal with Flux.
  • Method Details

    • isFluxByteBuffer

      public static boolean isFluxByteBuffer(Type entityType)
      Checks if a type is Flux<ByteBuffer>.
      Parameters:
      entityType - the type to check
      Returns:
      whether the type represents a Flux that emits ByteBuffer
    • addProgressReporting

      public static Flux<ByteBuffer> addProgressReporting(Flux<ByteBuffer> flux, ProgressReporter progressReporter)
      Adds progress reporting to the provided Flux of ByteBuffer.

      Each ByteBuffer that's emitted from the Flux will report Buffer.remaining().

      When Flux is resubscribed the progress is reset. If the flux is not replayable, resubscribing can result in empty or partial data then progress reporting might not be accurate.

      If ProgressReporter is not provided, i.e. is null, then this method returns unmodified Flux.

      Parameters:
      flux - A Flux to report progress on.
      progressReporter - Optional ProgressReporter.
      Returns:
      A Flux that reports progress, or original Flux if ProgressReporter is not provided.
    • collectBytesInByteBufferStream

      public static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream)
      Collects ByteBuffers emitted by a Flux into a byte array.
      Parameters:
      stream - A stream which emits ByteBuffer instances.
      Returns:
      A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.
      Throws:
      IllegalStateException - If the combined size of the emitted ByteBuffers is greater than Integer.MAX_VALUE.
    • collectBytesInByteBufferStream

      public static Mono<byte[]> collectBytesInByteBufferStream(Flux<ByteBuffer> stream, int sizeHint)
      Collects ByteBuffers emitted by a Flux into a byte array.

      Unlike collectBytesInByteBufferStream(Flux), this method accepts a second parameter sizeHint. This size hint allows for optimizations when creating the initial buffer to reduce the number of times it needs to be resized while concatenating emitted ByteBuffers.

      Parameters:
      stream - A stream which emits ByteBuffer instances.
      sizeHint - A hint about the expected stream size.
      Returns:
      A Mono which emits the concatenation of all the ByteBuffer instances given by the source Flux.
      Throws:
      IllegalArgumentException - If sizeHint is equal to or less than 0.
      IllegalStateException - If the combined size of the emitted ByteBuffers is greater than Integer.MAX_VALUE.
    • collectBytesFromNetworkResponse

      public static Mono<byte[]> collectBytesFromNetworkResponse(Flux<ByteBuffer> stream, HttpHeaders headers)
      Collects ByteBuffers returned in a network response into a byte array.

      The headers are inspected for containing an Content-Length which determines if a size hinted collection, collectBytesInByteBufferStream(Flux, int), or default collection, collectBytesInByteBufferStream(Flux), will be used.

      Parameters:
      stream - A network response ByteBuffer stream.
      headers - The HTTP headers of the response.
      Returns:
      A Mono which emits the collected network response ByteBuffers.
      Throws:
      NullPointerException - If headers is null.
      IllegalStateException - If the size of the network response is greater than Integer.MAX_VALUE.
    • byteBufferToArray

      public static byte[] byteBufferToArray(ByteBuffer byteBuffer)
      Gets the content of the provided ByteBuffer as a byte array. This method will create a new byte array even if the ByteBuffer can have optionally backing array.
      Parameters:
      byteBuffer - the byte buffer
      Returns:
      the byte array
    • createRetriableDownloadFlux

      public static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries)
      Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.
      Parameters:
      downloadSupplier - Supplier of the initial download.
      onDownloadErrorResume - BiFunction of Throwable and Long which is used to resume downloading when an error occurs.
      maxRetries - The maximum number of times a download can be resumed when an error occurs.
      Returns:
      A Flux that downloads reliably.
    • createRetriableDownloadFlux

      public static Flux<ByteBuffer> createRetriableDownloadFlux(Supplier<Flux<ByteBuffer>> downloadSupplier, BiFunction<Throwable,Long,Flux<ByteBuffer>> onDownloadErrorResume, int maxRetries, long position)
      Creates a Flux that is capable of resuming a download by applying retry logic when an error occurs.
      Parameters:
      downloadSupplier - Supplier of the initial download.
      onDownloadErrorResume - BiFunction of Throwable and Long which is used to resume downloading when an error occurs.
      maxRetries - The maximum number of times a download can be resumed when an error occurs.
      position - The initial offset for the download.
      Returns:
      A Flux that downloads reliably.
    • toFluxByteBuffer

      public static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream)
      Converts an InputStream into a Flux of ByteBuffer using a chunk size of 4096.

      Given that InputStream is not guaranteed to be replayable the returned Flux should be considered non-replayable as well.

      If the passed InputStream is null Flux.empty() will be returned.

      Parameters:
      inputStream - The InputStream to convert into a Flux.
      Returns:
      A Flux of ByteBuffers that contains the contents of the stream.
    • toFluxByteBuffer

      public static Flux<ByteBuffer> toFluxByteBuffer(InputStream inputStream, int chunkSize)
      Converts an InputStream into a Flux of ByteBuffer.

      Given that InputStream is not guaranteed to be replayable the returned Flux should be considered non-replayable as well.

      If the passed InputStream is null Flux.empty() will be returned.

      Parameters:
      inputStream - The InputStream to convert into a Flux.
      chunkSize - The requested size for each ByteBuffer.
      Returns:
      A Flux of ByteBuffers that contains the contents of the stream.
      Throws:
      IllegalArgumentException - If chunkSize is less than or equal to 0.
    • withContext

      public static <T> Mono<T> withContext(Function<Context,Mono<T>> serviceCall)
      This method converts the incoming deferContextual from Reactor Context to Azure Context and calls the given lambda function with this context and returns a single entity of type T

      If the reactor context is empty, Context.NONE will be used to call the lambda function

      Code samples

       String prefix = "Hello, ";
       Mono<String> response = FluxUtil
           .withContext(context -> serviceCallReturnsSingle(prefix, context));
       
      Type Parameters:
      T - The type of response returned from the service call
      Parameters:
      serviceCall - The lambda function that makes the service call into which azure context will be passed
      Returns:
      The response from service call
    • withContext

      public static <T> Mono<T> withContext(Function<Context,Mono<T>> serviceCall, Map<String,String> contextAttributes)
      This method converts the incoming deferContextual from Reactor Context to Azure Context, adds the specified context attributes and calls the given lambda function with this context and returns a single entity of type T

      If the reactor context is empty, Context.NONE will be used to call the lambda function

      Type Parameters:
      T - The type of response returned from the service call
      Parameters:
      serviceCall - serviceCall The lambda function that makes the service call into which azure context will be passed
      contextAttributes - The map of attributes sent by the calling method to be set on Context.
      Returns:
      The response from service call
    • toMono

      public static <T> Mono<T> toMono(Response<T> response)
      Converts the incoming content to Mono.
      Type Parameters:
      T - The type of the Response, which will be returned in the Mono.
      Parameters:
      response - whose value is to be converted
      Returns:
      The converted Mono
    • monoError

      public static <T> Mono<T> monoError(ClientLogger logger, RuntimeException ex)
      Propagates a RuntimeException through the error channel of Mono.
      Type Parameters:
      T - The return type.
      Parameters:
      logger - The ClientLogger to log the exception.
      ex - The RuntimeException.
      Returns:
      A Mono that terminates with error wrapping the RuntimeException.
    • monoError

      public static <T> Mono<T> monoError(LoggingEventBuilder logBuilder, RuntimeException ex)
      Propagates a RuntimeException through the error channel of Mono.
      Type Parameters:
      T - The return type.
      Parameters:
      logBuilder - The LoggingEventBuilder with context to log the exception.
      ex - The RuntimeException.
      Returns:
      A Mono that terminates with error wrapping the RuntimeException.
    • fluxError

      public static <T> Flux<T> fluxError(ClientLogger logger, RuntimeException ex)
      Propagates a RuntimeException through the error channel of Flux.
      Type Parameters:
      T - The return type.
      Parameters:
      logger - The ClientLogger to log the exception.
      ex - The RuntimeException.
      Returns:
      A Flux that terminates with error wrapping the RuntimeException.
    • pagedFluxError

      public static <T> PagedFlux<T> pagedFluxError(ClientLogger logger, RuntimeException ex)
      Propagates a RuntimeException through the error channel of PagedFlux.
      Type Parameters:
      T - The return type.
      Parameters:
      logger - The ClientLogger to log the exception.
      ex - The RuntimeException.
      Returns:
      A PagedFlux that terminates with error wrapping the RuntimeException.
    • fluxContext

      public static <T> Flux<T> fluxContext(Function<Context,Flux<T>> serviceCall)
      This method converts the incoming deferContextual from Reactor Context to Azure Context and calls the given lambda function with this context and returns a collection of type T

      If the reactor context is empty, Context.NONE will be used to call the lambda function

      Code samples

       String prefix = "Hello, ";
       Flux<String> response = FluxUtil
           .fluxContext(context -> serviceCallReturnsCollection(prefix, context));
       
      Type Parameters:
      T - The type of response returned from the service call
      Parameters:
      serviceCall - The lambda function that makes the service call into which the context will be passed
      Returns:
      The response from service call
    • toReactorContext

      public static Context toReactorContext(Context context)
      Converts an Azure context to Reactor context. If the Azure context is null or empty, Context.empty() will be returned.
      Parameters:
      context - The Azure context.
      Returns:
      The Reactor context.
    • writeToOutputStream

      public static Mono<Void> writeToOutputStream(Flux<ByteBuffer> content, OutputStream stream)
      Writes the ByteBuffers emitted by a Flux of ByteBuffer to an OutputStream.

      The stream is not closed by this call, closing of the stream is managed by the caller.

      The response Mono will emit an error if content or stream are null. Additionally, an error will be emitted if an exception occurs while writing the content to the stream.

      Parameters:
      content - The Flux of ByteBuffer content.
      stream - The OutputStream being written into.
      Returns:
      A Mono which emits a completion status once the Flux has been written to the OutputStream, or an error status if writing fails.
    • writeFile

      public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile)
      Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousFileChannel.

      The outFile is not closed by this call, closing of the outFile is managed by the caller.

      The response Mono will emit an error if content or outFile are null. Additionally, an error will be emitted if the outFile wasn't opened with the proper open options, such as StandardOpenOption.WRITE.

      Parameters:
      content - The Flux of ByteBuffer content.
      outFile - The AsynchronousFileChannel.
      Returns:
      A Mono which emits a completion status once the Flux has been written to the AsynchronousFileChannel.
      Throws:
      NullPointerException - When content is null.
      NullPointerException - When outFile is null.
    • writeFile

      public static Mono<Void> writeFile(Flux<ByteBuffer> content, AsynchronousFileChannel outFile, long position)
      Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousFileChannel starting at the given position in the file.

      The outFile is not closed by this call, closing of the outFile is managed by the caller.

      The response Mono will emit an error if content or outFile are null or position is less than 0. Additionally, an error will be emitted if the outFile wasn't opened with the proper open options, such as StandardOpenOption.WRITE.

      Parameters:
      content - The Flux of ByteBuffer content.
      outFile - The AsynchronousFileChannel.
      position - The position in the file to begin writing the content.
      Returns:
      A Mono which emits a completion status once the Flux has been written to the AsynchronousFileChannel.
      Throws:
      NullPointerException - When content is null.
      NullPointerException - When outFile is null.
      IllegalArgumentException - When position is negative.
    • writeToAsynchronousByteChannel

      public static Mono<Void> writeToAsynchronousByteChannel(Flux<ByteBuffer> content, AsynchronousByteChannel channel)
      Writes the ByteBuffers emitted by a Flux of ByteBuffer to an AsynchronousByteChannel.

      The channel is not closed by this call, closing of the channel is managed by the caller.

      The response Mono will emit an error if content or channel are null.

      Parameters:
      content - The Flux of ByteBuffer content.
      channel - The AsynchronousByteChannel.
      Returns:
      A Mono which emits a completion status once the Flux has been written to the AsynchronousByteChannel.
      Throws:
      NullPointerException - When content is null.
      NullPointerException - When channel is null.
    • writeToWritableByteChannel

      public static Mono<Void> writeToWritableByteChannel(Flux<ByteBuffer> content, WritableByteChannel channel)
      Writes the ByteBuffers emitted by a Flux of ByteBuffer to an WritableByteChannel.

      The channel is not closed by this call, closing of the channel is managed by the caller.

      The response Mono will emit an error if content or channel are null.

      Parameters:
      content - The Flux of ByteBuffer content.
      channel - The WritableByteChannel.
      Returns:
      A Mono which emits a completion status once the Flux has been written to the WritableByteChannel.
      Throws:
      NullPointerException - When content is null.
      NullPointerException - When channel is null.
    • readFile

      public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length)
      Creates a Flux from an AsynchronousFileChannel which reads part of a file into chunks of the given size.
      Parameters:
      fileChannel - The file channel.
      chunkSize - the size of file chunks to read.
      offset - The offset in the file to begin reading.
      length - The number of bytes to read from the file.
      Returns:
      the Flux.
    • readFile

      public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel, long offset, long length)
      Creates a Flux from an AsynchronousFileChannel which reads part of a file.
      Parameters:
      fileChannel - The file channel.
      offset - The offset in the file to begin reading.
      length - The number of bytes to read from the file.
      Returns:
      the Flux.
    • readFile

      public static Flux<ByteBuffer> readFile(AsynchronousFileChannel fileChannel)
      Creates a Flux from an AsynchronousFileChannel which reads the entire file.
      Parameters:
      fileChannel - The file channel.
      Returns:
      The AsyncInputStream.