1 // Copyright (c) Microsoft Corporation. All rights reserved.
2 // Licensed under the MIT License.
3
4 package com.microsoft.azure.eventprocessorhost;
5
6 import com.microsoft.azure.eventhubs.EventPosition;
7 import com.microsoft.azure.eventhubs.PartitionReceiver;
8
9 import java.time.Duration;
10 import java.util.Locale;
11 import java.util.function.Consumer;
12 import java.util.function.Function;
13
14 import org.slf4j.Logger;
15 import org.slf4j.LoggerFactory;
16
17 /***
18 * Options affecting the behavior of the event processor host instance in general.
19 */
20 public final class EventProcessorOptions {
21 private Consumer<ExceptionReceivedEventArgs> exceptionNotificationHandler = null;
22 private Boolean invokeProcessorAfterReceiveTimeout = false;
23 private boolean receiverRuntimeMetricEnabled = false;
24 private int maxBatchSize = 10;
25 private int prefetchCount = 300;
26 private Duration receiveTimeOut = Duration.ofMinutes(1);
27 private Function<String, EventPosition> initialPositionProvider = (partitionId) -> {
28 return EventPosition.fromStartOfStream();
29 };
30
31 private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(EventProcessorOptions.class);
32
33 public EventProcessorOptions() {
34 }
35
36 /***
37 * Returns an EventProcessorOptions instance with all options set to the default values.
38 *
39 * The default values are:
40 * <pre>
41 * MaxBatchSize: 10
42 * ReceiveTimeOut: 1 minute
43 * PrefetchCount: 300
44 * InitialPositionProvider: uses the last checkpoint, or START_OF_STREAM
45 * InvokeProcessorAfterReceiveTimeout: false
46 * ReceiverRuntimeMetricEnabled: false
47 * </pre>
48 *
49 * @return an EventProcessorOptions instance with all options set to the default values
50 */
51 public static EventProcessorOptions getDefaultOptions() {
52 return new EventProcessorOptions();
53 }
54
55 /**
56 * Sets a handler which receives notification of general exceptions.
57 * <p>
58 * Exceptions which occur while processing events from a particular Event Hub partition are delivered
59 * to the onError method of the event processor for that partition. This handler is called on occasions
60 * when there is no event processor associated with the throwing activity, or the event processor could
61 * not be created.
62 * <p>
63 * The handler is not expected to do anything about the exception. If it is possible to recover, the
64 * event processor host instance will recover automatically.
65 *
66 * @param notificationHandler Handler which is called when an exception occurs. Set to null to stop handling.
67 */
68 public void setExceptionNotification(Consumer<ExceptionReceivedEventArgs> notificationHandler) {
69 this.exceptionNotificationHandler = notificationHandler;
70 }
71
72 /**
73 * Returns the maximum number of events that will be passed to one call to IEventProcessor.onEvents
74 *
75 * @return the maximum maximum number of events that will be passed to one call to IEventProcessor.onEvents
76 */
77 public int getMaxBatchSize() {
78 return this.maxBatchSize;
79 }
80
81 /**
82 * Sets the maximum number of events that will be passed to one call to IEventProcessor.onEvents
83 *
84 * @param maxBatchSize the maximum number of events that will be passed to one call to IEventProcessor.onEvents
85 */
86 public void setMaxBatchSize(int maxBatchSize) {
87 this.maxBatchSize = maxBatchSize;
88 }
89
90 /**
91 * Returns the timeout for receive operations.
92 *
93 * @return the timeout for receive operations
94 */
95 public Duration getReceiveTimeOut() {
96 return this.receiveTimeOut;
97 }
98
99 /**
100 * Sets the timeout for receive operations.
101 *
102 * @param receiveTimeOut new timeout for receive operations
103 */
104 public void setReceiveTimeOut(Duration receiveTimeOut) {
105 this.receiveTimeOut = receiveTimeOut;
106 }
107
108 /***
109 * Returns the current prefetch count for the underlying event hub client.
110 *
111 * @return the current prefetch count for the underlying client
112 */
113 public int getPrefetchCount() {
114 return this.prefetchCount;
115 }
116
117 /***
118 * Sets the prefetch count for the underlying event hub client.
119 *
120 * The default is 300. This controls how many events are received in advance.
121 *
122 * @param prefetchCount The new prefetch count.
123 */
124 public void setPrefetchCount(int prefetchCount) {
125 if (prefetchCount < PartitionReceiver.MINIMUM_PREFETCH_COUNT) {
126 throw new IllegalArgumentException(String.format(Locale.US,
127 "PrefetchCount has to be above %s", PartitionReceiver.MINIMUM_PREFETCH_COUNT));
128 }
129
130 if (prefetchCount > PartitionReceiver.MAXIMUM_PREFETCH_COUNT) {
131 throw new IllegalArgumentException(String.format(Locale.US,
132 "PrefetchCount has to be below %s", PartitionReceiver.MAXIMUM_PREFETCH_COUNT));
133 }
134
135 this.prefetchCount = prefetchCount;
136 }
137
138 /***
139 * If there is no checkpoint for a partition, the initialPositionProvider function is used to determine
140 * the position at which to start receiving events for that partition.
141 *
142 * @return the current initial position provider function
143 */
144 public Function<String, EventPosition> getInitialPositionProvider() {
145 return this.initialPositionProvider;
146 }
147
148 /***
149 * Sets the function used to determine the position at which to start receiving events for a
150 * partition if there is no checkpoint for that partition.
151 *
152 * The provider function takes one argument, the partition id (a String), and returns the desired position.
153 *
154 * @param initialPositionProvider The new provider function.
155 */
156 public void setInitialPositionProvider(Function<String, EventPosition> initialPositionProvider) {
157 this.initialPositionProvider = initialPositionProvider;
158 }
159
160 /***
161 * Returns whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable
162 * when a receive timeout occurs (true) or not (false).
163 *
164 * Defaults to false.
165 *
166 * @return true if EventProcessorHost will call IEventProcessor.OnEvents on receive timeout, false otherwise
167 */
168 public Boolean getInvokeProcessorAfterReceiveTimeout() {
169 return this.invokeProcessorAfterReceiveTimeout;
170 }
171
172 /**
173 * Changes whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable
174 * when a receive timeout occurs (true) or not (false).
175 * <p>
176 * The default is false (no call).
177 *
178 * @param invokeProcessorAfterReceiveTimeout the new value for what to do
179 */
180 public void setInvokeProcessorAfterReceiveTimeout(Boolean invokeProcessorAfterReceiveTimeout) {
181 this.invokeProcessorAfterReceiveTimeout = invokeProcessorAfterReceiveTimeout;
182 }
183
184 /**
185 * Knob to enable/disable runtime metric of the receiver. If this is set to true,
186 * the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of
187 * {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated.
188 * <p>
189 * Enabling this knob will add 3 additional properties to all raw AMQP events received.
190 *
191 * @return the {@link boolean} indicating, whether, the runtime metric of the receiver was enabled
192 */
193 public boolean getReceiverRuntimeMetricEnabled() {
194 return this.receiverRuntimeMetricEnabled;
195 }
196
197 /**
198 * Knob to enable/disable runtime metric of the receiver. If this is set to true,
199 * the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of
200 * {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated.
201 * <p>
202 * Enabling this knob will add 3 additional properties to all raw AMQP events received.
203 *
204 * @param value the {@link boolean} to indicate, whether, the runtime metric of the receiver should be enabled
205 */
206 public void setReceiverRuntimeMetricEnabled(boolean value) {
207 this.receiverRuntimeMetricEnabled = value;
208 }
209
210 void notifyOfException(String hostname, Exception exception, String action) {
211 notifyOfException(hostname, exception, action, ExceptionReceivedEventArgs.NO_ASSOCIATED_PARTITION);
212 }
213
214 void notifyOfException(String hostname, Exception exception, String action, String partitionId) {
215 // Capture handler so it doesn't get set to null between test and use
216 Consumer<ExceptionReceivedEventArgs> handler = this.exceptionNotificationHandler;
217 if (handler != null) {
218 try {
219 handler.accept(new ExceptionReceivedEventArgs(hostname, exception, action, partitionId));
220 } catch (Exception e) {
221 TRACE_LOGGER.error("host " + hostname + ": caught exception from user-provided exception notification handler", e);
222 }
223 }
224 }
225
226 /***
227 * A prefab initial position provider that starts from the first event available.
228 *
229 * How to use this initial position provider: setInitialPositionProvider(new EventProcessorOptions.StartOfStreamInitialPositionProvider());
230 */
231 public class StartOfStreamInitialPositionProvider implements Function<String, EventPosition> {
232 @Override
233 public EventPosition apply(String t) {
234 return EventPosition.fromStartOfStream();
235 }
236 }
237
238 /***
239 * A prefab initial position provider that starts from the next event that becomes available.
240 *
241 * How to use this initial position provider: setInitialPositionProvider(new EventProcessorOptions.EndOfStreamInitialPositionProvider());
242 */
243 public class EndOfStreamInitialPositionProvider implements Function<String, EventPosition> {
244 @Override
245 public EventPosition apply(String t) {
246 return EventPosition.fromEndOfStream();
247 }
248 }
249 }