View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.microsoft.azure.eventprocessorhost;
5   
6   import com.microsoft.azure.eventhubs.EventPosition;
7   import com.microsoft.azure.eventhubs.PartitionReceiver;
8   
9   import java.time.Duration;
10  import java.util.Locale;
11  import java.util.function.Consumer;
12  import java.util.function.Function;
13  
14  import org.slf4j.Logger;
15  import org.slf4j.LoggerFactory;
16  
17  /***
18   * Options affecting the behavior of the event processor host instance in general.
19   */
20  public final class EventProcessorOptions {
21      private Consumer<ExceptionReceivedEventArgs> exceptionNotificationHandler = null;
22      private Boolean invokeProcessorAfterReceiveTimeout = false;
23      private boolean receiverRuntimeMetricEnabled = false;
24      private int maxBatchSize = 10;
25      private int prefetchCount = 300;
26      private Duration receiveTimeOut = Duration.ofMinutes(1);
27      private Function<String, EventPosition> initialPositionProvider = (partitionId) -> {
28          return EventPosition.fromStartOfStream();
29      };
30  
31      private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorOptions.class);
32  
33      public EventProcessorOptions() {
34      }
35  
36      /***
37       * Returns an EventProcessorOptions instance with all options set to the default values.
38       *
39       * The default values are:
40       * <pre>
41       * MaxBatchSize: 10
42       * ReceiveTimeOut: 1 minute
43       * PrefetchCount: 300
44       * InitialPositionProvider: uses the last checkpoint, or START_OF_STREAM
45       * InvokeProcessorAfterReceiveTimeout: false
46       * ReceiverRuntimeMetricEnabled: false
47       * </pre>
48       *
49       * @return an EventProcessorOptions instance with all options set to the default values
50       */
51      public static EventProcessorOptions getDefaultOptions() {
52          return new EventProcessorOptions();
53      }
54  
55      /**
56       * Sets a handler which receives notification of general exceptions.
57       * <p>
58       * Exceptions which occur while processing events from a particular Event Hub partition are delivered
59       * to the onError method of the event processor for that partition. This handler is called on occasions
60       * when there is no event processor associated with the throwing activity, or the event processor could
61       * not be created.
62       * <p>
63       * The handler is not expected to do anything about the exception. If it is possible to recover, the
64       * event processor host instance will recover automatically.
65       *
66       * @param notificationHandler Handler which is called when an exception occurs. Set to null to stop handling.
67       */
68      public void setExceptionNotification(Consumer<ExceptionReceivedEventArgs> notificationHandler) {
69          this.exceptionNotificationHandler = notificationHandler;
70      }
71  
72      /**
73       * Returns the maximum number of events that will be passed to one call to IEventProcessor.onEvents
74       *
75       * @return the maximum maximum number of events that will be passed to one call to IEventProcessor.onEvents
76       */
77      public int getMaxBatchSize() {
78          return this.maxBatchSize;
79      }
80  
81      /**
82       * Sets the maximum number of events that will be passed to one call to IEventProcessor.onEvents
83       *
84       * @param maxBatchSize the maximum number of events that will be passed to one call to IEventProcessor.onEvents
85       */
86      public void setMaxBatchSize(int maxBatchSize) {
87          this.maxBatchSize = maxBatchSize;
88      }
89  
90      /**
91       * Returns the timeout for receive operations.
92       *
93       * @return the timeout for receive operations
94       */
95      public Duration getReceiveTimeOut() {
96          return this.receiveTimeOut;
97      }
98  
99      /**
100      * Sets the timeout for receive operations.
101      *
102      * @param receiveTimeOut new timeout for receive operations
103      */
104     public void setReceiveTimeOut(Duration receiveTimeOut) {
105         this.receiveTimeOut = receiveTimeOut;
106     }
107 
108     /***
109      * Returns the current prefetch count for the underlying event hub client.
110      *
111      * @return the current prefetch count for the underlying client
112      */
113     public int getPrefetchCount() {
114         return this.prefetchCount;
115     }
116 
117     /***
118      * Sets the prefetch count for the underlying event hub client.
119      *
120      * The default is 300. This controls how many events are received in advance.
121      *
122      * @param prefetchCount  The new prefetch count.
123      */
124     public void setPrefetchCount(int prefetchCount) {
125         if (prefetchCount < PartitionReceiver.MINIMUM_PREFETCH_COUNT) {
126             throw new IllegalArgumentException(String.format(Locale.US,
127                     "PrefetchCount has to be above %s", PartitionReceiver.MINIMUM_PREFETCH_COUNT));
128         }
129 
130         if (prefetchCount > PartitionReceiver.MAXIMUM_PREFETCH_COUNT) {
131             throw new IllegalArgumentException(String.format(Locale.US,
132                     "PrefetchCount has to be below %s", PartitionReceiver.MAXIMUM_PREFETCH_COUNT));
133         }
134 
135         this.prefetchCount = prefetchCount;
136     }
137 
138     /***
139      * If there is no checkpoint for a partition, the initialPositionProvider function is used to determine
140      * the position at which to start receiving events for that partition.
141      *
142      * @return the current initial position provider function
143      */
144     public Function<String, EventPosition> getInitialPositionProvider() {
145         return this.initialPositionProvider;
146     }
147 
148     /***
149      * Sets the function used to determine the position at which to start receiving events for a
150      * partition if there is no checkpoint for that partition.
151      *
152      * The provider function takes one argument, the partition id (a String), and returns the desired position.
153      *
154      * @param initialPositionProvider The new provider function.
155      */
156     public void setInitialPositionProvider(Function<String, EventPosition> initialPositionProvider) {
157         this.initialPositionProvider = initialPositionProvider;
158     }
159 
160     /***
161      * Returns whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable
162      * when a receive timeout occurs (true) or not (false).
163      *
164      * Defaults to false.
165      *
166      * @return true if EventProcessorHost will call IEventProcessor.OnEvents on receive timeout, false otherwise
167      */
168     public Boolean getInvokeProcessorAfterReceiveTimeout() {
169         return this.invokeProcessorAfterReceiveTimeout;
170     }
171 
172     /**
173      * Changes whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable
174      * when a receive timeout occurs (true) or not (false).
175      * <p>
176      * The default is false (no call).
177      *
178      * @param invokeProcessorAfterReceiveTimeout the new value for what to do
179      */
180     public void setInvokeProcessorAfterReceiveTimeout(Boolean invokeProcessorAfterReceiveTimeout) {
181         this.invokeProcessorAfterReceiveTimeout = invokeProcessorAfterReceiveTimeout;
182     }
183 
184     /**
185      * Knob to enable/disable runtime metric of the receiver. If this is set to true,
186      * the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of
187      * {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated.
188      * <p>
189      * Enabling this knob will add 3 additional properties to all raw AMQP events received.
190      *
191      * @return the {@link boolean} indicating, whether, the runtime metric of the receiver was enabled
192      */
193     public boolean getReceiverRuntimeMetricEnabled() {
194         return this.receiverRuntimeMetricEnabled;
195     }
196 
197     /**
198      * Knob to enable/disable runtime metric of the receiver. If this is set to true,
199      * the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of
200      * {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated.
201      * <p>
202      * Enabling this knob will add 3 additional properties to all raw AMQP events received.
203      *
204      * @param value the {@link boolean} to indicate, whether, the runtime metric of the receiver should be enabled
205      */
206     public void setReceiverRuntimeMetricEnabled(boolean value) {
207         this.receiverRuntimeMetricEnabled = value;
208     }
209 
210     void notifyOfException(String hostname, Exception exception, String action) {
211         notifyOfException(hostname, exception, action, ExceptionReceivedEventArgs.NO_ASSOCIATED_PARTITION);
212     }
213 
214     void notifyOfException(String hostname, Exception exception, String action, String partitionId) {
215         // Capture handler so it doesn't get set to null between test and use
216         Consumer<ExceptionReceivedEventArgs> handler = this.exceptionNotificationHandler;
217         if (handler != null) {
218             try {
219                 handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
220             } catch (Exception e) {
221                 TRACE_LOGGER.error("host " + hostname + ": caught exception from user-provided exception notification handler", e);
222             }
223         }
224     }
225 
226     /***
227      * A prefab initial position provider that starts from the first event available.
228      *
229      * How to use this initial position provider: setInitialPositionProvider(new EventProcessorOptions.StartOfStreamInitialPositionProvider());
230      */
231     public class StartOfStreamInitialPositionProvider implements Function<String, EventPosition> {
232         @Override
233         public EventPosition apply(String t) {
234             return EventPosition.fromStartOfStream();
235         }
236     }
237 
238     /***
239      * A prefab initial position provider that starts from the next event that becomes available.
240      *
241      * How to use this initial position provider: setInitialPositionProvider(new EventProcessorOptions.EndOfStreamInitialPositionProvider());
242      */
243     public class EndOfStreamInitialPositionProvider implements Function<String, EventPosition> {
244         @Override
245         public EventPosition apply(String t) {
246             return EventPosition.fromEndOfStream();
247         }
248     }
249 }