PagedConverter.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.resourcemanager.resources.fluentcore.utils;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.PagedResponseBase;
import com.azure.core.http.rest.Response;
import com.azure.core.util.paging.PageRetriever;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
/**
* Utility class for conversion of PagedResponse.
*/
public final class PagedConverter {
private PagedConverter() {
}
/**
* Applies flatMap transform to elements of PagedFlux.
*
* @param pagedFlux the input of PagedFlux.
* @param mapper the flatMap transform of element T to Publisher of S.
* @param <T> input type of PagedFlux.
* @param <S> return type of PagedFlux.
* @return the PagedFlux with elements in PagedResponse transformed.
*/
public static <T, S> PagedFlux<S> flatMapPage(PagedFlux<T> pagedFlux,
Function<? super T, ? extends Publisher<? extends S>> mapper) {
Supplier<PageRetriever<String, PagedResponse<S>>> provider = () -> (continuationToken, pageSize) -> {
Flux<PagedResponse<T>> flux = (continuationToken == null)
? pagedFlux.byPage()
: pagedFlux.byPage(continuationToken);
return flux.concatMap(PagedConverter.flatMapPagedResponse(mapper));
};
return PagedFlux.create(provider);
}
/**
* Merge collection of all PagedFlux transformed from elements of PagedFlux to a single PagedFlux.
*
* @param pagedFlux the input of PagedFlux.
* @param transformer the transform of element T to PagedFlux of S.
* @param <T> input type of PagedFlux.
* @param <S> return type of PagedFlux.
* @return the merged PagedFlux.
*/
public static <T, S> PagedFlux<S> mergePagedFlux(PagedFlux<T> pagedFlux,
Function<? super T, PagedFlux<S>> transformer) {
// one possible issue is that when inner PagedFlux ends, that PagedResponse will have continuationToken == null
Supplier<PageRetriever<String, PagedResponse<S>>> provider = () -> (continuationToken, pageSize) -> {
Flux<PagedResponse<T>> flux = (continuationToken == null)
? pagedFlux.byPage()
: pagedFlux.byPage(continuationToken);
return flux.concatMap(PagedConverter.mergePagedFluxPagedResponse(transformer));
};
return PagedFlux.create(provider);
}
/**
* Applies flatMap transform to elements of PagedResponse.
*
* @param mapper the flatMap transform of element T to Publisher of S.
* @param <T> input type of pagedFlux.
* @param <S> return type of pagedFlux.
* @return the lifted transform on PagedResponse.
*/
private static <T, S> Function<PagedResponse<T>, Mono<PagedResponse<S>>> flatMapPagedResponse(
Function<? super T, ? extends Publisher<? extends S>> mapper) {
return pagedResponse ->
Flux.fromIterable(pagedResponse.getValue())
.flatMapSequential(mapper)
.collectList()
.map(values -> new PagedResponseBase<HttpRequest, S>(pagedResponse.getRequest(),
pagedResponse.getStatusCode(),
pagedResponse.getHeaders(),
values,
pagedResponse.getContinuationToken(),
null));
}
/**
* Applies transform of element to PagedFlux, to elements of PagedResponse. Then merge all these PagedFlux.
*
* @param transformer the transform of element T to PagedFlux of S.
* @param <T> input type of pagedFlux.
* @param <S> return type of pagedFlux.
* @return the the merged PagedFlux.
*/
private static <T, S> Function<PagedResponse<T>, Flux<PagedResponse<S>>> mergePagedFluxPagedResponse(
Function<? super T, PagedFlux<S>> transformer) {
return pagedResponse -> {
List<Flux<PagedResponse<S>>> fluxList = pagedResponse.getValue().stream()
.map(item -> transformer.apply(item).byPage()).collect(Collectors.toList());
return Flux.concat(fluxList)
.filter(p -> !p.getValue().isEmpty());
};
}
/**
* Converts Response of List to PagedFlux.
*
* @param <T> type of element.
* @param responseMono the Response of List to convert.
* @return the PagedFlux.
*/
public static <T> PagedFlux<T> convertListToPagedFlux(Mono<Response<List<T>>> responseMono) {
return new PagedFlux<>(() -> responseMono.map(response -> new PagedResponseBase<Void, T>(
response.getRequest(),
response.getStatusCode(),
response.getHeaders(),
response.getValue(),
null,
null
)));
}
}