1
2
3
4 package com.azure.common.implementation.util;
5
6 import io.netty.buffer.ByteBuf;
7 import io.netty.buffer.ByteBufAllocator;
8 import io.netty.buffer.CompositeByteBuf;
9 import io.netty.buffer.Unpooled;
10 import io.netty.util.ReferenceCountUtil;
11 import org.reactivestreams.Subscriber;
12 import org.reactivestreams.Subscription;
13 import reactor.core.CoreSubscriber;
14 import reactor.core.publisher.Flux;
15 import reactor.core.publisher.Mono;
16 import reactor.core.publisher.Operators;
17
18 import java.io.IOException;
19 import java.lang.reflect.Type;
20 import java.nio.channels.AsynchronousFileChannel;
21 import java.nio.channels.CompletionHandler;
22 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
23 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
24
25
26
27
28 public final class FluxUtil {
29
30
31
32
33
34
35 public static boolean isFluxByteBuf(Type entityType) {
36 if (TypeUtil.isTypeOrSubTypeOf(entityType, Flux.class)) {
37 final Type innerType = TypeUtil.getTypeArguments(entityType)[0];
38 if (TypeUtil.isTypeOrSubTypeOf(innerType, ByteBuf.class)) {
39 return true;
40 }
41 }
42 return false;
43 }
44
45
46
47
48
49
50
51 public static Mono<byte[]> collectBytesInByteBufStream(Flux<ByteBuf> stream, boolean autoReleaseEnabled) {
52 if (autoReleaseEnabled) {
53
54
55 return Mono.using(Unpooled::compositeBuffer,
56 cbb -> stream.collect(() -> cbb,
57 (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer).retain())),
58 ReferenceCountUtil::release)
59 .filter((CompositeByteBuf cbb) -> cbb.isReadable())
60 .map(FluxUtil::byteBufToArray);
61 } else {
62 return stream.collect(Unpooled::compositeBuffer,
63 (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer)))
64 .filter((CompositeByteBuf cbb) -> cbb.isReadable())
65 .map(FluxUtil::byteBufToArray);
66 }
67 }
68
69
70
71
72
73
74
75
76 public static Flux<ByteBuf> split(final ByteBuf whole, final int chunkSize) {
77 return Flux.generate(whole::readerIndex, (readFromIndex, synchronousSync) -> {
78 final int writerIndex = whole.writerIndex();
79
80 if (readFromIndex >= writerIndex) {
81 synchronousSync.complete();
82 return writerIndex;
83 } else {
84 int readSize = Math.min(writerIndex - readFromIndex, chunkSize);
85
86
87
88
89
90
91 synchronousSync.next(whole.slice(readFromIndex, readSize).retain());
92 return readFromIndex + readSize;
93 }
94 });
95 }
96
97
98
99
100
101
102
103
104
105
106 public static byte[] byteBufToArray(ByteBuf byteBuf) {
107 int length = byteBuf.readableBytes();
108 byte[] byteArray = new byte[length];
109 byteBuf.getBytes(byteBuf.readerIndex(), byteArray);
110 return byteArray;
111 }
112
113
114
115
116
117
118
119
120 public static Mono<ByteBuf> collectByteBufStream(Flux<ByteBuf> stream, boolean autoReleaseEnabled) {
121 if (autoReleaseEnabled) {
122 Mono<ByteBuf> mergedCbb = Mono.using(
123
124 () -> {
125 CompositeByteBuf initialCbb = Unpooled.compositeBuffer();
126 return initialCbb;
127 },
128
129 (CompositeByteBuf initialCbb) -> {
130 Mono<CompositeByteBuf> reducedCbb = stream.reduce(initialCbb, (CompositeByteBuf currentCbb, ByteBuf nextBb) -> {
131 CompositeByteBuf updatedCbb = currentCbb.addComponent(nextBb.retain());
132 return updatedCbb;
133 });
134
135 return reducedCbb
136 .doOnNext((CompositeByteBuf cbb) -> cbb.writerIndex(cbb.capacity()))
137 .filter((CompositeByteBuf cbb) -> cbb.isReadable());
138 },
139
140 (CompositeByteBuf finalCbb) -> finalCbb.release());
141 return mergedCbb;
142 } else {
143 return stream.collect(Unpooled::compositeBuffer,
144 (cbb1, buffer) -> cbb1.addComponent(true, Unpooled.wrappedBuffer(buffer)))
145 .filter((CompositeByteBuf cbb) -> cbb.isReadable())
146 .map(bb -> bb);
147 }
148 }
149
150 private static final int DEFAULT_CHUNK_SIZE = 1024 * 64;
151
152
153
154
155
156
157
158
159
160
161
162
163
164 public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) {
165 return new ByteBufStreamFromFile(fileChannel, chunkSize, offset, length);
166 }
167
168
169
170
171
172
173
174
175
176
177 public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel, long offset, long length) {
178 return byteBufStreamFromFile(fileChannel, DEFAULT_CHUNK_SIZE, offset, length);
179 }
180
181
182
183
184
185
186
187
188 public static Flux<ByteBuf> byteBufStreamFromFile(AsynchronousFileChannel fileChannel) {
189 try {
190 long size = fileChannel.size();
191 return byteBufStreamFromFile(fileChannel, DEFAULT_CHUNK_SIZE, 0, size);
192 } catch (IOException e) {
193 return Flux.error(e);
194 }
195 }
196
197
198
199 private static final class ByteBufStreamFromFile extends Flux<ByteBuf> {
200 private final ByteBufAllocator alloc;
201 private final AsynchronousFileChannel fileChannel;
202 private final int chunkSize;
203 private final long offset;
204 private final long length;
205
206 ByteBufStreamFromFile(AsynchronousFileChannel fileChannel, int chunkSize, long offset, long length) {
207 this.alloc = ByteBufAllocator.DEFAULT;
208 this.fileChannel = fileChannel;
209 this.chunkSize = chunkSize;
210 this.offset = offset;
211 this.length = length;
212 }
213
214 @Override
215 public void subscribe(CoreSubscriber<? super ByteBuf> actual) {
216 FileReadSubscription subscription = new FileReadSubscription(actual, fileChannel, alloc, chunkSize, offset, length);
217 actual.onSubscribe(subscription);
218 }
219
220 static final class FileReadSubscription implements Subscription, CompletionHandler<Integer, ByteBuf> {
221 private static final int NOT_SET = -1;
222 private static final long serialVersionUID = -6831808726875304256L;
223
224 private final Subscriber<? super ByteBuf> subscriber;
225 private volatile long position;
226
227 private final AsynchronousFileChannel fileChannel;
228 private final ByteBufAllocator alloc;
229 private final int chunkSize;
230 private final long offset;
231 private final long length;
232
233 private volatile boolean done;
234 private Throwable error;
235 private volatile ByteBuf next;
236 private volatile boolean cancelled;
237
238 volatile int wip;
239 @SuppressWarnings("rawtypes")
240 static final AtomicIntegerFieldUpdater<FileReadSubscription> WIP = AtomicIntegerFieldUpdater.newUpdater(FileReadSubscription.class, "wip");
241 volatile long requested;
242 @SuppressWarnings("rawtypes")
243 static final AtomicLongFieldUpdater<FileReadSubscription> REQUESTED = AtomicLongFieldUpdater.newUpdater(FileReadSubscription.class, "requested");
244
245
246 FileReadSubscription(Subscriber<? super ByteBuf> subscriber, AsynchronousFileChannel fileChannel, ByteBufAllocator alloc, int chunkSize, long offset, long length) {
247 this.subscriber = subscriber;
248
249 this.fileChannel = fileChannel;
250 this.alloc = alloc;
251 this.chunkSize = chunkSize;
252 this.offset = offset;
253 this.length = length;
254
255 this.position = NOT_SET;
256 }
257
258
259
260 @Override
261 public void request(long n) {
262 if (Operators.validate(n)) {
263 Operators.addCap(REQUESTED, this, n);
264 drain();
265 }
266 }
267
268 @Override
269 public void cancel() {
270 this.cancelled = true;
271 }
272
273
274
275
276
277 @Override
278 public void completed(Integer bytesRead, ByteBuf buffer) {
279 if (!cancelled) {
280 if (bytesRead == -1) {
281 done = true;
282 } else {
283
284 long pos = position;
285
286 int bytesWanted = (int) Math.min(bytesRead, maxRequired(pos));
287 buffer.writerIndex(bytesWanted);
288 long position2 = pos + bytesWanted;
289
290 position = position2;
291 next = buffer;
292 if (position2 >= offset + length) {
293 done = true;
294 }
295 }
296 drain();
297 }
298 }
299
300 @Override
301 public void failed(Throwable exc, ByteBuf attachment) {
302 if (!cancelled) {
303
304
305 error = exc;
306 done = true;
307 drain();
308 }
309 }
310
311
312
313 private void drain() {
314 if (WIP.getAndIncrement(this) != 0) {
315 return;
316 }
317
318 if (position == NOT_SET) {
319 position = offset;
320 doRead();
321 }
322 int missed = 1;
323 for (;;) {
324 if (cancelled) {
325 return;
326 }
327 if (REQUESTED.get(this) > 0) {
328 boolean emitted = false;
329
330 boolean d = done;
331 ByteBuf bb = next;
332 if (bb != null) {
333 next = null;
334
335
336 subscriber.onNext(bb);
337
338
339
340
341
342
343
344 emitted = true;
345 } else {
346 emitted = false;
347 }
348 if (d) {
349 if (error != null) {
350 subscriber.onError(error);
351
352 return;
353 } else {
354 subscriber.onComplete();
355
356 return;
357 }
358 }
359 if (emitted) {
360
361
362 Operators.produced(REQUESTED, this, 1);
363
364 doRead();
365 }
366 }
367 missed = WIP.addAndGet(this, -missed);
368 if (missed == 0) {
369 return;
370 }
371 }
372 }
373
374 private void doRead() {
375
376 long pos = position;
377 int readSize = Math.min(chunkSize, maxRequired(pos));
378 ByteBuf innerBuf = alloc.buffer(readSize, readSize);
379 fileChannel.read(innerBuf.nioBuffer(0, readSize), pos, innerBuf, this);
380 }
381
382 private int maxRequired(long pos) {
383 long maxRequired = offset + length - pos;
384 if (maxRequired <= 0) {
385 return 0;
386 } else {
387 int m = (int) (maxRequired);
388
389 if (m < 0) {
390 return Integer.MAX_VALUE;
391 } else {
392 return m;
393 }
394 }
395 }
396 }
397 }
398
399
400
401
402 private FluxUtil() {
403 }
404 }