ManagementChannel.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.eventhubs.implementation;

import com.azure.core.amqp.AmqpEndpointState;
import com.azure.core.amqp.exception.AmqpResponseCode;
import com.azure.core.amqp.implementation.AmqpConstants;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.RequestResponseChannel;
import com.azure.core.amqp.implementation.RequestResponseUtils;
import com.azure.core.amqp.implementation.TokenManagerProvider;
import com.azure.core.credential.TokenCredential;
import com.azure.core.credential.TokenRequestContext;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.EventHubProperties;
import com.azure.messaging.eventhubs.PartitionProperties;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.message.Message;
import reactor.core.Disposable;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.core.scheduler.Scheduler;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

/**
 * Channel responsible for Event Hubs related metadata and management plane operations. Management plane operations
 * include another partition, increasing quotas, etc.
 */
public class ManagementChannel implements EventHubManagementNode {
    // Well-known keys from the management service responses and requests.
    public static final String MANAGEMENT_ENTITY_NAME_KEY = "name";
    public static final String MANAGEMENT_PARTITION_NAME_KEY = "partition";
    public static final String MANAGEMENT_RESULT_PARTITION_IDS = "partition_ids";
    public static final String MANAGEMENT_RESULT_CREATED_AT = "created_at";
    public static final String MANAGEMENT_RESULT_BEGIN_SEQUENCE_NUMBER = "begin_sequence_number";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_SEQUENCE_NUMBER = "last_enqueued_sequence_number";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_OFFSET = "last_enqueued_offset";
    public static final String MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC = "last_enqueued_time_utc";
    public static final String MANAGEMENT_RESULT_RUNTIME_INFO_RETRIEVAL_TIME_UTC = "runtime_info_retrieval_time_utc";
    public static final String MANAGEMENT_RESULT_PARTITION_IS_EMPTY = "is_partition_empty";

    // Well-known keys for management plane service requests.
    private static final String MANAGEMENT_ENTITY_TYPE_KEY = "type";
    private static final String MANAGEMENT_OPERATION_KEY = "operation";
    private static final String MANAGEMENT_SECURITY_TOKEN_KEY = "security_token";

    // Well-known values for the service request.
    private static final String READ_OPERATION_VALUE = "READ";
    private static final String MANAGEMENT_EVENTHUB_ENTITY_TYPE = AmqpConstants.VENDOR + ":eventhub";
    private static final String MANAGEMENT_PARTITION_ENTITY_TYPE = AmqpConstants.VENDOR + ":partition";

    private final ClientLogger logger = new ClientLogger(ManagementChannel.class);
    private final TokenCredential tokenProvider;
    private final Mono<RequestResponseChannel> channelMono;
    private final Scheduler scheduler;
    private final String eventHubName;
    private final MessageSerializer messageSerializer;
    private final TokenManagerProvider tokenManagerProvider;
    private final ReplayProcessor<AmqpEndpointState> endpointStateProcessor = ReplayProcessor.cacheLast();
    private final FluxSink<AmqpEndpointState> endpointStateSink =
        endpointStateProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final Disposable subscription;

    private volatile boolean isDisposed;

    /**
     * Creates an instance that is connected to the {@code eventHubName}'s management node.
     *
     * @param responseChannelMono Mono that completes with a new {@link RequestResponseChannel}.
     * @param eventHubName The name of the Event Hub.
     * @param credential Credential to authorize user for access to the Event Hub.
     * @param tokenManagerProvider Provides a token manager that will keep track and maintain tokens.
     * @param messageSerializer Maps responses from the management channel.
     */
    ManagementChannel(Mono<RequestResponseChannel> responseChannelMono, String eventHubName, TokenCredential credential,
        TokenManagerProvider tokenManagerProvider, MessageSerializer messageSerializer,
        Scheduler scheduler) {

        this.tokenManagerProvider = Objects.requireNonNull(tokenManagerProvider,
            "'tokenManagerProvider' cannot be null.");
        this.tokenProvider = Objects.requireNonNull(credential, "'credential' cannot be null.");
        this.eventHubName = Objects.requireNonNull(eventHubName, "'eventHubName' cannot be null.");
        this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
        this.channelMono = Objects.requireNonNull(responseChannelMono, "'responseChannelMono' cannot be null.");
        this.scheduler = Objects.requireNonNull(scheduler, "'scheduler' cannot be null.");

        //@formatter:off
        this.subscription = responseChannelMono
            .flatMapMany(e -> e.getEndpointStates().distinctUntilChanged())
            .subscribe(e -> {
                logger.info("Management endpoint state: {}", e);
                endpointStateSink.next(e);
            }, error -> {
                    logger.error("Exception occurred:", error);
                    endpointStateSink.error(error);
                    close();
                }, () -> {
                    logger.info("Complete.");
                    endpointStateSink.complete();
                    close();
                });
        //@formatter:on
    }

    /**
     * Gets the endpoint states for the management channel.
     *
     * @return The endpoint states for the management channel.
     */
    @Override
    public Flux<AmqpEndpointState> getEndpointStates() {
        return endpointStateProcessor;
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Mono<EventHubProperties> getEventHubProperties() {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(MANAGEMENT_ENTITY_TYPE_KEY, MANAGEMENT_EVENTHUB_ENTITY_TYPE);
        properties.put(MANAGEMENT_ENTITY_NAME_KEY, eventHubName);
        properties.put(MANAGEMENT_OPERATION_KEY, READ_OPERATION_VALUE);

        return getProperties(properties, EventHubProperties.class).publishOn(scheduler);
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public Mono<PartitionProperties> getPartitionProperties(String partitionId) {
        final Map<String, Object> properties = new HashMap<>();
        properties.put(MANAGEMENT_ENTITY_TYPE_KEY, MANAGEMENT_PARTITION_ENTITY_TYPE);
        properties.put(MANAGEMENT_ENTITY_NAME_KEY, eventHubName);
        properties.put(MANAGEMENT_PARTITION_NAME_KEY, partitionId);
        properties.put(MANAGEMENT_OPERATION_KEY, READ_OPERATION_VALUE);

        return getProperties(properties, PartitionProperties.class).publishOn(scheduler);
    }

    private <T> Mono<T> getProperties(Map<String, Object> properties, Class<T> responseType) {
        final String tokenAudience = tokenManagerProvider.getScopesFromResource(eventHubName);

        return tokenProvider.getToken(new TokenRequestContext().addScopes(tokenAudience)).flatMap(accessToken -> {
            properties.put(MANAGEMENT_SECURITY_TOKEN_KEY, accessToken.getToken());

            final Message request = Proton.message();
            final ApplicationProperties applicationProperties = new ApplicationProperties(properties);
            request.setApplicationProperties(applicationProperties);

            return channelMono.flatMap(channel -> channel.sendWithAck(request)
                .handle((message, sink) -> {
                    if (RequestResponseUtils.isSuccessful(message)) {
                        sink.next(messageSerializer.deserialize(message, responseType));
                    } else {
                        final AmqpResponseCode statusCode = RequestResponseUtils.getStatusCode(message);
                        final String statusDescription = RequestResponseUtils.getStatusDescription(message);
                        final Throwable error = ExceptionUtil.amqpResponseCodeToException(statusCode.getValue(),
                            statusDescription, channel.getErrorContext());

                        sink.error(logger.logExceptionAsWarning(Exceptions.propagate(error)));
                    }
                }));
        });
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void close() {
        if (isDisposed) {
            return;
        }

        isDisposed = true;
        subscription.dispose();

        if (channelMono instanceof Disposable) {
            ((Disposable) channelMono).dispose();
        }
    }
}