Package com.azure.core.util.paging
Class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>>
java.lang.Object
reactor.core.publisher.Flux<T>
com.azure.core.util.paging.ContinuablePagedFlux<C,T,P>
com.azure.core.util.paging.ContinuablePagedFluxCore<C,T,P>
- Type Parameters:
C
- the type of the continuation tokenT
- The type of elements in aContinuablePage
P
- TheContinuablePage
holding items of typeT
.
- All Implemented Interfaces:
org.reactivestreams.Publisher<T>
,CorePublisher<T>
- Direct Known Subclasses:
AnalyzeActionsResultPagedFlux
,AnalyzeHealthcareEntitiesPagedFlux
,PagedFluxBase
public abstract class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>>
extends ContinuablePagedFlux<C,T,P>
The default implementation of
ContinuablePagedFlux
.
This type is a Flux that provides the ability to operate on pages of type ContinuablePage
and individual
items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a
previously-retrieved continuation token.
The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to return
PageRetriever
when called. The provider is invoked for each Subscription to this Flux. Given provider is
called per Subscription, the provider implementation can create one or more objects to store any state and Page
Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The
Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned
by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by
the Flux returned by the Page Retriever has null
continuation token.
Extending PagedFluxCore for Custom Continuation Token support
class ContinuationState<C> { private C lastContinuationToken; private boolean isDone; ContinuationState(C token) { this.lastContinuationToken = token; } void setLastContinuationToken(C token) { this.isDone = token == null; this.lastContinuationToken = token; } C getLastContinuationToken() { return this.lastContinuationToken; } boolean isDone() { return this.isDone; } } class FileContinuationToken { private final int nextLinkId; FileContinuationToken(int nextLinkId) { this.nextLinkId = nextLinkId; } public int getNextLinkId() { return nextLinkId; } } class File { private final String guid; File(String guid) { this.guid = guid; } public String getGuid() { return guid; } } class FilePage implements ContinuablePage<FileContinuationToken, File> { private final IterableStream<File> elements; private final FileContinuationToken fileContinuationToken; FilePage(List<File> elements, FileContinuationToken fileContinuationToken) { this.elements = IterableStream.of(elements); this.fileContinuationToken = fileContinuationToken; } @Override public IterableStream<File> getElements() { return elements; } @Override public FileContinuationToken getContinuationToken() { return fileContinuationToken; } } class FileShareServiceClient { Flux<FilePage> getFilePages(FileContinuationToken token) { List<File> files = Collections.singletonList(new File(UUID.randomUUID().toString())); if (token.getNextLinkId() < 10) { return Flux.just(new FilePage(files, null)); } else { return Flux.just(new FilePage(files, new FileContinuationToken((int) Math.floor(Math.random() * 20)))); } } } FileShareServiceClient client = new FileShareServiceClient(); Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider = () -> (continuationToken, pageSize) -> client.getFilePages(continuationToken); class FilePagedFlux extends ContinuablePagedFluxCore<FileContinuationToken, File, FilePage> { FilePagedFlux(Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider) { super(pageRetrieverProvider); } } FilePagedFlux filePagedFlux = new FilePagedFlux(pageRetrieverProvider);
- See Also:
-
Constructor Summary
ModifierConstructorDescriptionprotected
ContinuablePagedFluxCore
(Supplier<PageRetriever<C, P>> pageRetrieverProvider) Creates an instance ofContinuablePagedFluxCore
.protected
ContinuablePagedFluxCore
(Supplier<PageRetriever<C, P>> pageRetrieverProvider, int pageSize) Creates an instance ofContinuablePagedFluxCore
.protected
ContinuablePagedFluxCore
(Supplier<PageRetriever<C, P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate) Creates an instance ofContinuablePagedFluxCore
. -
Method Summary
Modifier and TypeMethodDescriptionbyPage()
Gets aFlux
ofContinuablePage
starting at the first page.byPage
(int preferredPageSize) Gets aFlux
ofContinuablePage
starting at the first page requesting each page to contain a number of elements equal to the preferred page size.Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token.Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.Get the page size configured thisContinuablePagedFluxCore
.void
subscribe
(CoreSubscriber<? super T> coreSubscriber) Subscribe to consume all items of typeT
in the sequence respectively.Methods inherited from class com.azure.core.util.paging.ContinuablePagedFlux
getContinuationPredicate
Methods inherited from class reactor.core.publisher.Flux
all, any, as, blockFirst, blockFirst, blockLast, blockLast, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, buffer, bufferTimeout, bufferTimeout, bufferTimeout, bufferTimeout, bufferUntil, bufferUntil, bufferUntilChanged, bufferUntilChanged, bufferUntilChanged, bufferWhen, bufferWhen, bufferWhile, cache, cache, cache, cache, cache, cache, cancelOn, cast, checkpoint, checkpoint, checkpoint, collect, collect, collectList, collectMap, collectMap, collectMap, collectMultimap, collectMultimap, collectMultimap, collectSortedList, collectSortedList, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, combineLatest, concat, concat, concat, concat, concatDelayError, concatDelayError, concatDelayError, concatDelayError, concatMap, concatMap, concatMapDelayError, concatMapDelayError, concatMapDelayError, concatMapIterable, concatMapIterable, concatWith, concatWithValues, contextWrite, contextWrite, count, create, create, defaultIfEmpty, defer, deferContextual, deferWithContext, delayElements, delayElements, delaySequence, delaySequence, delaySubscription, delaySubscription, delaySubscription, delayUntil, dematerialize, distinct, distinct, distinct, distinct, distinctUntilChanged, distinctUntilChanged, distinctUntilChanged, doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnError, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, elapsed, elapsed, elementAt, elementAt, empty, error, error, error, expand, expand, expandDeep, expandDeep, filter, filterWhen, filterWhen, first, first, firstWithSignal, firstWithSignal, firstWithValue, firstWithValue, flatMap, flatMap, flatMap, flatMap, flatMapDelayError, flatMapIterable, flatMapIterable, flatMapSequential, flatMapSequential, flatMapSequential, flatMapSequentialDelayError, from, fromArray, fromIterable, fromStream, fromStream, generate, generate, generate, getPrefetch, groupBy, groupBy, groupBy, groupBy, groupJoin, handle, hasElement, hasElements, hide, ignoreElements, index, index, interval, interval, interval, interval, join, just, just, last, last, limitRate, limitRate, limitRequest, log, log, log, log, log, log, map, mapNotNull, materialize, merge, merge, merge, merge, merge, merge, mergeComparing, mergeComparing, mergeComparing, mergeComparingDelayError, mergeComparingWith, mergeDelayError, mergeOrdered, mergeOrdered, mergeOrdered, mergeOrderedWith, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequential, mergeSequentialDelayError, mergeSequentialDelayError, mergeSequentialDelayError, mergeWith, metrics, name, never, next, ofType, onAssembly, onAssembly, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureBuffer, onBackpressureDrop, onBackpressureDrop, onBackpressureError, onBackpressureLatest, onErrorContinue, onErrorContinue, onErrorContinue, onErrorMap, onErrorMap, onErrorMap, onErrorResume, onErrorResume, onErrorResume, onErrorReturn, onErrorReturn, onErrorReturn, onErrorStop, onTerminateDetach, or, parallel, parallel, parallel, publish, publish, publish, publish, publishNext, publishOn, publishOn, publishOn, push, push, range, reduce, reduce, reduceWith, repeat, repeat, repeat, repeat, repeatWhen, replay, replay, replay, replay, replay, replay, retry, retry, retryWhen, sample, sample, sampleFirst, sampleFirst, sampleTimeout, sampleTimeout, scan, scan, scanWith, share, shareNext, single, single, singleOrEmpty, skip, skip, skip, skipLast, skipUntil, skipUntilOther, skipWhile, sort, sort, startWith, startWith, startWith, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribe, subscribeOn, subscribeOn, subscriberContext, subscriberContext, subscribeWith, switchIfEmpty, switchMap, switchMap, switchOnFirst, switchOnFirst, switchOnNext, switchOnNext, tag, take, take, take, take, takeLast, takeUntil, takeUntilOther, takeWhile, then, then, thenEmpty, thenMany, timed, timed, timeout, timeout, timeout, timeout, timeout, timeout, timeout, timestamp, timestamp, toIterable, toIterable, toIterable, toStream, toStream, toString, transform, transformDeferred, transformDeferredContextual, using, using, usingWhen, usingWhen, window, window, window, window, window, window, window, windowTimeout, windowTimeout, windowTimeout, windowTimeout, windowUntil, windowUntil, windowUntil, windowUntilChanged, windowUntilChanged, windowUntilChanged, windowWhen, windowWhile, windowWhile, withLatestFrom, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zip, zipWith, zipWith, zipWith, zipWith, zipWithIterable, zipWithIterable
-
Constructor Details
-
ContinuablePagedFluxCore
Creates an instance ofContinuablePagedFluxCore
.- Parameters:
pageRetrieverProvider
- a provider that returnsPageRetriever
.- Throws:
NullPointerException
- IfpageRetrieverProvider
is null.
-
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, int pageSize) Creates an instance ofContinuablePagedFluxCore
.- Parameters:
pageRetrieverProvider
- a provider that returnsPageRetriever
.pageSize
- the preferred page size- Throws:
NullPointerException
- IfpageRetrieverProvider
is null.IllegalArgumentException
- IfpageSize
is less than or equal to zero.
-
ContinuablePagedFluxCore
protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, Integer pageSize, Predicate<C> continuationPredicate) Creates an instance ofContinuablePagedFluxCore
.- Parameters:
pageRetrieverProvider
- A provider that returnsPageRetriever
.pageSize
- The preferred page size.continuationPredicate
- A predicate which determines if paging should continue.- Throws:
NullPointerException
- IfpageRetrieverProvider
is null.IllegalArgumentException
- IfpageSize
is not null and is less than or equal to zero.
-
-
Method Details
-
getPageSize
Get the page size configured thisContinuablePagedFluxCore
.- Returns:
- the page size configured,
null
if unspecified.
-
byPage
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
starting at the first page.- Specified by:
byPage
in classContinuablePagedFlux<C,
T, P extends ContinuablePage<C, T>> - Returns:
- A
Flux
ofContinuablePage
.
-
byPage
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token.- Specified by:
byPage
in classContinuablePagedFlux<C,
T, P extends ContinuablePage<C, T>> - Parameters:
continuationToken
- A continuation token identifying the page to select.- Returns:
- A
Flux
ofContinuablePage
.
-
byPage
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
starting at the first page requesting each page to contain a number of elements equal to the preferred page size.The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
- Specified by:
byPage
in classContinuablePagedFlux<C,
T, P extends ContinuablePage<C, T>> - Parameters:
preferredPageSize
- The preferred page size.- Returns:
- A
Flux
ofContinuablePage
.
-
byPage
Description copied from class:ContinuablePagedFlux
Gets aFlux
ofContinuablePage
beginning at the page identified by the given continuation token requesting each page to contain the number of elements equal to the preferred page size.The service may or may not honor the preferred page size therefore the client MUST be prepared to handle pages with different page sizes.
- Specified by:
byPage
in classContinuablePagedFlux<C,
T, P extends ContinuablePage<C, T>> - Parameters:
continuationToken
- A continuation token identifying the page to select.preferredPageSize
- The preferred page size.- Returns:
- A
Flux
ofContinuablePage
.
-
subscribe
Subscribe to consume all items of typeT
in the sequence respectively. This is recommended for most common scenarios. This will seamlessly fetch next page when required and provide with aFlux
of items.- Specified by:
subscribe
in interfaceCorePublisher<C>
- Specified by:
subscribe
in classFlux<T>
- Parameters:
coreSubscriber
- The subscriber for thisContinuablePagedFluxCore
-