< Summary

Class:Azure.Messaging.EventGrid.EventGridPublisherClient
Assembly:Azure.Messaging.EventGrid
File(s):C:\Git\azure-sdk-for-net\sdk\eventgrid\Azure.Messaging.EventGrid\src\Customization\EventGridPublisherClient.cs
Covered lines:132
Uncovered lines:13
Coverable lines:145
Total lines:339
Line coverage:91% (132 of 145)
Covered branches:26
Total branches:28
Branch coverage:92.8% (26 of 28)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get__hostName()-100%100%
.ctor()-100%100%
.ctor(...)-0%100%
.ctor(...)-0%100%
.ctor(...)-100%75%
.ctor(...)-100%75%
SendEventsAsync()-100%100%
SendEvents(...)-100%100%
PublishEventsInternal()-91.18%100%
SendEventsAsync()-100%100%
SendEvents(...)-100%100%
PublishCloudEventsInternal()-93.02%100%
SendEventsAsync()-100%100%
SendEvents(...)-100%100%
PublishCustomEventsInternal()-86.36%100%
BuildSharedAccessSignature(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventgrid\Azure.Messaging.EventGrid\src\Customization\EventGridPublisherClient.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Globalization;
 7using System.IO;
 8using System.Linq;
 9using System.Security.Cryptography;
 10using System.Text;
 11using System.Text.Json;
 12using System.Threading;
 13using System.Threading.Tasks;
 14using System.Web;
 15using Azure.Core;
 16using Azure.Core.Pipeline;
 17using Azure.Core.Serialization;
 18using Azure.Messaging.EventGrid.Models;
 19
 20namespace Azure.Messaging.EventGrid
 21{
 22    /// <summary>
 23    /// Client used to interact with the Event Grid service.
 24    /// </summary>
 25    public class EventGridPublisherClient
 26    {
 27        private readonly ServiceRestClient _serviceRestClient;
 28        private readonly ClientDiagnostics _clientDiagnostics;
 4429        private string _hostName => _endpoint.Host;
 30        private readonly Uri _endpoint;
 31        private readonly AzureKeyCredential _key;
 32        private readonly string _apiVersion;
 33        private readonly ObjectSerializer _dataSerializer;
 34
 35        /// <summary>Initalizes an instance of EventGridClient.</summary>
 4436        protected EventGridPublisherClient()
 37        {
 4438        }
 39
 40        /// <summary>Initalizes an instance of EventGridClient.</summary>
 41        /// <param name="endpoint">Topic endpoint. For example, "https://TOPIC-NAME.REGION-NAME-1.eventgrid.azure.net/ap
 42        /// <param name="credential">Credential used to connect to Azure.</param>
 43        public EventGridPublisherClient(Uri endpoint, AzureKeyCredential credential)
 044            : this(endpoint, credential, new EventGridPublisherClientOptions())
 45        {
 046        }
 47
 48        /// <summary>Initalizes an instance of EventGridClient.</summary>
 49        /// <param name="endpoint">Topic endpoint. For example, "https://TOPIC-NAME.REGION-NAME-1.eventgrid.azure.net/ap
 50        /// <param name="credential">Credential used to connect to Azure.</param>
 51        public EventGridPublisherClient(Uri endpoint, EventGridSharedAccessSignatureCredential credential)
 052            : this(endpoint, credential, new EventGridPublisherClientOptions())
 53        {
 054        }
 55
 56        /// <summary>Initalizes an instance of the<see cref="EventGridPublisherClient"/> class.</summary>
 57        /// <param name="endpoint">Topic endpoint. For example, "https://TOPIC-NAME.REGION-NAME-1.eventgrid.azure.net/ap
 58        /// <param name="credential">Credential used to connect to Azure.</param>
 59        /// <param name="options">Configuring options.</param>
 4060        public EventGridPublisherClient(Uri endpoint, AzureKeyCredential credential, EventGridPublisherClientOptions opt
 61        {
 4062            Argument.AssertNotNull(credential, nameof(credential));
 4063            options ??= new EventGridPublisherClientOptions();
 4064            _dataSerializer = options.DataSerializer ?? new JsonObjectSerializer();
 4065            _apiVersion = options.Version.GetVersionString();
 4066            _endpoint = endpoint;
 4067            _key = credential;
 4068            HttpPipeline pipeline = HttpPipelineBuilder.Build(options, new AzureKeyCredentialPolicy(credential, Constant
 4069            _serviceRestClient = new ServiceRestClient(new ClientDiagnostics(options), pipeline, options.Version.GetVers
 4070            _clientDiagnostics = new ClientDiagnostics(options);
 4071        }
 72
 73        /// <summary>
 74        /// Initializes a new instance of the <see cref="EventGridPublisherClient"/> class.
 75        /// </summary>
 76        /// <param name="endpoint">Topic endpoint. For example, "https://TOPIC-NAME.REGION-NAME-1.eventgrid.azure.net/ap
 77        /// <param name="credential">Credential used to connect to Azure.</param>
 78        /// <param name="options">Configuring options.</param>
 479        public EventGridPublisherClient(Uri endpoint, EventGridSharedAccessSignatureCredential credential, EventGridPubl
 80        {
 481            Argument.AssertNotNull(credential, nameof(credential));
 482            options ??= new EventGridPublisherClientOptions();
 483            _dataSerializer = options.DataSerializer ?? new JsonObjectSerializer();
 484            _endpoint = endpoint;
 485            HttpPipeline pipeline = HttpPipelineBuilder.Build(options, new EventGridSharedAccessSignatureCredentialPolic
 486            _serviceRestClient = new ServiceRestClient(new ClientDiagnostics(options), pipeline, options.Version.GetVers
 487            _clientDiagnostics = new ClientDiagnostics(options);
 488        }
 89
 90        /// <summary> Publishes a batch of EventGridEvents to an Azure Event Grid topic. </summary>
 91        /// <param name="events"> An array of events to be published to Event Grid. </param>
 92        /// <param name="cancellationToken"> The cancellation token to use. </param>
 93        public virtual async Task<Response> SendEventsAsync(IEnumerable<EventGridEvent> events, CancellationToken cancel
 894            => await PublishEventsInternal(events, true /*async*/, cancellationToken).ConfigureAwait(false);
 95
 96        /// <summary> Publishes a batch of EventGridEvents to an Azure Event Grid topic. </summary>
 97        /// <param name="events"> An array of events to be published to Event Grid. </param>
 98        /// <param name="cancellationToken"> The cancellation token to use. </param>
 99        public virtual Response SendEvents(IEnumerable<EventGridEvent> events, CancellationToken cancellationToken = def
 8100            => PublishEventsInternal(events, false /*async*/, cancellationToken).EnsureCompleted();
 101
 102        /// <summary> Publishes a batch of EventGridEvents to an Azure Event Grid topic. </summary>
 103        /// <param name="events"> An array of events to be published to Event Grid. </param>
 104        /// <param name="async">Whether to invoke the operation asynchronously.</param>
 105        /// <param name="cancellationToken"> The cancellation token to use. </param>
 106        private async Task<Response> PublishEventsInternal(IEnumerable<EventGridEvent> events, bool async, CancellationT
 107        {
 16108            using DiagnosticScope scope = _clientDiagnostics.CreateScope($"{nameof(EventGridPublisherClient)}.{nameof(Se
 16109            scope.Start();
 110
 111            try
 112            {
 113                // List of events cannot be null
 16114                Argument.AssertNotNull(events, nameof(events));
 115
 16116                List<EventGridEventInternal> eventsWithSerializedPayloads = new List<EventGridEventInternal>();
 352117                foreach (EventGridEvent egEvent in events)
 118                {
 119                    // Individual events cannot be null
 160120                    Argument.AssertNotNull(egEvent, nameof(egEvent));
 121
 160122                    MemoryStream stream = new MemoryStream();
 160123                    _dataSerializer.Serialize(stream, egEvent.Data, egEvent.Data.GetType(), cancellationToken);
 160124                    stream.Position = 0;
 160125                    JsonDocument data = JsonDocument.Parse(stream);
 126
 160127                    EventGridEventInternal newEGEvent = new EventGridEventInternal(
 160128                            egEvent.Id,
 160129                            egEvent.Subject,
 160130                            data.RootElement,
 160131                            egEvent.EventType,
 160132                            egEvent.EventTime,
 160133                            egEvent.DataVersion)
 160134                    {
 160135                        Topic = egEvent.Topic
 160136                    };
 137
 160138                    eventsWithSerializedPayloads.Add(newEGEvent);
 139                }
 16140                if (async)
 141                {
 142                    // Publish asynchronously if called via an async path
 8143                    return await _serviceRestClient.PublishEventsAsync(
 8144                        _hostName,
 8145                        eventsWithSerializedPayloads,
 8146                        cancellationToken).ConfigureAwait(false);
 147                }
 148                else
 149                {
 8150                    return _serviceRestClient.PublishEvents(
 8151                        _hostName,
 8152                        eventsWithSerializedPayloads,
 8153                        cancellationToken);
 154                }
 155            }
 0156            catch (Exception e)
 157            {
 0158                scope.Failed(e);
 0159                throw;
 160            }
 16161        }
 162
 163        /// <summary> Publishes a batch of CloudEvents to an Azure Event Grid topic. </summary>
 164        /// <param name="events"> An array of events to be published to Event Grid. </param>
 165        /// <param name="cancellationToken"> The cancellation token to use. </param>
 166        public virtual async Task<Response> SendEventsAsync(IEnumerable<CloudEvent> events, CancellationToken cancellati
 10167            => await PublishCloudEventsInternal(events, true /*async*/, cancellationToken).ConfigureAwait(false);
 168
 169        /// <summary> Publishes a batch of CloudEvents to an Azure Event Grid topic. </summary>
 170        /// <param name="events"> An array of events to be published to Event Grid. </param>
 171        /// <param name="cancellationToken"> The cancellation token to use. </param>
 172        public virtual Response SendEvents(IEnumerable<CloudEvent> events, CancellationToken cancellationToken = default
 10173            => PublishCloudEventsInternal(events, false /*async*/, cancellationToken).EnsureCompleted();
 174
 175        /// <summary> Publishes a batch of CloudEvents to an Azure Event Grid topic. </summary>
 176        /// <param name="events"> An array of events to be published to Event Grid. </param>
 177        /// <param name="async">Whether to invoke the operation asynchronously.</param>
 178        /// <param name="cancellationToken"> The cancellation token to use. </param>
 179        private async Task<Response> PublishCloudEventsInternal(IEnumerable<CloudEvent> events, bool async, Cancellation
 180        {
 20181            using DiagnosticScope scope = _clientDiagnostics.CreateScope($"{nameof(EventGridPublisherClient)}.{nameof(Se
 20182            scope.Start();
 183
 184            try
 185            {
 186                // List of events cannot be null
 20187                Argument.AssertNotNull(events, nameof(events));
 188
 20189                List<CloudEventInternal> eventsWithSerializedPayloads = new List<CloudEventInternal>();
 480190                foreach (CloudEvent cloudEvent in events)
 191                {
 192                    // Individual events cannot be null
 220193                    Argument.AssertNotNull(cloudEvent, nameof(cloudEvent));
 194
 220195                    CloudEventInternal newCloudEvent = new CloudEventInternal(
 220196                        cloudEvent.Id,
 220197                        cloudEvent.Source,
 220198                        cloudEvent.Type,
 220199                        "1.0")
 220200                    {
 220201                        Time = cloudEvent.Time,
 220202                        Datacontenttype = cloudEvent.DataContentType,
 220203                        Dataschema = cloudEvent.DataSchema,
 220204                        Subject = cloudEvent.Subject
 220205                    };
 206
 600207                    foreach (KeyValuePair<string, object> kvp in cloudEvent.ExtensionAttributes)
 208                    {
 80209                        newCloudEvent.Add(kvp.Key, new CustomModelSerializer(kvp.Value, _dataSerializer, cancellationTok
 210                    }
 211
 212                    // The 'Data' property is optional for CloudEvents
 220213                    if (cloudEvent.Data != null)
 214                    {
 180215                        if (cloudEvent.Data is IEnumerable<byte> enumerable)
 216                        {
 40217                            newCloudEvent.DataBase64 = Convert.ToBase64String(enumerable.ToArray());
 218                        }
 140219                        else if (cloudEvent.Data is ReadOnlyMemory<byte> memory)
 220                        {
 20221                            newCloudEvent.DataBase64 = Convert.ToBase64String(memory.ToArray());
 222                        }
 223                        else
 224                        {
 120225                            MemoryStream stream = new MemoryStream();
 120226                            _dataSerializer.Serialize(stream, cloudEvent.Data, cloudEvent.Data.GetType(), cancellationTo
 120227                            stream.Position = 0;
 120228                            JsonDocument data = JsonDocument.Parse(stream);
 120229                            newCloudEvent.Data = data.RootElement;
 230                        }
 231                    }
 220232                    eventsWithSerializedPayloads.Add(newCloudEvent);
 233                }
 20234                if (async)
 235                {
 236                    // Publish asynchronously if called via an async path
 10237                    return await _serviceRestClient.PublishCloudEventEventsAsync(
 10238                        _hostName,
 10239                        eventsWithSerializedPayloads,
 10240                        cancellationToken).ConfigureAwait(false);
 241                }
 242                else
 243                {
 10244                    return _serviceRestClient.PublishCloudEventEvents(
 10245                        _hostName,
 10246                        eventsWithSerializedPayloads,
 10247                        cancellationToken);
 248                }
 249            }
 0250            catch (Exception e)
 251            {
 0252                scope.Failed(e);
 0253                throw;
 254            }
 20255        }
 256
 257        /// <summary> Publishes a batch of custom events to an Azure Event Grid topic. </summary>
 258        /// <param name="events"> An array of events to be published to Event Grid. </param>
 259        /// <param name="cancellationToken"> The cancellation token to use. </param>
 260        public virtual async Task<Response> SendEventsAsync(IEnumerable<object> events, CancellationToken cancellationTo
 4261            => await PublishCustomEventsInternal(events, true /*async*/, cancellationToken).ConfigureAwait(false);
 262
 263        /// <summary> Publishes a batch of custom events to an Azure Event Grid topic. </summary>
 264        /// <param name="events"> An array of events to be published to Event Grid. </param>
 265        /// <param name="cancellationToken"> The cancellation token to use. </param>
 266        public virtual Response SendEvents(IEnumerable<object> events, CancellationToken cancellationToken = default)
 4267            => PublishCustomEventsInternal(events, false /*async*/, cancellationToken).EnsureCompleted();
 268
 269        private async Task<Response> PublishCustomEventsInternal(IEnumerable<object> events, bool async, CancellationTok
 270        {
 8271            using DiagnosticScope scope = _clientDiagnostics.CreateScope($"{nameof(EventGridPublisherClient)}.{nameof(Se
 8272            scope.Start();
 273
 274            try
 275            {
 8276                List<CustomModelSerializer> serializedEvents = new List<CustomModelSerializer>();
 176277                foreach (object customEvent in events)
 278                {
 80279                    serializedEvents.Add(
 80280                        new CustomModelSerializer(
 80281                            customEvent,
 80282                            _dataSerializer,
 80283                            cancellationToken));
 284                }
 8285                if (async)
 286                {
 4287                    return await _serviceRestClient.PublishCustomEventEventsAsync(
 4288                    _hostName,
 4289                    serializedEvents,
 4290                    cancellationToken).ConfigureAwait(false);
 291                }
 292                else
 293                {
 4294                    return _serviceRestClient.PublishCustomEventEvents(
 4295                    _hostName,
 4296                    serializedEvents,
 4297                    cancellationToken);
 298                }
 299            }
 0300            catch (Exception e)
 301            {
 0302                scope.Failed(e);
 0303                throw;
 304            }
 8305        }
 306
 307        /// <summary>
 308        /// Creates a SAS token for use with Event Grid service.
 309        /// </summary>
 310        /// <param name="endpoint">The path for the event grid topic to which you're sending events. For example, "https
 311        /// <param name="expirationUtc">Time at which the SAS token becomes invalid for authentication.</param>
 312        /// <param name="key">Key credential used to generate the token.</param>
 313        /// <param name="apiVersion">Service version to use when handling requests made with the SAS token.</param>
 314        /// <returns>Returns the generated SAS token string.</returns>
 315        public static string BuildSharedAccessSignature(Uri endpoint, DateTimeOffset expirationUtc, AzureKeyCredential k
 316        {
 317            const char Resource = 'r';
 318            const char Expiration = 'e';
 319            const char Signature = 's';
 320
 4321            var uriBuilder = new RequestUriBuilder();
 4322            uriBuilder.Reset(endpoint);
 4323            uriBuilder.AppendQuery("api-version", apiVersion.GetVersionString(), true);
 4324            string encodedResource = HttpUtility.UrlEncode(uriBuilder.ToString());
 4325            var culture = CultureInfo.CreateSpecificCulture("en-US");
 4326            var encodedExpirationUtc = HttpUtility.UrlEncode(expirationUtc.ToString(culture));
 327
 4328            string unsignedSas = $"{Resource}={encodedResource}&{Expiration}={encodedExpirationUtc}";
 4329            using (var hmac = new HMACSHA256(Convert.FromBase64String(key.Key)))
 330            {
 4331                string signature = Convert.ToBase64String(hmac.ComputeHash(Encoding.UTF8.GetBytes(unsignedSas)));
 4332                string encodedSignature = HttpUtility.UrlEncode(signature);
 4333                string signedSas = $"{unsignedSas}&{Signature}={encodedSignature}";
 334
 4335                return signedSas;
 336            }
 4337        }
 338    }
 339}