ContinuablePagedFluxCore.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.util.paging;

import com.azure.core.util.IterableStream;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.util.Objects;
import java.util.function.Supplier;

/**
 * The default implementation of {@link ContinuablePagedFlux}.
 *
 * This type is a Flux that provides the ability to operate on pages of type {@link ContinuablePage} and individual
 * items in such pages. This type supports user-provided continuation tokens, allowing for restarting from a
 * previously-retrieved continuation token.
 *
 * The type is backed by the Page Retriever provider provided in it's constructor. The provider is expected to return
 * {@link PageRetriever} when called. The provider is invoked for each Subscription to this Flux. Given provider is
 * called per Subscription, the provider implementation can create one or more objects to store any state and Page
 * Retriever can capture and use those objects. This indirectly associate the state objects to the Subscription. The
 * Page Retriever can get called multiple times in serial fashion, each time after the completion of the Flux returned
 * by the previous invocation. The final completion signal will be send to the Subscriber when the last Page emitted by
 * the Flux returned by the Page Retriever has {@code null} continuation token.
 *
 * <p><strong>Extending PagedFluxCore for Custom Continuation Token support</strong></p>
 * {@codesnippet com.azure.core.util.paging.pagedfluxcore.continuationtoken}
 *
 * @param <C> the type of the continuation token
 * @param <T> The type of elements in a {@link ContinuablePage}
 * @param <P> The {@link ContinuablePage} holding items of type {@code T}.
 * @see ContinuablePagedFlux
 * @see ContinuablePage
 */
public abstract class ContinuablePagedFluxCore<C, T, P extends ContinuablePage<C, T>>
    extends ContinuablePagedFlux<C, T, P> {
    final Supplier<PageRetriever<C, P>> pageRetrieverProvider;
    final Integer defaultPageSize;

    /**
     * Creates an instance of {@link ContinuablePagedFluxCore}.
     *
     * @param pageRetrieverProvider a provider that returns {@link PageRetriever}.
     */
    protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider) {
        this.pageRetrieverProvider = Objects.requireNonNull(pageRetrieverProvider,
            "'pageRetrieverProvider' function cannot be null.");
        this.defaultPageSize = null;
    }

    /**
     * Creates an instance of {@link ContinuablePagedFluxCore}.
     *
     * @param pageRetrieverProvider a provider that returns {@link PageRetriever}.
     * @param pageSize the preferred page size
     * @throws IllegalArgumentException if defaultPageSize is not greater than zero
     */
    protected ContinuablePagedFluxCore(Supplier<PageRetriever<C, P>> pageRetrieverProvider, int pageSize) {
        this.pageRetrieverProvider = Objects.requireNonNull(pageRetrieverProvider,
            "'pageRetrieverProvider' function cannot be null.");
        if (pageSize <= 0) {
            throw new IllegalArgumentException("pageSize > 0 required but provided: " + pageSize);
        }
        this.defaultPageSize = pageSize;
    }

    /**
     * Get the page size configured this {@link ContinuablePagedFluxCore}.
     *
     * @return the page size configured, {@code null} if unspecified.
     */
    public Integer getPageSize() {
        return this.defaultPageSize;
    }

    @Override
    public Flux<P> byPage() {
        return byPage(this.pageRetrieverProvider, null, this.defaultPageSize);
    }

    @Override
    public Flux<P> byPage(C continuationToken) {
        if (continuationToken == null) {
            return Flux.empty();
        }
        return byPage(this.pageRetrieverProvider, continuationToken, this.defaultPageSize);
    }

    @Override
    public Flux<P> byPage(int preferredPageSize) {
        if (preferredPageSize <= 0) {
            return Flux.error(new IllegalArgumentException("preferredPageSize > 0 required but provided: "
                + preferredPageSize));
        }
        return byPage(this.pageRetrieverProvider, null, preferredPageSize);
    }

    @Override
    public Flux<P> byPage(C continuationToken, int preferredPageSize) {
        if (preferredPageSize <= 0) {
            return Flux.error(new IllegalArgumentException("preferredPageSize > 0 required but provided: "
                + preferredPageSize));
        }
        if (continuationToken == null) {
            return Flux.empty();
        }
        return byPage(this.pageRetrieverProvider, continuationToken, preferredPageSize);
    }

    /**
     * Subscribe to consume all items of type {@code T} in the sequence respectively. This is recommended for most
     * common scenarios. This will seamlessly fetch next page when required and provide with a {@link Flux} of items.
     *
     * @param coreSubscriber The subscriber for this {@link ContinuablePagedFluxCore}
     */
    @Override
    public void subscribe(CoreSubscriber<? super T> coreSubscriber) {
        byPage(this.pageRetrieverProvider, null, this.defaultPageSize)
            .flatMap(page -> {
                IterableStream<T> iterableStream = page.getElements();
                return iterableStream == null
                    ? Flux.empty()
                    : Flux.fromIterable(page.getElements());
            })
            .subscribe(coreSubscriber);
    }

    /**
     * Get a Flux of {@link ContinuablePage} created by concat-ing Flux instances returned Page Retriever Function
     * calls.
     *
     * @param provider the provider that when called returns Page Retriever Function
     * @param continuationToken the token to identify the pages to be retrieved
     * @param pageSize the preferred page size
     * @return a Flux of {@link ContinuablePage} identified by the given continuation token
     */
    private Flux<P> byPage(Supplier<PageRetriever<C, P>> provider, C continuationToken, Integer pageSize) {
        return Flux.defer(() -> {
            final PageRetriever<C, P> pageRetriever = provider.get();
            final ContinuationState<C> state = new ContinuationState<>(continuationToken);
            return retrievePages(state, pageRetriever, pageSize);
        });
    }

    /**
     * Get a Flux of {@link ContinuablePage} created by concat-ing child Flux instances returned Page Retriever Function
     * calls. The first child Flux of {@link ContinuablePage} is identified by the continuation-token in the state.
     *
     * @param state the state to be used across multiple Page Retriever Function calls
     * @param pageRetriever the Page Retriever Function
     * @param pageSize the preferred page size
     * @return a Flux of {@link ContinuablePage}
     */
    private Flux<P> retrievePages(ContinuationState<C> state, PageRetriever<C, P> pageRetriever, Integer pageSize) {
        /*
         * The second argument for 'expand' is an initial capacity hint to the expand subscriber to indicate what size
         * buffer it should instantiate. 4 is used as PageRetriever's 'get' returns a Flux so an implementation may
         * return multiple pages, but in the case only one page is retrieved the buffer won't need to be resized or
         * request additional pages from the service.
         */
        return retrievePage(state, pageRetriever, pageSize)
            .expand(page -> {
                state.setLastContinuationToken(page.getContinuationToken());
                return Flux.defer(() -> retrievePage(state, pageRetriever, pageSize));
            }, 4);
    }

    private Flux<P> retrievePage(ContinuationState<C> state, PageRetriever<C, P> pageRetriever, Integer pageSize) {
        if (state.isDone()) {
            return Flux.empty();
        } else {
            return pageRetriever.get(state.getLastContinuationToken(), pageSize)
                .switchIfEmpty(Flux.defer(() -> {
                    state.setLastContinuationToken(null);
                    return Mono.empty();
                }));
        }
    }
}