SendOperationTest.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.spring.integration.test.support;

import com.google.common.collect.ImmutableMap;
import com.azure.spring.integration.core.api.PartitionSupplier;
import com.azure.spring.integration.core.api.SendOperation;
import org.junit.Test;
import org.springframework.core.NestedRuntimeException;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.GenericMessage;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

public abstract class SendOperationTest<O extends SendOperation> {

    protected O sendOperation = null;

    @SuppressWarnings("unchecked")
    protected String payload = "payload";
    protected CompletableFuture<Void> future = new CompletableFuture<>();
    protected String partitionKey = "key";
    protected String destination = "event-hub";
    protected Message<?> message =
        new GenericMessage<>("testPayload", ImmutableMap.of("key1", "value1", "key2", "value2"));
    private String partitionId = "1";

    public O getSendOperation() {
        return sendOperation;
    }

    public void setSendOperation(O sendOperation) {
        this.sendOperation = sendOperation;
    }

    public String getPayload() {
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }

    public CompletableFuture<Void> getFuture() {
        return future;
    }

    public void setFuture(CompletableFuture<Void> future) {
        this.future = future;
    }

    public String getPartitionKey() {
        return partitionKey;
    }

    public void setPartitionKey(String partitionKey) {
        this.partitionKey = partitionKey;
    }

    public String getDestination() {
        return destination;
    }

    public void setDestination(String destination) {
        this.destination = destination;
    }

    public Message<?> getMessage() {
        return message;
    }

    public void setMessage(Message<?> message) {
        this.message = message;
    }

    @Test
    public void testSendWithoutPartitionSupplier() throws ExecutionException, InterruptedException {
        this.future.complete(null);
        CompletableFuture<Void> future = this.sendOperation.sendAsync(destination, message, null);

        assertNull(future.get());
        verifySendCalled(1);
    }

    @Test
    public void testSendWithoutPartition() throws ExecutionException, InterruptedException {
        this.future.complete(null);
        CompletableFuture<Void> future = this.sendOperation.sendAsync(destination, message, new PartitionSupplier());

        assertNull(future.get());
        verifySendCalled(1);
    }

    @Test
    public void testSendWithPartitionId() throws ExecutionException, InterruptedException {
        this.future.complete(null);
        PartitionSupplier partitionSupplier = new PartitionSupplier();
        partitionSupplier.setPartitionId(partitionId);
        CompletableFuture<Void> future = this.sendOperation.sendAsync(destination, message, partitionSupplier);

        assertNull(future.get());
        verifySendWithPartitionId(1);
        verifyPartitionSenderCalled(1);
    }

    @Test
    public void testSendWithPartitionKey() throws ExecutionException, InterruptedException {
        this.future.complete(null);
        PartitionSupplier partitionSupplier = new PartitionSupplier();
        partitionSupplier.setPartitionKey(partitionKey);
        CompletableFuture<Void> future = this.sendOperation.sendAsync(destination, message, partitionSupplier);

        assertNull(future.get());
        verifySendWithPartitionKey(1);
        verifyGetClientCreator(1);
    }

    @Test(expected = NestedRuntimeException.class)
    public void testSendCreateSenderFailure() throws Throwable {
        whenSendWithException();

        try {
            this.sendOperation.sendAsync(destination, this.message, null).get();
        } catch (ExecutionException e) {
            throw e.getCause();
        }
    }

    @Test
    public void testSendFailure() {
        CompletableFuture<Void> future = this.sendOperation.sendAsync(destination, this.message, null);
        this.future.completeExceptionally(new Exception("future failed."));

        try {
            future.get();
            fail("Test should fail.");
        } catch (InterruptedException ie) {
            fail("get() should fail with an ExecutionException.");
        } catch (ExecutionException ee) {
            assertEquals("future failed.", ee.getCause().getMessage());
        }
    }

    protected abstract void verifySendCalled(int times);

    protected abstract void verifyPartitionSenderCalled(int times);

    protected abstract void whenSendWithException();

    protected abstract void verifyGetClientCreator(int times);

    protected abstract void verifySendWithPartitionKey(int times);

    protected abstract void verifySendWithPartitionId(int times);
}