BarrierRequestHelper.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.directconnectivity;
import com.azure.cosmos.implementation.AadTokenAuthorizationHelper;
import com.azure.cosmos.implementation.AuthorizationTokenType;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.IAuthorizationTokenProvider;
import com.azure.cosmos.implementation.InternalServerErrorException;
import com.azure.cosmos.implementation.OperationType;
import com.azure.cosmos.implementation.PathsHelper;
import com.azure.cosmos.implementation.RMResources;
import com.azure.cosmos.implementation.ResourceId;
import com.azure.cosmos.implementation.ResourceType;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.Strings;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.RequestVerb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;
public class BarrierRequestHelper {
private final static Logger logger = LoggerFactory.getLogger(BarrierRequestHelper.class);
public static Mono<RxDocumentServiceRequest> createAsync(
DiagnosticsClientContext clientContext,
RxDocumentServiceRequest request,
IAuthorizationTokenProvider authorizationTokenProvider,
Long targetLsn,
Long targetGlobalCommittedLsn) {
boolean isCollectionHeadRequest = BarrierRequestHelper.isCollectionHeadBarrierRequest(
request.getResourceType(),
request.getOperationType());
AuthorizationTokenType originalRequestTokenType = request.authorizationTokenType;
if (authorizationTokenProvider != null && authorizationTokenProvider.getAuthorizationTokenType() != null) {
originalRequestTokenType = authorizationTokenProvider.getAuthorizationTokenType();
}
if (originalRequestTokenType == AuthorizationTokenType.Invalid) {
String message = "AuthorizationTokenType not set for the read request";
assert false : message;
logger.error(message);
}
String authorizationToken = Strings.Emtpy;
RxDocumentServiceRequest barrierLsnRequest = null;
if (!isCollectionHeadRequest) {
// DB Feed
barrierLsnRequest = RxDocumentServiceRequest.create(clientContext,
OperationType.HeadFeed,
null,
ResourceType.Database,
null,
originalRequestTokenType);
} else if (request.getIsNameBased()) {
// Name based server request
// get the collection full name
// dbs/{id}/colls/{collid}/
String collectionLink = PathsHelper.getCollectionPath(request.getResourceAddress());
barrierLsnRequest = RxDocumentServiceRequest.createFromName(clientContext,
OperationType.Head,
collectionLink,
ResourceType.DocumentCollection,
originalRequestTokenType);
} else {
// RID based Server request
barrierLsnRequest = RxDocumentServiceRequest.create(clientContext,
OperationType.Head,
ResourceId.parse(request.getResourceId()).getDocumentCollectionId().toString(),
ResourceType.DocumentCollection,
null,
originalRequestTokenType);
}
barrierLsnRequest.getHeaders().put(HttpConstants.HttpHeaders.X_DATE, Utils.nowAsRFC1123());
if (targetLsn != null && targetLsn > 0) {
barrierLsnRequest.getHeaders().put(HttpConstants.HttpHeaders.TARGET_LSN, targetLsn.toString());
}
if (targetGlobalCommittedLsn != null && targetGlobalCommittedLsn > 0) {
barrierLsnRequest.getHeaders().put(HttpConstants.HttpHeaders.TARGET_GLOBAL_COMMITTED_LSN, targetGlobalCommittedLsn.toString());
}
boolean hasAadToken = false;
switch (originalRequestTokenType) {
case PrimaryMasterKey:
case PrimaryReadonlyMasterKey:
case SecondaryMasterKey:
case SecondaryReadonlyMasterKey:
authorizationToken = authorizationTokenProvider.getUserAuthorizationToken(
barrierLsnRequest.getResourceAddress(),
isCollectionHeadRequest ? ResourceType.DocumentCollection : ResourceType.Database,
RequestVerb.HEAD,
barrierLsnRequest.getHeaders(),
originalRequestTokenType,
request.properties);
break;
case ResourceToken:
authorizationToken = request.getHeaders().get(HttpConstants.HttpHeaders.AUTHORIZATION);
break;
case AadToken:
hasAadToken = true;
break;
default:
String unknownAuthToken =
"Unknown authorization token kind '" + originalRequestTokenType + "' for read request";
assert false : unknownAuthToken;
logger.error(unknownAuthToken);
throw Exceptions.propagate(
new InternalServerErrorException(unknownAuthToken + " - " + RMResources.InternalServerError));
}
if (!hasAadToken) {
barrierLsnRequest.getHeaders().put(HttpConstants.HttpHeaders.AUTHORIZATION, authorizationToken);
}
barrierLsnRequest.requestContext = request.requestContext.clone();
if (request.getPartitionKeyRangeIdentity() != null) {
barrierLsnRequest.routeTo(request.getPartitionKeyRangeIdentity());
}
if (request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY) != null) {
barrierLsnRequest.getHeaders().put(HttpConstants.HttpHeaders.PARTITION_KEY, request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY));
barrierLsnRequest.setPartitionKeyInternal(request.getPartitionKeyInternal());
}
if (request.getHeaders().get(WFConstants.BackendHeaders.COLLECTION_RID) != null) {
barrierLsnRequest.getHeaders().put(WFConstants.BackendHeaders.COLLECTION_RID, request.getHeaders().get(WFConstants.BackendHeaders.COLLECTION_RID));
}
if (hasAadToken) {
return authorizationTokenProvider.populateAuthorizationHeader(barrierLsnRequest);
} else {
return Mono.just(barrierLsnRequest);
}
}
static boolean isCollectionHeadBarrierRequest(ResourceType resourceType, OperationType operationType) {
switch (resourceType) {
case Attachment:
case Document:
case Conflict:
case StoredProcedure:
case UserDefinedFunction:
case Trigger:
return true;
case DocumentCollection:
if (operationType != OperationType.ReadFeed && operationType != OperationType.Query && operationType != OperationType.SqlQuery) {
return true;
} else {
return false;
}
case PartitionKeyRange:
// no logic for OperationType.GetSplitPoint and OperationType.AbortSplit
// as they are not applicable to SDK
return false;
default:
return false;
}
}
}