BulkOperationRetryPolicy.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.cosmos.implementation.batch;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.CosmosException;
import com.azure.cosmos.implementation.HttpConstants.StatusCodes;
import com.azure.cosmos.implementation.HttpConstants.SubStatusCodes;
import com.azure.cosmos.implementation.IRetryPolicy;
import com.azure.cosmos.implementation.ResourceThrottleRetryPolicy;
import com.azure.cosmos.implementation.RetryContext;
import com.azure.cosmos.implementation.ShouldRetryResult;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.caches.RxCollectionCache;
import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.models.CosmosBatchOperationResult;
import reactor.core.publisher.Mono;

import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;

/**
 * A container to keep retry policies and functions for bulk.
 */
final class BulkOperationRetryPolicy implements IRetryPolicy {

    private static final int MAX_RETRIES = 1;

    private final RxCollectionCache collectionCache;
    private final RxPartitionKeyRangeCache partitionKeyRangeCache;
    private final String collectionLink;
    private final ResourceThrottleRetryPolicy resourceThrottleRetryPolicy;
    private int attemptedRetries;

    BulkOperationRetryPolicy(
        RxCollectionCache collectionCache,
        RxPartitionKeyRangeCache partitionKeyRangeCache,
        String resourceFullName,
        ResourceThrottleRetryPolicy resourceThrottleRetryPolicy) {

        this.collectionCache = collectionCache;
        this.partitionKeyRangeCache = partitionKeyRangeCache;

        // Similar to PartitionKeyMismatchRetryPolicy constructor.
        collectionLink = Utils.getCollectionName(resourceFullName);
        this.resourceThrottleRetryPolicy = resourceThrottleRetryPolicy;
    }

    final Mono<ShouldRetryResult> shouldRetry(final CosmosBatchOperationResult result) {

        checkNotNull(result, "expected non-null result");

        // Create CosmosException for the next retry policy to understand:
        CosmosException exception = BridgeInternal.createCosmosException(
            null,
            result.getStatusCode(),
            null,
            BulkExecutorUtil.getResponseHeadersFromBatchOperationResult(result));

        if (this.resourceThrottleRetryPolicy == null) {
            return Mono.just(ShouldRetryResult.noRetry());
        }

        return this.resourceThrottleRetryPolicy.shouldRetry(exception);
    }

    @Override
    public Mono<ShouldRetryResult> shouldRetry(Exception exception) {

        if (this.resourceThrottleRetryPolicy == null) {
            return Mono.just(ShouldRetryResult.noRetry());
        }

        return this.resourceThrottleRetryPolicy.shouldRetry(exception);
    }

    @Override
    public RetryContext getRetryContext() {
        return this.resourceThrottleRetryPolicy.getRetryContext();
    }

    Mono<Boolean> shouldRetryForGone(int statusCode, int subStatusCode) {
        if (statusCode == StatusCodes.GONE) {
            if (this.attemptedRetries++ > MAX_RETRIES) {
                return Mono.just(false);
            }

            if ((subStatusCode == SubStatusCodes.PARTITION_KEY_RANGE_GONE ||
                     subStatusCode == SubStatusCodes.COMPLETING_SPLIT ||
                     subStatusCode == SubStatusCodes.COMPLETING_PARTITION_MIGRATION)) {
                return collectionCache
                       .resolveByNameAsync(null, collectionLink, null)
                       .flatMap(collection -> this.partitionKeyRangeCache
                                                  .tryGetOverlappingRangesAsync(null /*metaDataDiagnosticsContext*/,
                                                                                collection.getResourceId(),
                                                                                FeedRangeEpkImpl.forFullRange()
                                                                                    .getRange(),
                                                                                true,
                                                                                null /*properties*/)
                                                  .then(Mono.just(true)));
            }

            if (subStatusCode == SubStatusCodes.NAME_CACHE_IS_STALE) {
                refreshCollectionCache();
            }

            return Mono.just(true);
        }

        return Mono.just(false);
    }

    /**
     * TODO(rakkuma): metaDataDiagnosticContext is passed null in collectionCache.refresh function. Fix it while adding
     *  support for an operation wise Diagnostic. The value here should be merged in the individual diagnostic.
     * Issue: https://github.com/Azure/azure-sdk-for-java/issues/17647
     */
    private void refreshCollectionCache() {
        this.collectionCache.refresh(
            null,
            this.collectionLink,
            null);
    }
}