< Summary

Class:Azure.Messaging.ServiceBus.Amqp.AmqpRuleManager
Assembly:Azure.Messaging.ServiceBus
File(s):C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpRuleManager.cs
Covered lines:0
Uncovered lines:112
Coverable lines:112
Total lines:311
Line coverage:0% (0 of 112)
Covered branches:0
Total branches:20
Branch coverage:0% (0 of 20)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_IsClosed()-0%100%
.cctor()-0%100%
.ctor(...)-0%0%
AddRuleAsync()-0%100%
<AddRuleAsync()-0%100%
AddRuleInternalAsync()-0%0%
RemoveRuleAsync()-0%100%
<RemoveRuleAsync()-0%100%
RemoveRuleInternalAsync()-0%0%
GetRulesAsync()-0%100%
<GetRulesAsync()-0%100%
GetRulesInternalAsync()-0%0%
CloseAsync()-0%0%

File(s)

C:\Git\azure-sdk-for-net\sdk\servicebus\Azure.Messaging.ServiceBus\src\Amqp\AmqpRuleManager.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Threading;
 7using System.Threading.Tasks;
 8using Azure.Core;
 9using Azure.Messaging.ServiceBus.Amqp.Framing;
 10using Azure.Messaging.ServiceBus.Core;
 11using Azure.Messaging.ServiceBus.Management;
 12using Microsoft.Azure.Amqp;
 13using Microsoft.Azure.Amqp.Encoding;
 14
 15namespace Azure.Messaging.ServiceBus.Amqp
 16{
 17#pragma warning disable CA1001 // Types that own disposable fields should be disposable. AmqpRuleManager does not own co
 18    internal class AmqpRuleManager : TransportRuleManager
 19#pragma warning restore CA1001 // Types that own disposable fields should be disposable
 20    {
 21        /// <summary>
 22        /// The path of the Service Bus subscription to which the rule manager is bound.
 23        /// </summary>
 24        ///
 25        private readonly string _subscriptionPath;
 26
 27        /// <summary>
 28        /// The policy to use for determining retry behavior for when an operation fails.
 29        /// </summary>
 30        private readonly ServiceBusRetryPolicy _retryPolicy;
 31
 32        /// <summary>
 33        /// The identifier for the rule manager.
 34        /// </summary>
 35        private readonly string _identifier;
 36
 37        /// <summary>
 38        /// The AMQP connection scope responsible for managing transport constructs for this instance.
 39        /// </summary>
 40        ///
 41        private readonly AmqpConnectionScope _connectionScope;
 42
 43        /// <summary>
 44        /// Indicates whether or not this instance has been closed.
 45        /// </summary>
 46        private bool _closed = false;
 47
 48        /// <summary>
 49        /// Indicates whether or not this rule manager has been closed.
 50        /// </summary>
 51        ///
 52        /// <value>
 53        /// <c>true</c> if the rule manager is closed; otherwise, <c>false</c>.
 54        /// </value>
 055        public override bool IsClosed => _closed;
 56
 57        private readonly FaultTolerantAmqpObject<RequestResponseAmqpLink> _managementLink;
 58
 59        static AmqpRuleManager()
 60        {
 061            AmqpCodec.RegisterKnownTypes(AmqpTrueRuleFilterCodec.Name, AmqpTrueRuleFilterCodec.Code, () => new AmqpTrueR
 062            AmqpCodec.RegisterKnownTypes(AmqpFalseRuleFilterCodec.Name, AmqpFalseRuleFilterCodec.Code, () => new AmqpFal
 063            AmqpCodec.RegisterKnownTypes(AmqpCorrelationRuleFilterCodec.Name, AmqpCorrelationRuleFilterCodec.Code, () =>
 064            AmqpCodec.RegisterKnownTypes(AmqpSqlRuleFilterCodec.Name, AmqpSqlRuleFilterCodec.Code, () => new AmqpSqlRule
 065            AmqpCodec.RegisterKnownTypes(AmqpEmptyRuleActionCodec.Name, AmqpEmptyRuleActionCodec.Code, () => new AmqpEmp
 066            AmqpCodec.RegisterKnownTypes(AmqpSqlRuleActionCodec.Name, AmqpSqlRuleActionCodec.Code, () => new AmqpSqlRule
 067            AmqpCodec.RegisterKnownTypes(AmqpRuleDescriptionCodec.Name, AmqpRuleDescriptionCodec.Code, () => new AmqpRul
 068        }
 69
 70        /// <summary>
 71        /// Initializes a new instance of the <see cref="AmqpRuleManager"/> class.
 72        /// </summary>
 73        ///
 74        /// <param name="subscriptionPath">The path of the Service Bus subscription to which the rule manager is bound.<
 75        /// <param name="connectionScope">The AMQP connection context for operations.</param>
 76        /// <param name="retryPolicy">The retry policy to consider when an operation fails.</param>
 77        /// <param name="identifier">The identifier for the rule manager.</param>
 78        ///
 79        /// <remarks>
 80        /// As an internal type, this class performs only basic sanity checks against its arguments.  It
 81        /// is assumed that callers are trusted and have performed deep validation.
 82        ///
 83        /// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
 84        /// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
 85        /// caller.
 86        /// </remarks>
 087        public AmqpRuleManager(
 088            string subscriptionPath,
 089            AmqpConnectionScope connectionScope,
 090            ServiceBusRetryPolicy retryPolicy,
 091            string identifier)
 92        {
 093            Argument.AssertNotNullOrEmpty(subscriptionPath, nameof(subscriptionPath));
 094            Argument.AssertNotNull(connectionScope, nameof(connectionScope));
 095            Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
 96
 097            _subscriptionPath = subscriptionPath;
 098            _connectionScope = connectionScope;
 099            _retryPolicy = retryPolicy;
 0100            _identifier = identifier;
 101
 0102            _managementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
 0103                timeout => _connectionScope.OpenManagementLinkAsync(
 0104                    _subscriptionPath,
 0105                    _identifier,
 0106                    timeout,
 0107                    CancellationToken.None),
 0108                link =>
 0109                {
 0110                    link.Session?.SafeClose();
 0111                    link.SafeClose();
 0112                });
 0113        }
 114
 115        /// <summary>
 116        /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription.
 117        /// </summary>
 118        ///
 119        /// <param name="description">The rule description that provides the rule to add.</param>
 120        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 121        ///
 122        /// <remarks>
 123        /// You can add rules to the subscription that decides which messages from the topic should reach the subscripti
 124        /// A default <see cref="TrueRuleFilter"/> rule named <see cref="RuleProperties.DefaultRuleName"/> is always add
 125        /// You can add multiple rules with distinct names to the same subscription.
 126        /// Multiple filters combine with each other using logical OR condition. i.e., If any filter succeeds, the messa
 127        /// </remarks>
 128        ///
 129        /// <returns>A task instance that represents the asynchronous add rule operation.</returns>
 130        public override async Task AddRuleAsync(
 131            RuleProperties description,
 132            CancellationToken cancellationToken = default) =>
 0133            await _retryPolicy.RunOperation(
 0134                async (timeout) =>
 0135                await AddRuleInternalAsync(
 0136                    description,
 0137                    timeout).ConfigureAwait(false),
 0138                _connectionScope,
 0139                cancellationToken).ConfigureAwait(false);
 140
 141        /// <summary>
 142        /// Adds a rule to the current subscription to filter the messages reaching from topic to the subscription.
 143        /// </summary>
 144        ///
 145        /// <param name="description">The rule description that provides the rule to add.</param>
 146        /// <param name="timeout">The per-try timeout specified in the RetryOptions.</param>
 147        ///
 148        /// <returns>A task instance that represents the asynchronous add rule operation.</returns>
 149        private async Task AddRuleInternalAsync(
 150            RuleProperties description,
 151            TimeSpan timeout)
 152        {
 153            // Create an AmqpRequest Message to add rule
 0154            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(
 0155                     ManagementConstants.Operations.AddRuleOperation,
 0156                     timeout,
 0157                     null);
 0158            amqpRequestMessage.Map[ManagementConstants.Properties.RuleName] = description.Name;
 0159            amqpRequestMessage.Map[ManagementConstants.Properties.RuleDescription] = AmqpMessageConverter.GetRuleDescrip
 160
 0161            AmqpResponseMessage response = await ManagementUtilities.ExecuteRequestResponseAsync(
 0162                    _connectionScope,
 0163                    _managementLink,
 0164                    amqpRequestMessage,
 0165                    timeout).ConfigureAwait(false);
 166
 0167            if (response.StatusCode != AmqpResponseStatusCode.OK)
 168            {
 0169                throw response.ToMessagingContractException();
 170            }
 0171        }
 172
 173        /// <summary>
 174        /// Removes the rule on the subscription identified by <paramref name="ruleName" />.
 175        /// </summary>
 176        ///
 177        /// <param name="ruleName">Name of the rule</param>
 178        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 179        ///
 180        /// <returns>A task instance that represents the asynchronous remove rule operation.</returns>
 181        public override async Task RemoveRuleAsync(
 182            string ruleName,
 183            CancellationToken cancellationToken = default) =>
 0184            await _retryPolicy.RunOperation(
 0185                async (timeout) =>
 0186                await RemoveRuleInternalAsync(
 0187                    ruleName,
 0188                    timeout).ConfigureAwait(false),
 0189                _connectionScope,
 0190                cancellationToken).ConfigureAwait(false);
 191
 192        /// <summary>
 193        /// Removes the rule on the subscription identified by <paramref name="ruleName" />.
 194        /// </summary>
 195        ///
 196        /// <param name="ruleName">Name of the rule</param>
 197        /// <param name="timeout">The per-try timeout specified in the RetryOptions.</param>
 198        ///
 199        /// <returns>A task instance that represents the asynchronous remove rule operation.</returns>
 200        private async Task RemoveRuleInternalAsync(
 201            string ruleName,
 202            TimeSpan timeout)
 203        {
 204            // Create an AmqpRequest Message to remove rule
 0205            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(
 0206                        ManagementConstants.Operations.RemoveRuleOperation,
 0207                        timeout,
 0208                        null);
 0209            amqpRequestMessage.Map[ManagementConstants.Properties.RuleName] = ruleName;
 210
 0211            AmqpResponseMessage response = await ManagementUtilities.ExecuteRequestResponseAsync(
 0212                    _connectionScope,
 0213                    _managementLink,
 0214                    amqpRequestMessage,
 0215                    timeout).ConfigureAwait(false);
 216
 0217            if (response.StatusCode != AmqpResponseStatusCode.OK)
 218            {
 0219                throw response.ToMessagingContractException();
 220            }
 0221        }
 222
 223        /// <summary>
 224        /// Get all rules associated with the subscription.
 225        /// </summary>
 226        ///
 227        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 228        ///
 229        /// <returns>Returns a list of rules description</returns>
 230        public override async Task<IList<RuleProperties>> GetRulesAsync(CancellationToken cancellationToken = default)
 231        {
 0232            IList<RuleProperties> rulesDescription = null;
 233
 0234            await _retryPolicy.RunOperation(
 0235                async (timeout) =>
 0236                rulesDescription = await GetRulesInternalAsync(timeout).ConfigureAwait(false),
 0237                _connectionScope,
 0238                cancellationToken).ConfigureAwait(false);
 239
 0240            return rulesDescription;
 0241        }
 242
 243        /// <summary>
 244        /// Get all rules associated with the subscription.
 245        /// </summary>
 246        ///
 247        /// <param name="timeout">The per-try timeout specified in the RetryOptions.</param>
 248        ///
 249        /// <returns>Returns a list of rules description</returns>
 250        private async Task<IList<RuleProperties>> GetRulesInternalAsync(TimeSpan timeout)
 251        {
 0252            var amqpRequestMessage = AmqpRequestMessage.CreateRequest(
 0253                    ManagementConstants.Operations.EnumerateRulesOperation,
 0254                    timeout,
 0255                    null);
 0256            amqpRequestMessage.Map[ManagementConstants.Properties.Top] = int.MaxValue;
 0257            amqpRequestMessage.Map[ManagementConstants.Properties.Skip] = 0;
 258
 0259            var response = await ManagementUtilities.ExecuteRequestResponseAsync(
 0260                _connectionScope,
 0261                _managementLink,
 0262                amqpRequestMessage,
 0263                timeout).ConfigureAwait(false);
 0264            var ruleDescriptions = new List<RuleProperties>();
 0265            if (response.StatusCode == AmqpResponseStatusCode.OK)
 266            {
 0267                var ruleList = response.GetListValue<AmqpMap>(ManagementConstants.Properties.Rules);
 0268                foreach (var entry in ruleList)
 269                {
 0270                    var amqpRule = (AmqpRuleDescriptionCodec)entry[ManagementConstants.Properties.RuleDescription];
 0271                    var ruleDescription = AmqpMessageConverter.GetRuleDescription(amqpRule);
 0272                    ruleDescriptions.Add(ruleDescription);
 273                }
 274            }
 0275            else if (response.StatusCode == AmqpResponseStatusCode.NoContent)
 276            {
 277                // Do nothing. Return empty list;
 278            }
 279            else
 280            {
 0281                throw response.ToMessagingContractException();
 282            }
 283
 0284            return ruleDescriptions;
 0285        }
 286
 287        /// <summary>
 288        /// Closes the connection to the transport rule manager instance.
 289        /// </summary>
 290        ///
 291        /// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request t
 292        public override async Task CloseAsync(CancellationToken cancellationToken)
 293        {
 0294            if (_closed)
 295            {
 0296                return;
 297            }
 298
 0299            _closed = true;
 300
 0301            if (_managementLink?.TryGetOpenedObject(out var _) == true)
 302            {
 0303                cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
 0304                await _managementLink.CloseAsync().ConfigureAwait(false);
 305            }
 306
 0307            _managementLink?.Dispose();
 308
 0309        }
 310    }
 311}