ChatThreadAsyncClient.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.communication.chat;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import com.azure.communication.chat.implementation.AzureCommunicationChatServiceImpl;
import com.azure.communication.chat.implementation.converters.AddChatThreadMembersOptionsConverter;
import com.azure.communication.chat.implementation.converters.ChatMessageConverter;
import com.azure.communication.chat.implementation.converters.ChatThreadMemberConverter;
import com.azure.communication.chat.implementation.converters.ReadReceiptConverter;
import com.azure.communication.chat.implementation.models.SendReadReceiptRequest;
import com.azure.communication.chat.models.AddChatThreadMembersOptions;
import com.azure.communication.chat.models.ChatMessage;
import com.azure.communication.chat.models.ListChatMessagesOptions;
import com.azure.communication.chat.models.ReadReceipt;
import com.azure.communication.chat.models.SendChatMessageOptions;
import com.azure.communication.chat.models.SendChatMessageResult;
import com.azure.communication.chat.models.ChatThreadMember;
import com.azure.communication.chat.models.UpdateChatMessageOptions;
import com.azure.communication.chat.models.UpdateChatThreadOptions;
import com.azure.communication.common.CommunicationUser;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.PagedResponseBase;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.paging.PageRetriever;

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.Objects;
import java.util.stream.Collectors;

import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.withContext;

/**
 * Async Client that supports chat thread operations.
 */
@ServiceClient(builder = ChatClientBuilder.class, isAsync = true)
public final class ChatThreadAsyncClient {
    private final ClientLogger logger = new ClientLogger(ChatThreadAsyncClient.class);

    private final AzureCommunicationChatServiceImpl chatServiceClient;

    private final String chatThreadId;

    ChatThreadAsyncClient(AzureCommunicationChatServiceImpl chatServiceClient, String chatThreadId) {
        this.chatServiceClient = chatServiceClient;
        this.chatThreadId = chatThreadId;
    }

    /**
     * Get the thread id property.
     *
     * @return the thread id value.
     */
    public String getChatThreadId() {
        return chatThreadId;
    }

    /**
     * Updates a thread's properties.
     *
     * @param options Options for updating a chat thread.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> updateChatThread(UpdateChatThreadOptions options) {
        try {
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> updateChatThread(options, context)
                .flatMap((Response<Void> res) -> {
                    return Mono.empty();
                }));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Updates a thread's properties.
     *
     * @param options Options for updating a chat thread.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> updateChatThreadWithResponse(UpdateChatThreadOptions options) {
        try {
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> updateChatThread(options, context));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Updates a thread's properties.
     *
     * @param options Options for updating a chat thread.
     * @param context The context to associate with this operation.
     * @return the completion.
     */
    Mono<Response<Void>> updateChatThread(UpdateChatThreadOptions options, Context context) {
        context = context == null ? Context.NONE : context;
        return this.chatServiceClient.updateChatThreadWithResponseAsync(chatThreadId, options, context);
    }

    /**
     * Adds thread members to a thread. If members already exist, no change occurs.
     *
     * @param options Options for adding thread members.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> addMembers(AddChatThreadMembersOptions options) {
        try {
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> addMembers(options, context)
                .flatMap((Response<Void> res) -> {
                    return Mono.empty();
                }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Adds thread members to a thread. If members already exist, no change occurs.
     *
     * @param options Options for adding thread members.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> addMembersWithResponse(AddChatThreadMembersOptions options) {
        try {
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> addMembers(options, context));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Adds thread members to a thread. If members already exist, no change occurs.
     *
     * @param options Options for adding thread members.
     * @param context The context to associate with this operation.
     * @return the completion.
     */
    Mono<Response<Void>> addMembers(AddChatThreadMembersOptions options, Context context) {
        context = context == null ? Context.NONE : context;
        return this.chatServiceClient.addChatThreadMembersWithResponseAsync(
            chatThreadId, AddChatThreadMembersOptionsConverter.convert(options), context);
    }

    /**
     * Remove a member from a thread.
     *
     * @param user User identity of the thread member to remove from the thread.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> removeMember(CommunicationUser user) {
        try {
            Objects.requireNonNull(user, "'user' cannot be null.");
            Objects.requireNonNull(user.getId(), "'user.getId()' cannot be null.");
            return withContext(context -> removeMember(user, context)
                .flatMap((Response<Void> res) -> {
                    return Mono.empty();
                }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Remove a member from a thread.
     *
     * @param user User identity of the thread member to remove from the thread.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> removeMemberWithResponse(CommunicationUser user) {
        try {
            Objects.requireNonNull(user, "'user' cannot be null.");
            Objects.requireNonNull(user.getId(), "'user.getId()' cannot be null.");
            return withContext(context -> removeMember(user, context));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Remove a member from a thread.
     *
     * @param user User identity of the thread member to remove from the thread.
     * @param context The context to associate with this operation.
     * @return the completion.
     */
    Mono<Response<Void>> removeMember(CommunicationUser user, Context context) {
        context = context == null ? Context.NONE : context;

        return this.chatServiceClient.removeChatThreadMemberWithResponseAsync(chatThreadId, user.getId(), context);
    }

    /**
     * Gets the members of a thread.
     *
     * @return the members of a thread.
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<ChatThreadMember> listMembers() {
        try {
            return pagedFluxConvert(new PagedFlux<>(
                () -> withContext(context ->
                    this.chatServiceClient.listChatThreadMembersSinglePageAsync(chatThreadId, context)),
                nextLink -> withContext(context ->
                    this.chatServiceClient.listChatThreadMembersNextSinglePageAsync(nextLink, context))),
                f -> ChatThreadMemberConverter.convert(f));
        } catch (RuntimeException ex) {

            return new PagedFlux<>(() -> monoError(logger, ex));
        }
    }

    /**
     * Gets the members of a thread.
     *
     * @param context The context to associate with this operation.
     * @return the members of a thread.
     */
    PagedFlux<ChatThreadMember> listMembers(Context context) {
        final Context serviceContext = context == null ? Context.NONE : context;

        try {
            return pagedFluxConvert(new PagedFlux<>(
                () ->
                    this.chatServiceClient.listChatThreadMembersSinglePageAsync(chatThreadId, serviceContext),
                nextLink ->
                    this.chatServiceClient.listChatThreadMembersNextSinglePageAsync(nextLink, serviceContext)),
                f -> ChatThreadMemberConverter.convert(f));
        } catch (RuntimeException ex) {
            return new PagedFlux<>(() -> monoError(logger, ex));
        }
    }

    /**
     * Sends a message to a thread.
     *
     * @param options Options for sending the message.
     * @return the response.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<SendChatMessageResult> sendMessage(SendChatMessageOptions options) {
        try {
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> sendMessage(options, context)
                .flatMap(
                    (Response<SendChatMessageResult> res) -> {
                        if (res.getValue() != null) {
                            return Mono.just(res.getValue());
                        } else {
                            return Mono.empty();
                        }
                    }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Sends a message to a thread.
     *
     * @param options Options for sending the message.
     * @return the response.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<SendChatMessageResult>> sendMessageWithResponse(SendChatMessageOptions options) {
        try {
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> sendMessage(options, context));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Sends a message to a thread.
     *
     * @param options Options for sending the message.
     * @param context The context to associate with this operation.
     * @return the response.
     */
    Mono<Response<SendChatMessageResult>> sendMessage(SendChatMessageOptions options, Context context) {
        context = context == null ? Context.NONE : context;

        return this.chatServiceClient.sendChatMessageWithResponseAsync(
            chatThreadId, options, context);
    }

    /**
     * Gets a message by id.
     *
     * @param chatMessageId The message id.
     * @return a message by id.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<ChatMessage> getMessage(String chatMessageId) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            return withContext(context -> getMessage(chatMessageId, context)
                .flatMap(
                    (Response<ChatMessage> res) -> {
                        if (res.getValue() != null) {
                            return Mono.just(res.getValue());
                        } else {
                            return Mono.empty();
                        }
                    }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Gets a message by id.
     *
     * @param chatMessageId The message id.
     * @return a message by id.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<ChatMessage>> getMessageWithResponse(String chatMessageId) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            return withContext(context -> getMessage(chatMessageId, context));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Gets a message by id.
     *
     * @param chatMessageId The message id.
     * @param context The context to associate with this operation.
     * @return a message by id.
     */
    Mono<Response<ChatMessage>> getMessage(String chatMessageId, Context context) {
        context = context == null ? Context.NONE : context;

        return this.chatServiceClient.getChatMessageWithResponseAsync(chatThreadId, chatMessageId, context).map(
            result -> new SimpleResponse<ChatMessage>(
                result, ChatMessageConverter.convert(result.getValue())));
    }

    /**
     * Gets a list of messages from a thread.
     *
     * @return a paged list of messages from a thread.
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<ChatMessage> listMessages() {
        ListChatMessagesOptions listMessagesOptions = new ListChatMessagesOptions();
        try {
            return pagedFluxConvert(new PagedFlux<>(
                () -> withContext(context ->  this.chatServiceClient.listChatMessagesSinglePageAsync(
                    chatThreadId, listMessagesOptions.getMaxPageSize(), listMessagesOptions.getStartTime(), context)),
                nextLink -> withContext(context -> this.chatServiceClient.listChatMessagesNextSinglePageAsync(
                    nextLink, context))),
                f -> ChatMessageConverter.convert(f));
        } catch (RuntimeException ex) {

            return new PagedFlux<>(() -> monoError(logger, ex));
        }
    }

    /**
     * Gets a list of messages from a thread.
     *
     * @param listMessagesOptions The request options.
     * @return a paged list of messages from a thread.
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<ChatMessage> listMessages(ListChatMessagesOptions listMessagesOptions) {
        final ListChatMessagesOptions serviceListMessagesOptions =
            listMessagesOptions == null ? new ListChatMessagesOptions() : listMessagesOptions;

        try {
            return pagedFluxConvert(new PagedFlux<>(
                () -> withContext(context ->  this.chatServiceClient.listChatMessagesSinglePageAsync(
                    chatThreadId,
                    serviceListMessagesOptions.getMaxPageSize(),
                    serviceListMessagesOptions.getStartTime(),
                    context)),
                nextLink -> withContext(context -> this.chatServiceClient.listChatMessagesNextSinglePageAsync(
                    nextLink, context))),
                f -> ChatMessageConverter.convert(f));
        } catch (RuntimeException ex) {

            return new PagedFlux<>(() -> monoError(logger, ex));
        }
    }

    /**
     * Gets a list of messages from a thread.
     *
     * @param listMessagesOptions The request options.
     * @param context The context to associate with this operation.
     * @return a paged list of messages from a thread.
     */
    PagedFlux<ChatMessage> listMessages(ListChatMessagesOptions listMessagesOptions, Context context) {
        final ListChatMessagesOptions serviceListMessagesOptions
            = listMessagesOptions == null ? new ListChatMessagesOptions() : listMessagesOptions;
        final Context serviceContext = context == null ? Context.NONE : context;

        try {
            return pagedFluxConvert(new PagedFlux<>(
                () ->  this.chatServiceClient.listChatMessagesSinglePageAsync(
                    chatThreadId,
                    serviceListMessagesOptions.getMaxPageSize(),
                    serviceListMessagesOptions.getStartTime(),
                    serviceContext),
                nextLink -> this.chatServiceClient.listChatMessagesNextSinglePageAsync(
                    nextLink, serviceContext)),
                f -> ChatMessageConverter.convert(f));
        } catch (RuntimeException ex) {

            return new PagedFlux<>(() -> monoError(logger, ex));
        }
    }

    /**
     * Updates a message.
     *
     * @param chatMessageId The message id.
     * @param options Options for updating the message.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> updateMessage(String chatMessageId, UpdateChatMessageOptions options) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> updateMessage(chatMessageId, options, context)
                .flatMap((Response<Void> res) -> {
                    return Mono.empty();
                }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Updates a message.
     *
     * @param chatMessageId The message id.
     * @param options Options for updating the message.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> updateMessageWithResponse(String chatMessageId, UpdateChatMessageOptions options) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            Objects.requireNonNull(options, "'options' cannot be null.");
            return withContext(context -> updateMessage(chatMessageId, options, context));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Updates a message.
     *
     * @param chatMessageId The message id.
     * @param options Options for updating the message.
     * @param context The context to associate with this operation.
     * @return the completion.
     */
    Mono<Response<Void>> updateMessage(String chatMessageId, UpdateChatMessageOptions options, Context context) {
        context = context == null ? Context.NONE : context;

        return this.chatServiceClient.updateChatMessageWithResponseAsync(chatThreadId, chatMessageId, options, context);
    }

    /**
     * Deletes a message.
     *
     * @param chatMessageId The message id.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> deleteMessage(String chatMessageId) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            return withContext(context -> deleteMessage(chatMessageId, context)
                .flatMap((Response<Void> res) -> {
                    return Mono.empty();
                }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Deletes a message.
     *
     * @param chatMessageId The message id.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> deleteMessageWithResponse(String chatMessageId) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            return withContext(context -> deleteMessage(chatMessageId, context));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Deletes a message.
     *
     * @param chatMessageId The message id.
     * @param context The context to associate with this operation.
     * @return the completion.
     */
    Mono<Response<Void>> deleteMessage(String chatMessageId, Context context) {
        context = context == null ? Context.NONE : context;

        return this.chatServiceClient.deleteChatMessageWithResponseAsync(chatThreadId, chatMessageId, context);
    }

    /**
     * Posts a typing event to a thread, on behalf of a user.
     *
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> sendTypingNotification() {
        try {
            return withContext(context -> sendTypingNotification(context)
                .flatMap((Response<Void> res) -> {
                    return Mono.empty();
                }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Posts a typing event to a thread, on behalf of a user.
     *
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> sendTypingNotificationWithResponse() {
        try {
            return withContext(context -> sendTypingNotification(context));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Posts a typing event to a thread, on behalf of a user.
     *
     * @param context The context to associate with this operation.
     * @return the completion.
     */
    Mono<Response<Void>> sendTypingNotification(Context context) {
        context = context == null ? Context.NONE : context;

        return this.chatServiceClient.sendTypingNotificationWithResponseAsync(chatThreadId, context);
    }

    /**
     * Posts a read receipt event to a thread, on behalf of a user.
     *
     * @param chatMessageId The id of the chat message that was read.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> sendReadReceipt(String chatMessageId) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            return withContext(context -> sendReadReceipt(chatMessageId, context)
                .flatMap((Response<Void> res) -> {
                    return Mono.empty();
                }));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Posts a read receipt event to a thread, on behalf of a user.
     *
     * @param chatMessageId The id of the chat message that was read.
     * @return the completion.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> sendReadReceiptWithResponse(String chatMessageId) {
        try {
            Objects.requireNonNull(chatMessageId, "'chatMessageId' cannot be null.");
            return withContext(context -> sendReadReceipt(chatMessageId, context));
        } catch (RuntimeException ex) {

            return monoError(logger, ex);
        }
    }

    /**
     * Posts a read receipt event to a thread, on behalf of a user.
     *
     * @param chatMessageId The id of the chat message that was read.
     * @param context The context to associate with this operation.
     * @return the completion.
     */
    Mono<Response<Void>> sendReadReceipt(String chatMessageId, Context context) {
        context = context == null ? Context.NONE : context;

        SendReadReceiptRequest request = new SendReadReceiptRequest()
            .setChatMessageId(chatMessageId);
        return this.chatServiceClient.sendChatReadReceiptWithResponseAsync(chatThreadId, request, context);
    }

    /**
     * Gets read receipts for a thread.
     *
     * @return read receipts for a thread.
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<ReadReceipt> listReadReceipts() {
        try {
            return pagedFluxConvert(new PagedFlux<>(
                () -> withContext(context ->  this.chatServiceClient.listChatReadReceiptsSinglePageAsync(
                    chatThreadId, context)),
                nextLink -> withContext(context -> this.chatServiceClient.listChatReadReceiptsNextSinglePageAsync(
                    nextLink, context))),
                f -> ReadReceiptConverter.convert(f));
        } catch (RuntimeException ex) {

            return new PagedFlux<>(() -> monoError(logger, ex));
        }
    }

    /**
     * Gets read receipts for a thread.
     *
     * @param context The context to associate with this operation.
     * @return read receipts for a thread.
     */
    PagedFlux<ReadReceipt> listReadReceipts(Context context) {
        final Context serviceContext = context == null ? Context.NONE : context;
        try {
            return pagedFluxConvert(new PagedFlux<>(
                () -> this.chatServiceClient.listChatReadReceiptsSinglePageAsync(
                    chatThreadId, serviceContext),
                nextLink -> this.chatServiceClient.listChatReadReceiptsNextSinglePageAsync(
                    nextLink, serviceContext)),
                f -> ReadReceiptConverter.convert(f));
        } catch (RuntimeException ex) {

            return new PagedFlux<>(() -> monoError(logger, ex));
        }
    }

    private <T1, T2> PagedFlux<T1> pagedFluxConvert(PagedFlux<T2> originalPagedFlux, Function<T2, T1> func) {

        final Function<PagedResponse<T2>,
                PagedResponse<T1>> responseMapper
            = response -> new PagedResponseBase<Void, T1>(response.getRequest(),
                response.getStatusCode(),
                response.getHeaders(),
                response.getValue()
                    .stream()
                    .map(value -> func.apply(value)).collect(Collectors.toList()),
                response.getContinuationToken(),
                null);

        final Supplier<PageRetriever<String, PagedResponse<T1>>> provider = () ->
            (continuationToken, pageSize) -> {
                Flux<PagedResponse<T2>> flux
                    = (continuationToken == null)
                        ? originalPagedFlux.byPage()
                        : originalPagedFlux.byPage(continuationToken);
                return flux.map(responseMapper);
            };

        return PagedFlux.create(provider);
    }
}