AbstractInboundChannelAdapter.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.integration.core;

import com.azure.spring.integration.core.api.SubscribeByGroupOperation;
import com.azure.spring.integration.core.api.SubscribeOperation;
import org.springframework.integration.endpoint.MessageProducerSupport;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
 * The abstract inbound channel adapter. The subscriber will start to subscribe on start and stop subscribing on stop.
 */
public abstract class AbstractInboundChannelAdapter extends MessageProducerSupport {
    private final String destination;
    protected String consumerGroup = null;
    protected SubscribeOperation subscribeOperation = null;
    protected SubscribeByGroupOperation subscribeByGroupOperation = null;

    public String getConsumerGroup() {
        return consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public SubscribeOperation getSubscribeOperation() {
        return subscribeOperation;
    }

    public void setSubscribeOperation(SubscribeOperation subscribeOperation) {
        this.subscribeOperation = subscribeOperation;
    }

    public SubscribeByGroupOperation getSubscribeByGroupOperation() {
        return subscribeByGroupOperation;
    }

    public void setSubscribeByGroupOperation(SubscribeByGroupOperation subscribeByGroupOperation) {
        this.subscribeByGroupOperation = subscribeByGroupOperation;
    }

    protected AbstractInboundChannelAdapter(String destination) {
        Assert.hasText(destination, "destination can't be null or empty");
        this.destination = destination;
    }

    @Override
    public void doStart() {
        super.doStart();
        if (useGroupOperation()) {
            this.subscribeByGroupOperation.subscribe(this.destination, this.consumerGroup, this::receiveMessage);
        } else {
            this.subscribeOperation.subscribe(this.destination, this::receiveMessage);
        }
    }

    public void receiveMessage(Message<?> message) {
        sendMessage(message);
    }

    @Override
    protected void doStop() {
        if (useGroupOperation()) {
            this.subscribeByGroupOperation.unsubscribe(destination, this.consumerGroup);
        } else {
            this.subscribeOperation.unsubscribe(destination);
        }

        super.doStop();
    }

    private boolean useGroupOperation() {
        return this.subscribeByGroupOperation != null && StringUtils.hasText(consumerGroup);
    }

    protected Map<String, Object> buildPropertiesMap() {
        Map<String, Object> properties = new HashMap<>();
        properties.put("consumerGroup", consumerGroup);
        properties.put("destination", destination);

        return properties;
    }

}