GlobalEndpointManager.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation;

import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList;
import com.azure.cosmos.implementation.routing.LocationCache;
import com.azure.cosmos.implementation.routing.LocationHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

import java.net.URI;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;

/**
 * Endpoint region cache manager implementation. Supports cross region address routing based on
 * availability and preference list.
 */
public class GlobalEndpointManager implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(GlobalEndpointManager.class);

    private final int backgroundRefreshLocationTimeIntervalInMS;
    private final LocationCache locationCache;
    private final URI defaultEndpoint;
    private final ConnectionPolicy connectionPolicy;
    private final Duration maxInitializationTime;
    private final DatabaseAccountManagerInternal owner;
    private final AtomicBoolean isRefreshing;
    private final AtomicBoolean refreshInBackground;
    private final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Scheduler scheduler = Schedulers.fromExecutor(executor);
    private volatile boolean isClosed;
    private AtomicBoolean firstTimeDatabaseAccountInitialization = new AtomicBoolean(true);
    private volatile DatabaseAccount latestDatabaseAccount;

    public GlobalEndpointManager(DatabaseAccountManagerInternal owner, ConnectionPolicy connectionPolicy, Configs configs)  {
        this.backgroundRefreshLocationTimeIntervalInMS = configs.getUnavailableLocationsExpirationTimeInSeconds() * 1000;
        this.maxInitializationTime = Duration.ofSeconds(configs.getGlobalEndpointManagerMaxInitializationTimeInSeconds());
        try {
            this.locationCache = new LocationCache(
                    new ArrayList<>(connectionPolicy.getPreferredRegions() != null ?
                            connectionPolicy.getPreferredRegions():
                            Collections.emptyList()
                    ),
                    owner.getServiceEndpoint(),
                    connectionPolicy.isEndpointDiscoveryEnabled(),
                    connectionPolicy.isMultipleWriteRegionsEnabled(),
                    configs);

            this.owner = owner;
            this.defaultEndpoint = owner.getServiceEndpoint();
            this.connectionPolicy = connectionPolicy;

            this.isRefreshing = new AtomicBoolean(false);
            this.refreshInBackground = new AtomicBoolean(false);
            this.isClosed = false;
        } catch (Exception e) {
            throw new IllegalArgumentException(e);
        }
    }

    public void init() {
        // TODO: add support for openAsync
        // https://msdata.visualstudio.com/CosmosDB/_workitems/edit/332589
        startRefreshLocationTimerAsync(true).block(maxInitializationTime);
    }

    public UnmodifiableList<URI> getReadEndpoints() {
        // readonly
        return this.locationCache.getReadEndpoints();
    }

    public UnmodifiableList<URI> getWriteEndpoints() {
        //readonly
        return this.locationCache.getWriteEndpoints();
    }

    public static Mono<DatabaseAccount> getDatabaseAccountFromAnyLocationsAsync(
            URI defaultEndpoint, List<String> locations, Function<URI, Mono<DatabaseAccount>> getDatabaseAccountFn) {

        return getDatabaseAccountFn.apply(defaultEndpoint).onErrorResume(
                e -> {
                    logger.error("Fail to reach global gateway [{}], [{}]", defaultEndpoint, e.getMessage());
                    if (locations.isEmpty()) {
                        return Mono.error(e);
                    }

                    Flux<Flux<DatabaseAccount>> obs = Flux.range(0, locations.size())
                            .map(index -> getDatabaseAccountFn.apply(LocationHelper.getLocationEndpoint(defaultEndpoint, locations.get(index))).flux());

                    // iterate and get the database account from the first non failure, otherwise get the last error.
                    Mono<DatabaseAccount> res = Flux.concatDelayError(obs).take(1).single();
                    return res.doOnError(
                            innerE -> logger.error("Fail to reach location any of locations {} {}", String.join(",", locations), innerE.getMessage()));
                });
    }

    public URI resolveServiceEndpoint(RxDocumentServiceRequest request) {
        return this.locationCache.resolveServiceEndpoint(request);
    }

    public void markEndpointUnavailableForRead(URI endpoint) {
        logger.debug("Marking endpoint {} unavailable for read",endpoint);
        this.locationCache.markEndpointUnavailableForRead(endpoint);;
    }

    public void markEndpointUnavailableForWrite(URI endpoint) {
        logger.debug("Marking  endpoint {} unavailable for Write",endpoint);
        this.locationCache.markEndpointUnavailableForWrite(endpoint);
    }

    public boolean canUseMultipleWriteLocations(RxDocumentServiceRequest request) {
        return this.locationCache.canUseMultipleWriteLocations(request);
    }

    public void close() {
        this.isClosed = true;
        this.executor.shutdown();
        logger.debug("GlobalEndpointManager closed.");
    }

    public Mono<Void> refreshLocationAsync(DatabaseAccount databaseAccount, boolean forceRefresh) {
        return Mono.defer(() -> {
            logger.debug("refreshLocationAsync() invoked");

            if (forceRefresh) {
                Mono<DatabaseAccount> databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync(
                    this.defaultEndpoint,
                    new ArrayList<>(this.connectionPolicy.getPreferredRegions()),
                    this::getDatabaseAccountAsync);

                return databaseAccountObs.map(dbAccount -> {
                    this.locationCache.onDatabaseAccountRead(dbAccount);
                    return dbAccount;
                }).flatMap(dbAccount -> {
                    return Mono.empty();
                });
            }

            if (!isRefreshing.compareAndSet(false, true)) {
                logger.debug("in the middle of another refresh. Not invoking a new refresh.");
                return Mono.empty();
            }

            logger.debug("will refresh");
            return this.refreshLocationPrivateAsync(databaseAccount).doOnError(e -> this.isRefreshing.set(false));
        });
    }

    /**
     * This will provide the latest databaseAccount.
     * If due to some reason last databaseAccount update was null,
     * this method will return previous valid value
     * @return DatabaseAccount
     */
    public DatabaseAccount getLatestDatabaseAccount() {
        return this.latestDatabaseAccount;
    }

    public int getPreferredLocationCount() {
        return this.connectionPolicy.getPreferredRegions() != null ? this.connectionPolicy.getPreferredRegions().size() : 0;
    }

    private Mono<Void> refreshLocationPrivateAsync(DatabaseAccount databaseAccount) {
        return Mono.defer(() -> {
            logger.debug("refreshLocationPrivateAsync() refreshing locations");

            if (databaseAccount != null) {
                this.locationCache.onDatabaseAccountRead(databaseAccount);
            }

            Utils.ValueHolder<Boolean> canRefreshInBackground = new Utils.ValueHolder<>();
            if (this.locationCache.shouldRefreshEndpoints(canRefreshInBackground)) {
                logger.debug("shouldRefreshEndpoints: true");

                if (databaseAccount == null && !canRefreshInBackground.v) {
                    logger.debug("shouldRefreshEndpoints: can't be done in background");

                    Mono<DatabaseAccount> databaseAccountObs = getDatabaseAccountFromAnyLocationsAsync(
                            this.defaultEndpoint,
                            new ArrayList<>(this.connectionPolicy.getPreferredRegions()),
                            this::getDatabaseAccountAsync);

                    return databaseAccountObs.map(dbAccount -> {
                        this.locationCache.onDatabaseAccountRead(dbAccount);
                        this.isRefreshing.set(false);
                        return dbAccount;
                    }).flatMap(dbAccount -> {
                        // trigger a startRefreshLocationTimerAsync don't wait on it.
                        if (!this.refreshInBackground.get()) {
                            this.startRefreshLocationTimerAsync();
                        }
                        return Mono.empty();
                    });
                }

                // trigger a startRefreshLocationTimerAsync don't wait on it.
                if (!this.refreshInBackground.get()) {
                    this.startRefreshLocationTimerAsync();
                }

                this.isRefreshing.set(false);
                return Mono.empty();
            } else {
                logger.debug("shouldRefreshEndpoints: false, nothing to do.");
                this.isRefreshing.set(false);
                return Mono.empty();
            }
        });
    }

    private void startRefreshLocationTimerAsync() {
        startRefreshLocationTimerAsync(false).subscribe();
    }

    private Mono<Void> startRefreshLocationTimerAsync(boolean initialization) {

        if (this.isClosed) {
            logger.debug("startRefreshLocationTimerAsync: nothing to do, it is closed");
            // if client is already closed, nothing to be done, just return.
            return Mono.empty();
        }

        logger.debug("registering a refresh in [{}] ms", this.backgroundRefreshLocationTimeIntervalInMS);
        LocalDateTime now = LocalDateTime.now();

        int delayInMillis = initialization ? 0: this.backgroundRefreshLocationTimeIntervalInMS;

        this.refreshInBackground.set(true);

        return Mono.delay(Duration.ofMillis(delayInMillis), CosmosSchedulers.COSMOS_PARALLEL)
                .flatMap(
                        t -> {
                            if (this.isClosed) {
                                logger.warn("client already closed");
                                // if client is already closed, nothing to be done, just return.
                                return Mono.empty();
                            }

                            logger.debug("startRefreshLocationTimerAsync() - Invoking refresh, I was registered on [{}]", now);
                            Mono<DatabaseAccount> databaseAccountObs = GlobalEndpointManager.getDatabaseAccountFromAnyLocationsAsync(this.defaultEndpoint, new ArrayList<>(this.connectionPolicy.getPreferredRegions()),
                                    this::getDatabaseAccountAsync);

                            return databaseAccountObs.flatMap(dbAccount -> {
                                logger.debug("db account retrieved");
                                this.refreshInBackground.set(false);
                                return this.refreshLocationPrivateAsync(dbAccount);
                            });
                        }).onErrorResume(ex -> {
                    logger.error("startRefreshLocationTimerAsync() - Unable to refresh database account from any location. Exception: {}", ex.toString(), ex);

                    this.startRefreshLocationTimerAsync();
                    return Mono.empty();
                }).subscribeOn(scheduler);
    }

    private Mono<DatabaseAccount> getDatabaseAccountAsync(URI serviceEndpoint) {
        return this.owner.getDatabaseAccountFromEndpoint(serviceEndpoint)
            .doOnNext(databaseAccount -> {
                if(databaseAccount != null) {
                    this.latestDatabaseAccount = databaseAccount;
                }

                logger.debug("account retrieved: {}", databaseAccount);
            }).single();
    }

    public boolean isClosed() {
        return this.isClosed;
    }

    public String getRegionName(URI locationEndpoint, OperationType operationType) {
        return this.locationCache.getRegionName(locationEndpoint, operationType);
    }
}