RxCollectionCache.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.CosmosClientMetadataCachesSnapshot;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.InvalidPartitionException;
import com.azure.cosmos.implementation.NotFoundException;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.RMResources;
import com.azure.cosmos.implementation.ResourceId;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.models.ModelBridgeInternal;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

import java.util.Map;

/**
 * While this class is public, but it is not part of our published public APIs.
 * This is meant to be internally used only by our sdk.
 */
public abstract class RxCollectionCache {

    private final AsyncCache<String, DocumentCollection> collectionInfoByNameCache;
    private final AsyncCache<String, DocumentCollection> collectionInfoByIdCache;

    public static void serialize(CosmosClientMetadataCachesSnapshot clientMetadataCachesSnapshot, RxCollectionCache cache) {
        clientMetadataCachesSnapshot.serializeCollectionInfoByIdCache(cache.collectionInfoByIdCache);
        clientMetadataCachesSnapshot.serializeCollectionInfoByNameCache(cache.collectionInfoByNameCache);
    }

    protected RxCollectionCache(AsyncCache<String, DocumentCollection> collectionInfoByNameCache, AsyncCache<String, DocumentCollection> collectionInfoByIdCache) {
        this.collectionInfoByNameCache = collectionInfoByNameCache;
        this.collectionInfoByIdCache = collectionInfoByIdCache;
    }

    protected RxCollectionCache() {
        this(new AsyncCache<>(new CollectionRidComparer()), new AsyncCache<>(new CollectionRidComparer()));
    }

    /**
     * Resolves a request to a collection in a sticky manner.
     * Unless request.ForceNameCacheRefresh is equal to true, it will return the same collection.
     * @param request Request to resolve.
     * @return an instance of Single&lt;DocumentCollection&gt;
     */
    public Mono<Utils.ValueHolder<DocumentCollection>> resolveCollectionAsync(
        MetadataDiagnosticsContext metaDataDiagnosticsContext, RxDocumentServiceRequest request) {
        //  Mono Void to represent only terminal events specifically complete and error
        Mono<Void> init = null;
        if (request.getIsNameBased()) {
            if (request.isForceNameCacheRefresh()) {
                Mono<Void> mono = this.refreshAsync(metaDataDiagnosticsContext, request);
                init = mono.then(Mono.fromRunnable(() -> request.setForceNameCacheRefresh(false)));
            }

            Mono<Utils.ValueHolder<DocumentCollection>> collectionInfoObs = this.resolveByPartitionKeyRangeIdentityAsync(
                BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics),request.getPartitionKeyRangeIdentity(), request.properties);

            if (init != null) {
                collectionInfoObs = init.then(collectionInfoObs);
            }

            return collectionInfoObs.flatMap(collectionValueHolder -> {
                if (collectionValueHolder.v != null) {
                    return Mono.just(collectionValueHolder);
                }
                if (request.requestContext.resolvedCollectionRid == null) {

                    Mono<DocumentCollection> collectionInfoRes = this.resolveByNameAsync(metaDataDiagnosticsContext, request.getResourceAddress(), request.properties);

                    return collectionInfoRes.flatMap(collection -> {
                        // TODO: how to async log this?
                        //                      logger.debug(
                        //                          "Mapped resourceName {} to getResourceId {}.",
                        //                          request.getResourceAddress(),
                        //                          collectionInfo.getResourceId());

                        request.setResourceId(collection.getResourceId());
                        request.requestContext.resolvedCollectionRid = collection.getResourceId();
                        return Mono.just(new Utils.ValueHolder<>(collection));

                    });
                } else {
                    return this.resolveByRidAsync(metaDataDiagnosticsContext, request.requestContext.resolvedCollectionRid, request.properties);
                }
            });
        } else {
            return resolveByPartitionKeyRangeIdentityAsync(metaDataDiagnosticsContext, request.getPartitionKeyRangeIdentity(),request.properties)
                .flatMap(collectionValueHolder -> {

                    if (collectionValueHolder.v != null) {
                        return Mono.just(collectionValueHolder);
                    }

                    return this.resolveByRidAsync(metaDataDiagnosticsContext, request.getResourceAddress(), request.properties);
                });
        }
    }

    /**
     * This method is only used in retry policy as it doesn't have request handy.
     * @param resourceAddress
     */
    public void refresh(MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties) {
        if (PathsHelper.isNameBased(resourceAddress)) {
            String resourceFullName = PathsHelper.getCollectionPath(resourceAddress);

            this.collectionInfoByNameCache.refresh(
                    resourceFullName,
                    () -> {
                        Mono<DocumentCollection> collectionObs = this.getByNameAsync(metaDataDiagnosticsContext, resourceFullName, properties);
                        return collectionObs.doOnSuccess(collection -> this.collectionInfoByIdCache.set(collection.getResourceId(), collection));
                    });
        }
    }

    protected abstract Mono<DocumentCollection> getByRidAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, Map<String, Object> properties);

    protected abstract Mono<DocumentCollection> getByNameAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties);

    private Mono<Utils.ValueHolder<DocumentCollection>> resolveByPartitionKeyRangeIdentityAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext,
                                                                                                PartitionKeyRangeIdentity partitionKeyRangeIdentity,
                                                                                                Map<String, Object> properties) {
        // if request is targeted at specific partition using x-ms-documentd-partitionkeyrangeid header,
        // which contains value "<collectionrid>,<partitionkeyrangeid>", then resolve to collection rid in this header.
        if (partitionKeyRangeIdentity != null && partitionKeyRangeIdentity.getCollectionRid() != null) {
            return this.resolveByRidAsync(metaDataDiagnosticsContext, partitionKeyRangeIdentity.getCollectionRid(), properties)
                    .onErrorResume(e -> {
                        Throwable unwrappedException = Exceptions.unwrap(e);
                        if (unwrappedException instanceof NotFoundException) {
                            // This is signal to the upper logic either to refresh
                            // collection cache and retry.
                            return Mono.error(new InvalidPartitionException(RMResources.InvalidDocumentCollection));
                        }
                        return Mono.error(unwrappedException);

                    });
        }
        return Mono.just(new Utils.ValueHolder<>(null));
    }

    public Mono<Utils.ValueHolder<DocumentCollection>> resolveByRidAsync(
            MetadataDiagnosticsContext metaDataDiagnosticsContext,
            String resourceId,
            Map<String, Object> properties) {

        ResourceId resourceIdParsed = ResourceId.parse(resourceId);
        String collectionResourceId = resourceIdParsed.getDocumentCollectionId().toString();

        Mono<DocumentCollection> async = this.collectionInfoByIdCache.getAsync(
            collectionResourceId,
            null,
            () -> this.getByRidAsync(metaDataDiagnosticsContext, collectionResourceId, properties));
        return async.map(Utils.ValueHolder::new);
    }

    public Mono<DocumentCollection> resolveByNameAsync(
        MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties) {

        return this.resolveByNameAsync(metaDataDiagnosticsContext, resourceAddress, properties, null);
    }

    public Mono<DocumentCollection> resolveByNameAsync(
        MetadataDiagnosticsContext metaDataDiagnosticsContext,
        String resourceAddress,
        Map<String, Object> properties,
        DocumentCollection obsoleteValue) {

        String resourceFullName = PathsHelper.getCollectionPath(resourceAddress);

        return this.collectionInfoByNameCache.getAsync(
            resourceFullName,
            obsoleteValue,
            () -> {
                Mono<DocumentCollection> collectionObs = this.getByNameAsync(
                    metaDataDiagnosticsContext, resourceFullName, properties);
                return collectionObs.doOnSuccess(collection -> this.collectionInfoByIdCache.set(
                    collection.getResourceId(),
                    collection));
            });
    }

    public Mono<Void> refreshAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, RxDocumentServiceRequest request) {
        // TODO System.Diagnostics.Debug.Assert(request.IsNameBased);

        String resourceFullName = PathsHelper.getCollectionPath(request.getResourceAddress());
        Mono<Void> mono;

        if (request.requestContext.resolvedCollectionRid != null) {
            // Here we will issue backend call only if cache wasn't already refreshed (if whatever is there corresponds to previously resolved collection rid).
            DocumentCollection obsoleteValue = new DocumentCollection();
            ModelBridgeInternal.setResourceId(obsoleteValue, request.requestContext.resolvedCollectionRid);

            mono = this.collectionInfoByNameCache.getAsync(
                    resourceFullName,
                    obsoleteValue,
                    () -> {
                        Mono<DocumentCollection> collectionObs = this.getByNameAsync(metaDataDiagnosticsContext, resourceFullName, request.properties);
                        return collectionObs.doOnSuccess(collection -> {
                            this.collectionInfoByIdCache.set(collection.getResourceId(), collection);
                        });
                    }).then();
        } else {
            // In case of ForceRefresh directive coming from client, there will be no ResolvedCollectionRid, so we
            // need to refresh unconditionally.
            mono = Mono.fromRunnable(() -> this.refresh(metaDataDiagnosticsContext, request.getResourceAddress(), request.properties));
        }

        return mono.doOnSuccess(aVoid -> request.requestContext.resolvedCollectionRid = null);
    }

    private static class CollectionRidComparer implements IEqualityComparer<DocumentCollection> {
        private static final long serialVersionUID = 1l;
        public boolean areEqual(DocumentCollection left, DocumentCollection right) {
            if (left == null && right == null) {
                return true;
            }

            if ((left == null) ^ (right == null)) {
                return false;
            }

            return StringUtils.equals(left.getResourceId(), right.getResourceId());
        }
    }
}