RxClientCollectionCache.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.caches;

import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.RequestVerb;
import com.azure.cosmos.implementation.DocumentClientRetryPolicy;
import com.azure.cosmos.implementation.DocumentCollection;
import com.azure.cosmos.implementation.ISessionContainer;
import com.azure.cosmos.implementation.AuthorizationTokenType;
import com.azure.cosmos.implementation.ClearingSessionContainerClientRetryPolicy;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.IRetryPolicyFactory;
import com.azure.cosmos.implementation.MetadataDiagnosticsContext;
import com.azure.cosmos.implementation.ObservableHelper;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.RxStoreModel;
import com.azure.cosmos.implementation.Utils;
import reactor.core.publisher.Mono;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.Map;

/**
 * Caches collection information.
 * While this class is public, but it is not part of our published public APIs.
 * This is meant to be internally used only by our sdk.
 */
public class RxClientCollectionCache extends RxCollectionCache {

    private final DiagnosticsClientContext diagnosticsClientContext;
    private final RxStoreModel storeModel;
    private final IAuthorizationTokenProvider tokenProvider;
    private final IRetryPolicyFactory retryPolicy;
    private final ISessionContainer sessionContainer;

    public RxClientCollectionCache(DiagnosticsClientContext diagnosticsClientContext,
                                   ISessionContainer sessionContainer,
                                   RxStoreModel storeModel,
                                   IAuthorizationTokenProvider tokenProvider,
                                   IRetryPolicyFactory retryPolicy,
                                   AsyncCache<String, DocumentCollection> collectionInfoByNameCache, AsyncCache<String, DocumentCollection> collectionInfoByIdCache) {
        super(collectionInfoByNameCache, collectionInfoByIdCache);
        this.diagnosticsClientContext = diagnosticsClientContext;
        this.storeModel = storeModel;
        this.tokenProvider = tokenProvider;
        this.retryPolicy = retryPolicy;
        this.sessionContainer = sessionContainer;
    }

    public RxClientCollectionCache(DiagnosticsClientContext diagnosticsClientContext,
                                   ISessionContainer sessionContainer,
                                   RxStoreModel storeModel,
                                   IAuthorizationTokenProvider tokenProvider,
                                   IRetryPolicyFactory retryPolicy) {
        this.diagnosticsClientContext = diagnosticsClientContext;
        this.storeModel = storeModel;
        this.tokenProvider = tokenProvider;
        this.retryPolicy = retryPolicy;
        this.sessionContainer = sessionContainer;
    }

    protected Mono<DocumentCollection> getByRidAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String collectionRid, Map<String, Object> properties) {
        DocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(this.sessionContainer, this.retryPolicy.getRequestPolicy());
        return ObservableHelper.inlineIfPossible(
                () -> this.readCollectionAsync(metaDataDiagnosticsContext, PathsHelper.generatePath(ResourceType.DocumentCollection, collectionRid, false), retryPolicyInstance, properties)
                , retryPolicyInstance);
    }

    protected Mono<DocumentCollection> getByNameAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext, String resourceAddress, Map<String, Object> properties) {
        DocumentClientRetryPolicy retryPolicyInstance = new ClearingSessionContainerClientRetryPolicy(this.sessionContainer, this.retryPolicy.getRequestPolicy());
        return ObservableHelper.inlineIfPossible(
                () -> this.readCollectionAsync(metaDataDiagnosticsContext, resourceAddress, retryPolicyInstance, properties),
                retryPolicyInstance);
    }

    private Mono<DocumentCollection> readCollectionAsync(MetadataDiagnosticsContext metaDataDiagnosticsContext,
                                                         String collectionLink,
                                                         DocumentClientRetryPolicy retryPolicyInstance,
                                                         Map<String, Object> properties) {

        String path = Utils.joinPath(collectionLink, null);
        RxDocumentServiceRequest request = RxDocumentServiceRequest.create(this.diagnosticsClientContext,
                OperationType.Read,
                ResourceType.DocumentCollection,
                path,
                new HashMap<>());

        request.getHeaders().put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123());

        if (tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken) {
            String resourceName = request.getResourceAddress();
            String authorizationToken = tokenProvider.getUserAuthorizationToken(
                    resourceName,
                    request.getResourceType(),
                    RequestVerb.GET,
                    request.getHeaders(),
                    AuthorizationTokenType.PrimaryMasterKey,
                    properties);

            try {
                authorizationToken = URLEncoder.encode(authorizationToken, "UTF-8");
            } catch (UnsupportedEncodingException e) {
                return Mono.error(new IllegalStateException("Failed to encode authtoken.", e));
            }
            request.getHeaders().put(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken);
        }

        if (retryPolicyInstance != null){
            retryPolicyInstance.onBeforeSendRequest(request);
        }

        Instant addressCallStartTime = Instant.now();
        Mono<RxDocumentServiceResponse> responseObs;
        if (tokenProvider.getAuthorizationTokenType() != AuthorizationTokenType.AadToken) {
            responseObs = this.storeModel.processMessage(request);
        } else {
            responseObs = tokenProvider
                .populateAuthorizationHeader(request)
                .flatMap(serviceRequest -> this.storeModel.processMessage(serviceRequest));
        }

        return responseObs.map(response -> {
            if(metaDataDiagnosticsContext != null) {
                Instant addressCallEndTime = Instant.now();
                MetadataDiagnosticsContext.MetadataDiagnostics metaDataDiagnostic  = new MetadataDiagnosticsContext.MetadataDiagnostics(addressCallStartTime,
                    addressCallEndTime,
                    MetadataDiagnosticsContext.MetadataType.CONTAINER_LOOK_UP);
                metaDataDiagnosticsContext.addMetaDataDiagnostic(metaDataDiagnostic);
            }

            return BridgeInternal.toResourceResponse(response, DocumentCollection.class)
                .getResource();
        }).single();
    }
}