AddressSelector.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.directconnectivity;
import com.azure.cosmos.implementation.GoneException;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import reactor.core.publisher.Mono;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
public class AddressSelector {
private final IAddressResolver addressResolver;
private final Protocol protocol;
public AddressSelector(IAddressResolver addressResolver, Protocol protocol) {
this.addressResolver = addressResolver;
this.protocol = protocol;
}
public Mono<List<Uri>> resolveAllUriAsync(
RxDocumentServiceRequest request,
boolean includePrimary,
boolean forceRefresh) {
Mono<List<AddressInformation>> allReplicaAddressesObs = this.resolveAddressesAsync(request, forceRefresh);
return allReplicaAddressesObs.map(allReplicaAddresses -> allReplicaAddresses.stream().filter(a -> includePrimary || !a.isPrimary())
.map(a -> a.getPhysicalUri()).collect(Collectors.toList()));
}
public Mono<Uri> resolvePrimaryUriAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
Mono<List<AddressInformation>> replicaAddressesObs = this.resolveAddressesAsync(request, forceAddressRefresh);
return replicaAddressesObs.flatMap(replicaAddresses -> {
try {
return Mono.just(AddressSelector.getPrimaryUri(request, replicaAddresses));
} catch (Exception e) {
return Mono.error(e);
}
});
}
public static Uri getPrimaryUri(RxDocumentServiceRequest request, List<AddressInformation> replicaAddresses) throws GoneException {
AddressInformation primaryAddress = null;
if (request.getDefaultReplicaIndex() != null) {
int defaultReplicaIndex = request.getDefaultReplicaIndex();
if (defaultReplicaIndex >= 0 && defaultReplicaIndex < replicaAddresses.size()) {
primaryAddress = replicaAddresses.get(defaultReplicaIndex);
}
} else {
primaryAddress = replicaAddresses.stream().filter(address -> address.isPrimary() && !address.getPhysicalUri().getURIAsString().contains("["))
.findAny().orElse(null);
}
if (primaryAddress == null) {
// Primary endpoint (of the desired protocol) was not found.
throw new GoneException(String.format("The requested resource is no longer available at the server. Returned addresses are {%s}",
String.join(",", replicaAddresses.stream()
.map(address -> address.getPhysicalUri().getURIAsString()).collect(Collectors.toList()))), null);
}
return primaryAddress.getPhysicalUri();
}
public Mono<List<AddressInformation>> resolveAddressesAsync(RxDocumentServiceRequest request, boolean forceAddressRefresh) {
Mono<List<AddressInformation>> resolvedAddressesObs =
(this.addressResolver.resolveAsync(request, forceAddressRefresh))
.map(addresses -> Arrays.stream(addresses)
.filter(address -> !Strings.isNullOrEmpty(address.getPhysicalUri().getURIAsString()) && Strings.areEqualIgnoreCase(address.getProtocolScheme(), this.protocol.scheme()))
.collect(Collectors.toList()));
return resolvedAddressesObs.map(
resolvedAddresses -> {
List<AddressInformation> r = resolvedAddresses.stream().filter(address -> !address.isPublic()).collect(Collectors.toList());
if (r.size() > 0) {
return r;
} else {
return resolvedAddresses.stream().filter(AddressInformation::isPublic).collect(Collectors.toList());
}
}
);
}
}