< Summary

Class:Azure.Messaging.EventHubs.Amqp.AmqpEventBatch
Assembly:Azure.Messaging.EventHubs
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpEventBatch.cs
Covered lines:43
Uncovered lines:0
Coverable lines:43
Total lines:187
Line coverage:100% (43 of 43)
Covered branches:7
Total branches:8
Branch coverage:87.5% (7 of 8)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_MaximumSizeInBytes()-100%100%
get_SizeInBytes()-100%100%
get_Count()-100%100%
get_MessageConverter()-100%100%
get_Options()-100%100%
get_BatchEvents()-100%100%
.ctor(...)-100%100%
TryAdd(...)-100%83.33%
Clear()-100%100%
AsEnumerable()-100%100%
Dispose()-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs\src\Amqp\AmqpEventBatch.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.Linq;
 8using Azure.Core;
 9using Azure.Messaging.EventHubs.Core;
 10using Azure.Messaging.EventHubs.Producer;
 11using Microsoft.Azure.Amqp;
 12
 13namespace Azure.Messaging.EventHubs.Amqp
 14{
 15    /// <summary>
 16    ///   A set of events with known size constraints, based on messages to be sent
 17    ///   using an AMQP-based transport.
 18    /// </summary>
 19    ///
 20    internal class AmqpEventBatch : TransportEventBatch
 21    {
 22        /// <summary>The amount of bytes to reserve as overhead for a small message.</summary>
 23        private const byte OverheadBytesSmallMessage = 5;
 24
 25        /// <summary>The amount of bytes to reserve as overhead for a large message.</summary>
 26        private const byte OverheadBytesLargeMessage = 8;
 27
 28        /// <summary>The maximum number of bytes that a message may be to be considered small.</summary>
 29        private const byte MaximumBytesSmallMessage = 255;
 30
 31        /// <summary>The size of the batch, in bytes, to reserve for the AMQP message overhead.</summary>
 32        private readonly long ReservedSize;
 33
 34        /// <summary>A flag that indicates whether or not the instance has been disposed.</summary>
 35        private volatile bool _disposed = false;
 36
 37        /// <summary>The size of the batch, in bytes, as it will be sent via the AMQP transport.</summary>
 38        private long _sizeBytes = 0;
 39
 40        /// <summary>
 41        ///   The maximum size allowed for the batch, in bytes.  This includes the events in the batch as
 42        ///   well as any overhead for the batch itself when sent to the Event Hubs service.
 43        /// </summary>
 44        ///
 7645        public override long MaximumSizeInBytes { get; }
 46
 47        /// <summary>
 48        ///   The size of the batch, in bytes, as it will be sent to the Event Hubs
 49        ///   service.
 50        /// </summary>
 51        ///
 652        public override long SizeInBytes => _sizeBytes;
 53
 54        /// <summary>
 55        ///   The count of events contained in the batch.
 56        /// </summary>
 57        ///
 858        public override int Count => BatchEvents.Count;
 59
 60        /// <summary>
 61        ///   The converter to use for translating <see cref="EventData" /> into the corresponding AMQP message.
 62        /// </summary>
 63        ///
 7464        private AmqpMessageConverter MessageConverter { get; }
 65
 66        /// <summary>
 67        ///   The set of options to apply to the batch.
 68        /// </summary>
 69        ///
 7670        private CreateBatchOptions Options { get; }
 71
 72        /// <summary>
 73        ///   The set of events that have been added to the batch.
 74        /// </summary>
 75        ///
 19676        private List<EventData> BatchEvents { get; } = new List<EventData>();
 77
 78        /// <summary>
 79        ///   Initializes a new instance of the <see cref="AmqpEventBatch"/> class.
 80        /// </summary>
 81        ///
 82        /// <param name="messageConverter">The converter to use for translating <see cref="EventData" /> into the corres
 83        /// <param name="options">The set of options to apply to the batch.</param>
 84        ///
 6285        public AmqpEventBatch(AmqpMessageConverter messageConverter,
 6286                              CreateBatchOptions options)
 87        {
 6288            Argument.AssertNotNull(messageConverter, nameof(messageConverter));
 6089            Argument.AssertNotNull(options, nameof(options));
 5890            Argument.AssertNotNull(options.MaximumSizeInBytes, nameof(options.MaximumSizeInBytes));
 91
 5692            MessageConverter = messageConverter;
 5693            Options = options;
 5694            MaximumSizeInBytes = options.MaximumSizeInBytes.Value;
 95
 96            // Initialize the size by reserving space for the batch envelope.
 97
 5698            using AmqpMessage envelope = messageConverter.CreateBatchFromEvents(Enumerable.Empty<EventData>(), options.P
 5699            ReservedSize = envelope.SerializedMessageSize;
 56100            _sizeBytes = ReservedSize;
 101
 112102        }
 103
 104        /// <summary>
 105        ///   Attempts to add an event to the batch, ensuring that the size
 106        ///   of the batch does not exceed its maximum.
 107        /// </summary>
 108        ///
 109        /// <param name="eventData">The event to attempt to add to the batch.</param>
 110        ///
 111        /// <returns><c>true</c> if the event was added; otherwise, <c>false</c>.</returns>
 112        ///
 113        public override bool TryAdd(EventData eventData)
 114        {
 78115            Argument.AssertNotNull(eventData, nameof(eventData));
 76116            Argument.AssertNotDisposed(_disposed, nameof(EventDataBatch));
 117
 74118            AmqpMessage eventMessage = MessageConverter.CreateMessageFromEvent(eventData, Options.PartitionKey);
 119
 120            try
 121            {
 122                // Calculate the size for the event, based on the AMQP message size and accounting for a
 123                // bit of reserved overhead size.
 124
 74125                var size = _sizeBytes
 74126                    + eventMessage.SerializedMessageSize
 74127                    + (eventMessage.SerializedMessageSize <= MaximumBytesSmallMessage
 74128                        ? OverheadBytesSmallMessage
 74129                        : OverheadBytesLargeMessage);
 130
 74131                if (size > MaximumSizeInBytes)
 132                {
 4133                    return false;
 134                }
 135
 70136                _sizeBytes = size;
 70137                BatchEvents.Add(eventData);
 138
 70139                return true;
 140            }
 141            finally
 142            {
 74143                eventMessage?.Dispose();
 74144            }
 74145        }
 146
 147        /// <summary>
 148        ///   Clears the batch, removing all events and resetting the
 149        ///   available size.
 150        /// </summary>
 151        ///
 152        public override void Clear()
 153        {
 42154            BatchEvents.Clear();
 42155            _sizeBytes = ReservedSize;
 42156        }
 157
 158        /// <summary>
 159        ///   Represents the batch as an enumerable set of transport-specific
 160        ///   representations of an event.
 161        /// </summary>
 162        ///
 163        /// <typeparam name="T">The transport-specific event representation being requested.</typeparam>
 164        ///
 165        /// <returns>The set of events as an enumerable of the requested type.</returns>
 166        ///
 167        public override IEnumerable<T> AsEnumerable<T>()
 168        {
 16169            if (typeof(T) != typeof(EventData))
 170            {
 2171                throw new FormatException(string.Format(CultureInfo.CurrentCulture, Resources.UnsupportedTransportEventT
 172            }
 173
 14174            return (IEnumerable<T>)BatchEvents;
 175        }
 176
 177        /// <summary>
 178        ///   Performs the task needed to clean up resources used by the <see cref="AmqpEventBatch" />.
 179        /// </summary>
 180        ///
 181        public override void Dispose()
 182        {
 38183            _disposed = true;
 38184            Clear();
 38185        }
 186    }
 187}