RenameCollectionAwareClientRetryPolicy.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.caches.RxClientCollectionCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

import java.time.Duration;

public class RenameCollectionAwareClientRetryPolicy extends DocumentClientRetryPolicy {

    private final static Logger logger = LoggerFactory.getLogger(RenameCollectionAwareClientRetryPolicy.class);

    private final DocumentClientRetryPolicy retryPolicy;
    private final ISessionContainer sessionContainer;
    private final RxClientCollectionCache collectionCache;
    private RxDocumentServiceRequest request;
    private boolean hasTriggered = false;

    public RenameCollectionAwareClientRetryPolicy(ISessionContainer sessionContainer, RxClientCollectionCache collectionCache, DocumentClientRetryPolicy retryPolicy) {
        this.retryPolicy = retryPolicy;
        this.sessionContainer = sessionContainer;
        this.collectionCache = collectionCache;
        this.request = null;
    }

    @Override
    public void onBeforeSendRequest(RxDocumentServiceRequest request) {
        this.request = request;
        this.retryPolicy.onBeforeSendRequest(request);
    }

    @Override
    public RetryContext getRetryContext() {
        if (this.retryPolicy != null) {
            return this.retryPolicy.getRetryContext();
        } else {
            return null;
        }
    }

    @Override
    public Mono<ShouldRetryResult> shouldRetry(Exception e) {
        return this.retryPolicy.shouldRetry(e).flatMap(shouldRetryResult -> {
            if (!shouldRetryResult.shouldRetry && !this.hasTriggered) {
                CosmosException clientException = Utils.as(e, CosmosException.class);

                if (this.request == null) {
                    // someone didn't call OnBeforeSendRequest - nothing we can do
                    logger.error("onBeforeSendRequest is not invoked, encountered failure due to request being null", e);
                    return Mono.just(ShouldRetryResult.error(e));
                }

                if (clientException != null && this.request.getIsNameBased() &&
                        Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.NOTFOUND) &&
                        Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.READ_SESSION_NOT_AVAILABLE)) {
                    // Clear the session token, because the collection name might be reused.
                    logger.warn("Clear the token for named base request {}", request.getResourceAddress());

                    this.sessionContainer.clearTokenByCollectionFullName(request.getResourceAddress());

                    this.hasTriggered = true;

                    String oldCollectionRid = request.requestContext.resolvedCollectionRid;

                    request.forceNameCacheRefresh = true;
                    request.requestContext.resolvedCollectionRid = null;

                    Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request);

                    return collectionObs.flatMap(collectionValueHolder -> {
                        if (collectionValueHolder.v == null) {
                            logger.warn("Can't recover from session unavailable exception because resolving collection name {} returned null", request.getResourceAddress());
                        } else if (!StringUtils.isEmpty(oldCollectionRid) && !StringUtils.isEmpty(collectionValueHolder.v.getResourceId())) {
                            return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
                        }
                        return Mono.just(shouldRetryResult);
                    }).onErrorResume(throwable -> {
                        // When resolveCollectionAsync throws an exception ignore it because it's an attempt to recover an existing
                        // error. When the recovery fails we return ShouldRetryResult.noRetry and propagate the original exception to the client

                        logger.warn("Can't recover from session unavailable exception because resolving collection name {} failed with {}", request.getResourceAddress(), throwable.getMessage());
                        if (throwable instanceof Exception) {
                            return Mono.just(ShouldRetryResult.error((Exception) throwable));
                        }
                        return Mono.error(throwable);
                    });
                }
            }
            return Mono.just(shouldRetryResult);
        });
    }
}