ContinuablePagedByItemIterable.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.core.util.paging;
import com.azure.core.util.logging.ClientLogger;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* Internal class that is a blocking iterable for {@link ContinuablePagedIterable}.
* <p>
* This class retrieves pages from the service in a blocking manner while also respecting the number of items to be
* retrieved. This functions differently than just wrapping a {@link ContinuablePagedFlux} as this will track the exact
* number of items emitted and whether the previously retrieve page/pages contain any additional items that could be
* emitted.
*
* @param <C> The continuation token type.
* @param <T> The item type.
* @param <P> The page type.
*/
final class ContinuablePagedByItemIterable<C, T, P extends ContinuablePage<C, T>> implements Iterable<T> {
private final PageRetriever<C, P> pageRetriever;
private final C continuationToken;
private final Integer preferredPageSize;
ContinuablePagedByItemIterable(PageRetriever<C, P> pageRetriever, C continuationToken, Integer preferredPageSize) {
this.pageRetriever = pageRetriever;
this.continuationToken = continuationToken;
this.preferredPageSize = preferredPageSize;
}
@Override
public Iterator<T> iterator() {
return new ContinuablePagedByItemIterator<>(pageRetriever, continuationToken, preferredPageSize);
}
private static final class ContinuablePagedByItemIterator<C, T, P extends ContinuablePage<C, T>>
extends ContinuablePagedByIteratorBase<C, T, P, T> {
private volatile Queue<Iterator<T>> pages = new ConcurrentLinkedQueue<>();
private volatile Iterator<T> currentPage;
ContinuablePagedByItemIterator(PageRetriever<C, P> pageRetriever, C continuationToken,
Integer preferredPageSize) {
super(pageRetriever, new ContinuationState<>(continuationToken), preferredPageSize,
new ClientLogger(ContinuablePagedByItemIterator.class));
requestPage();
}
@Override
boolean needToRequestPage() {
return (currentPage == null || !currentPage.hasNext()) && pages.peek() == null;
}
@Override
public boolean isNextAvailable() {
return (currentPage != null && currentPage.hasNext()) || pages.peek() != null;
}
@Override
T getNext() {
if ((currentPage == null || !currentPage.hasNext()) && pages.peek() != null) {
currentPage = pages.poll();
}
return currentPage.next();
}
@Override
void addPage(P page) {
Iterator<T> pageValues = page.getElements().iterator();
if (pageValues.hasNext()) {
this.pages.add(pageValues);
}
}
}
}