SessionHandler.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.core.amqp.implementation.handler;

import com.azure.core.amqp.exception.AmqpErrorContext;
import com.azure.core.amqp.exception.AmqpException;
import com.azure.core.amqp.exception.SessionErrorContext;
import com.azure.core.amqp.implementation.ClientConstants;
import com.azure.core.amqp.implementation.ExceptionUtil;
import com.azure.core.amqp.implementation.ReactorDispatcher;
import com.azure.core.util.logging.ClientLogger;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
import org.apache.qpid.proton.engine.Session;

import java.io.IOException;
import java.time.Duration;
import java.util.Locale;

public class SessionHandler extends Handler {
    private final ClientLogger logger = new ClientLogger(SessionHandler.class);

    private final String entityName;
    private final Duration openTimeout;
    private final ReactorDispatcher reactorDispatcher;

    public SessionHandler(String connectionId, String hostname, String entityName, ReactorDispatcher reactorDispatcher,
                          Duration openTimeout) {
        super(connectionId, hostname);
        this.entityName = entityName;
        this.openTimeout = openTimeout;
        this.reactorDispatcher = reactorDispatcher;
    }

    public AmqpErrorContext getErrorContext() {
        return new SessionErrorContext(getHostname(), entityName);
    }

    @Override
    public void onSessionLocalOpen(Event e) {
        logger.verbose("onSessionLocalOpen connectionId[{}], entityName[{}], condition[{}]",
            getConnectionId(), this.entityName,
            e.getSession().getCondition() == null
                ? ClientConstants.NOT_APPLICABLE
                : e.getSession().getCondition().toString());

        final Session session = e.getSession();

        try {
            reactorDispatcher.invoke(this::onSessionTimeout, this.openTimeout);
        } catch (IOException ioException) {
            logger.warning("onSessionLocalOpen connectionId[{}], entityName[{}], reactorDispatcherError[{}]",
                getConnectionId(), this.entityName,
                ioException.getMessage());

            session.close();

            final String message =
                String.format(Locale.US, "onSessionLocalOpen connectionId[%s], entityName[%s], underlying IO of"
                        + " reactorDispatcher faulted with error: %s",
                    getConnectionId(), this.entityName, ioException.getMessage());
            final Throwable exception = new AmqpException(false, message, ioException, getErrorContext());

            onError(exception);
        }
    }

    @Override
    public void onSessionRemoteOpen(Event e) {
        final Session session = e.getSession();
        if (session.getLocalState() == EndpointState.UNINITIALIZED) {
            logger.warning("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}],"
                    + " sessionOutgoingWindow[{}] endpoint was uninitialised.",
                getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow());

            session.open();
        } else {
            logger.info("onSessionRemoteOpen connectionId[{}], entityName[{}], sessionIncCapacity[{}],"
                    + " sessionOutgoingWindow[{}]",
                getConnectionId(), entityName, session.getIncomingCapacity(), session.getOutgoingWindow());
        }

        onNext(EndpointState.ACTIVE);
    }

    @Override
    public void onSessionLocalClose(Event e) {
        final ErrorCondition condition = (e != null && e.getSession() != null)
            ? e.getSession().getCondition()
            : null;

        logger.verbose("onSessionLocalClose connectionId[{}], entityName[{}], condition[{}]",
            entityName, getConnectionId(),
            condition == null ? ClientConstants.NOT_APPLICABLE : condition.toString());
    }

    @Override
    public void onSessionRemoteClose(Event e) {
        final Session session = e.getSession();

        logger.info("onSessionRemoteClose connectionId[{}], entityName[{}], condition[{}]",
            entityName,
            getConnectionId(),
            session == null || session.getRemoteCondition() == null
                ? ClientConstants.NOT_APPLICABLE
                : session.getRemoteCondition().toString());

        ErrorCondition condition = session != null ? session.getRemoteCondition() : null;

        if (session != null && session.getLocalState() != EndpointState.CLOSED) {
            logger.info("onSessionRemoteClose closing a local session for connectionId[{}], entityName[{}], "
                    + "condition[{}], description[{}]",
                getConnectionId(), entityName,
                condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE,
                condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE);

            session.setCondition(session.getRemoteCondition());
            session.close();
        }

        if (condition == null) {
            onNext(EndpointState.CLOSED);
        } else {
            final String id = getConnectionId();
            final AmqpErrorContext context = getErrorContext();
            final Exception exception;
            if (condition.getCondition() == null) {
                exception = new AmqpException(false, String.format(Locale.US,
                        "onSessionRemoteClose connectionId[%s], entityName[%s], condition[%s]", id, entityName,
                        condition),
                    context);
            } else {
                exception = ExceptionUtil.toException(condition.getCondition().toString(), String.format(Locale.US,
                        "onSessionRemoteClose connectionId[%s], entityName[%s]", id,
                        entityName),
                    context);
            }

            onError(exception);
        }
    }

    @Override
    public void onSessionFinal(Event e) {
        final Session session = e.getSession();
        final ErrorCondition condition = session != null ? session.getCondition() : null;

        logger.info("onSessionFinal connectionId[{}], entityName[{}], condition[{}], description[{}]",
            getConnectionId(), entityName,
            condition != null ? condition.getCondition() : ClientConstants.NOT_APPLICABLE,
            condition != null ? condition.getDescription() : ClientConstants.NOT_APPLICABLE);

        close();
    }

    private void onSessionTimeout() {
        // It is supposed to close a local session to handle timeout exception.
        // However, closing the session can result in NPE because of proton-j bug (https://issues.apache
        // .org/jira/browse/PROTON-1939).
        // And the bug will cause the reactor thread to stop processing pending tasks scheduled on the reactor and
        // as a result task won't be completed at all.

        // TODO: handle timeout error once the proton-j bug is fixed.
        // if (!sessionCreated && !sessionOpenErrorDispatched) {
        //     logger.warning(
        //     "SessionTimeoutHandler.onEvent - connectionId[{}], entityName[{}], session open timed out.",
        //     this.connectionId, this.entityName);
        // }
    }
}