View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.azure.common.implementation.util;
5   
6   import io.netty.buffer.ByteBuf;
7   import io.netty.buffer.ByteBufAllocator;
8   import io.netty.buffer.CompositeByteBuf;
9   import io.netty.buffer.Unpooled;
10  import io.netty.util.ReferenceCountUtil;
11  import org.reactivestreams.Subscriber;
12  import org.reactivestreams.Subscription;
13  import reactor.core.CoreSubscriber;
14  import reactor.core.publisher.Flux;
15  import reactor.core.publisher.Mono;
16  import reactor.core.publisher.Operators;
17  
18  import java.io.IOException;
19  import java.lang.reflect.Type;
20  import java.nio.channels.AsynchronousFileChannel;
21  import java.nio.channels.CompletionHandler;
22  import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23  import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24  
25  /**
26   * Utility type exposing methods to deal with {@link Flux}.
27   */
28  public final class FluxUtil {
29      /**
30       * Checks if a type is Flux<ByteBuf>.
31       *
32       * @param entityType the type to check
33       * @return whether the type represents a Flux that emits ByteBuf
34       */
35      public static boolean isFluxByteBuf(Type entityType) {
36          if (TypeUtil.isTypeOrSubTypeOf(entityType, Flux.class)) {
37              final Type innerType = TypeUtil.getTypeArguments(entityType)[0];
38              if (TypeUtil.isTypeOrSubTypeOf(innerType, ByteBuf.class)) {
39                  return true;
40              }
41          }
42          return false;
43      }
44  
45      /**
46       * Collects ByteBuf emitted by a Flux into a byte array.
47       * @param stream A stream which emits ByteBuf instances.
48       * @param autoReleaseEnabled if ByteBuf instances in stream gets automatically released as they consumed
49       * @return A Mono which emits the concatenation of all the ByteBuf instances given by the source Flux.
50       */
51      public static Mono<byte[]> collectBytesInByteBufStream(Flux<ByteBuf> stream, boolean autoReleaseEnabled) {
52          if (autoReleaseEnabled) {
53              // A stream is auto-release enabled means - the ByteBuf chunks in the stream get
54              // released as consumer consumes each chunk.
55              return Mono.using(Unpooled::compositeBuffer,
56                  cbb -> stream.collect(() -> cbb,
57                      (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer).retain())),
58                      ReferenceCountUtil::release)
59                      .filter((CompositeByteBuf cbb) -> cbb.isReadable())
60                      .map(FluxUtil::byteBufToArray);
61          } else {
62              return stream.collect(Unpooled::compositeBuffer,
63                  (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer)))
64                      .filter((CompositeByteBuf cbb) -> cbb.isReadable())
65                      .map(FluxUtil::byteBufToArray);
66          }
67      }
68  
69      /**
70       * Splits a ByteBuf into ByteBuf chunks.
71       *
72       * @param whole the ByteBuf to split
73       * @param chunkSize the maximum size of each ByteBuf chunk
74       * @return A stream that emits chunks of the original whole ByteBuf
75       */
76      public static Flux<ByteBuf> split(final ByteBuf whole, final int chunkSize) {
77          return Flux.generate(whole::readerIndex, (readFromIndex, synchronousSync) -> {
78              final int writerIndex = whole.writerIndex();
79              //
80              if (readFromIndex >= writerIndex) {
81                  synchronousSync.complete();
82                  return writerIndex;
83              } else {
84                  int readSize = Math.min(writerIndex - readFromIndex, chunkSize);
85                  // Netty slice operation will not increment the ref count.
86                  //
87                  // Here we invoke 'retain' on each slice, since
88                  // consumer of the returned Flux stream is responsible for
89                  // releasing each chunk as it gets consumed.
90                  //
91                  synchronousSync.next(whole.slice(readFromIndex, readSize).retain());
92                  return readFromIndex + readSize;
93              }
94          });
95      }
96  
97      /**
98       * Gets the content of the provided ByteBuf as a byte array.
99       * This method will create a new byte array even if the ByteBuf can
100      * have optionally backing array.
101      *
102      *
103      * @param byteBuf the byte buffer
104      * @return the byte array
105      */
106     public static byte[] byteBufToArray(ByteBuf byteBuf) {
107         int length = byteBuf.readableBytes();
108         byte[] byteArray = new byte[length];
109         byteBuf.getBytes(byteBuf.readerIndex(), byteArray);
110         return byteArray;
111     }
112 
113     /**
114      * Collects byte buffers emitted by a Flux into a ByteBuf.
115      *
116      * @param stream A stream which emits ByteBuf instances.
117      * @param autoReleaseEnabled if ByteBuf instances in stream gets automatically released as they consumed
118      * @return A Mono which emits the concatenation of all the byte buffers given by the source Flux.
119      */
120     public static Mono<ByteBuf> collectByteBufStream(Flux<ByteBuf> stream, boolean autoReleaseEnabled) {
121         if (autoReleaseEnabled) {
122             Mono<ByteBuf> mergedCbb = Mono.using(
123                     // Resource supplier
124                 () -> {
125                     CompositeByteBuf initialCbb = Unpooled.compositeBuffer();
126                     return initialCbb;
127                 },
128                     // source Mono creator
129                 (CompositeByteBuf initialCbb) -> {
130                     Mono<CompositeByteBuf> reducedCbb = stream.reduce(initialCbb, (CompositeByteBuf currentCbb, ByteBuf nextBb) -> {
131                         CompositeByteBuf updatedCbb = currentCbb.addComponent(nextBb.retain());
132                         return updatedCbb;
133                     });
134                     //
135                     return reducedCbb
136                             .doOnNext((CompositeByteBuf cbb) -> cbb.writerIndex(cbb.capacity()))
137                             .filter((CompositeByteBuf cbb) -> cbb.isReadable());
138                 },
139                     // Resource cleaner
140                 (CompositeByteBuf finalCbb) -> finalCbb.release());
141             return mergedCbb;
142         } else {
143             return stream.collect(Unpooled::compositeBuffer,
144                 (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer)))
145                     .filter((CompositeByteBuf cbb) -> cbb.isReadable())
146                     .map(bb -> bb);
147         }
148     }
149 
150     private static final int DEFAULT_CHUNK_SIZE = 1024 * 64;
151 
152     //region Utility methods to create Flux<ByteBuf> that read and emits chunks from AsynchronousFileChannel.
153 
154     /**
155      * Creates a {@link Flux} from an {@link AsynchronousFileChannel}
156      * which reads part of a file into chunks of the given size.
157      *
158      * @param fileChannel The file channel.
159      * @param chunkSize the size of file chunks to read.
160      * @param offset The offset in the file to begin reading.
161      * @param length The number of bytes to read from the file.
162      * @return the Flowable.
163      */
164     public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) {
165         return new ByteBufStreamFromFile(fileChannel, chunkSize, offset, length);
166     }
167 
168     /**
169      * Creates a {@link Flux} from an {@link AsynchronousFileChannel}
170      * which reads part of a file.
171      *
172      * @param fileChannel The file channel.
173      * @param offset The offset in the file to begin reading.
174      * @param length The number of bytes to read from the file.
175      * @return the Flowable.
176      */
177     public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel, long offset, long length) {
178         return byteBufStreamFromFile(fileChannel, DEFAULT_CHUNK_SIZE, offset, length);
179     }
180 
181     /**
182      * Creates a {@link Flux} from an {@link AsynchronousFileChannel}
183      * which reads the entire file.
184      *
185      * @param fileChannel The file channel.
186      * @return The AsyncInputStream.
187      */
188     public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel) {
189         try {
190             long size = fileChannel.size();
191             return byteBufStreamFromFile(fileChannel, DEFAULT_CHUNK_SIZE, 0, size);
192         } catch (IOException e) {
193             return Flux.error(e);
194         }
195     }
196     //endregion
197 
198     //region ByteBufStreamFromFile implementation
199     private static final class ByteBufStreamFromFile extends Flux<ByteBuf> {
200         private final ByteBufAllocator alloc;
201         private final AsynchronousFileChannel fileChannel;
202         private final int chunkSize;
203         private final long offset;
204         private final long length;
205 
206         ByteBufStreamFromFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) {
207             this.alloc = ByteBufAllocator.DEFAULT;
208             this.fileChannel = fileChannel;
209             this.chunkSize = chunkSize;
210             this.offset = offset;
211             this.length = length;
212         }
213 
214         @Override
215         public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
216             FileReadSubscription subscription = new FileReadSubscription(actual, fileChannel, alloc, chunkSize, offset, length);
217             actual.onSubscribe(subscription);
218         }
219 
220         static final class FileReadSubscription implements Subscription, CompletionHandler<Integer, ByteBuf> {
221             private static final int NOT_SET = -1;
222             private static final long serialVersionUID = -6831808726875304256L;
223             //
224             private final Subscriber<? super ByteBuf> subscriber;
225             private volatile long position;
226             //
227             private final AsynchronousFileChannel fileChannel;
228             private final ByteBufAllocator alloc;
229             private final int chunkSize;
230             private final long offset;
231             private final long length;
232             //
233             private volatile boolean done;
234             private Throwable error;
235             private volatile ByteBuf next;
236             private volatile boolean cancelled;
237             //
238             volatile int wip;
239             @SuppressWarnings("rawtypes")
240             static final AtomicIntegerFieldUpdater<FileReadSubscription> WIP = AtomicIntegerFieldUpdater.newUpdater(FileReadSubscription.class, "wip");
241             volatile long requested;
242             @SuppressWarnings("rawtypes")
243             static final AtomicLongFieldUpdater<FileReadSubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(FileReadSubscription.class, "requested");
244             //
245 
246             FileReadSubscription(Subscriber<? super ByteBuf> subscriber, AsynchronousFileChannel fileChannel, ByteBufAllocator alloc, int chunkSize, long offset, long length) {
247                 this.subscriber = subscriber;
248                 //
249                 this.fileChannel = fileChannel;
250                 this.alloc = alloc;
251                 this.chunkSize = chunkSize;
252                 this.offset = offset;
253                 this.length = length;
254                 //
255                 this.position = NOT_SET;
256             }
257 
258             //region Subscription implementation
259 
260             @Override
261             public void request(long n) {
262                 if (Operators.validate(n)) {
263                     Operators.addCap(REQUESTED, this, n);
264                     drain();
265                 }
266             }
267 
268             @Override
269             public void cancel() {
270                 this.cancelled = true;
271             }
272 
273             //endregion
274 
275             //region CompletionHandler implementation
276 
277             @Override
278             public void completed(Integer bytesRead, ByteBuf buffer) {
279                 if (!cancelled) {
280                     if (bytesRead == -1) {
281                         done = true;
282                     } else {
283                         // use local variable to perform fewer volatile reads
284                         long pos = position;
285                         //
286                         int bytesWanted = (int) Math.min(bytesRead, maxRequired(pos));
287                         buffer.writerIndex(bytesWanted);
288                         long position2 = pos + bytesWanted;
289                         //noinspection NonAtomicOperationOnVolatileField
290                         position = position2;
291                         next = buffer;
292                         if (position2 >= offset + length) {
293                             done = true;
294                         }
295                     }
296                     drain();
297                 }
298             }
299 
300             @Override
301             public void failed(Throwable exc, ByteBuf attachment) {
302                 if (!cancelled) {
303                     // must set error before setting done to true
304                     // so that is visible in drain loop
305                     error = exc;
306                     done = true;
307                     drain();
308                 }
309             }
310 
311             //endregion
312 
313             private void drain() {
314                 if (WIP.getAndIncrement(this) != 0) {
315                     return;
316                 }
317                 // on first drain (first request) we initiate the first read
318                 if (position == NOT_SET) {
319                     position = offset;
320                     doRead();
321                 }
322                 int missed = 1;
323                 for (;;) {
324                     if (cancelled) {
325                         return;
326                     }
327                     if (REQUESTED.get(this) > 0) {
328                         boolean emitted = false;
329                         // read d before next to avoid race
330                         boolean d = done;
331                         ByteBuf bb = next;
332                         if (bb != null) {
333                             next = null;
334                             //
335                             // try {
336                             subscriber.onNext(bb);
337                             // } finally {
338                                 // Note: Don't release here, we follow netty disposal pattern
339                                 // it's consumers responsiblity to release chunks after consumption.
340                                 //
341                                 // ReferenceCountUtil.release(bb);
342                             // }
343                             //
344                             emitted = true;
345                         } else {
346                             emitted = false;
347                         }
348                         if (d) {
349                             if (error != null) {
350                                 subscriber.onError(error);
351                                 // exit without reducing wip so that further drains will be NOOP
352                                 return;
353                             } else {
354                                 subscriber.onComplete();
355                                 // exit without reducing wip so that further drains will be NOOP
356                                 return;
357                             }
358                         }
359                         if (emitted) {
360                             // do this after checking d to avoid calling read
361                             // when done
362                             Operators.produced(REQUESTED, this, 1);
363                             //
364                             doRead();
365                         }
366                     }
367                     missed = WIP.addAndGet(this, -missed);
368                     if (missed == 0) {
369                         return;
370                     }
371                 }
372             }
373 
374             private void doRead() {
375                 // use local variable to limit volatile reads
376                 long pos = position;
377                 int readSize = Math.min(chunkSize, maxRequired(pos));
378                 ByteBuf innerBuf = alloc.buffer(readSize, readSize);
379                 fileChannel.read(innerBuf.nioBuffer(0, readSize), pos, innerBuf, this);
380             }
381 
382             private int maxRequired(long pos) {
383                 long maxRequired = offset + length - pos;
384                 if (maxRequired <= 0) {
385                     return 0;
386                 } else {
387                     int m = (int) (maxRequired);
388                     // support really large files by checking for overflow
389                     if (m < 0) {
390                         return Integer.MAX_VALUE;
391                     } else {
392                         return m;
393                     }
394                 }
395             }
396         }
397     }
398 
399     //endregion
400 
401     // Private Ctr
402     private FluxUtil() {
403     }
404 }