< Summary

Class:Microsoft.Azure.ServiceBus.TopicClient
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\TopicClient.cs
Covered lines:0
Uncovered lines:59
Coverable lines:59
Total lines:242
Line coverage:0% (0 of 59)
Covered branches:0
Total branches:22
Branch coverage:0% (0 of 22)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor(...)-0%0%
.ctor(...)-0%0%
.ctor(...)-0%100%
.ctor(...)-0%0%
get_TopicName()-0%100%
get_OperationTimeout()-0%100%
set_OperationTimeout(...)-0%100%
get_Path()-0%100%
get_ServiceBusConnection()-0%100%
get_InnerSender()-0%0%
get_CbsTokenProvider()-0%100%
SendAsync(...)-0%100%
SendAsync(...)-0%100%
ScheduleMessageAsync(...)-0%100%
CancelScheduledMessageAsync(...)-0%100%
get_RegisteredPlugins()-0%100%
RegisterPlugin(...)-0%100%
UnregisterPlugin(...)-0%100%
OnClosingAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\TopicClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Threading.Tasks;
 9    using Microsoft.Azure.Amqp;
 10    using Microsoft.Azure.ServiceBus.Core;
 11    using Microsoft.Azure.ServiceBus.Primitives;
 12
 13    /// <summary>
 14    /// TopicClient can be used for all basic interactions with a Service Bus topic.
 15    /// </summary>
 16    /// <example>
 17    /// Create a new TopicClient
 18    /// <code>
 19    /// ITopicClient topicClient = new TopicClient(
 20    ///     namespaceConnectionString,
 21    ///     topicName,
 22    ///     RetryExponential);
 23    /// </code>
 24    ///
 25    /// Send a message to the topic:
 26    /// <code>
 27    /// byte[] data = GetData();
 28    /// await topicClient.SendAsync(data);
 29    /// </code>
 30    /// </example>
 31    /// <remarks>It uses AMQP protocol for communicating with servicebus.</remarks>
 32    public class TopicClient : ClientEntity, ITopicClient
 33    {
 34        readonly object syncLock;
 35        MessageSender innerSender;
 36
 37        /// <summary>
 38        /// Instantiates a new <see cref="TopicClient"/> to perform operations on a topic.
 39        /// </summary>
 40        /// <param name="connectionStringBuilder"><see cref="ServiceBusConnectionStringBuilder"/> having namespace and t
 41        /// <param name="retryPolicy">Retry policy for topic operations. Defaults to <see cref="RetryPolicy.Default"/></
 42        /// <remarks>Creates a new connection to the topic, which is opened during the first send operation.</remarks>
 43        public TopicClient(ServiceBusConnectionStringBuilder connectionStringBuilder, RetryPolicy retryPolicy = null)
 044            : this(connectionStringBuilder?.GetNamespaceConnectionString(), connectionStringBuilder?.EntityPath, retryPo
 45        {
 046        }
 47
 48        /// <summary>
 49        /// Instantiates a new <see cref="TopicClient"/> to perform operations on a topic.
 50        /// </summary>
 51        /// <param name="connectionString">Namespace connection string. Must not contain topic information.</param>
 52        /// <param name="entityPath">Path to the topic</param>
 53        /// <param name="retryPolicy">Retry policy for topic operations. Defaults to <see cref="RetryPolicy.Default"/></
 54        /// <remarks>Creates a new connection to the topic, which is opened during the first send operation.</remarks>
 55        public TopicClient(string connectionString, string entityPath, RetryPolicy retryPolicy = null)
 056            : this(new ServiceBusConnection(connectionString), entityPath, retryPolicy ?? RetryPolicy.Default)
 57        {
 058            if (string.IsNullOrWhiteSpace(connectionString))
 59            {
 060                throw Fx.Exception.ArgumentNullOrWhiteSpace(connectionString);
 61            }
 62
 063            this.OwnsConnection = true;
 064        }
 65
 66        /// <summary>
 67        /// Creates a new instance of the Topic client using the specified endpoint, entity path, and token provider.
 68        /// </summary>
 69        /// <param name="endpoint">Fully qualified domain name for Service Bus. Most likely, {yournamespace}.servicebus.
 70        /// <param name="entityPath">Topic path.</param>
 71        /// <param name="tokenProvider">Token provider which will generate security tokens for authorization.</param>
 72        /// <param name="transportType">Transport type.</param>
 73        /// <param name="retryPolicy">Retry policy for topic operations. Defaults to <see cref="RetryPolicy.Default"/></
 74        /// <remarks>Creates a new connection to the topic, which is opened during the first send operation.</remarks>
 75        public TopicClient(
 76            string endpoint,
 77            string entityPath,
 78            ITokenProvider tokenProvider,
 79            TransportType transportType = TransportType.Amqp,
 80            RetryPolicy retryPolicy = null)
 081            : this(new ServiceBusConnection(endpoint, transportType, retryPolicy) {TokenProvider = tokenProvider}, entit
 82        {
 083            this.OwnsConnection = true;
 084        }
 85
 86        /// <summary>
 87        /// Creates a new instance of the Topic client on a given <see cref="ServiceBusConnection"/>
 88        /// </summary>
 89        /// <param name="serviceBusConnection">Connection object to the service bus namespace.</param>
 90        /// <param name="entityPath">Topic path.</param>
 91        /// <param name="retryPolicy">Retry policy for topic operations. Defaults to <see cref="RetryPolicy.Default"/></
 92        public TopicClient(ServiceBusConnection serviceBusConnection, string entityPath, RetryPolicy retryPolicy)
 093            : base(nameof(TopicClient), entityPath, retryPolicy)
 94        {
 095            MessagingEventSource.Log.TopicClientCreateStart(serviceBusConnection?.Endpoint.Authority, entityPath);
 96
 097            if (string.IsNullOrWhiteSpace(entityPath))
 98            {
 099                throw Fx.Exception.ArgumentNullOrWhiteSpace(entityPath);
 100            }
 0101            this.ServiceBusConnection = serviceBusConnection ?? throw new ArgumentNullException(nameof(serviceBusConnect
 0102            this.syncLock = new object();
 0103            this.TopicName = entityPath;
 0104            this.OwnsConnection = false;
 0105            this.ServiceBusConnection.ThrowIfClosed();
 106
 0107            if (this.ServiceBusConnection.TokenProvider != null)
 108            {
 0109                this.CbsTokenProvider = new TokenProviderAdapter(this.ServiceBusConnection.TokenProvider, this.ServiceBu
 110            }
 111            else
 112            {
 0113                throw new ArgumentNullException($"{nameof(ServiceBusConnection)} doesn't have a valid token provider");
 114            }
 115
 0116            MessagingEventSource.Log.TopicClientCreateStop(serviceBusConnection.Endpoint.Authority, entityPath, this.Cli
 0117        }
 118
 119        /// <summary>
 120        /// Gets the name of the topic.
 121        /// </summary>
 0122        public string TopicName { get; }
 123
 124        /// <summary>
 125        /// Duration after which individual operations will timeout.
 126        /// </summary>
 127        public override TimeSpan OperationTimeout
 128        {
 0129            get => this.ServiceBusConnection.OperationTimeout;
 0130            set => this.ServiceBusConnection.OperationTimeout = value;
 131        }
 132
 133        /// <summary>
 134        /// Gets the name of the topic.
 135        /// </summary>
 0136        public override string Path => this.TopicName;
 137
 138        /// <summary>
 139        /// Connection object to the service bus namespace.
 140        /// </summary>
 0141        public override ServiceBusConnection ServiceBusConnection { get; }
 142
 143        internal MessageSender InnerSender
 144        {
 145            get
 146            {
 0147                if (this.innerSender == null)
 148                {
 0149                    lock (this.syncLock)
 150                    {
 0151                        if (this.innerSender == null)
 152                        {
 0153                            this.innerSender = new MessageSender(
 0154                                this.TopicName,
 0155                                null,
 0156                                MessagingEntityType.Topic,
 0157                                this.ServiceBusConnection,
 0158                                this.CbsTokenProvider,
 0159                                this.RetryPolicy);
 160                        }
 0161                    }
 162                }
 163
 0164                return this.innerSender;
 165            }
 166        }
 167
 0168        ICbsTokenProvider CbsTokenProvider { get; }
 169
 170        /// <summary>
 171        /// Sends a message to Service Bus.
 172        /// </summary>
 173        public Task SendAsync(Message message)
 174        {
 0175            return this.SendAsync(new[] { message });
 176        }
 177
 178        /// <summary>
 179        /// Sends a list of messages to Service Bus.
 180        /// When called on partitioned entities, messages meant for different partitions cannot be batched together.
 181        /// </summary>
 182        public Task SendAsync(IList<Message> messageList)
 183        {
 0184            this.ThrowIfClosed();
 0185            return this.InnerSender.SendAsync(messageList);
 186        }
 187
 188        /// <summary>
 189        /// Schedules a message to appear on Service Bus at a later time.
 190        /// </summary>
 191        /// <param name="message">The <see cref="Message"/> that needs to be scheduled.</param>
 192        /// <param name="scheduleEnqueueTimeUtc">The UTC time at which the message should be available for processing.</
 193        /// <returns>The sequence number of the message that was scheduled.</returns>
 194        public Task<long> ScheduleMessageAsync(Message message, DateTimeOffset scheduleEnqueueTimeUtc)
 195        {
 0196            this.ThrowIfClosed();
 0197            return this.InnerSender.ScheduleMessageAsync(message, scheduleEnqueueTimeUtc);
 198        }
 199
 200        /// <summary>
 201        /// Cancels a message that was scheduled.
 202        /// </summary>
 203        /// <param name="sequenceNumber">The <see cref="Message.SystemPropertiesCollection.SequenceNumber"/> of the mess
 204        public Task CancelScheduledMessageAsync(long sequenceNumber)
 205        {
 0206            this.ThrowIfClosed();
 0207            return this.InnerSender.CancelScheduledMessageAsync(sequenceNumber);
 208        }
 209
 210        /// <summary>
 211        /// Gets a list of currently registered plugins for this TopicClient.
 212        /// </summary>
 0213        public override IList<ServiceBusPlugin> RegisteredPlugins => this.InnerSender.RegisteredPlugins;
 214
 215        /// <summary>
 216        /// Registers a <see cref="ServiceBusPlugin"/> to be used with this topic client.
 217        /// </summary>
 218        public override void RegisterPlugin(ServiceBusPlugin serviceBusPlugin)
 219        {
 0220            this.ThrowIfClosed();
 0221            this.InnerSender.RegisterPlugin(serviceBusPlugin);
 0222        }
 223
 224        /// <summary>
 225        /// Unregisters a <see cref="ServiceBusPlugin"/>.
 226        /// </summary>
 227        /// <param name="serviceBusPluginName">The name <see cref="ServiceBusPlugin.Name"/> to be unregistered</param>
 228        public override void UnregisterPlugin(string serviceBusPluginName)
 229        {
 0230            this.ThrowIfClosed();
 0231            this.InnerSender.UnregisterPlugin(serviceBusPluginName);
 0232        }
 233
 234        protected override async Task OnClosingAsync()
 235        {
 0236            if (this.innerSender != null)
 237            {
 0238                await this.innerSender.CloseAsync().ConfigureAwait(false);
 239            }
 0240        }
 241    }
 242}