ContinuablePagedIterable.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.util.paging;
import com.azure.core.util.IterableStream;
import java.util.Iterator;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
/**
* This class provides utility to iterate over {@link ContinuablePage} using {@link Stream} {@link Iterable}
* interfaces.
*
* @param <C> the type of the continuation token
* @param <T> The type of elements in a {@link ContinuablePage}
* @param <P> The {@link ContinuablePage} holding items of type {@code T}.
* @see IterableStream
* @see ContinuablePagedFlux
*/
public abstract class ContinuablePagedIterable<C, T, P extends ContinuablePage<C, T>> extends IterableStream<T> {
private final ContinuablePagedFlux<C, T, P> pagedFlux;
private final int batchSize;
/**
* Creates instance with the given {@link ContinuablePagedFlux}.
*
* @param pagedFlux the paged flux use as iterable
*/
public ContinuablePagedIterable(ContinuablePagedFlux<C, T, P> pagedFlux) {
this(pagedFlux, 1);
}
/**
* Creates instance with the given {@link ContinuablePagedFlux}.
*
* @param pagedFlux the paged flux use as iterable
* @param batchSize the bounded capacity to prefetch from the {@link ContinuablePagedFlux}
*/
public ContinuablePagedIterable(ContinuablePagedFlux<C, T, P> pagedFlux, int batchSize) {
super(pagedFlux);
this.pagedFlux = pagedFlux;
this.batchSize = batchSize;
}
@Override
public Stream<T> stream() {
return StreamSupport.stream(iterableByItemInternal().spliterator(), false);
}
/**
* Retrieve the {@link Stream}, one page at a time. It will provide same {@link Stream} of T values from starting if
* called multiple times.
*
* @return {@link Stream} of a pages
*/
public Stream<P> streamByPage() {
return streamByPageInternal(null, null, () -> this.pagedFlux.byPage().toStream(batchSize));
}
/**
* Retrieve the {@link Stream}, one page at a time, starting from the next page associated with the given
* continuation token. To start from first page, use {@link #streamByPage()} instead.
*
* @param continuationToken The continuation token used to fetch the next page
* @return {@link Stream} of a pages
*/
public Stream<P> streamByPage(C continuationToken) {
return streamByPageInternal(continuationToken, null,
() -> this.pagedFlux.byPage(continuationToken).toStream(batchSize));
}
/**
* Retrieve the {@link Stream}, one page at a time, with each page containing {@code preferredPageSize} items.
*
* It will provide same {@link Stream} of T values from starting if called multiple times.
*
* @param preferredPageSize the preferred page size, service may or may not honor the page size preference hence
* client MUST be prepared to handle pages with different page size.
* @return {@link Stream} of a pages
*/
public Stream<P> streamByPage(int preferredPageSize) {
return streamByPageInternal(null, preferredPageSize,
() -> this.pagedFlux.byPage(preferredPageSize).toStream(batchSize));
}
/**
* Retrieve the {@link Stream}, one page at a time, with each page containing {@code preferredPageSize} items,
* starting from the next page associated with the given continuation token. To start from first page, use {@link
* #streamByPage()} or {@link #streamByPage(int)} instead.
*
* @param preferredPageSize the preferred page size, service may or may not honor the page size preference hence
* client MUST be prepared to handle pages with different page size.
* @param continuationToken The continuation token used to fetch the next page
* @return {@link Stream} of a pages
*/
public Stream<P> streamByPage(C continuationToken, int preferredPageSize) {
return streamByPageInternal(continuationToken, preferredPageSize,
() -> this.pagedFlux.byPage(continuationToken, preferredPageSize).toStream(batchSize));
}
@Override
public Iterator<T> iterator() {
return iterableByItemInternal().iterator();
}
/**
* Retrieve the {@link Iterable}, one page at a time. It will provide same {@link Iterable} of T values from
* starting if called multiple times.
*
* @return {@link Stream} of a pages
*/
public Iterable<P> iterableByPage() {
return iterableByPageInternal(null, null, () -> this.pagedFlux.byPage().toIterable(batchSize));
}
/**
* Retrieve the {@link Iterable}, one page at a time, starting from the next page associated with the given
* continuation token. To start from first page, use {@link #iterableByPage()} instead.
*
* @param continuationToken The continuation token used to fetch the next page
* @return {@link Iterable} of a pages
*/
public Iterable<P> iterableByPage(C continuationToken) {
return iterableByPageInternal(continuationToken, null,
() -> this.pagedFlux.byPage(continuationToken).toIterable(batchSize));
}
/**
* Retrieve the {@link Iterable}, one page at a time, with each page containing {@code preferredPageSize} items.
*
* It will provide same {@link Iterable} of T values from starting if called multiple times.
*
* @param preferredPageSize the preferred page size, service may or may not honor the page size preference hence
* client MUST be prepared to handle pages with different page size.
* @return {@link Iterable} of a pages
*/
public Iterable<P> iterableByPage(int preferredPageSize) {
return iterableByPageInternal(null, preferredPageSize,
() -> this.pagedFlux.byPage(preferredPageSize).toIterable(batchSize));
}
/**
* Retrieve the {@link Iterable}, one page at a time, with each page containing {@code preferredPageSize} items,
* starting from the next page associated with the given continuation token. To start from first page, use {@link
* #iterableByPage()} or {@link #iterableByPage(int)} instead.
*
* @param preferredPageSize the preferred page size, service may or may not honor the page size preference hence
* client MUST be prepared to handle pages with different page size.
* @param continuationToken The continuation token used to fetch the next page
* @return {@link Iterable} of a pages
*/
public Iterable<P> iterableByPage(C continuationToken, int preferredPageSize) {
return iterableByPageInternal(continuationToken, preferredPageSize,
() -> this.pagedFlux.byPage(continuationToken, preferredPageSize).toIterable(batchSize));
}
private Stream<P> streamByPageInternal(C continuationToken, Integer preferredPageSize,
Supplier<Stream<P>> nonPagedFluxCoreIterableSupplier) {
if (pagedFlux instanceof ContinuablePagedFluxCore) {
return StreamSupport.stream(iterableByPageInternal(continuationToken, preferredPageSize, null)
.spliterator(), false);
} else {
return nonPagedFluxCoreIterableSupplier.get();
}
}
private Iterable<P> iterableByPageInternal(C continuationToken, Integer preferredPageSize,
Supplier<Iterable<P>> nonPagedFluxCoreIterableSupplier) {
if (pagedFlux instanceof ContinuablePagedFluxCore) {
ContinuablePagedFluxCore<C, T, P> pagedFluxCore = (ContinuablePagedFluxCore<C, T, P>) pagedFlux;
return new ContinuablePagedByPageIterable<>(pagedFluxCore.pageRetrieverProvider.get(), continuationToken,
preferredPageSize);
} else {
return nonPagedFluxCoreIterableSupplier.get();
}
}
private Iterable<T> iterableByItemInternal() {
if (pagedFlux instanceof ContinuablePagedFluxCore) {
ContinuablePagedFluxCore<C, T, P> pagedFluxCore = (ContinuablePagedFluxCore<C, T, P>) pagedFlux;
return new ContinuablePagedByItemIterable<>(pagedFluxCore.pageRetrieverProvider.get(), null, null);
} else {
return this.pagedFlux.toIterable(this.batchSize);
}
}
}