< Summary

Class:Azure.Messaging.EventHubs.Processor.Samples.Sample10_RunningWithDifferentStorageVersion
Assembly:Azure.Messaging.EventHubs.Processor.Samples
File(s):C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample10_RunningWithDifferentStorageVersion.cs
Covered lines:0
Uncovered lines:54
Coverable lines:54
Total lines:214
Line coverage:0% (0 of 54)
Covered branches:0
Total branches:24
Branch coverage:0% (0 of 24)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
get_Name()-0%100%
get_Description()-0%100%
RunAsync()-0%0%
get_Version()-0%100%
OnSendingRequest(...)-0%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\eventhub\Azure.Messaging.EventHubs.Processor\samples\Sample10_RunningWithDifferentStorageVersion.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Text;
 7using System.Threading;
 8using System.Threading.Tasks;
 9using Azure.Core;
 10using Azure.Core.Pipeline;
 11using Azure.Messaging.EventHubs.Consumer;
 12using Azure.Messaging.EventHubs.Processor.Samples.Infrastructure;
 13using Azure.Messaging.EventHubs.Producer;
 14using Azure.Storage.Blobs;
 15
 16/*
 17 * The following sample can be used if the environment you are targeting supports a different version of Storage Blobs S
 18 * typically available on Azure. For example, if you are running Event Hubs on an Azure Stack Hub version 2002, the high
 19 * version for the Storage service is version 2017-11-09. In this case, you will need to use the following code to chang
 20 * service API version to 2017-11-09. For more information on the Azure Storage service versions supported on Azure Stac
 21 * refer to <see href="http://docs.microsoft.com/en-us/azure-stack/user/azure-stack-acs-differences"/>.
 22 */
 23
 24namespace Azure.Messaging.EventHubs.Processor.Samples
 25{
 26    /// <summary>
 27    ///   An example of running the Event Processor with a different version of the Azure Storage service.
 28    /// </summary>
 29    ///
 30    public class Sample10_RunningWithDifferentStorageVersion : IEventHubsBlobCheckpointSample
 31    {
 32        /// <summary>
 33        ///   The name of the sample.
 34        /// </summary>
 35        ///
 036        public string Name => nameof(Sample10_RunningWithDifferentStorageVersion);
 37
 38        /// <summary>
 39        ///   A short description of the sample.
 40        /// </summary>
 41        ///
 042        public string Description => "An example of running the Event Processor with a different version of the Azure St
 43
 44        /// <summary>
 45        ///   Runs the sample using the specified Event Hubs and Azure storage connection information.
 46        /// </summary>
 47        ///
 48        /// <param name="eventHubsConnectionString">The connection string for the Event Hubs namespace that the sample s
 49        /// <param name="eventHubName">The name of the Event Hub, sometimes known as its path, that the sample should ru
 50        /// <param name="blobStorageConnectionString">The connection string for the storage account where checkpoints an
 51        /// <param name="blobContainerName">The name of the blob storage container where checkpoints and state should be
 52        ///
 53        public async Task RunAsync(string eventHubsConnectionString,
 54                                   string eventHubName,
 55                                   string blobStorageConnectionString,
 56                                   string blobContainerName)
 57        {
 58            // Sometimes the environment you are targeting uses a service version that's different from the one set by d
 59            // SDK.  The SDK targets the Azure cloud by default, and you may need to manually specify a different versio
 60            // are trying to run in another platform.
 61            //
 62            // For instance, this will happen when you try to run the Event Processor Client on an Azure Stack Hub versi
 63            // processor makes use of the Azure Storage SDK, which sets a default Azure Storage service version that's d
 64            // the ones expected by the Azure Stack Hub.  This sample illustrates how to work around this issue by using
 65            // policy to request the Azure Storage SDK to change its service version.
 66            //
 67            // In order to do this, create a Blob Client Options instance and add a Storage Api Version Policy to it.  T
 68            // will be passed to the Blob Container Client used by the processor.  The implementation of the policy is p
 69            // the end of this sample.
 70
 071            string consumerGroup = EventHubConsumerClient.DefaultConsumerGroupName;
 72
 073            BlobClientOptions storageClientOptions = new BlobClientOptions();
 074            storageClientOptions.AddPolicy(new StorageApiVersionPolicy(), HttpPipelinePosition.PerCall);
 75
 076            BlobContainerClient storageClient = new BlobContainerClient(blobStorageConnectionString, blobContainerName, 
 077            EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, eventHubsConnectionS
 78
 79            // For this example, we'll create a simple event handler that writes to the
 80            // console each time it was invoked.
 81
 082            int eventsProcessed = 0;
 83
 84            Task processEventHandler(ProcessEventArgs eventArgs)
 85            {
 086                if (eventArgs.CancellationToken.IsCancellationRequested)
 87                {
 088                    return Task.CompletedTask;
 89                }
 90
 091                if (eventArgs.HasEvent)
 92                {
 093                    ++eventsProcessed;
 094                    Console.WriteLine($"Event Received: { Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()) }");
 95                }
 96
 097                return Task.CompletedTask;
 98            }
 99
 100            // For this example, exceptions will just be logged to the console.
 101
 102            Task processErrorHandler(ProcessErrorEventArgs eventArgs)
 103            {
 0104                if (eventArgs.CancellationToken.IsCancellationRequested)
 105                {
 0106                    return Task.CompletedTask;
 107                }
 108
 0109                Console.WriteLine("===============================");
 0110                Console.WriteLine($"The error handler was invoked during the operation: { eventArgs.Operation ?? "Unknow
 0111                Console.WriteLine("===============================");
 0112                Console.WriteLine();
 113
 0114                return Task.CompletedTask;
 115            }
 116
 0117            processor.ProcessEventAsync += processEventHandler;
 0118            processor.ProcessErrorAsync += processErrorHandler;
 119
 120            try
 121            {
 122                // To begin, we'll publish a batch of events for our processor to receive. Because we are not specifying
 123                // the Event Hubs service will automatically route it to a single partition.
 124
 0125                var expectedEvents = new List<EventData>()
 0126                {
 0127                   new EventData(Encoding.UTF8.GetBytes("First Event, Single Batch")),
 0128                   new EventData(Encoding.UTF8.GetBytes("Second Event, Single Batch")),
 0129                   new EventData(Encoding.UTF8.GetBytes("Third Event, Single Batch"))
 0130                };
 131
 0132                await using (var producer = new EventHubProducerClient(eventHubsConnectionString, eventHubName))
 133                {
 0134                    using EventDataBatch eventBatch = await producer.CreateBatchAsync();
 135
 0136                    for (int index = 0; index < expectedEvents.Count; ++index)
 137                    {
 0138                        eventBatch.TryAdd(expectedEvents[index]);
 139                    }
 140
 0141                    await producer.SendAsync(eventBatch);
 0142                }
 143
 0144                Console.WriteLine("Starting the Event Processor client...");
 0145                Console.WriteLine();
 146
 0147                await processor.StartProcessingAsync();
 148
 0149                using var cancellationSource = new CancellationTokenSource();
 0150                cancellationSource.CancelAfter(TimeSpan.FromSeconds(90));
 151
 0152                while ((!cancellationSource.IsCancellationRequested) && (eventsProcessed < expectedEvents.Count))
 153                {
 0154                    await Task.Delay(TimeSpan.FromMilliseconds(250));
 155                }
 156
 157                // Once we arrive at this point, either cancellation was requested or we have processed all of our event
 158                // both cases, we'll want to shut down the processor.
 159
 0160                Console.WriteLine();
 0161                Console.WriteLine("Stopping the processor...");
 162
 0163                await processor.StopProcessingAsync();
 0164            }
 165            finally
 166            {
 167                // It is encouraged that you unregister your handlers when you have finished
 168                // using the Event Processor to ensure proper cleanup.  This is especially
 169                // important when using lambda expressions or handlers in any form that may
 170                // contain closure scopes or hold other references.
 171
 0172                processor.ProcessEventAsync -= processEventHandler;
 0173                processor.ProcessErrorAsync -= processErrorHandler;
 174            }
 175
 176            // The Event Processor client has been stopped and is not explicitly disposable; there
 177            // is nothing further that we need to do for cleanup.
 178
 0179            Console.WriteLine();
 0180        }
 181
 182        /// <summary>
 183        ///   A pipeline policy to be applied to a Blob Container Client.  This policy will be applied to every
 184        ///   request sent by the client, making it possible to specify the Azure Storage version they will target.
 185        /// </summary>
 186        ///
 187        private class StorageApiVersionPolicy : HttpPipelineSynchronousPolicy
 188        {
 189            /// <summary>
 190            ///   The Azure Storage version we want to use.
 191            /// </summary>
 192            ///
 193            /// <remarks>
 194            ///   2017-11-09 is the latest version available in Azure Stack Hub 2002.  Other available versions could
 195            ///   always be specified as long as all operations used by the Event Processor Client are supported.
 196            /// </remarks>
 197            ///
 0198            private string Version => @"2017-11-09";
 199
 200            /// <summary>
 201            ///   A method that will be called before a request is sent to the Azure Storage service.  Here we are
 202            ///   overriding this method and injecting the version we want to change to into the request headers.
 203            /// </summary>
 204            ///
 205            /// <param name="message">The message to be sent to the Azure Storage service.</param>
 206            ///
 207            public override void OnSendingRequest(HttpMessage message)
 208            {
 0209                base.OnSendingRequest(message);
 0210                message.Request.Headers.SetValue("x-ms-version", Version);
 0211            }
 212        }
 213    }
 214}