SyncTokenPolicy.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.appconfiguration.implementation;

import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.logging.ClientLogger;
import reactor.core.publisher.Mono;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
 * A policy uses a concurrent map to maintain the latest sync-tokens. When this HTTP pipeline policy is triggered, the
 * policy will retrieve all sync-tokens without sequence number segment from the concurrent map and use it in the HTTP
 * request. Also after received the HTTP response, update the latest sync-tokens to the map.
 */
public final class SyncTokenPolicy implements HttpPipelinePolicy {
    private static final String COMMA = ",";
    private static final String EQUAL = "=";
    private static final String SYNC_TOKEN = "Sync-Token";
    private static final String SKIP_INVALID_TOKEN = "Skipping invalid sync token '{}'.";
    private final Map<String, SyncToken> syncTokenMap = new ConcurrentHashMap<>(); // key is sync-token id
    private final ClientLogger logger = new ClientLogger(SyncTokenPolicy.class);

    /**
     * Add or update the sync token to a thread safe map.
     *
     * @param context request context
     * @param next The next policy to invoke.
     * @return A {@link Mono} representing the HTTP response that will arrive asynchronously.
     */
    @Override
    public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {

        // TODO: https://github.com/Azure/azure-sdk-for-java/issues/20355
        // Add all of sync-tokens to HTTP request header
        context.getHttpRequest().setHeader(SYNC_TOKEN, getSyncTokenHeader());

        return next.process().flatMap(httpResponse -> {
            // Get the sync-token from HTTP response header
            final String syncTokenValue = httpResponse.getHeaders().getValue(SYNC_TOKEN);

            // Skip sync-token updates of concurrent map if no 'Sync-Token' header
            if (syncTokenValue != null) {
                updateSyncToken(syncTokenValue);
            }

            return Mono.just(httpResponse);
        });
    }

    /**
     * Get all latest sync-tokens from the concurrent map and convert to one sync-token string.
     * All sync-tokens concatenated by a comma delimiter.
     * @return sync-token string
     */
    private String getSyncTokenHeader() {
        return syncTokenMap.values().stream().map(syncToken -> syncToken.getId() + EQUAL + syncToken.getValue())
                   .collect(Collectors.joining(COMMA));
    }

    /**
     * Update the existing synchronization tokens.
     *
     * @param token an external synchronization token to ensure service requests receive up-to-date values.
     */
    public void updateSyncToken(String token) {
        // Sync-Token header could have more than one value
        final String[] syncTokens = token.split(COMMA);
        for (final String syncTokenString : syncTokens) {
            if (CoreUtils.isNullOrEmpty(syncTokenString)) {
                continue;
            }

            final SyncToken syncToken;
            try {
                syncToken = SyncToken.createSyncToken(syncTokenString);
            } catch (Exception ex) {
                logger.info(SKIP_INVALID_TOKEN, syncTokenString);
                continue;
            }

            final String tokenId = syncToken.getId();
            // If the value is not thread safe and must be updated inside the method with a remapping function
            // to ensure the entire operation is atomic.
            syncTokenMap.compute(tokenId, (key, existingSyncToken) -> {
                if (existingSyncToken == null
                        || syncToken.getSequenceNumber() > existingSyncToken.getSequenceNumber()) {
                    return syncToken;
                }
                return existingSyncToken;
            });
        }
    }
}