< Summary

Class:Microsoft.Azure.ServiceBus.Amqp.AmqpSubscriptionClient
Assembly:Microsoft.Azure.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Amqp\AmqpSubscriptionClient.cs
Covered lines:0
Uncovered lines:94
Coverable lines:94
Total lines:201
Line coverage:0% (0 of 94)
Covered branches:0
Total branches:20
Branch coverage:0% (0 of 20)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.cctor()-0%100%
.ctor(...)-0%100%
get_InnerReceiver()-0%0%
get_PrefetchCount()-0%100%
set_PrefetchCount(...)-0%0%
get_ServiceBusConnection()-0%100%
get_RetryPolicy()-0%100%
get_CbsTokenProvider()-0%100%
get_ReceiveMode()-0%100%
get_Path()-0%100%
CloseAsync()-0%0%
OnAddRuleAsync()-0%0%
OnRemoveRuleAsync()-0%0%
OnGetRulesAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Microsoft.Azure.ServiceBus\src\Amqp\AmqpSubscriptionClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft. All rights reserved.
 2// Licensed under the MIT license. See LICENSE file in the project root for full license information.
 3
 4namespace Microsoft.Azure.ServiceBus.Amqp
 5{
 6    using System;
 7    using System.Collections.Generic;
 8    using System.Threading.Tasks;
 9    using Azure.Amqp;
 10    using Azure.Amqp.Encoding;
 11    using Core;
 12    using Framing;
 13    using Primitives;
 14
 15    internal sealed class AmqpSubscriptionClient : IInnerSubscriptionClient
 16    {
 17        int prefetchCount;
 18        readonly object syncLock;
 19        MessageReceiver innerReceiver;
 20
 21        static AmqpSubscriptionClient()
 22        {
 023            AmqpCodec.RegisterKnownTypes(AmqpTrueFilterCodec.Name, AmqpTrueFilterCodec.Code, () => new AmqpTrueFilterCod
 024            AmqpCodec.RegisterKnownTypes(AmqpFalseFilterCodec.Name, AmqpFalseFilterCodec.Code, () => new AmqpFalseFilter
 025            AmqpCodec.RegisterKnownTypes(AmqpCorrelationFilterCodec.Name, AmqpCorrelationFilterCodec.Code, () => new Amq
 026            AmqpCodec.RegisterKnownTypes(AmqpSqlFilterCodec.Name, AmqpSqlFilterCodec.Code, () => new AmqpSqlFilterCodec(
 027            AmqpCodec.RegisterKnownTypes(AmqpEmptyRuleActionCodec.Name, AmqpEmptyRuleActionCodec.Code, () => new AmqpEmp
 028            AmqpCodec.RegisterKnownTypes(AmqpSqlRuleActionCodec.Name, AmqpSqlRuleActionCodec.Code, () => new AmqpSqlRule
 029            AmqpCodec.RegisterKnownTypes(AmqpRuleDescriptionCodec.Name, AmqpRuleDescriptionCodec.Code, () => new AmqpRul
 030        }
 31
 032        public AmqpSubscriptionClient(
 033            string path,
 034            ServiceBusConnection servicebusConnection,
 035            RetryPolicy retryPolicy,
 036            ICbsTokenProvider cbsTokenProvider,
 037            int prefetchCount = 0,
 038            ReceiveMode mode = ReceiveMode.ReceiveAndDelete)
 39        {
 040            this.syncLock = new object();
 041            this.Path = path;
 042            this.ServiceBusConnection = servicebusConnection;
 043            this.RetryPolicy = retryPolicy;
 044            this.CbsTokenProvider = cbsTokenProvider;
 045            this.PrefetchCount = prefetchCount;
 046            this.ReceiveMode = mode;
 047        }
 48
 49        public MessageReceiver InnerReceiver
 50        {
 51            get
 52            {
 053                if (this.innerReceiver == null)
 54                {
 055                    lock (this.syncLock)
 56                    {
 057                        if (this.innerReceiver == null)
 58                        {
 059                            this.innerReceiver = new MessageReceiver(
 060                                this.Path,
 061                                MessagingEntityType.Subscriber,
 062                                this.ReceiveMode,
 063                                this.ServiceBusConnection,
 064                                this.CbsTokenProvider,
 065                                this.RetryPolicy,
 066                                this.PrefetchCount);
 67                        }
 068                    }
 69                }
 70
 071                return this.innerReceiver;
 72            }
 73        }
 74
 75        /// <summary>
 76        /// Gets or sets the number of messages that the subscription client can simultaneously request.
 77        /// </summary>
 78        /// <value>The number of messages that the subscription client can simultaneously request.</value>
 79        public int PrefetchCount
 80        {
 081            get => this.prefetchCount;
 82            set
 83            {
 084                if (value < 0)
 85                {
 086                    throw Fx.Exception.ArgumentOutOfRange(nameof(this.PrefetchCount), value, "Value cannot be less than 
 87                }
 088                this.prefetchCount = value;
 089                if (this.innerReceiver != null)
 90                {
 091                    this.innerReceiver.PrefetchCount = value;
 92                }
 093            }
 94        }
 95
 096        ServiceBusConnection ServiceBusConnection { get; }
 97
 098        RetryPolicy RetryPolicy { get; }
 99
 0100        ICbsTokenProvider CbsTokenProvider { get; }
 101
 0102        ReceiveMode ReceiveMode { get; }
 103
 0104        string Path { get; }
 105
 106        public Task CloseAsync()
 107        {
 0108            return this.innerReceiver?.CloseAsync();
 109        }
 110
 111        public async Task OnAddRuleAsync(RuleDescription description)
 112        {
 113            try
 114            {
 0115                var amqpRequestMessage = AmqpRequestMessage.CreateRequest(
 0116                    ManagementConstants.Operations.AddRuleOperation,
 0117                    this.ServiceBusConnection.OperationTimeout,
 0118                    null);
 0119                amqpRequestMessage.Map[ManagementConstants.Properties.RuleName] = description.Name;
 0120                amqpRequestMessage.Map[ManagementConstants.Properties.RuleDescription] =
 0121                    AmqpMessageConverter.GetRuleDescriptionMap(description);
 122
 0123                var response = await this.InnerReceiver.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(f
 124
 0125                if (response.StatusCode != AmqpResponseStatusCode.OK)
 126                {
 0127                    throw response.ToMessagingContractException();
 128                }
 0129            }
 130            catch (Exception exception)
 131            {
 0132                throw AmqpExceptionHelper.GetClientException(exception);
 133            }
 0134        }
 135
 136        public async Task OnRemoveRuleAsync(string ruleName)
 137        {
 138            try
 139            {
 0140                var amqpRequestMessage =
 0141                    AmqpRequestMessage.CreateRequest(
 0142                        ManagementConstants.Operations.RemoveRuleOperation,
 0143                        this.ServiceBusConnection.OperationTimeout,
 0144                        null);
 0145                amqpRequestMessage.Map[ManagementConstants.Properties.RuleName] = ruleName;
 146
 0147                var response = await this.InnerReceiver.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(f
 148
 0149                if (response.StatusCode != AmqpResponseStatusCode.OK)
 150                {
 0151                    throw response.ToMessagingContractException();
 152                }
 0153            }
 154            catch (Exception exception)
 155            {
 0156                throw AmqpExceptionHelper.GetClientException(exception);
 157            }
 0158        }
 159
 160        public async Task<IList<RuleDescription>> OnGetRulesAsync(int top, int skip)
 161        {
 162            try
 163            {
 0164                var amqpRequestMessage =
 0165                    AmqpRequestMessage.CreateRequest(
 0166                        ManagementConstants.Operations.EnumerateRulesOperation,
 0167                        this.ServiceBusConnection.OperationTimeout,
 0168                        null);
 0169                amqpRequestMessage.Map[ManagementConstants.Properties.Top] = top;
 0170                amqpRequestMessage.Map[ManagementConstants.Properties.Skip] = skip;
 171
 0172                var response = await this.InnerReceiver.ExecuteRequestResponseAsync(amqpRequestMessage).ConfigureAwait(f
 0173                var ruleDescriptions = new List<RuleDescription>();
 0174                if (response.StatusCode == AmqpResponseStatusCode.OK)
 175                {
 0176                    var ruleList = response.GetListValue<AmqpMap>(ManagementConstants.Properties.Rules);
 0177                    foreach (var entry in ruleList)
 178                    {
 0179                        var amqpRule = (AmqpRuleDescriptionCodec)entry[ManagementConstants.Properties.RuleDescription];
 0180                        var ruleDescription = AmqpMessageConverter.GetRuleDescription(amqpRule);
 0181                        ruleDescriptions.Add(ruleDescription);
 182                    }
 183                }
 0184                else if (response.StatusCode == AmqpResponseStatusCode.NoContent)
 185                {
 186                    // Do nothing. Return empty list;
 187                }
 188                else
 189                {
 0190                    throw response.ToMessagingContractException();
 191                }
 192
 0193                return ruleDescriptions;
 194            }
 195            catch (Exception exception)
 196            {
 0197                throw AmqpExceptionHelper.GetClientException(exception);
 198            }
 0199        }
 200    }
 201}