< Summary

Class:Microsoft.Azure.Batch.ODATAMonitorControl
Assembly:Microsoft.Azure.Batch
File(s):C:\Git\azure-sdk-for-net\sdk\batch\Microsoft.Azure.Batch\src\ODATAMonitor.cs
Covered lines:8
Uncovered lines:0
Coverable lines:8
Total lines:387
Line coverage:100% (8 of 8)
Covered branches:2
Total branches:2
Branch coverage:100% (2 of 2)

Metrics

MethodCyclomatic complexity Line coverage Branch coverage
.ctor()-100%100%
.cctor()-100%100%
get_DelayBetweenDataFetch()-100%100%
set_DelayBetweenDataFetch(...)-100%100%

File(s)

C:\Git\azure-sdk-for-net\sdk\batch\Microsoft.Azure.Batch\src\ODATAMonitor.cs

#LineLine coverage
 1// Copyright (c) Microsoft Corporation. All rights reserved.
 2// Licensed under the MIT License. See License.txt in the project root for license information.
 3
 4using System;
 5using System.Collections.Generic;
 6using System.Linq;
 7using System.Text;
 8using System.Threading.Tasks;
 9
 10namespace Microsoft.Azure.Batch
 11{
 12    using System.Threading;
 13
 14    /// <summary>
 15    /// Contains control settings used for optimal retrieval of state data via OData predicates.
 16    /// </summary>
 17    public class ODATAMonitorControl
 18    {
 319        private TimeSpan _delayBetweenDataFetch = new TimeSpan(0, 0, seconds: 2);
 120        private static TimeSpan _lowerBoundDelayBetweenRefresh = new TimeSpan(0, 0, 0, 0, milliseconds : 500);
 21
 22        /// <summary>
 23        /// The minimum time between attempts to fetch data for a monitored instance.
 24        /// </summary>
 25        public TimeSpan DelayBetweenDataFetch
 26        {
 27            get
 28            {
 629                return _delayBetweenDataFetch;
 30            }
 31            set
 32            {
 33                // forbid values that are too small... avoid DOS of server
 134                if (value < _lowerBoundDelayBetweenRefresh)
 35                {
 136                    value = _lowerBoundDelayBetweenRefresh;
 37                }
 38
 139                _delayBetweenDataFetch = value;
 140            }
 41        }
 42
 43        /// <summary>
 44        /// The maximum number of characters allowed for a single OData predicate.
 45        /// </summary>
 346        public int ODATAPredicateLimit = 500;
 47    }
 48
 49    /// <summary>
 50    /// Use a lambda to construct the correct ListFoo calls with correct params.
 51    /// </summary>
 52    /// <typeparam name="T"></typeparam>
 53    /// <returns></returns>
 54    internal delegate IPagedEnumerable<T> ListDelegate<T>();
 55
 56    /// <summary>
 57    /// A class that leverages OData predicates to poll the states of instances.
 58    /// </summary>
 59    internal class ODATAMonitor
 60    {
 61        /// <summary>
 62        /// Polls the collection of instances until each has passed the condition at least once.
 63        /// </summary>
 64        /// <typeparam name="T"></typeparam>
 65        /// <param name="collectionToMonitor"></param>
 66        /// <param name="condition"></param>
 67        /// <param name="getName"></param>
 68        /// <param name="listObjects"></param>
 69        /// <param name="cancellationToken"></param>
 70        /// <param name="detailLevel">Controls the detail level of the data returned by a call to the Azure Batch Servic
 71        /// <param name="control"></param>
 72        /// <returns></returns>
 73        public static async Task WhenAllAsync<T>(
 74            IEnumerable<T> collectionToMonitor,
 75            Func<T, bool> condition,
 76            Func<T, string> getName,
 77            ListDelegate<T> listObjects,
 78            CancellationToken cancellationToken,
 79            ODATADetailLevel detailLevel,
 80            ODATAMonitorControl control) where T : IRefreshable
 81        {
 82            if (null == collectionToMonitor)
 83            {
 84                throw new ArgumentNullException("collectionToMonitor");
 85            }
 86
 87            if (null == condition)
 88            {
 89                throw new ArgumentNullException("condition");
 90            }
 91
 92            RefreshViaODATAFilterList<T> odataRefresher = new RefreshViaODATAFilterList<T>(cancellationToken, detailLeve
 93
 94            // populate for first pass
 95            foreach (T curT in collectionToMonitor)
 96            {
 97                // filter out the instances that already meet the condition
 98                if (!condition(curT))
 99                {
 100                    MonitorLastCall<T> box = new MonitorLastCall<T>(curT, /* flags each instance as available to refresh
 101
 102                    odataRefresher.CurrentPassQueue.Enqueue(box);
 103                }
 104            }
 105
 106            // process the instances in the current pass... swap queues to begin next pass
 107            while (!odataRefresher.CancellationToken.IsCancellationRequested &&
 108                odataRefresher.CurrentPassQueue.Count > 0)
 109            {
 110                // get next instance
 111                MonitorLastCall<T> nextInstanceToProcess = odataRefresher.CurrentPassQueue.Dequeue();
 112
 113                // build an odata filter with as many names as the limit will allow and call refresh instances as needed
 114                Task asyncProcessOneInstance = odataRefresher.ProcessOneInstance(nextInstanceToProcess, getName);
 115
 116                // a-wait for completion
 117                await asyncProcessOneInstance.ConfigureAwait(continueOnCapturedContext: false);
 118
 119                // if the current pass queue is empty, swap the queues to begin next pass
 120                if (0 == odataRefresher.CurrentPassQueue.Count)
 121                {
 122                    odataRefresher.SwapQueues();
 123
 124                    // if we appear to be done, the stringbuilder might have the last predicate in it so flush that
 125                    if (0 == odataRefresher.CurrentPassQueue.Count)
 126                    {
 127                        // start the call to process the last predicate
 128                        Task asyncListTask = odataRefresher.CallListAndProcessResults();
 129
 130                        // a-wait for completion
 131                        await asyncListTask.ConfigureAwait(continueOnCapturedContext: false);
 132
 133                        // if the last predicate created new work... the swap will bring it into a new pass
 134                        odataRefresher.SwapQueues();
 135                    }
 136                }
 137            }
 138
 139            //Were we cancelled?
 140            odataRefresher.CancellationToken.ThrowIfCancellationRequested();
 141        }
 142
 143        /// <summary>
 144        /// Will accept instances of T and construct a filter predicate.
 145        /// The predicate will be used (and reset) when it is full or if
 146        /// the given instance was to recently refreshed (DOS avoidance).
 147        /// </summary>
 148        /// <typeparam name="T"></typeparam>
 149        internal class RefreshViaODATAFilterList<T>
 150        {
 151#region constructors
 152
 153            private RefreshViaODATAFilterList()
 154            {
 155            }
 156
 157            internal RefreshViaODATAFilterList(
 158                            CancellationToken cancellationToken,
 159                            ODATADetailLevel odataDetail,
 160                            Func<T, bool> condition,
 161                            ListDelegate<T> listObjects,
 162                            ODATAMonitorControl odataMonitorControl)
 163            {
 164                this.CancellationToken = cancellationToken;
 165                _odataDetailLevel = odataDetail;
 166                _condition = condition;
 167                _listObjects = listObjects;
 168                _odataMonitorControl = odataMonitorControl;
 169
 170                CurrentPassQueue = new Queue<MonitorLastCall<T>>();
 171                NextPassQueue = new Queue<MonitorLastCall<T>>();
 172            }
 173
 174#endregion constructors
 175
 176            // queue that holds the instances being refreshed on this pass
 177            public Queue<MonitorLastCall<T>> CurrentPassQueue = new Queue<MonitorLastCall<T>>();
 178
 179            // queue that holds the instances that have been refreshed and failed the condition...
 180            // to be refreshed again on the next pass
 181            public Queue<MonitorLastCall<T>> NextPassQueue = new Queue<MonitorLastCall<T>>();
 182
 183            public CancellationToken CancellationToken { get; private set; }
 184
 185            /// <summary>
 186            /// Holds the delegate that determines of the given instance no longer needs to be polled.
 187            /// </summary>
 188            private readonly Func<T, bool> _condition;
 189
 190            /// <summary>
 191            /// Delegate to fetch new instances fresh state data.
 192            /// </summary>
 193            private readonly ListDelegate<T> _listObjects;
 194
 195            /// <summary>
 196            /// To be updated with filter predicate for refreshing instance state data.
 197            /// </summary>
 198            private readonly ODATADetailLevel _odataDetailLevel;
 199
 200            /// <summary>
 201            /// Holds control data for this odata monitor.
 202            /// </summary>
 203            private readonly ODATAMonitorControl _odataMonitorControl;
 204
 205            // ODATA predicated will be constructed in this
 206            private readonly StringBuilder _odataFilterSB = new StringBuilder();
 207
 208            private const string IdPrefix = "id eq '";
 209            private const string IdPostfix = "'";
 210            private const string OrOperator = " or ";
 211
 212            internal async Task ProcessOneInstance(MonitorLastCall<T> nextInstance, Func<T, string> getName)
 213            {
 214                // we will loop until this is null
 215                MonitorLastCall<T> processThisInstance = nextInstance;
 216
 217                while (null != processThisInstance)
 218                {
 219                    bool usePredicateToCallList = false;
 220                    DateTime utcNow = DateTime.UtcNow;
 221
 222                    // if it is not too early, we can use this instance
 223                    if (utcNow >= nextInstance.DoNotRefreshUntilUTC)
 224                    {
 225                        // now check to see if it will fit
 226                        // remember the limit on the name is 64 and the limit on the predicate is 500
 227                        // the assumption is that even if these #s evolve at least one max-sized-name will fit.
 228
 229                        StringBuilder possibleAdditionalExpression = new StringBuilder();
 230
 231                        // if this is not the first name then we must "or" this name in
 232                        if (0 != _odataFilterSB.Length)
 233                        {
 234                            possibleAdditionalExpression.Append(OrOperator);
 235                        }
 236
 237                        // add in the name prefix
 238                        possibleAdditionalExpression.Append(IdPrefix);
 239
 240                        // get the name of the object
 241                        string name = getName(nextInstance.Instance);
 242
 243                        // add in the name
 244                        possibleAdditionalExpression.Append(name);
 245
 246                        // add the name postfix
 247                        possibleAdditionalExpression.Append(IdPostfix);
 248
 249                        // if it will fit then append the clause to the predicate
 250                        if ((_odataFilterSB.Length + possibleAdditionalExpression.Length) < _odataMonitorControl.ODATAPr
 251                        {
 252                            // amend the predicate to refresh the object
 253                            _odataFilterSB.Append(possibleAdditionalExpression.ToString());
 254
 255                            processThisInstance = null;  // we are done
 256                        }
 257                        else
 258                        {
 259                            // it will not fit so we are done with this predicate
 260                            usePredicateToCallList = true;
 261                        }
 262                    }
 263                    else // if the next instance cannot be refreshed yet we may need to wait a bit
 264                    {
 265                        // if we have work to do... return to process that work and use up time
 266                        if (_odataFilterSB.Length > 0)
 267                        {
 268                            usePredicateToCallList = true;
 269                        }
 270                        else  // if we have no work to do we should delay until the instance can be refreshed
 271                        {
 272                            TimeSpan delayThisMuch = processThisInstance.DoNotRefreshUntilUTC - utcNow;
 273
 274                            if (delayThisMuch.Ticks > 0)
 275                            {
 276                                await System.Threading.Tasks.Task.Delay(delayThisMuch).ConfigureAwait(continueOnCaptured
 277                            }
 278                        }
 279                    }
 280
 281                    // should we call the server with the current predicate data?
 282                    if (usePredicateToCallList)
 283                    {
 284                        usePredicateToCallList = false;
 285
 286                        // start the new list operation
 287                        Task asyncListTask = CallListAndProcessResults();
 288
 289                        // wait for completion
 290                        await asyncListTask.ConfigureAwait(continueOnCapturedContext: false);
 291                    }
 292                }
 293            }
 294
 295            /// <summary>
 296            /// Will swap the queues.  This is the transition from one pass to the next.
 297            /// </summary>
 298            internal void SwapQueues()
 299            {
 300                Queue<MonitorLastCall<T>> tmp = this.CurrentPassQueue;
 301
 302                this.CurrentPassQueue = this.NextPassQueue;
 303                this.NextPassQueue = tmp;
 304            }
 305
 306            /// <summary>
 307            /// Uses list func to fetch fresh data on the instances in the predicate.
 308            /// Instances that fail the condition are enqueued for the next pass.
 309            /// </summary>
 310            /// <returns></returns>
 311            internal async Task CallListAndProcessResults()
 312            {
 313                // extract the predicate that restricts the list call
 314                string predicate = _odataFilterSB.ToString();
 315
 316                // clear the sb for the next batch
 317                _odataFilterSB.Clear();
 318
 319                // early exit if there is no work to do
 320                if (string.IsNullOrWhiteSpace(predicate))
 321                {
 322                    return;
 323                }
 324
 325                // update the detail level
 326                _odataDetailLevel.FilterClause = predicate;
 327
 328                // get the enumerable to refresh the instances
 329                IPagedEnumerable<T> tEnumberable = _listObjects();
 330
 331                // get the enumerator for asycn walk of the collection
 332                IPagedEnumerator<T> tEnumerator = tEnumberable.GetPagedEnumerator();
 333
 334                // used to enumerate until out of data
 335                bool thereIsMoreData;
 336
 337                do
 338                {
 339                    // move to next instance, possibley make call to server to get next batch
 340                    Task<bool> asyncTask = tEnumerator.MoveNextAsync(this.CancellationToken);
 341
 342                    thereIsMoreData = await asyncTask.ConfigureAwait(continueOnCapturedContext: false);
 343
 344                    if (thereIsMoreData)
 345                    {
 346                        // get the current instance
 347                        T refreshedInstance = tEnumerator.Current;
 348
 349                        // test it to see if it is done
 350                        if (!_condition(refreshedInstance))
 351                        {
 352                            // we will have to refresh it again so put in queue for next pass
 353
 354                            // box it up
 355                            MonitorLastCall<T> mlc = new MonitorLastCall<T>(refreshedInstance, DateTime.UtcNow + _odataM
 356
 357                            // enqueue it for next pass
 358                            this.NextPassQueue.Enqueue(mlc);
 359                        }
 360                    }
 361                }
 362                while (thereIsMoreData);
 363            }
 364        }
 365    }
 366
 367    /// <summary>
 368    /// A class to track the last time an instance was refreshed/monitored.
 369    /// </summary>
 370    /// <typeparam name="T"></typeparam>
 371    internal class MonitorLastCall<T>
 372    {
 373        private MonitorLastCall()
 374        {
 375        }
 376
 377        internal MonitorLastCall(T instance, DateTime timestamp)
 378        {
 379            this.DoNotRefreshUntilUTC = timestamp;
 380            this.Instance = instance;
 381        }
 382
 383        public DateTime DoNotRefreshUntilUTC { get; internal set; }
 384
 385        public T Instance { get; internal set; }
 386    }
 387}