ServiceBusAdministrationAsyncClient.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus.administration;

import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.exception.AzureException;
import com.azure.core.exception.ClientAuthenticationException;
import com.azure.core.exception.HttpResponseException;
import com.azure.core.exception.ResourceExistsException;
import com.azure.core.exception.ResourceModifiedException;
import com.azure.core.exception.ResourceNotFoundException;
import com.azure.core.http.HttpHeaders;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.HttpRequest;
import com.azure.core.http.rest.PagedFlux;
import com.azure.core.http.rest.PagedResponse;
import com.azure.core.http.rest.Response;
import com.azure.core.http.rest.SimpleResponse;
import com.azure.core.util.Context;
import com.azure.core.util.CoreUtils;
import com.azure.core.util.IterableStream;
import com.azure.core.util.UrlBuilder;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.servicebus.administration.models.CreateQueueOptions;
import com.azure.messaging.servicebus.administration.models.CreateRuleOptions;
import com.azure.messaging.servicebus.administration.models.CreateSubscriptionOptions;
import com.azure.messaging.servicebus.administration.models.CreateTopicOptions;
import com.azure.messaging.servicebus.administration.models.NamespaceProperties;
import com.azure.messaging.servicebus.administration.models.QueueProperties;
import com.azure.messaging.servicebus.administration.models.QueueRuntimeProperties;
import com.azure.messaging.servicebus.administration.models.RuleProperties;
import com.azure.messaging.servicebus.administration.models.SubscriptionProperties;
import com.azure.messaging.servicebus.administration.models.SubscriptionRuntimeProperties;
import com.azure.messaging.servicebus.administration.models.TopicProperties;
import com.azure.messaging.servicebus.administration.models.TopicRuntimeProperties;
import com.azure.messaging.servicebus.implementation.EntityHelper;
import com.azure.messaging.servicebus.implementation.EntitiesImpl;
import com.azure.messaging.servicebus.implementation.RulesImpl;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementClientImpl;
import com.azure.messaging.servicebus.implementation.ServiceBusManagementSerializer;
import com.azure.messaging.servicebus.implementation.models.CreateQueueBody;
import com.azure.messaging.servicebus.implementation.models.CreateQueueBodyContent;
import com.azure.messaging.servicebus.implementation.models.CreateRuleBody;
import com.azure.messaging.servicebus.implementation.models.CreateRuleBodyContent;
import com.azure.messaging.servicebus.implementation.models.CreateSubscriptionBody;
import com.azure.messaging.servicebus.implementation.models.CreateSubscriptionBodyContent;
import com.azure.messaging.servicebus.implementation.models.CreateTopicBody;
import com.azure.messaging.servicebus.implementation.models.CreateTopicBodyContent;
import com.azure.messaging.servicebus.implementation.models.NamespacePropertiesEntry;
import com.azure.messaging.servicebus.implementation.models.QueueDescription;
import com.azure.messaging.servicebus.implementation.models.QueueDescriptionEntry;
import com.azure.messaging.servicebus.implementation.models.QueueDescriptionFeed;
import com.azure.messaging.servicebus.implementation.models.ResponseLink;
import com.azure.messaging.servicebus.implementation.models.RuleActionImpl;
import com.azure.messaging.servicebus.implementation.models.RuleDescription;
import com.azure.messaging.servicebus.implementation.models.RuleDescriptionEntry;
import com.azure.messaging.servicebus.implementation.models.RuleDescriptionFeed;
import com.azure.messaging.servicebus.implementation.models.RuleFilterImpl;
import com.azure.messaging.servicebus.implementation.models.ServiceBusManagementError;
import com.azure.messaging.servicebus.implementation.models.ServiceBusManagementErrorException;
import com.azure.messaging.servicebus.implementation.models.SubscriptionDescription;
import com.azure.messaging.servicebus.implementation.models.SubscriptionDescriptionEntry;
import com.azure.messaging.servicebus.implementation.models.SubscriptionDescriptionFeed;
import com.azure.messaging.servicebus.implementation.models.TopicDescription;
import com.azure.messaging.servicebus.implementation.models.TopicDescriptionEntry;
import com.azure.messaging.servicebus.implementation.models.TopicDescriptionFeed;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLDecoder;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;

import static com.azure.core.http.policy.AddHeadersFromContextPolicy.AZURE_REQUEST_HTTP_HEADERS_KEY;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.pagedFluxError;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.AZ_TRACING_NAMESPACE_VALUE;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SERVICE_BUS_DLQ_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME;
import static com.azure.messaging.servicebus.implementation.ServiceBusConstants.SERVICE_BUS_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME;

/**
 * An <b>asynchronous</b> client for managing a Service Bus namespace. Instantiated via
 * {@link ServiceBusAdministrationClientBuilder}.
 *
 * <p><strong>Create a queue</strong></p>
 * <!-- src_embed com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.createqueue#string -->
 * <pre>
 * &#47;&#47; `.subscribe&#40;&#41;` is a non-blocking call. It'll move onto the next
 * &#47;&#47; instruction after setting up the `consumer` and `errorConsumer` callbacks.
 * client.createQueue&#40;&quot;my-new-queue&quot;&#41;.subscribe&#40;queue -&gt; &#123;
 *     System.out.printf&#40;&quot;Queue created. Name: %s. Lock Duration: %s.%n&quot;,
 *         queue.getName&#40;&#41;, queue.getLockDuration&#40;&#41;&#41;;
 * &#125;, error -&gt; &#123;
 *         System.err.println&#40;&quot;Error creating queue: &quot; + error&#41;;
 *     &#125;&#41;;
 * </pre>
 * <!-- end com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.createqueue#string -->
 *
 * <p><strong>Edit an existing subscription</strong></p>
 * <!-- src_embed com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.updatesubscription#subscriptionproperties -->
 * <pre>
 * &#47;&#47; To update the subscription we have to:
 * &#47;&#47; 1. Get the subscription info from the service.
 * &#47;&#47; 2. Update the SubscriptionProperties we want to change.
 * &#47;&#47; 3. Call the updateSubscription&#40;&#41; with the updated object.
 *
 * &#47;&#47; `.subscribe&#40;&#41;` is a non-blocking call. It'll move onto the next
 * &#47;&#47; instruction after setting up the `consumer` and `errorConsumer` callbacks.
 * client.getSubscription&#40;&quot;my-topic&quot;, &quot;my-subscription&quot;&#41;
 *     .flatMap&#40;subscription -&gt; &#123;
 *         System.out.println&#40;&quot;Original delivery count: &quot; + subscription.getMaxDeliveryCount&#40;&#41;&#41;;
 *
 *         &#47;&#47; Updating it to a new value.
 *         subscription.setMaxDeliveryCount&#40;5&#41;;
 *
 *         &#47;&#47; Persisting the updates to the subscription object.
 *         return client.updateSubscription&#40;subscription&#41;;
 *     &#125;&#41;
 *     .subscribe&#40;subscription -&gt; &#123;
 *         System.out.printf&#40;&quot;Subscription updated. Name: %s. Delivery count: %s.%n&quot;,
 *             subscription.getSubscriptionName&#40;&#41;, subscription.getMaxDeliveryCount&#40;&#41;&#41;;
 *     &#125;, error -&gt; &#123;
 *             System.err.println&#40;&quot;Error updating subscription: &quot; + error&#41;;
 *         &#125;&#41;;
 * </pre>
 * <!-- end com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.updatesubscription#subscriptionproperties -->
 *
 * <p><strong>List all queues</strong></p>
 * <!-- src_embed com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.listQueues -->
 * <pre>
 * &#47;&#47; `.subscribe&#40;&#41;` is a non-blocking call. It'll move onto the next
 * &#47;&#47; instruction after setting up the `consumer` and `errorConsumer` callbacks.
 * client.listQueues&#40;&#41;.subscribe&#40;queue -&gt; &#123;
 *     System.out.printf&#40;&quot;Queue [%s]. Lock Duration: %s.%n&quot;,
 *         queue.getName&#40;&#41;, queue.getLockDuration&#40;&#41;&#41;;
 * &#125;, error -&gt; &#123;
 *         System.err.println&#40;&quot;Error fetching queues: &quot; + error&#41;;
 *     &#125;&#41;;
 * </pre>
 * <!-- end com.azure.messaging.servicebus.administration.servicebusadministrationasyncclient.listQueues -->
 *
 * @see ServiceBusAdministrationClientBuilder
 * @see ServiceBusAdministrationClient ServiceBusAdministrationClient for a synchronous client.
 */
@ServiceClient(builder = ServiceBusAdministrationClientBuilder.class, isAsync = true)
public final class ServiceBusAdministrationAsyncClient {
    private static final String CONTENT_TYPE = "application/xml";

    // Name of the entity type when listing queues and topics.
    private static final String QUEUES_ENTITY_TYPE = "queues";
    private static final String TOPICS_ENTITY_TYPE = "topics";

    private static final int NUMBER_OF_ELEMENTS = 100;

    private final ServiceBusManagementClientImpl managementClient;
    private final EntitiesImpl entityClient;
    private final ClientLogger logger = new ClientLogger(ServiceBusAdministrationAsyncClient.class);
    private final ServiceBusManagementSerializer serializer;
    private final RulesImpl rulesClient;

    /**
     * Creates a new instance with the given management client and serializer.
     *
     * @param managementClient Client to make management calls.
     * @param serializer Serializer to deserialize ATOM XML responses.
     *
     * @throws NullPointerException if any one of {@code managementClient, serializer, credential} is null.
     */
    ServiceBusAdministrationAsyncClient(ServiceBusManagementClientImpl managementClient,
        ServiceBusManagementSerializer serializer) {
        this.serializer = Objects.requireNonNull(serializer, "'serializer' cannot be null.");
        this.managementClient = Objects.requireNonNull(managementClient, "'managementClient' cannot be null.");
        this.entityClient = managementClient.getEntities();
        this.rulesClient = managementClient.getRules();
    }

    /**
     * Creates a queue with the given name.
     *
     * @param queueName Name of the queue to create.
     *
     * @return A Mono that completes with information about the created queue.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the queue quota is exceeded, or an error
     *     occurred processing the request.
     * @throws NullPointerException if {@code queueName} is null.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws ResourceExistsException if a queue exists with the same {@code queueName}.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<QueueProperties> createQueue(String queueName) {
        try {
            return createQueue(queueName, new CreateQueueOptions());
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Creates a queue with the {@link CreateQueueOptions} and given queue name.
     *
     * @param queueName Name of the queue to create.
     * @param queueOptions Options about the queue to create.
     *
     * @return A Mono that completes with information about the created queue.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the queue quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} or {@code queueOptions} is null.
     * @throws ResourceExistsException if a queue exists with the same {@link QueueProperties#getName() queueName}.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<QueueProperties> createQueue(String queueName, CreateQueueOptions queueOptions) {
        return createQueueWithResponse(queueName, queueOptions).map(Response::getValue);
    }

    /**
     * Creates a queue and returns the created queue in addition to the HTTP response.
     *
     * @param queueName Name of the queue to create.
     * @param queueOptions Options about the queue to create.
     *
     * @return A Mono that returns the created queue in addition to the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the queue quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} or {@code queueOptions} is null.
     * @throws ResourceExistsException if a queue exists with the same {@link QueueProperties#getName() queueName}.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<QueueProperties>> createQueueWithResponse(String queueName, CreateQueueOptions queueOptions) {
        return withContext(context -> createQueueWithResponse(queueName, queueOptions, context));
    }

    /**
     * Creates a rule under the given topic and subscription
     *
     * @param topicName Name of the topic associated with rule.
     * @param subscriptionName Name of the subscription associated with the rule.
     * @param ruleName Name of the rule.
     *
     * @return A Mono that completes with information about the created rule.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
     *     processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} are are empty strings.
     * @throws NullPointerException if {@code topicName} or {@code ruleName} are are null.
     * @throws ResourceExistsException if a rule exists with the same topic, subscription, and rule name.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<RuleProperties> createRule(String topicName, String subscriptionName, String ruleName) {
        try {
            return createRule(topicName, subscriptionName, ruleName, new CreateRuleOptions());
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Creates a rule with the {@link CreateRuleOptions}.
     *
     * @param topicName Name of the topic associated with rule.
     * @param subscriptionName Name of the subscription associated with the rule.
     * @param ruleName Name of the rule.
     * @param ruleOptions Information about the rule to create.
     *
     * @return A Mono that completes with information about the created rule.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
     *     processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} are are empty strings.
     * @throws NullPointerException if {@code topicName}, {@code ruleName}, or {@code ruleOptions}
     *     are are null.
     * @throws ResourceExistsException if a rule exists with the same topic and rule name.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<RuleProperties> createRule(String topicName, String subscriptionName, String ruleName,
        CreateRuleOptions ruleOptions) {

        return createRuleWithResponse(topicName, subscriptionName, ruleName, ruleOptions)
            .map(Response::getValue);
    }

    /**
     * Creates a rule and returns the created rule in addition to the HTTP response.
     *
     * @param topicName Name of the topic associated with rule.
     * @param subscriptionName Name of the subscription associated with the rule.
     * @param ruleName Name of the rule.
     * @param ruleOptions Information about the rule to create.
     *
     * @return A Mono that returns the created rule in addition to the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
     *     processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} are are empty strings.
     * @throws NullPointerException if {@code topicName}, {@code ruleName}, or {@code ruleOptions}
     *     are are null.
     * @throws ResourceExistsException if a rule exists with the same topic and rule name.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<RuleProperties>> createRuleWithResponse(String topicName, String subscriptionName,
        String ruleName, CreateRuleOptions ruleOptions) {
        return withContext(context -> createRuleWithResponse(topicName, subscriptionName, ruleName, ruleOptions,
            context));
    }

    /**
     * Creates a subscription with the given topic and subscription names.
     *
     * @param topicName Name of the topic associated with subscription.
     * @param subscriptionName Name of the subscription.
     *
     * @return A Mono that completes with information about the created subscription.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
     *     processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} are are empty strings.
     * @throws NullPointerException if {@code topicName} or {@code subscriptionName} are are null.
     * @throws ResourceExistsException if a subscription exists with the same topic and subscription name.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<SubscriptionProperties> createSubscription(String topicName, String subscriptionName) {
        try {
            return createSubscription(topicName, subscriptionName, new CreateSubscriptionOptions());
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Creates a subscription with the {@link CreateSubscriptionOptions}.
     *
     * @param topicName Name of the topic associated with subscription.
     * @param subscriptionName Name of the subscription.
     * @param subscriptionOptions Information about the subscription to create.
     *
     * @return A Mono that completes with information about the created subscription.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
     *     processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} are are empty strings.
     * @throws NullPointerException if {@code topicName}, {@code subscriptionName}, or {@code subscriptionOptions}
     *     are are null.
     * @throws ResourceExistsException if a subscription exists with the same topic and subscription name.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<SubscriptionProperties> createSubscription(String topicName, String subscriptionName,
        CreateSubscriptionOptions subscriptionOptions) {

        return createSubscriptionWithResponse(topicName, subscriptionName, subscriptionOptions)
            .map(Response::getValue);
    }

    /**
     * Creates a subscription and returns the created subscription in addition to the HTTP response.
     *
     * @param topicName Name of the topic associated with subscription.
     * @param subscriptionName Name of the subscription.
     * @param subscriptionOptions Information about the subscription to create.
     *
     * @return A Mono that returns the created subscription in addition to the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the quota is exceeded, or an error occurred
     *     processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} are are empty strings.
     * @throws NullPointerException if {@code topicName}, {@code subscriptionName}, or {@code subscriptionOptions}
     *     are are null.
     * @throws ResourceExistsException if a subscription exists with the same topic and subscription name.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<SubscriptionProperties>> createSubscriptionWithResponse(String topicName,
        String subscriptionName, CreateSubscriptionOptions subscriptionOptions) {
        return withContext(context -> createSubscriptionWithResponse(topicName, subscriptionName, subscriptionOptions,
            context));
    }

    /**
     * Creates a topic with the given name.
     *
     * @param topicName Name of the topic to create.
     *
     * @return A Mono that completes with information about the created topic.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the topic quota is exceeded, or an error
     *     occurred processing the request.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws ResourceExistsException if a topic exists with the same {@code topicName}.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<TopicProperties> createTopic(String topicName) {
        try {
            return createTopic(topicName, new CreateTopicOptions());
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Creates a topic with the {@link CreateTopicOptions}.
     *
     * @param topicName Name of the topic to create.
     * @param topicOptions The options used to create the topic.
     *
     * @return A Mono that completes with information about the created topic.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the topic quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} or {@code topicOptions} is null.
     * @throws ResourceExistsException if a topic exists with the same {@code topicName}.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<TopicProperties> createTopic(String topicName, CreateTopicOptions topicOptions) {
        return createTopicWithResponse(topicName, topicOptions).map(Response::getValue);
    }

    /**
     * Creates a topic and returns the created topic in addition to the HTTP response.
     *
     * @param topicName Name of the topic to create.
     * @param topicOptions The options used to create the topic.
     *
     * @return A Mono that returns the created topic in addition to the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the topic quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@link TopicProperties#getName() topic.getName()} is null or an empty
     *     string.
     * @throws NullPointerException if {@code topicName} or {@code topicOptions} is null.
     * @throws ResourceExistsException if a topic exists with the same {@code topicName}.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<TopicProperties>> createTopicWithResponse(String topicName, CreateTopicOptions topicOptions) {
        return withContext(context -> createTopicWithResponse(topicName, topicOptions, context));
    }

    /**
     * Deletes a queue the matching {@code queueName}.
     *
     * @param queueName Name of queue to delete.
     *
     * @return A Mono that completes when the queue is deleted.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws NullPointerException if {@code queueName} is null.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws ResourceNotFoundException if the {@code queueName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/delete-queue">Delete Queue</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> deleteQueue(String queueName) {
        return deleteQueueWithResponse(queueName).then();
    }

    /**
     * Deletes a queue the matching {@code queueName} and returns the HTTP response.
     *
     * @param queueName Name of queue to delete.
     *
     * @return A Mono that completes when the queue is deleted and returns the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws NullPointerException if {@code queueName} is null.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws ResourceNotFoundException if the {@code queueName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/delete-queue">Delete Queue</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> deleteQueueWithResponse(String queueName) {
        return withContext(context -> deleteQueueWithResponse(queueName, context));
    }

    /**
     * Deletes a rule the matching {@code ruleName}.
     *
     * @param topicName Name of topic associated with rule to delete.
     * @param subscriptionName Name of the subscription associated with the rule to delete.
     * @param ruleName Name of rule to delete.
     *
     * @return A Mono that completes when the rule is deleted.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code ruleName} is an empty string.
     * @throws NullPointerException if {@code topicName} or {@code ruleName} is null.
     * @throws ResourceNotFoundException if the {@code ruleName} does not exist.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> deleteRule(String topicName, String subscriptionName, String ruleName) {
        return deleteRuleWithResponse(topicName, subscriptionName, ruleName).then();
    }

    /**
     * Deletes a rule the matching {@code ruleName} and returns the HTTP response.
     *
     * @param topicName Name of topic associated with rule to delete.
     * @param subscriptionName Name of the subscription associated with the rule to delete.
     * @param ruleName Name of rule to delete.
     *
     * @return A Mono that completes when the rule is deleted and returns the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName}, {@code subscriptionName}, or {@code ruleName} is an
     *     empty string.
     * @throws NullPointerException if {@code topicName}, {@code subscriptionName}, or {@code ruleName} is null.
     * @throws ResourceNotFoundException if the {@code ruleName} does not exist.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> deleteRuleWithResponse(String topicName, String subscriptionName,
        String ruleName) {
        return withContext(context -> deleteRuleWithResponse(topicName, subscriptionName, ruleName, context));
    }

    /**
     * Deletes a subscription the matching {@code subscriptionName}.
     *
     * @param topicName Name of topic associated with subscription to delete.
     * @param subscriptionName Name of subscription to delete.
     *
     * @return A Mono that completes when the subscription is deleted.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} is an empty string.
     * @throws NullPointerException if {@code topicName} or {@code subscriptionName} is null.
     * @throws ResourceNotFoundException if the {@code subscriptionName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/delete-subscription">Delete Subscription</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> deleteSubscription(String topicName, String subscriptionName) {
        return deleteSubscriptionWithResponse(topicName, subscriptionName).then();
    }

    /**
     * Deletes a subscription the matching {@code subscriptionName} and returns the HTTP response.
     *
     * @param topicName Name of topic associated with subscription to delete.
     * @param subscriptionName Name of subscription to delete.
     *
     * @return A Mono that completes when the subscription is deleted and returns the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} is an empty string.
     * @throws NullPointerException if {@code topicName} or {@code subscriptionName} is null.
     * @throws ResourceNotFoundException if the {@code subscriptionName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/delete-subscription">Delete Subscription</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> deleteSubscriptionWithResponse(String topicName, String subscriptionName) {
        return withContext(context -> deleteSubscriptionWithResponse(topicName, subscriptionName, context));
    }

    /**
     * Deletes a topic the matching {@code topicName}.
     *
     * @param topicName Name of topic to delete.
     *
     * @return A Mono that completes when the topic is deleted.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws ResourceNotFoundException if the {@code topicName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/delete-topic">Delete Topic</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Void> deleteTopic(String topicName) {
        return deleteTopicWithResponse(topicName).then();
    }

    /**
     * Deletes a topic the matching {@code topicName} and returns the HTTP response.
     *
     * @param topicName Name of topic to delete.
     *
     * @return A Mono that completes when the topic is deleted and returns the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws ResourceNotFoundException if the {@code topicName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/delete-topic">Delete Topic</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Void>> deleteTopicWithResponse(String topicName) {
        return withContext(context -> deleteTopicWithResponse(topicName, context));
    }

    /**
     * Gets information about the queue.
     *
     * @param queueName Name of queue to get information about.
     *
     * @return A Mono that completes with information about the queue.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} is null.
     * @throws ResourceNotFoundException if the {@code queueName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<QueueProperties> getQueue(String queueName) {
        return getQueueWithResponse(queueName).map(Response::getValue);
    }

    /**
     * Gets information about the queue along with its HTTP response.
     *
     * @param queueName Name of queue to get information about.
     *
     * @return A Mono that completes with information about the queue and the associated HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} is null.
     * @throws ResourceNotFoundException if the {@code queueName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<QueueProperties>> getQueueWithResponse(String queueName) {
        return withContext(context -> getQueueWithResponse(queueName, context, Function.identity()));
    }

    /**
     * Gets whether or not a queue with {@code queueName} exists in the Service Bus namespace.
     *
     * @param queueName Name of the queue.
     *
     * @return A Mono that completes indicating whether or not the queue exists.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Boolean> getQueueExists(String queueName) {
        return getQueueExistsWithResponse(queueName).map(Response::getValue);
    }

    /**
     * Gets whether or not a queue with {@code queueName} exists in the Service Bus namespace.
     *
     * @param queueName Name of the queue.
     *
     * @return A Mono that completes indicating whether or not the queue exists along with its HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Boolean>> getQueueExistsWithResponse(String queueName) {
        return getEntityExistsWithResponse(getQueueWithResponse(queueName));
    }

    /**
     * Gets runtime properties about the queue.
     *
     * @param queueName Name of queue to get information about.
     *
     * @return A Mono that completes with runtime properties about the queue.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} is null.
     * @throws ResourceNotFoundException if the {@code queueName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<QueueRuntimeProperties> getQueueRuntimeProperties(String queueName) {
        return getQueueRuntimePropertiesWithResponse(queueName).map(response -> response.getValue());
    }

    /**
     * Gets runtime properties about the queue along with its HTTP response.
     *
     * @param queueName Name of queue to get information about.
     *
     * @return A Mono that completes with runtime properties about the queue and the associated HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code queueName} is an empty string.
     * @throws NullPointerException if {@code queueName} is null.
     * @throws ResourceNotFoundException if the {@code queueName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<QueueRuntimeProperties>> getQueueRuntimePropertiesWithResponse(String queueName) {
        return withContext(context -> getQueueWithResponse(queueName, context, QueueRuntimeProperties::new));
    }

    /**
     * Gets information about the Service Bus namespace.
     *
     * @return A Mono that completes with information about the Service Bus namespace.
     * @throws ClientAuthenticationException if the client's credentials do not have access to the namespace.
     * @throws HttpResponseException If error occurred processing the request.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<NamespaceProperties> getNamespaceProperties() {
        return getNamespacePropertiesWithResponse().map(Response::getValue);
    }

    /**
     * Gets information about the Service Bus namespace along with its HTTP response.
     *
     * @return A Mono that completes with information about the namespace and the associated HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<NamespaceProperties>> getNamespacePropertiesWithResponse() {
        return withContext(this::getNamespacePropertiesWithResponse);
    }

    /**
     * Gets a rule from the service namespace.
     *
     * Only following data types are deserialized in Filters and Action parameters - string, int, long, boolean, double,
     * and OffsetDateTime. Other data types would return its string value.
     *
     * @param topicName The name of the topic relative to service bus namespace.
     * @param subscriptionName The subscription name the rule belongs to.
     * @param ruleName The name of the rule to retrieve.
     *
     * @return The associated rule.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<RuleProperties> getRule(String topicName, String subscriptionName, String ruleName) {
        return getRuleWithResponse(topicName, subscriptionName, ruleName).map(response -> response.getValue());
    }

    /**
     * Gets a rule from the service namespace.
     *
     * Only following data types are deserialized in Filters and Action parameters - string, int, long, bool, double,
     * and OffsetDateTime. Other data types would return its string value.
     *
     * @param topicName The name of the topic relative to service bus namespace.
     * @param subscriptionName The subscription name the rule belongs to.
     * @param ruleName The name of the rule to retrieve.
     *
     * @return The associated rule with the corresponding HTTP response.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<RuleProperties>> getRuleWithResponse(String topicName, String subscriptionName,
        String ruleName) {
        return withContext(context -> getRuleWithResponse(topicName, subscriptionName, ruleName, context));
    }

    /**
     * Gets information about the queue.
     *
     * @param topicName Name of topic associated with subscription.
     * @param subscriptionName Name of subscription to get information about.
     *
     * @return A Mono that completes with information about the subscription.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} are empty strings.
     * @throws NullPointerException if {@code topicName} or {@code subscriptionName} are null.
     * @throws ResourceNotFoundException if the {@code subscriptionName} does not exist in the {@code topicName}.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<SubscriptionProperties> getSubscription(String topicName, String subscriptionName) {
        return getSubscriptionWithResponse(topicName, subscriptionName).map(Response::getValue);
    }

    /**
     * Gets information about the subscription along with its HTTP response.
     *
     * @param topicName Name of topic associated with subscription.
     * @param subscriptionName Name of subscription to get information about.
     *
     * @return A Mono that completes with information about the subscription and the associated HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} are empty strings.
     * @throws NullPointerException if {@code topicName} or {@code subscriptionName} are null.
     * @throws ResourceNotFoundException if the {@code subscriptionName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<SubscriptionProperties>> getSubscriptionWithResponse(String topicName,
        String subscriptionName) {
        return withContext(context -> getSubscriptionWithResponse(topicName, subscriptionName, context,
            Function.identity()));
    }

    /**
     * Gets whether or not a subscription within a topic exists.
     *
     * @param topicName Name of topic associated with subscription.
     * @param subscriptionName Name of the subscription.
     *
     * @return A Mono that completes indicating whether or not the subscription exists.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code subscriptionName} is an empty string.
     * @throws NullPointerException if {@code subscriptionName} is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Boolean> getSubscriptionExists(String topicName, String subscriptionName) {
        return getSubscriptionExistsWithResponse(topicName, subscriptionName).map(Response::getValue);
    }

    /**
     * Gets whether or not a subscription within a topic exists.
     *
     * @param topicName Name of topic associated with subscription.
     * @param subscriptionName Name of the subscription.
     *
     * @return A Mono that completes indicating whether or not the subscription exists along with its HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code subscriptionName} is an empty string.
     * @throws NullPointerException if {@code subscriptionName} is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Boolean>> getSubscriptionExistsWithResponse(String topicName, String subscriptionName) {
        return getEntityExistsWithResponse(getSubscriptionWithResponse(topicName, subscriptionName));
    }

    /**
     * Gets runtime properties about the subscription.
     *
     * @param topicName Name of topic associated with subscription.
     * @param subscriptionName Name of subscription to get information about.
     *
     * @return A Mono that completes with runtime properties about the subscription.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} are empty strings.
     * @throws NullPointerException if {@code topicName} or {@code subscriptionName} are null.
     * @throws ResourceNotFoundException if the {@code subscriptionName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<SubscriptionRuntimeProperties> getSubscriptionRuntimeProperties(
        String topicName, String subscriptionName) {
        return getSubscriptionRuntimePropertiesWithResponse(topicName, subscriptionName)
            .map(response -> response.getValue());
    }

    /**
     * Gets runtime properties about the subscription.
     *
     * @param topicName Name of topic associated with subscription.
     * @param subscriptionName Name of subscription to get information about.
     *
     * @return A Mono that completes with runtime properties about the subscription.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code subscriptionName} is an empty string.
     * @throws NullPointerException if {@code subscriptionName} is null.
     * @throws ResourceNotFoundException if the {@code subscriptionName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<SubscriptionRuntimeProperties>> getSubscriptionRuntimePropertiesWithResponse(
        String topicName, String subscriptionName) {
        return withContext(context -> getSubscriptionWithResponse(topicName, subscriptionName, context,
            SubscriptionRuntimeProperties::new));
    }

    /**
     * Gets information about the topic.
     *
     * @param topicName Name of topic to get information about.
     *
     * @return A Mono that completes with information about the topic.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws ResourceNotFoundException if the {@code topicName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<TopicProperties> getTopic(String topicName) {
        return getTopicWithResponse(topicName).map(Response::getValue);
    }

    /**
     * Gets information about the topic along with its HTTP response.
     *
     * @param topicName Name of topic to get information about.
     *
     * @return A Mono that completes with information about the topic and the associated HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws ResourceNotFoundException if the {@code topicName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<TopicProperties>> getTopicWithResponse(String topicName) {
        return withContext(context -> getTopicWithResponse(topicName, context, Function.identity()));
    }

    /**
     * Gets whether or not a topic with {@code topicName} exists in the Service Bus namespace.
     *
     * @param topicName Name of the topic.
     *
     * @return A Mono that completes indicating whether or not the topic exists.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Boolean> getTopicExists(String topicName) {
        return getTopicExistsWithResponse(topicName).map(Response::getValue);
    }

    /**
     * Gets whether or not a topic with {@code topicName} exists in the Service Bus namespace.
     *
     * @param topicName Name of the topic.
     *
     * @return A Mono that completes indicating whether or not the topic exists along with its HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<Boolean>> getTopicExistsWithResponse(String topicName) {
        return getEntityExistsWithResponse(getTopicWithResponse(topicName));
    }

    /**
     * Gets runtime properties about the topic.
     *
     * @param topicName Name of topic to get information about.
     *
     * @return A Mono that completes with runtime properties about the topic.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws ResourceNotFoundException if the {@code topicName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<TopicRuntimeProperties> getTopicRuntimeProperties(String topicName) {
        return getTopicRuntimePropertiesWithResponse(topicName).map(response -> response.getValue());
    }

    /**
     * Gets runtime properties about the topic with its HTTP response.
     *
     * @param topicName Name of topic to get information about.
     *
     * @return A Mono that completes with runtime properties about the topic and the associated HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If error occurred processing the request.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws ResourceNotFoundException if the {@code topicName} does not exist.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/get-entity">Get Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<TopicRuntimeProperties>> getTopicRuntimePropertiesWithResponse(String topicName) {
        return withContext(context -> getTopicWithResponse(topicName, context, TopicRuntimeProperties::new));
    }

    /**
     * Fetches all the queues in the Service Bus namespace.
     *
     * @return A Flux of {@link QueueProperties queues} in the Service Bus namespace.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/enumeration">List entities, subscriptions, or
     *     authorization rules</a>
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<QueueProperties> listQueues() {
        return new PagedFlux<>(
            () -> withContext(context -> listQueuesFirstPage(context)),
            token -> withContext(context -> listQueuesNextPage(token, context)));
    }

    /**
     * Fetches all the rules for a topic and subscription.
     *
     * @param topicName The topic name under which all the rules need to be retrieved.
     * @param subscriptionName The name of the subscription for which all rules need to be retrieved.
     *
     * @return A Flux of {@link RuleProperties rules} for the {@code topicName} and {@code subscriptionName}.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws NullPointerException if {@code topicName} or {@code subscriptionName} is null.
     * @throws IllegalArgumentException if {@code topicName} or {@code subscriptionName} is an empty string.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/enumeration">List entities, rules, or
     *     authorization rules</a>
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<RuleProperties> listRules(String topicName, String subscriptionName) {
        if (topicName == null) {
            return pagedFluxError(logger, new NullPointerException("'topicName' cannot be null."));
        } else if (topicName.isEmpty()) {
            return pagedFluxError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
        }

        return new PagedFlux<>(
            () -> withContext(context -> listRulesFirstPage(topicName, subscriptionName, context)),
            token -> withContext(context -> listRulesNextPage(topicName, subscriptionName, token, context)));
    }

    /**
     * Fetches all the subscriptions for a topic.
     *
     * @param topicName The topic name under which all the subscriptions need to be retrieved.
     *
     * @return A Flux of {@link SubscriptionProperties subscriptions} for the {@code topicName}.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws NullPointerException if {@code topicName} is null.
     * @throws IllegalArgumentException if {@code topicName} is an empty string.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/enumeration">List entities, subscriptions, or
     *     authorization rules</a>
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<SubscriptionProperties> listSubscriptions(String topicName) {
        if (topicName == null) {
            return pagedFluxError(logger, new NullPointerException("'topicName' cannot be null."));
        } else if (topicName.isEmpty()) {
            return pagedFluxError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
        }

        return new PagedFlux<>(
            () -> withContext(context -> listSubscriptionsFirstPage(topicName, context)),
            token -> withContext(context -> listSubscriptionsNextPage(topicName, token, context)));
    }

    /**
     * Fetches all the topics in the Service Bus namespace.
     *
     * @return A Flux of {@link TopicProperties topics} in the Service Bus namespace.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/enumeration">List entities, subscriptions, or
     *     authorization rules</a>
     */
    @ServiceMethod(returns = ReturnType.COLLECTION)
    public PagedFlux<TopicProperties> listTopics() {
        return new PagedFlux<>(
            () -> withContext(context -> listTopicsFirstPage(context)),
            token -> withContext(context -> listTopicsNextPage(token, context)));
    }

    /**
     * Updates a queue with the given {@link QueueProperties}. The {@link QueueProperties} must be fully populated as
     * all of the properties are replaced. If a property is not set the service default value is used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getQueue(String) Get queue description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * <p>
     * There are a subset of properties that can be updated. More information can be found in the links below. They are:
     * <ul>
     * <li>{@link QueueProperties#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}</li>
     * <li>{@link QueueProperties#setLockDuration(Duration) LockDuration}</li>
     * <li>{@link QueueProperties#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow}
     * </li>
     * <li>{@link QueueProperties#setMaxDeliveryCount(Integer) MaxDeliveryCount}</li>
     * </ul>
     *
     * @param queue Information about the queue to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that completes with the updated queue.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the queue quota is exceeded, or an error
     *     occurred processing the request.
     * @throws NullPointerException if {@code queue} is null.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-queue">Update Queue</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<QueueProperties> updateQueue(QueueProperties queue) {
        return updateQueueWithResponse(queue).map(Response::getValue);
    }

    /**
     * Updates a queue with the given {@link QueueProperties}. The {@link QueueProperties} must be fully populated as
     * all of the properties are replaced. If a property is not set the service default value is used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getQueue(String) Get queue description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * <p>
     * There are a subset of properties that can be updated. More information can be found in the links below. They are:
     * <ul>
     * <li>{@link QueueProperties#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}</li>
     * <li>{@link QueueProperties#setLockDuration(Duration) LockDuration}</li>
     * <li>{@link QueueProperties#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow}
     * </li>
     * <li>{@link QueueProperties#setMaxDeliveryCount(Integer) MaxDeliveryCount}</li>
     * </ul>
     *
     * @param queue Information about the queue to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that returns the updated queue in addition to the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the queue quota is exceeded, or an error
     *     occurred processing the request.
     * @throws NullPointerException if {@code queue} is null.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-queue">Update Queue</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<QueueProperties>> updateQueueWithResponse(QueueProperties queue) {
        return withContext(context -> updateQueueWithResponse(queue, context));
    }

    /**
     * Updates a rule with the given {@link RuleProperties}. The {@link RuleProperties} must be fully populated as all
     * of the properties are replaced. If a property is not set the service default value is used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getRule(String, String, String) Get rule description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * @param topicName The topic name under which the rule is updated.
     * @param subscriptionName The name of the subscription for which the rule is updated.
     * @param rule Information about the rule to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that returns the updated rule.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the rule quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@link RuleProperties#getName()} is null or an empty string.
     * @throws NullPointerException if {@code rule} is null.
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<RuleProperties> updateRule(String topicName, String subscriptionName, RuleProperties rule) {
        return updateRuleWithResponse(topicName, subscriptionName, rule).map(Response::getValue);
    }

    /**
     * Updates a rule with the given {@link RuleProperties}. The {@link RuleProperties} must be fully populated as all
     * of the properties are replaced. If a property is not set the service default value is used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getRule(String, String, String) Get rule description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * @param topicName The topic name under which the rule is updated.
     * @param subscriptionName The name of the subscription for which the rule is updated.
     * @param rule Information about the rule to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that returns the updated rule in addition to the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the rule quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@link RuleProperties#getName()} is null or an empty string.
     * @throws NullPointerException if {@code rule} is null.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<RuleProperties>> updateRuleWithResponse(String topicName, String subscriptionName,
        RuleProperties rule) {

        return withContext(context -> updateRuleWithResponse(topicName, subscriptionName, rule, context));
    }

    /**
     * Updates a subscription with the given {@link SubscriptionProperties}. The {@link SubscriptionProperties} must be
     * fully populated as all of the properties are replaced. If a property is not set the service default value is
     * used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getSubscription(String, String) Get subscription description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * <p>
     * There are a subset of properties that can be updated. More information can be found in the links below. They are:
     * <ul>
     * <li>{@link SubscriptionProperties#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}</li>
     * <li>{@link SubscriptionProperties#setLockDuration(Duration) LockDuration}</li>
     * <li>{@link SubscriptionProperties#setMaxDeliveryCount(int) MaxDeliveryCount}</li>
     * </ul>
     *
     * @param subscription Information about the subscription to update. You must provide all the property values
     *     that are desired on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that returns the updated subscription.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the subscription quota is exceeded, or an
     *     error occurred processing the request.
     * @throws IllegalArgumentException if {@link SubscriptionProperties#getTopicName()} or {@link
     *     SubscriptionProperties#getSubscriptionName()} is null or an empty string.
     * @throws NullPointerException if {@code subscription} is null.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<SubscriptionProperties> updateSubscription(SubscriptionProperties subscription) {
        return updateSubscriptionWithResponse(subscription).map(Response::getValue);
    }

    /**
     * Updates a subscription with the given {@link SubscriptionProperties}. The {@link SubscriptionProperties} must be
     * fully populated as all of the properties are replaced. If a property is not set the service default value is
     * used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getSubscription(String, String) Get subscription description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * <p>
     * There are a subset of properties that can be updated. More information can be found in the links below. They are:
     * <ul>
     * <li>{@link SubscriptionProperties#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}</li>
     * <li>{@link SubscriptionProperties#setLockDuration(Duration) LockDuration}</li>
     * <li>{@link SubscriptionProperties#setMaxDeliveryCount(int) MaxDeliveryCount}</li>
     * </ul>
     *
     * @param subscription Information about the subscription to update. You must provide all the property values
     *     that are desired on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that returns the updated subscription in addition to the HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the subscription quota is exceeded, or an
     *     error occurred processing the request.
     * @throws IllegalArgumentException if {@link SubscriptionProperties#getTopicName()} or {@link
     *     SubscriptionProperties#getSubscriptionName()} is null or an empty string.
     * @throws NullPointerException if {@code subscription} is null.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<SubscriptionProperties>> updateSubscriptionWithResponse(
        SubscriptionProperties subscription) {

        return withContext(context -> updateSubscriptionWithResponse(subscription, context));
    }

    /**
     * Updates a topic with the given {@link TopicProperties}. The {@link TopicProperties} must be fully populated as
     * all of the properties are replaced. If a property is not set the service default value is used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getTopic(String) Get topic description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * <p>
     * There are a subset of properties that can be updated. More information can be found in the links below. They are:
     * <ul>
     * <li>{@link TopicProperties#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}</li>
     * <li>{@link TopicProperties#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow}
     * </li>
     * </ul>
     *
     * @param topic Information about the topic to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that completes with the updated topic.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the topic quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@link TopicProperties#getName() topic.getName()} is null or an empty
     *     string.
     * @throws NullPointerException if {@code topic} is null.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-topic">Update Topic</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<TopicProperties> updateTopic(TopicProperties topic) {
        return updateTopicWithResponse(topic).map(Response::getValue);
    }

    /**
     * Updates a topic with the given {@link TopicProperties}. The {@link TopicProperties} must be fully populated as
     * all of the properties are replaced. If a property is not set the service default value is used.
     *
     * The suggested flow is:
     * <ol>
     *     <li>{@link #getTopic(String) Get topic description.}</li>
     *     <li>Update the required elements.</li>
     *     <li>Pass the updated description into this method.</li>
     * </ol>
     *
     * <p>
     * There are a subset of properties that can be updated. More information can be found in the links below. They are:
     * <ul>
     * <li>{@link TopicProperties#setDefaultMessageTimeToLive(Duration) DefaultMessageTimeToLive}</li>
     * <li>{@link TopicProperties#setDuplicateDetectionHistoryTimeWindow(Duration) DuplicateDetectionHistoryTimeWindow}
     * </li>
     * </ul>
     *
     * @param topic Information about the topic to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     *
     * @return A Mono that completes with the updated topic and its HTTP response.
     * @throws ClientAuthenticationException if the client's credentials do not have access to modify the
     *     namespace.
     * @throws HttpResponseException If the request body was invalid, the topic quota is exceeded, or an error
     *     occurred processing the request.
     * @throws IllegalArgumentException if {@link TopicProperties#getName() topic.getName()} is null or an empty
     *     string.
     * @throws NullPointerException if {@code topic} is null.
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-entity">Create or Update Entity</a>
     * @see <a href="https://docs.microsoft.com/rest/api/servicebus/update-topic">Update Topic</a>
     */
    @ServiceMethod(returns = ReturnType.SINGLE)
    public Mono<Response<TopicProperties>> updateTopicWithResponse(TopicProperties topic) {
        return withContext(context -> updateTopicWithResponse(topic, context));
    }

    /**
     * Creates a queue with its context.
     *
     * @param createQueueOptions Queue to create.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the created {@link QueueProperties}.
     */
    Mono<Response<QueueProperties>> createQueueWithResponse(String queueName, CreateQueueOptions createQueueOptions,
        Context context) {
        if (queueName == null) {
            return monoError(logger, new NullPointerException("'queueName' cannot be null."));
        } else if (queueName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'queueName' cannot be empty."));
        }

        if (createQueueOptions == null) {
            return monoError(logger, new NullPointerException("'createQueueOptions' cannot be null."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }
        final Context contextWithHeaders = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
            .addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders());

        final String forwardToEntity = createQueueOptions.getForwardTo();
        if (!CoreUtils.isNullOrEmpty(forwardToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardToEntity, contextWithHeaders);
            createQueueOptions.setForwardTo(getAbsoluteUrlFromEntity(forwardToEntity));
        }

        final String forwardDlqToEntity = createQueueOptions.getForwardDeadLetteredMessagesTo();
        if (!CoreUtils.isNullOrEmpty(forwardDlqToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_DLQ_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardDlqToEntity, contextWithHeaders);
            createQueueOptions.setForwardDeadLetteredMessagesTo(getAbsoluteUrlFromEntity(forwardDlqToEntity));
        }

        final QueueDescription description = EntityHelper.getQueueDescription(createQueueOptions);
        final CreateQueueBodyContent content = new CreateQueueBodyContent()
            .setType(CONTENT_TYPE)
            .setQueueDescription(description);
        final CreateQueueBody createEntity = new CreateQueueBody()
            .setContent(content);

        try {
            return entityClient.putWithResponseAsync(queueName, createEntity, null, contextWithHeaders)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(this::deserializeQueue);
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Creates a rule with its context.
     *
     * @param ruleOptions Rule to create.
     * @param context Context to pass into request.
     *
     *
     * @return A Mono that completes with the created {@link RuleProperties}.
     */
    Mono<Response<RuleProperties>> createRuleWithResponse(String topicName, String subscriptionName, String ruleName,
        CreateRuleOptions ruleOptions, Context context) {
        if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null."));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be empty."));
        }

        if (subscriptionName == null) {
            return monoError(logger, new NullPointerException("'subscriptionName' cannot be null."));
        } else if (subscriptionName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be empty."));
        }

        if (ruleName == null) {
            return monoError(logger, new NullPointerException("'ruleName' cannot be null."));
        } else if (ruleName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'ruleName' cannot be empty."));
        }

        if (ruleOptions == null) {
            return monoError(logger, new NullPointerException("'rule' cannot be null."));
        }

        final RuleActionImpl action = ruleOptions.getAction() != null
            ? EntityHelper.toImplementation(ruleOptions.getAction())
            : null;
        final RuleFilterImpl filter = ruleOptions.getFilter() != null
            ? EntityHelper.toImplementation(ruleOptions.getFilter())
            : null;
        final RuleDescription rule = new RuleDescription()
            .setAction(action)
            .setFilter(filter)
            .setName(ruleName);

        final CreateRuleBodyContent content = new CreateRuleBodyContent()
            .setType(CONTENT_TYPE)
            .setRuleDescription(rule);
        final CreateRuleBody createEntity = new CreateRuleBody().setContent(content);

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return managementClient.getRules().putWithResponseAsync(topicName, subscriptionName, ruleName, createEntity,
                null, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> deserializeRule(response));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Creates a subscription with its context.
     *
     * @param subscriptionOptions Subscription to create.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the created {@link SubscriptionProperties}.
     */
    Mono<Response<SubscriptionProperties>> createSubscriptionWithResponse(String topicName, String subscriptionName,
        CreateSubscriptionOptions subscriptionOptions, Context context) {
        if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null."));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be empty."));
        }

        if (subscriptionName == null) {
            return monoError(logger, new NullPointerException("'subscriptionName' cannot be null."));
        } else if (subscriptionName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be empty."));
        }

        if (subscriptionOptions == null) {
            return monoError(logger, new NullPointerException("'subscription' cannot be null."));
        }

        final Context contextWithHeaders = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
            .addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders());
        final String forwardToEntity = subscriptionOptions.getForwardTo();
        if (!CoreUtils.isNullOrEmpty(forwardToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardToEntity, contextWithHeaders);
            subscriptionOptions.setForwardTo(getAbsoluteUrlFromEntity(forwardToEntity));
        }

        final String forwardDlqToEntity = subscriptionOptions.getForwardDeadLetteredMessagesTo();
        if (!CoreUtils.isNullOrEmpty(forwardDlqToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_DLQ_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardDlqToEntity, contextWithHeaders);
            subscriptionOptions.setForwardDeadLetteredMessagesTo(getAbsoluteUrlFromEntity(forwardDlqToEntity));
        }

        final SubscriptionDescription subscription = EntityHelper.getSubscriptionDescription(subscriptionOptions);
        final CreateSubscriptionBodyContent content = new CreateSubscriptionBodyContent()
            .setType(CONTENT_TYPE)
            .setSubscriptionDescription(subscription);
        final CreateSubscriptionBody createEntity = new CreateSubscriptionBody().setContent(content);

        try {
            return managementClient.getSubscriptions().putWithResponseAsync(topicName, subscriptionName, createEntity,
                null, contextWithHeaders)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> deserializeSubscription(topicName, response));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Creates a topicOptions with its context.
     *
     * @param topicOptions Topic to create.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the created {@link TopicProperties}.
     */
    Mono<Response<TopicProperties>> createTopicWithResponse(String topicName, CreateTopicOptions topicOptions,
        Context context) {
        if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null."));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be empty."));
        }

        if (topicOptions == null) {
            return monoError(logger, new NullPointerException("'topicOptions' cannot be null"));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final TopicDescription topic = EntityHelper.getTopicDescription(topicOptions);
        final CreateTopicBodyContent content = new CreateTopicBodyContent()
            .setType(CONTENT_TYPE)
            .setTopicDescription(topic);
        final CreateTopicBody createEntity = new CreateTopicBody()
            .setContent(content);

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return entityClient.putWithResponseAsync(topicName, createEntity, null, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(this::deserializeTopic);
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Deletes a queue with its context.
     *
     * @param queueName Name of queue to delete.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes when the queue is deleted.
     */
    Mono<Response<Void>> deleteQueueWithResponse(String queueName, Context context) {
        if (queueName == null) {
            return monoError(logger, new NullPointerException("'queueName' cannot be null"));
        } else if (queueName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'queueName' cannot be an empty string."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return entityClient.deleteWithResponseAsync(queueName, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> {
                    return new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                        response.getHeaders(), null);
                });
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Deletes a queue with its context.
     *
     * @param topicName Name of topic to delete.
     * @param subscriptionName Name of the subscription for the rule.
     * @param ruleName Name of the rule.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the created {@link QueueProperties}.
     */
    Mono<Response<Void>> deleteRuleWithResponse(String topicName, String subscriptionName, String ruleName,
        Context context) {
        if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null"));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
        } else if (subscriptionName == null) {
            return monoError(logger, new NullPointerException("'subscriptionName' cannot be null"));
        } else if (subscriptionName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be an empty string."));
        } else if (ruleName == null) {
            return monoError(logger, new NullPointerException("'ruleName' cannot be null"));
        } else if (ruleName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'ruleName' cannot be an empty string."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return rulesClient.deleteWithResponseAsync(topicName, subscriptionName, ruleName, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                    response.getHeaders(), null));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Deletes a subscription with its context.
     *
     * @param topicName Name of topic associated with subscription to delete.
     * @param subscriptionName Name of subscription to delete.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the created {@link SubscriptionProperties}.
     */
    Mono<Response<Void>> deleteSubscriptionWithResponse(String topicName, String subscriptionName, Context context) {
        if (subscriptionName == null) {
            return monoError(logger, new NullPointerException("'subscriptionName' cannot be null"));
        } else if (subscriptionName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be an empty string."));
        } else if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null"));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return managementClient.getSubscriptions().deleteWithResponseAsync(topicName, subscriptionName,
                withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                    response.getHeaders(), null));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Deletes a topic with its context.
     *
     * @param topicName Name of topic to delete.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the created {@link TopicProperties}.
     */
    Mono<Response<Void>> deleteTopicWithResponse(String topicName, Context context) {
        if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null"));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return entityClient.deleteWithResponseAsync(topicName, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                    response.getHeaders(), null));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Gets whether an entity exists.
     *
     * @param getEntityOperation Operation to get information about entity. If {@link ResourceNotFoundException} is
     *     thrown, then it is mapped to false.
     * @param <T> Entity type.
     *
     * @return True if the entity exists, false otherwise.
     */
    <T> Mono<Response<Boolean>> getEntityExistsWithResponse(Mono<Response<T>> getEntityOperation) {
        return getEntityOperation.map(response -> {
            // When an entity does not exist, it does not have any description object in it.
            final boolean exists = response.getValue() != null;
            return (Response<Boolean>) new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                response.getHeaders(), exists);
        })
            .onErrorResume(ResourceNotFoundException.class, exception -> {
                final HttpResponse response = exception.getResponse();
                final Response<Boolean> result = new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                    response.getHeaders(), false);

                return Mono.just(result);
            });
    }

    /**
     * Gets a queue with its context.
     *
     * @param queueName Name of queue to fetch information for.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the {@link QueueProperties}.
     */
    <T> Mono<Response<T>> getQueueWithResponse(String queueName, Context context,
        Function<QueueProperties, T> mapper) {
        if (queueName == null) {
            return monoError(logger, new NullPointerException("'queueName' cannot be null"));
        } else if (queueName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'queueName' cannot be empty."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return entityClient.getWithResponseAsync(queueName, true, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .handle((response, sink) -> {
                    final Response<QueueProperties> deserialize = deserializeQueue(response);

                    // if this is null, then the queue could not be found.
                    if (deserialize.getValue() == null) {
                        final HttpResponse notFoundResponse = new EntityNotFoundHttpResponse<>(deserialize);
                        sink.error(new ResourceNotFoundException(String.format("Queue '%s' does not exist.", queueName),
                            notFoundResponse));
                    } else {
                        final T mapped = mapper.apply(deserialize.getValue());
                        sink.next(new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                            response.getHeaders(), mapped));
                    }
                });
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    Mono<Response<RuleProperties>> getRuleWithResponse(String topicName, String subscriptionName,
        String ruleName, Context context) {
        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return rulesClient.getWithResponseAsync(topicName, subscriptionName, ruleName, true, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(this::deserializeRule);
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Gets a subscription with its context.
     *
     * @param topicName Name of the topic associated with the subscription.
     * @param subscriptionName Name of subscription to fetch information for.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the {@link SubscriptionProperties}.
     */
    <T> Mono<Response<T>> getSubscriptionWithResponse(String topicName, String subscriptionName, Context context,
        Function<SubscriptionProperties, T> mapper) {
        if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null."));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be an empty string."));
        } else if (subscriptionName == null) {
            return monoError(logger, new NullPointerException("'subscriptionName' cannot be null."));
        } else if (subscriptionName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'subscriptionName' cannot be an empty string."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return managementClient.getSubscriptions().getWithResponseAsync(topicName, subscriptionName, true,
                withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .handle((response, sink) -> {
                    final Response<SubscriptionProperties> deserialize = deserializeSubscription(topicName, response);

                    // if this is null, then the queue could not be found.
                    if (deserialize.getValue() == null) {
                        final HttpResponse notFoundResponse = new EntityNotFoundHttpResponse<>(deserialize);
                        sink.error(new ResourceNotFoundException(String.format(
                            "Subscription '%s' in topic '%s' does not exist.", topicName, subscriptionName),
                            notFoundResponse));
                    } else {
                        final T mapped = mapper.apply(deserialize.getValue());
                        sink.next(new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                            response.getHeaders(), mapped));
                    }
                });
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Gets the namespace properties with its context.
     *
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the {@link NamespaceProperties}.
     */
    Mono<Response<NamespaceProperties>> getNamespacePropertiesWithResponse(Context context) {
        return managementClient.getNamespaces().getWithResponseAsync(context).handle((response, sink) -> {
            final NamespacePropertiesEntry entry = response.getValue();
            if (entry == null || entry.getContent() == null) {
                sink.error(new AzureException(
                    "There was no content inside namespace response. Entry: " + response));
                return;
            }

            final NamespaceProperties namespaceProperties = entry.getContent().getNamespaceProperties();
            final Response<NamespaceProperties> result = new SimpleResponse<>(response.getRequest(),
                response.getStatusCode(), response.getHeaders(), namespaceProperties);

            sink.next(result);
        });
    }

    /**
     * Gets a topic with its context.
     *
     * @param topicName Name of topic to fetch information for.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the {@link TopicProperties}.
     */
    <T> Mono<Response<T>> getTopicWithResponse(String topicName, Context context,
        Function<TopicProperties, T> mapper) {
        if (topicName == null) {
            return monoError(logger, new NullPointerException("'topicName' cannot be null"));
        } else if (topicName.isEmpty()) {
            return monoError(logger, new IllegalArgumentException("'topicName' cannot be empty."));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return entityClient.getWithResponseAsync(topicName, true, withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .handle((response, sink) -> {
                    final Response<TopicProperties> deserialize = deserializeTopic(response);

                    // if this is null, then the queue could not be found.
                    if (deserialize.getValue() == null) {
                        final HttpResponse notFoundResponse = new EntityNotFoundHttpResponse<>(deserialize);
                        sink.error(new ResourceNotFoundException(String.format("Topic '%s' does not exist.", topicName),
                            notFoundResponse));
                    } else {
                        final T mapped = mapper.apply(deserialize.getValue());
                        sink.next(new SimpleResponse<>(response.getRequest(), response.getStatusCode(),
                            response.getHeaders(), mapped));
                    }
                });
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Gets the first page of queues with context.
     *
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of queues.
     */
    Mono<PagedResponse<QueueProperties>> listQueuesFirstPage(Context context) {
        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return listQueues(0, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Gets the next page of queues with context.
     *
     * @param continuationToken Number of items to skip in feed.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of queues or empty if there are no items left.
     */
    Mono<PagedResponse<QueueProperties>> listQueuesNextPage(String continuationToken, Context context) {
        if (continuationToken == null || continuationToken.isEmpty()) {
            return Mono.empty();
        }

        try {
            final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
            final int skip = Integer.parseInt(continuationToken);

            return listQueues(skip, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Gets the first page of rules with context.
     *
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of rules.
     */
    Mono<PagedResponse<RuleProperties>> listRulesFirstPage(String topicName, String subscriptionName, Context context) {
        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return listRules(topicName, subscriptionName, 0, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Gets the next page of rules with context.
     *
     * @param continuationToken Number of items to skip in feed.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of rules or empty if there are no items left.
     */
    Mono<PagedResponse<RuleProperties>> listRulesNextPage(String topicName, String subscriptionName,
        String continuationToken, Context context) {
        if (continuationToken == null || continuationToken.isEmpty()) {
            return Mono.empty();
        }

        try {
            final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
            final int skip = Integer.parseInt(continuationToken);

            return listRules(topicName, subscriptionName, skip, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Gets the first page of subscriptions with context.
     *
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of subscriptions.
     */
    Mono<PagedResponse<SubscriptionProperties>> listSubscriptionsFirstPage(String topicName, Context context) {
        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return listSubscriptions(topicName, 0, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Gets the next page of subscriptions with context.
     *
     * @param continuationToken Number of items to skip in feed.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of subscriptions or empty if there are no items left.
     */
    Mono<PagedResponse<SubscriptionProperties>> listSubscriptionsNextPage(String topicName, String continuationToken,
        Context context) {
        if (continuationToken == null || continuationToken.isEmpty()) {
            return Mono.empty();
        }

        try {
            final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
            final int skip = Integer.parseInt(continuationToken);

            return listSubscriptions(topicName, skip, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Gets the first page of topics with context.
     *
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of topics.
     */
    Mono<PagedResponse<TopicProperties>> listTopicsFirstPage(Context context) {
        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            return listTopics(0, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Gets the next page of topics with context.
     *
     * @param continuationToken Number of items to skip in feed.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with a page of topics or empty if there are no items left.
     */
    Mono<PagedResponse<TopicProperties>> listTopicsNextPage(String continuationToken, Context context) {
        if (continuationToken == null || continuationToken.isEmpty()) {
            return Mono.empty();
        }

        try {
            final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);
            final int skip = Integer.parseInt(continuationToken);

            return listTopics(skip, withTracing);
        } catch (RuntimeException e) {
            return monoError(logger, e);
        }
    }

    /**
     * Updates a queue with its context.
     *
     * @param queue Information about the queue to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the updated {@link QueueProperties}.
     */
    Mono<Response<QueueProperties>> updateQueueWithResponse(QueueProperties queue, Context context) {
        if (queue == null) {
            return monoError(logger, new NullPointerException("'queue' cannot be null"));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final Context contextWithHeaders = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
            .addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders());
        final String forwardToEntity = queue.getForwardTo();
        if (!CoreUtils.isNullOrEmpty(forwardToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardToEntity, contextWithHeaders);
            queue.setForwardTo(getAbsoluteUrlFromEntity(forwardToEntity));
        }

        final String forwardDlqToEntity = queue.getForwardDeadLetteredMessagesTo();
        if (!CoreUtils.isNullOrEmpty(forwardDlqToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_DLQ_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardDlqToEntity, contextWithHeaders);
            queue.setForwardDeadLetteredMessagesTo(getAbsoluteUrlFromEntity(forwardDlqToEntity));
        }

        final QueueDescription queueDescription = EntityHelper.toImplementation(queue);
        final CreateQueueBodyContent content = new CreateQueueBodyContent()
            .setType(CONTENT_TYPE)
            .setQueueDescription(queueDescription);
        final CreateQueueBody createEntity = new CreateQueueBody()
            .setContent(content);

        try {
            // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour.
            return entityClient.putWithResponseAsync(queue.getName(), createEntity, "*", contextWithHeaders)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> deserializeQueue(response));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Updates a rule with its context.
     *
     * @param rule Information about the rule to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the updated {@link RuleProperties}.
     */
    Mono<Response<RuleProperties>> updateRuleWithResponse(String topicName, String subscriptionName,
        RuleProperties rule, Context context) {
        if (rule == null) {
            return monoError(logger, new NullPointerException("'rule' cannot be null"));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final RuleDescription implementation = EntityHelper.toImplementation(rule);
        final CreateRuleBodyContent content = new CreateRuleBodyContent()
            .setType(CONTENT_TYPE)
            .setRuleDescription(implementation);
        final CreateRuleBody ruleBody = new CreateRuleBody()
            .setContent(content);
        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour.
            return managementClient.getRules().putWithResponseAsync(topicName, subscriptionName, rule.getName(),
                ruleBody, "*", withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> deserializeRule(response));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Updates a subscription with its context.
     *
     * @param subscription Information about the subscription to update. You must provide all the property values
     *     that are desired on the updated entity. Any values not provided are set to the service default values.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the updated {@link SubscriptionProperties}.
     */
    Mono<Response<SubscriptionProperties>> updateSubscriptionWithResponse(SubscriptionProperties subscription,
        Context context) {
        if (subscription == null) {
            return monoError(logger, new NullPointerException("'subscription' cannot be null"));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }
        final Context contextWithHeaders = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE)
            .addData(AZURE_REQUEST_HTTP_HEADERS_KEY, new HttpHeaders());
        final String forwardToEntity = subscription.getForwardTo();
        if (!CoreUtils.isNullOrEmpty(forwardToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardToEntity, contextWithHeaders);
            subscription.setForwardTo(getAbsoluteUrlFromEntity(forwardToEntity));
        }

        final String forwardDlqToEntity = subscription.getForwardDeadLetteredMessagesTo();
        if (!CoreUtils.isNullOrEmpty(forwardDlqToEntity)) {
            addSupplementaryAuthHeader(SERVICE_BUS_DLQ_SUPPLEMENTARY_AUTHORIZATION_HEADER_NAME,
                forwardDlqToEntity, contextWithHeaders);
            subscription.setForwardDeadLetteredMessagesTo(getAbsoluteUrlFromEntity(forwardDlqToEntity));
        }

        final String topicName = subscription.getTopicName();
        final String subscriptionName = subscription.getSubscriptionName();
        final SubscriptionDescription implementation = EntityHelper.toImplementation(subscription);
        final CreateSubscriptionBodyContent content = new CreateSubscriptionBodyContent()
            .setType(CONTENT_TYPE)
            .setSubscriptionDescription(implementation);
        final CreateSubscriptionBody createEntity = new CreateSubscriptionBody()
            .setContent(content);

        try {
            // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour.
            return managementClient.getSubscriptions().putWithResponseAsync(topicName, subscriptionName, createEntity,
                "*", contextWithHeaders)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> deserializeSubscription(topicName, response));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    /**
     * Updates a topic with its context.
     *
     * @param topic Information about the topic to update. You must provide all the property values that are desired
     *     on the updated entity. Any values not provided are set to the service default values.
     * @param context Context to pass into request.
     *
     * @return A Mono that completes with the updated {@link TopicProperties}.
     */
    Mono<Response<TopicProperties>> updateTopicWithResponse(TopicProperties topic, Context context) {
        if (topic == null) {
            return monoError(logger, new NullPointerException("'topic' cannot be null"));
        } else if (context == null) {
            return monoError(logger, new NullPointerException("'context' cannot be null."));
        }

        final TopicDescription implementation = EntityHelper.toImplementation(topic);
        final CreateTopicBodyContent content = new CreateTopicBodyContent()
            .setType(CONTENT_TYPE)
            .setTopicDescription(implementation);
        final CreateTopicBody createEntity = new CreateTopicBody()
            .setContent(content);
        final Context withTracing = context.addData(AZ_TRACING_NAMESPACE_KEY, AZ_TRACING_NAMESPACE_VALUE);

        try {
            // If-Match == "*" to unconditionally update. This is in line with the existing client library behaviour.
            return entityClient.putWithResponseAsync(topic.getName(), createEntity, "*", withTracing)
                .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
                .map(response -> deserializeTopic(response));
        } catch (RuntimeException ex) {
            return monoError(logger, ex);
        }
    }

    private <T> T deserialize(Object object, Class<T> clazz) {
        if (object == null) {
            return null;
        }

        final String contents = String.valueOf(object);
        if (contents.isEmpty()) {
            return null;
        }

        try {
            return serializer.deserialize(contents, clazz);
        } catch (IOException e) {
            throw logger.logExceptionAsError(new RuntimeException(String.format(
                "Exception while deserializing. Body: [%s]. Class: %s", contents, clazz), e));
        }
    }

    /**
     * Given an HTTP response, will deserialize it into a strongly typed Response object.
     *
     * @param response HTTP response to deserialize response body from.
     * @param clazz Class to deserialize response type into.
     * @param <T> Class type to deserialize response into.
     *
     * @return A Response with a strongly typed response value.
     */
    private <T> Response<T> deserialize(Response<Object> response, Class<T> clazz) {
        final T deserialize = deserialize(response.getValue(), clazz);

        return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(),
            deserialize);
    }

    /**
     * Converts a Response into its corresponding {@link QueueDescriptionEntry} then mapped into {@link
     * QueueProperties}.
     *
     * @param response HTTP Response to deserialize.
     *
     * @return The corresponding HTTP response with convenience properties set.
     */
    private Response<QueueProperties> deserializeQueue(Response<Object> response) {
        final QueueDescriptionEntry entry = deserialize(response.getValue(), QueueDescriptionEntry.class);

        // This was an empty response (ie. 204).
        if (entry == null) {
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        } else if (entry.getContent() == null) {
            logger.info("entry.getContent() is null. The entity may not exist. {}", entry);
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        } else if (entry.getContent().getQueueDescription() == null) {
            final TopicDescriptionEntry entryTopic = deserialize(response.getValue(), TopicDescriptionEntry.class);
            if (entryTopic != null && entryTopic.getContent() != null && entryTopic.getContent().getTopicDescription() != null) {
                logger.warning("'{}' is not a queue, it is a topic.", entryTopic.getTitle());
                return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
            }
        }

        final QueueProperties result = EntityHelper.toModel(entry.getContent().getQueueDescription());
        final String queueName = getTitleValue(entry.getTitle());
        EntityHelper.setQueueName(result, queueName);

        return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), result);
    }

    /**
     * Converts a Response into its corresponding {@link RuleDescriptionEntry} then mapped into {@link RuleProperties}.
     *
     * @param response HTTP Response to deserialize.
     *
     * @return The corresponding HTTP response with convenience properties set.
     */
    private Response<RuleProperties> deserializeRule(Response<Object> response) {
        final RuleDescriptionEntry entry = deserialize(response.getValue(), RuleDescriptionEntry.class);

        // This was an empty response (ie. 204).
        if (entry == null) {
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        } else if (entry.getContent() == null) {
            logger.info("entry.getContent() is null. The entity may not exist. {}", entry);
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        }

        final RuleDescription description = entry.getContent().getRuleDescription();
        final RuleProperties result = EntityHelper.toModel(description);

        return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), result);
    }

    /**
     * Converts a Response into its corresponding {@link SubscriptionDescriptionEntry} then mapped into {@link
     * SubscriptionProperties}.
     *
     * @param response HTTP Response to deserialize.
     *
     * @return The corresponding HTTP response with convenience properties set.
     */
    private Response<SubscriptionProperties> deserializeSubscription(String topicName, Response<Object> response) {
        final SubscriptionDescriptionEntry entry = deserialize(response.getValue(), SubscriptionDescriptionEntry.class);

        // This was an empty response (ie. 204).
        if (entry == null) {
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        } else if (entry.getContent() == null) {
            logger.warning("entry.getContent() is null. There should have been content returned. Entry: {}", entry);
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        }

        final SubscriptionProperties subscription = EntityHelper.toModel(
            entry.getContent().getSubscriptionDescription());
        final String subscriptionName = getTitleValue(entry.getTitle());
        EntityHelper.setSubscriptionName(subscription, subscriptionName);
        EntityHelper.setTopicName(subscription, topicName);

        return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(),
            subscription);
    }

    /**
     * Converts a Response into its corresponding {@link TopicDescriptionEntry} then mapped into {@link
     * QueueProperties}.
     *
     * @param response HTTP Response to deserialize.
     *
     * @return The corresponding HTTP response with convenience properties set.
     */
    private Response<TopicProperties> deserializeTopic(Response<Object> response) {
        final TopicDescriptionEntry entry = deserialize(response.getValue(), TopicDescriptionEntry.class);

        // This was an empty response (ie. 204).
        if (entry == null) {
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        } else if (entry.getContent() == null) {
            logger.warning("entry.getContent() is null. There should have been content returned. Entry: {}", entry);
            return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
        } else if (entry.getContent().getTopicDescription() == null) {
            final QueueDescriptionEntry entryQueue = deserialize(response.getValue(), QueueDescriptionEntry.class);
            if (entryQueue != null && entryQueue.getContent() != null && entryQueue.getContent().getQueueDescription() != null) {
                logger.warning("'{}' is not a topic, it is a queue.", entryQueue.getTitle());
                return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), null);
            }
        }

        final TopicProperties result = EntityHelper.toModel(entry.getContent().getTopicDescription());
        final String topicName = getTitleValue(entry.getTitle());
        EntityHelper.setTopicName(result, topicName);

        return new SimpleResponse<>(response.getRequest(), response.getStatusCode(), response.getHeaders(), result);
    }

    /**
     * Creates a {@link FeedPage} given the elements and a set of response links to get the next link from.
     *
     * @param entities Entities in the feed.
     * @param responseLinks Links returned from the feed.
     * @param <TResult> Type of Service Bus entities in page.
     *
     * @return A {@link FeedPage} indicating whether this can be continued or not.
     * @throws MalformedURLException if the "next" page link does not contain a well-formed URL.
     */
    private <TResult, TFeed> FeedPage<TResult> extractPage(Response<TFeed> response, List<TResult> entities,
        List<ResponseLink> responseLinks)
        throws MalformedURLException, UnsupportedEncodingException {
        final Optional<ResponseLink> nextLink = responseLinks.stream()
            .filter(link -> link.getRel().equalsIgnoreCase("next"))
            .findFirst();

        if (!nextLink.isPresent()) {
            return new FeedPage<>(response.getStatusCode(), response.getHeaders(), response.getRequest(), entities);
        }

        final URL url = new URL(nextLink.get().getHref());
        final String decode = URLDecoder.decode(url.getQuery(), StandardCharsets.UTF_8.name());
        final Optional<Integer> skipParameter = Arrays.stream(decode.split("&amp;|&"))
            .map(part -> part.split("=", 2))
            .filter(parts -> parts[0].equalsIgnoreCase("$skip") && parts.length == 2)
            .map(parts -> Integer.valueOf(parts[1]))
            .findFirst();

        if (skipParameter.isPresent()) {
            return new FeedPage<>(response.getStatusCode(), response.getHeaders(), response.getRequest(), entities,
                skipParameter.get());
        } else {
            logger.warning("There should have been a skip parameter for the next page.");
            return new FeedPage<>(response.getStatusCode(), response.getHeaders(), response.getRequest(), entities);
        }
    }

    /**
     * Helper method that invokes the service method, extracts the data and translates it to a PagedResponse.
     *
     * @param skip Number of elements to skip.
     * @param context Context for the query.
     *
     * @return A Mono that completes with a paged response of queues.
     */
    private Mono<PagedResponse<QueueProperties>> listQueues(int skip, Context context) {
        return managementClient.listEntitiesWithResponseAsync(QUEUES_ENTITY_TYPE, skip, NUMBER_OF_ELEMENTS, context)
            .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
            .flatMap(response -> {
                final Response<QueueDescriptionFeed> feedResponse = deserialize(response, QueueDescriptionFeed.class);
                final QueueDescriptionFeed feed = feedResponse.getValue();
                if (feed == null) {
                    logger.warning("Could not deserialize QueueDescriptionFeed. skip {}, top: {}", skip,
                        NUMBER_OF_ELEMENTS);
                    return Mono.empty();
                }

                final List<QueueProperties> entities = feed.getEntry().stream()
                    .filter(e -> e.getContent() != null && e.getContent().getQueueDescription() != null)
                    .map(e -> {
                        final String queueName = getTitleValue(e.getTitle());
                        final QueueProperties queueProperties = EntityHelper.toModel(
                            e.getContent().getQueueDescription());

                        EntityHelper.setQueueName(queueProperties, queueName);

                        return queueProperties;
                    })
                    .collect(Collectors.toList());
                try {
                    return Mono.just(extractPage(feedResponse, entities, feed.getLink()));
                } catch (MalformedURLException | UnsupportedEncodingException error) {
                    return Mono.error(new RuntimeException("Could not parse response into FeedPage<QueueDescription>",
                        error));
                }
            });
    }


    /**
     * Helper method that invokes the service method, extracts the data and translates it to a PagedResponse.
     *
     * @param skip Number of elements to skip.
     * @param context Context for the query.
     *
     * @return A Mono that completes with a paged response of rules.
     */
    private Mono<PagedResponse<RuleProperties>> listRules(String topicName, String subscriptionName, int skip,
        Context context) {
        return managementClient.listRulesWithResponseAsync(topicName, subscriptionName, skip, NUMBER_OF_ELEMENTS,
            context)
            .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
            .flatMap(response -> {
                final Response<RuleDescriptionFeed> feedResponse = deserialize(response,
                    RuleDescriptionFeed.class);

                final RuleDescriptionFeed feed = feedResponse.getValue();
                if (feed == null) {
                    logger.warning("Could not deserialize RuleDescriptionFeed. skip {}, top: {}", skip,
                        NUMBER_OF_ELEMENTS);
                    return Mono.empty();
                }

                final List<RuleProperties> entities = feed.getEntry().stream()
                    .filter(e -> e.getContent() != null && e.getContent().getRuleDescription() != null)
                    .map(e -> {
                        return EntityHelper.toModel(e.getContent().getRuleDescription());
                    })
                    .collect(Collectors.toList());
                try {
                    return Mono.just(extractPage(feedResponse, entities, feed.getLink()));
                } catch (MalformedURLException | UnsupportedEncodingException error) {
                    return Mono.error(new RuntimeException(
                        "Could not parse response into FeedPage<RuleDescription>", error));
                }
            });
    }

    /**
     * Helper method that invokes the service method, extracts the data and translates it to a PagedResponse.
     *
     * @param skip Number of elements to skip.
     * @param context Context for the query.
     *
     * @return A Mono that completes with a paged response of subscriptions.
     */
    private Mono<PagedResponse<SubscriptionProperties>> listSubscriptions(String topicName, int skip,
        Context context) {
        return managementClient.listSubscriptionsWithResponseAsync(topicName, skip, NUMBER_OF_ELEMENTS, context)
            .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
            .flatMap(response -> {
                final Response<SubscriptionDescriptionFeed> feedResponse = deserialize(response,
                    SubscriptionDescriptionFeed.class);

                final SubscriptionDescriptionFeed feed = feedResponse.getValue();
                if (feed == null) {
                    logger.warning("Could not deserialize SubscriptionDescriptionFeed. skip {}, top: {}", skip,
                        NUMBER_OF_ELEMENTS);
                    return Mono.empty();
                }

                final List<SubscriptionProperties> entities = feed.getEntry().stream()
                    .filter(e -> e.getContent() != null && e.getContent().getSubscriptionDescription() != null)
                    .map(e -> {
                        final String subscriptionName = getTitleValue(e.getTitle());
                        final SubscriptionProperties description = EntityHelper.toModel(
                            e.getContent().getSubscriptionDescription());

                        EntityHelper.setTopicName(description, topicName);
                        EntityHelper.setSubscriptionName(description, subscriptionName);

                        return description;
                    })
                    .collect(Collectors.toList());
                try {
                    return Mono.just(extractPage(feedResponse, entities, feed.getLink()));
                } catch (MalformedURLException | UnsupportedEncodingException error) {
                    return Mono.error(new RuntimeException(
                        "Could not parse response into FeedPage<SubscriptionDescription>", error));
                }
            });
    }

    /**
     * Helper method that invokes the service method, extracts the data and translates it to a PagedResponse.
     *
     * @param skip Number of elements to skip.
     * @param context Context for the query.
     *
     * @return A Mono that completes with a paged response of topics.
     */
    private Mono<PagedResponse<TopicProperties>> listTopics(int skip, Context context) {
        return managementClient.listEntitiesWithResponseAsync(TOPICS_ENTITY_TYPE, skip, NUMBER_OF_ELEMENTS, context)
            .onErrorMap(ServiceBusAdministrationAsyncClient::mapException)
            .flatMap(response -> {
                final Response<TopicDescriptionFeed> feedResponse = deserialize(response, TopicDescriptionFeed.class);
                final TopicDescriptionFeed feed = feedResponse.getValue();
                if (feed == null) {
                    logger.warning("Could not deserialize TopicDescriptionFeed. skip {}, top: {}", skip,
                        NUMBER_OF_ELEMENTS);
                    return Mono.empty();
                }

                final List<TopicProperties> entities = feed.getEntry().stream()
                    .filter(e -> e.getContent() != null && e.getContent().getTopicDescription() != null)
                    .map(e -> {
                        final String topicName = getTitleValue(e.getTitle());
                        final TopicProperties topicProperties = EntityHelper.toModel(
                            e.getContent().getTopicDescription());
                        EntityHelper.setTopicName(topicProperties, topicName);

                        return topicProperties;
                    })
                    .collect(Collectors.toList());
                try {
                    return Mono.just(extractPage(feedResponse, entities, feed.getLink()));
                } catch (MalformedURLException | UnsupportedEncodingException error) {
                    return Mono.error(new RuntimeException("Could not parse response into FeedPage<TopicDescription>",
                        error));
                }
            });
    }

    /**
     * Check that the additional headers field is present and add the additional auth header
     *
     * @param headerName name of the header to be added
     * @param context current request context
     *
     * @return boolean representing the outcome of adding header operation
     */
    private void addSupplementaryAuthHeader(String headerName, String entity, Context context) {
        context.getData(AZURE_REQUEST_HTTP_HEADERS_KEY)
            .ifPresent(headers -> {
                if (headers instanceof HttpHeaders) {
                    HttpHeaders customHttpHeaders = (HttpHeaders) headers;
                    customHttpHeaders.add(headerName, entity);
                }
            });
    }

    /**
     * Checks if the given entity is an absolute URL, if so return it.
     * Otherwise, construct the URL from the given entity and return that.
     *
     * @param entity : entity to forward messages to.
     *
     * @return Forward to Entity represented as an absolute URL
     */
    private String getAbsoluteUrlFromEntity(String entity) {
        // Check if passed entity is an absolute URL
        try {
            URL url = new URL(entity);
            return url.toString();
        } catch (MalformedURLException ex) {
            // Entity is not a URL, continue.
        }
        UrlBuilder urlBuilder = new UrlBuilder();
        urlBuilder.setScheme("https");
        urlBuilder.setHost(managementClient.getEndpoint());
        urlBuilder.setPath(entity);

        try {
            URL url = urlBuilder.toUrl();
            return url.toString();
        } catch (MalformedURLException ex) {
            // This is not expected.
            logger.error("Failed to construct URL using the endpoint:'{}' and entity:'{}'",
                managementClient.getEndpoint(), entity);
            logger.logThrowableAsError(ex);
        }
        return null;
    }

    /**
     * Given an XML title element, returns the XML text inside. Jackson deserializes Objects as LinkedHashMaps. XML text
     * is represented as an entry with an empty string as the key.
     *
     * For example, the text returned from this {@code <title text="text/xml">QueueName</title>} is "QueueName".
     *
     * @param responseTitle XML title element.
     *
     * @return The XML text inside the title. {@code null} is returned if there is no value.
     */
    @SuppressWarnings("unchecked")
    private String getTitleValue(Object responseTitle) {
        if (!(responseTitle instanceof Map)) {
            return null;
        }

        final Map<String, String> map;
        try {
            map = (Map<String, String>) responseTitle;
            return map.get("");
        } catch (ClassCastException error) {
            logger.warning("Unable to cast to Map<String,String>. Title: {}", responseTitle, error);
            return null;
        }
    }

    /**
     * Maps an exception from the ATOM APIs to its associated {@link HttpResponseException}.
     *
     * @param exception Exception from the ATOM API.
     *
     * @return The corresponding {@link HttpResponseException} or {@code throwable} if it is not an instance of {@link
     *     ServiceBusManagementErrorException}.
     */
    private static Throwable mapException(Throwable exception) {
        if (!(exception instanceof ServiceBusManagementErrorException)) {
            return exception;
        }

        final ServiceBusManagementErrorException managementError = ((ServiceBusManagementErrorException) exception);
        final ServiceBusManagementError error = managementError.getValue();
        final HttpResponse errorHttpResponse = managementError.getResponse();

        final int statusCode = error != null && error.getCode() != null
            ? error.getCode()
            : errorHttpResponse.getStatusCode();
        final String errorDetail = error != null && error.getDetail() != null
            ? error.getDetail()
            : managementError.getMessage();

        switch (statusCode) {
            case 401:
                return new ClientAuthenticationException(errorDetail, managementError.getResponse(), exception);
            case 404:
                return new ResourceNotFoundException(errorDetail, managementError.getResponse(), exception);
            case 409:
                return new ResourceExistsException(errorDetail, managementError.getResponse(), exception);
            case 412:
                return new ResourceModifiedException(errorDetail, managementError.getResponse(), exception);
            default:
                return new HttpResponseException(errorDetail, managementError.getResponse(), exception);
        }
    }

    /**
     * A page of Service Bus entities.
     *
     * @param <T> The entity description from Service Bus.
     */
    private static final class FeedPage<T> implements PagedResponse<T> {
        private final int statusCode;
        private final HttpHeaders header;
        private final HttpRequest request;
        private final IterableStream<T> entries;
        private final String continuationToken;

        /**
         * Creates a page that does not have any more pages.
         *
         * @param entries Items in the page.
         */
        private FeedPage(int statusCode, HttpHeaders header, HttpRequest request, List<T> entries) {
            this.statusCode = statusCode;
            this.header = header;
            this.request = request;
            this.entries = new IterableStream<>(entries);
            this.continuationToken = null;
        }

        /**
         * Creates an instance that has additional pages to fetch.
         *
         * @param entries Items in the page.
         * @param skip Number of elements to "skip".
         */
        private FeedPage(int statusCode, HttpHeaders header, HttpRequest request, List<T> entries, int skip) {
            this.statusCode = statusCode;
            this.header = header;
            this.request = request;
            this.entries = new IterableStream<>(entries);
            this.continuationToken = String.valueOf(skip);
        }

        @Override
        public IterableStream<T> getElements() {
            return entries;
        }

        @Override
        public String getContinuationToken() {
            return continuationToken;
        }

        @Override
        public int getStatusCode() {
            return statusCode;
        }

        @Override
        public HttpHeaders getHeaders() {
            return header;
        }

        @Override
        public HttpRequest getRequest() {
            return request;
        }

        @Override
        public void close() {
        }
    }

    private static final class EntityNotFoundHttpResponse<T> extends HttpResponse {
        private final int statusCode;
        private final HttpHeaders headers;

        private EntityNotFoundHttpResponse(Response<T> response) {
            super(response.getRequest());
            this.headers = response.getHeaders();
            this.statusCode = response.getStatusCode();
        }

        @Override
        public int getStatusCode() {
            return statusCode;
        }

        @Override
        public String getHeaderValue(String name) {
            return headers.getValue(name);
        }

        @Override
        public HttpHeaders getHeaders() {
            return headers;
        }

        @Override
        public Flux<ByteBuffer> getBody() {
            return Flux.empty();
        }

        @Override
        public Mono<byte[]> getBodyAsByteArray() {
            return Mono.empty();
        }

        @Override
        public Mono<String> getBodyAsString() {
            return Mono.empty();
        }

        @Override
        public Mono<String> getBodyAsString(Charset charset) {
            return Mono.empty();
        }
    }
}