Class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>>

java.lang.Object
reactor.core.publisher.Flux<T>
com.azure.core.util.paging.ContinuablePagedFlux<C,T,P>
com.azure.core.util.paging.ContinuablePagedFluxCore<C,T,P>
Type Parameters:
C - the type of the continuation token
T - The type of elements in a ContinuablePage
P - The ContinuablePage holding items of type T.
All Implemented Interfaces:
org.reactivestreams.Publisher<T>, CorePublisher<T>
Direct Known Subclasses:
AnalyzeActionsResultPagedFlux, AnalyzeHealthcareEntitiesPagedFlux, PagedFluxBase

public abstract class ContinuablePagedFluxCore<C,T,P extends ContinuablePage<C,T>> extends ContinuablePagedFlux<C,T,P>
The default implementation of ContinuablePagedFlux. This type is a Flux that provides the ability to operate on pages of type ContinuablePage and individual items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a previously-retrieved continuation token. The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to return PageRetriever when called. The provider is invoked for each Subscription to this Flux. Given provider is called per Subscription, the provider implementation can create one or more objects to store any state and Page Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by the Flux returned by the Page Retriever has null continuation token.

Extending PagedFluxCore for Custom Continuation Token support

 class ContinuationState<C> {
     private C lastContinuationToken;
     private boolean isDone;

     ContinuationState(C token) {
         this.lastContinuationToken = token;
     }

     void setLastContinuationToken(C token) {
         this.isDone = token == null;
         this.lastContinuationToken = token;
     }

     C getLastContinuationToken() {
         return this.lastContinuationToken;
     }

     boolean isDone() {
         return this.isDone;
     }
 }

 class FileContinuationToken {
     private final int nextLinkId;

     FileContinuationToken(int nextLinkId) {
         this.nextLinkId = nextLinkId;
     }

     public int getNextLinkId() {
         return nextLinkId;
     }
 }

 class File {
     private final String guid;

     File(String guid) {
         this.guid = guid;
     }

     public String getGuid() {
         return guid;
     }
 }

 class FilePage implements ContinuablePage<FileContinuationToken, File> {
     private final IterableStream<File> elements;
     private final FileContinuationToken fileContinuationToken;

     FilePage(List<File> elements, FileContinuationToken fileContinuationToken) {
         this.elements = IterableStream.of(elements);
         this.fileContinuationToken = fileContinuationToken;
     }

     @Override
     public IterableStream<File> getElements() {
         return elements;
     }

     @Override
     public FileContinuationToken getContinuationToken() {
         return fileContinuationToken;
     }
 }

 class FileShareServiceClient {
     Flux<FilePage> getFilePages(FileContinuationToken token) {
         List<File> files = Collections.singletonList(new File(UUID.randomUUID().toString()));
         if (token.getNextLinkId() < 10) {
             return Flux.just(new FilePage(files, null));
         } else {
             return Flux.just(new FilePage(files,
                 new FileContinuationToken((int) Math.floor(Math.random() * 20))));
         }
     }
 }

 FileShareServiceClient client = new FileShareServiceClient();

 Supplier<PageRetriever<FileContinuationToken, FilePage>> pageRetrieverProvider = () ->
     (continuationToken, pageSize) -> client.getFilePages(continuationToken);

 class FilePagedFlux extends ContinuablePagedFluxCore<FileContinuationToken, File, FilePage> {
     FilePagedFlux(Supplier<PageRetriever<FileContinuationToken, FilePage>>
         pageRetrieverProvider) {
         super(pageRetrieverProvider);
     }
 }

 FilePagedFlux filePagedFlux = new FilePagedFlux(pageRetrieverProvider);

 
See Also: