ContinuablePagedByIteratorBase.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.util.paging;
import com.azure.core.util.logging.ClientLogger;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Internal class that is a blocking iterator base class.
* <p>
* This class manages retrieving and maintaining previously retrieve page/pages in a synchronous fashion. It will ensure
* the minimum number of pages are retrieved from a service by checking if any additional items/pages could be emitted
* before requesting additional ones from the service.
*
* @param <C> The continuation token type.
* @param <T> The item type.
* @param <P> The page type.
* @param <E> The type that the {@link ContinuablePagedIterable} will emit.
*/
abstract class ContinuablePagedByIteratorBase<C, T, P extends ContinuablePage<C, T>, E> implements Iterator<E> {
private final PageRetriever<C, P> pageRetriever;
private final ContinuationState<C> continuationState;
private final Integer defaultPageSize;
private final ClientLogger logger;
private volatile boolean done;
ContinuablePagedByIteratorBase(PageRetriever<C, P> pageRetriever, ContinuationState<C> continuationState,
Integer defaultPageSize, ClientLogger logger) {
this.continuationState = continuationState;
this.pageRetriever = pageRetriever;
this.defaultPageSize = defaultPageSize;
this.logger = logger;
}
@Override
public E next() {
if (!hasNext()) {
throw logger.logExceptionAsError(new NoSuchElementException("Iterator contains no more elements."));
}
return getNext();
}
@Override
public boolean hasNext() {
// Request next pages in a loop in case we are returned empty pages for the by item implementation.
while (!done && needToRequestPage()) {
requestPage();
}
return isNextAvailable();
}
/*
* Indicates if a page needs to be requested.
*/
abstract boolean needToRequestPage();
/*
* Indicates if another element is available.
*/
abstract boolean isNextAvailable();
/*
* Gets the next element to be emitted.
*/
abstract E getNext();
synchronized void requestPage() {
/*
* In the scenario where multiple threads were waiting on synchronization, check that no earlier thread made a
* request that would satisfy the current element request. Additionally, check to make sure that any earlier
* requests didn't consume the paged responses to completion.
*/
if (isNextAvailable() || done) {
return;
}
AtomicBoolean receivedPages = new AtomicBoolean(false);
pageRetriever.get(continuationState.getLastContinuationToken(), defaultPageSize)
.map(page -> {
receivedPages.set(true);
addPage(page);
continuationState.setLastContinuationToken(page.getContinuationToken());
this.done = continuationState.isDone();
return page;
}).blockLast();
/*
* In the scenario when the subscription completes without emitting an element indicate we are done by checking
* if we have any additional elements to return.
*/
this.done = done || (!receivedPages.get() && !isNextAvailable());
}
/*
* Add a page returned by the service and update the continuation state.
*/
abstract void addPage(P page);
}