PartitionKeyRangeGoneRetryPolicy.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.caches.IPartitionKeyRangeCache;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.routing.CollectionRoutingMap;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.Map;

// TODO: this need testing
/**
 * While this class is public, but it is not part of our published public APIs.
 * This is meant to be internally used only by our sdk.
 */
public class PartitionKeyRangeGoneRetryPolicy extends DocumentClientRetryPolicy {

    private final RxCollectionCache collectionCache;
    private final DiagnosticsClientContext diagnosticsClientContext;
    private final DocumentClientRetryPolicy nextRetryPolicy;
    private final IPartitionKeyRangeCache partitionKeyRangeCache;
    private final String collectionLink;
    private final Map<String, Object> requestOptionProperties;
    private volatile boolean retried;
    private RxDocumentServiceRequest request;

    public PartitionKeyRangeGoneRetryPolicy(DiagnosticsClientContext diagnosticsClientContext,
            RxCollectionCache collectionCache,
            IPartitionKeyRangeCache partitionKeyRangeCache,
            String collectionLink,
            DocumentClientRetryPolicy nextRetryPolicy,
            Map<String, Object> requestOptionProperties) {
        this.diagnosticsClientContext = diagnosticsClientContext;
        this.collectionCache = collectionCache;
        this.partitionKeyRangeCache = partitionKeyRangeCache;
        this.collectionLink = collectionLink;
        this.nextRetryPolicy = nextRetryPolicy;
        this.requestOptionProperties = requestOptionProperties;
        this.request = null;
    }

    /// <summary>
    /// Should the caller retry the operation.
    /// </summary>
    /// <param name="exception">Exception that occured when the operation was tried</param>
    /// <param name="cancellationToken"></param>
    /// <returns>True indicates caller should retry, False otherwise</returns>
    public Mono<ShouldRetryResult> shouldRetry(Exception exception) {
        CosmosException clientException = Utils.as(exception, CosmosException.class);
        if (clientException != null &&
                Exceptions.isStatusCode(clientException, HttpConstants.StatusCodes.GONE) &&
                Exceptions.isSubStatusCode(clientException, HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE)) {

            if (this.retried){
                return Mono.just(ShouldRetryResult.error(clientException));
            }

            RxDocumentServiceRequest request = RxDocumentServiceRequest.create(
                    this.diagnosticsClientContext,
                    OperationType.Read,
                    ResourceType.DocumentCollection,
                    this.collectionLink,
                    null
                    // AuthorizationTokenType.PrimaryMasterKey)
                    );
            if (this.requestOptionProperties != null) {
                request.properties = this.requestOptionProperties;
            }
            Mono<Utils.ValueHolder<DocumentCollection>> collectionObs = this.collectionCache.resolveCollectionAsync(
                BridgeInternal.getMetaDataDiagnosticContext(this.request.requestContext.cosmosDiagnostics),
                request);

            return collectionObs.flatMap(collectionValueHolder -> {

                Mono<Utils.ValueHolder<CollectionRoutingMap>> routingMapObs = this.partitionKeyRangeCache.tryLookupAsync(
                    BridgeInternal.getMetaDataDiagnosticContext(this.request.requestContext.cosmosDiagnostics),
                    collectionValueHolder.v.getResourceId(),
                    null,
                    request.properties);

                Mono<Utils.ValueHolder<CollectionRoutingMap>> refreshedRoutingMapObs = routingMapObs.flatMap(routingMapValueHolder -> {
                    if (routingMapValueHolder.v != null) {
                        // Force refresh.
                        return this.partitionKeyRangeCache.tryLookupAsync(
                            null,
                            collectionValueHolder.v.getResourceId(),
                            routingMapValueHolder.v,
                            request.properties);
                    } else {
                        return Mono.just(new Utils.ValueHolder<>(null));
                    }
                });

                //  TODO: Check if this behavior can be replaced by doOnSubscribe
                return refreshedRoutingMapObs.flatMap(rm -> {
                    this.retried = true;
                    return Mono.just(ShouldRetryResult.retryAfter(Duration.ZERO));
                });

            });

        } else {
            return this.nextRetryPolicy.shouldRetry(exception);
        }
    }

    @Override
    public void onBeforeSendRequest(RxDocumentServiceRequest request) {
        this.request = request;
        this.nextRetryPolicy.onBeforeSendRequest(request);
    }

    @Override
    public RetryContext getRetryContext() {
        if (this.nextRetryPolicy != null) {
            return this.nextRetryPolicy.getRetryContext();
        } else {
            return null;
        }
    }
}