1
2
3
4 package com.microsoft.azure.eventprocessorhost;
5
6 import com.google.gson.Gson;
7 import com.microsoft.azure.storage.AccessCondition;
8 import com.microsoft.azure.storage.CloudStorageAccount;
9 import com.microsoft.azure.storage.StorageErrorCodeStrings;
10 import com.microsoft.azure.storage.StorageException;
11 import com.microsoft.azure.storage.StorageExtendedErrorInformation;
12 import com.microsoft.azure.storage.blob.BlobListingDetails;
13 import com.microsoft.azure.storage.blob.BlobProperties;
14 import com.microsoft.azure.storage.blob.BlobRequestOptions;
15 import com.microsoft.azure.storage.blob.CloudBlob;
16 import com.microsoft.azure.storage.blob.CloudBlobClient;
17 import com.microsoft.azure.storage.blob.CloudBlobContainer;
18 import com.microsoft.azure.storage.blob.CloudBlobDirectory;
19 import com.microsoft.azure.storage.blob.CloudBlockBlob;
20 import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
21 import com.microsoft.azure.storage.blob.LeaseState;
22 import com.microsoft.azure.storage.blob.ListBlobItem;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26 import java.io.IOException;
27 import java.net.URISyntaxException;
28 import java.nio.file.Path;
29 import java.nio.file.Paths;
30 import java.security.InvalidKeyException;
31 import java.util.ArrayList;
32 import java.util.EnumSet;
33 import java.util.HashMap;
34 import java.util.Hashtable;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.NoSuchElementException;
38 import java.util.concurrent.CompletableFuture;
39 import java.util.concurrent.CompletionException;
40 import java.util.regex.Matcher;
41 import java.util.regex.Pattern;
42
43
44 class AzureStorageCheckpointLeaseManager implements ICheckpointManager, ILeaseManager {
45 private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(AzureStorageCheckpointLeaseManager.class);
46 private static final String METADATA_OWNER_NAME = "OWNINGHOST";
47
48 private final String storageConnectionString;
49 private final String storageBlobPrefix;
50 private final BlobRequestOptions leaseOperationOptions = new BlobRequestOptions();
51 private final BlobRequestOptions checkpointOperationOptions = new BlobRequestOptions();
52 private final BlobRequestOptions renewRequestOptions = new BlobRequestOptions();
53 private HostContext hostContext;
54 private String storageContainerName;
55 private CloudBlobClient storageClient;
56 private CloudBlobContainer eventHubContainer;
57 private CloudBlobDirectory consumerGroupDirectory;
58 private Gson gson;
59
60 private Hashtable<String, Checkpoint> latestCheckpoint = new Hashtable<String, Checkpoint>();
61
62 AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName) {
63 this(storageConnectionString, storageContainerName, "");
64 }
65
66 AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName, String storageBlobPrefix) {
67 if ((storageConnectionString == null) || storageConnectionString.trim().isEmpty()) {
68 throw new IllegalArgumentException("Provide valid Azure Storage connection string when using Azure Storage");
69 }
70 this.storageConnectionString = storageConnectionString;
71
72 if ((storageContainerName != null) && storageContainerName.trim().isEmpty()) {
73 throw new IllegalArgumentException("Azure Storage container name must be a valid container name or null to use the default");
74 }
75 this.storageContainerName = storageContainerName;
76
77
78
79 this.storageBlobPrefix = (storageBlobPrefix != null) ? storageBlobPrefix.trim() : "";
80 }
81
82
83
84
85 void initialize(HostContext hostContext) throws InvalidKeyException, URISyntaxException, StorageException {
86 this.hostContext = hostContext;
87
88 if (this.storageContainerName == null) {
89 this.storageContainerName = this.hostContext.getEventHubPath();
90 }
91
92
93
94
95 Pattern p = Pattern.compile("^(?-i)(?:[a-z0-9]|(?<=[0-9a-z])-(?=[0-9a-z])){3,63}$");
96 Matcher m = p.matcher(this.storageContainerName);
97 if (!m.find()) {
98 throw new IllegalArgumentException("EventHub names must conform to the following rules to be able to use it with EventProcessorHost: "
99 + "Must start with a letter or number, and can contain only letters, numbers, and the dash (-) character. "
100 + "Every dash (-) character must be immediately preceded and followed by a letter or number; consecutive dashes are not permitted in container names. "
101 + "All letters in a container name must be lowercase. "
102 + "Must be from 3 to 63 characters long.");
103 }
104
105 this.storageClient = CloudStorageAccount.parse(this.storageConnectionString).createCloudBlobClient();
106
107 this.eventHubContainer = this.storageClient.getContainerReference(this.storageContainerName);
108
109
110
111 this.consumerGroupDirectory = this.eventHubContainer.getDirectoryReference(this.storageBlobPrefix + this.hostContext.getConsumerGroupName());
112
113 this.gson = new Gson();
114
115 this.leaseOperationOptions.setMaximumExecutionTimeInMs(this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000);
116 this.storageClient.setDefaultRequestOptions(this.leaseOperationOptions);
117 this.checkpointOperationOptions.setMaximumExecutionTimeInMs(this.hostContext.getPartitionManagerOptions().getCheckpointTimeoutInSeconds() * 1000);
118
119
120
121 this.renewRequestOptions.setMaximumExecutionTimeInMs(this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000);
122 }
123
124 @Override
125 public CompletableFuture<Boolean> checkpointStoreExists() {
126 return storeExistsInternal(this.checkpointOperationOptions, EventProcessorHostActionStrings.CHECKING_CHECKPOINT_STORE,
127 "Failure while checking checkpoint store existence");
128 }
129
130
131
132
133
134
135
136 @Override
137 public CompletableFuture<Void> createCheckpointStoreIfNotExists() {
138
139
140 return CompletableFuture.completedFuture(null);
141 }
142
143 @Override
144 public CompletableFuture<Void> deleteCheckpointStore() {
145 return deleteStoreInternal(this.checkpointOperationOptions);
146 }
147
148 @Override
149 public CompletableFuture<Checkpoint> getCheckpoint(String partitionId) {
150 CompletableFuture<Checkpoint> future = null;
151
152 try {
153 AzureBlobLease lease = getLeaseInternal(partitionId, this.checkpointOperationOptions);
154 Checkpoint checkpoint = null;
155 if (lease != null) {
156 if ((lease.getOffset() != null) && !lease.getOffset().isEmpty()) {
157 checkpoint = new Checkpoint(partitionId);
158 checkpoint.setOffset(lease.getOffset());
159 checkpoint.setSequenceNumber(lease.getSequenceNumber());
160 }
161
162 }
163 future = CompletableFuture.completedFuture(checkpoint);
164 } catch (URISyntaxException | IOException | StorageException e) {
165 future = new CompletableFuture<Checkpoint>();
166 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_CHECKPOINT));
167 }
168
169 return future;
170 }
171
172 @Override
173 public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds) {
174
175
176 return CompletableFuture.completedFuture(null);
177 }
178
179 @Override
180 public CompletableFuture<Void> updateCheckpoint(CompleteLease lease, Checkpoint checkpoint) {
181 AzureBlobLease"../../../../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLeasereBlobLease.html#AzureBlobLease">AzureBlobLease updatedLease = new AzureBlobLease"../../../../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLease((AzureBlobLease) lease);
182 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(checkpoint.getPartitionId(),
183 "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber()));
184 updatedLease.setOffset(checkpoint.getOffset());
185 updatedLease.setSequenceNumber(checkpoint.getSequenceNumber());
186
187 CompletableFuture<Void> future = null;
188
189 try {
190 if (updateLeaseInternal(updatedLease, this.checkpointOperationOptions)) {
191 future = CompletableFuture.completedFuture(null);
192 } else {
193 TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Lease lost"));
194 future = new CompletableFuture<Void>();
195 future.completeExceptionally(LoggingUtils.wrapException(new RuntimeException("Lease lost while updating checkpoint"),
196 EventProcessorHostActionStrings.UPDATING_CHECKPOINT));
197 }
198 } catch (StorageException | IOException e) {
199 TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Failure updating checkpoint"), e);
200 future = new CompletableFuture<Void>();
201 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.UPDATING_CHECKPOINT));
202 }
203
204 return future;
205 }
206
207 @Override
208 public CompletableFuture<Void> deleteCheckpoint(String partitionId) {
209
210 return CompletableFuture.completedFuture(null);
211 }
212
213
214
215
216
217
218 @Override
219 public int getLeaseDurationInMilliseconds() {
220 return this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000;
221 }
222
223 @Override
224 public CompletableFuture<Boolean> leaseStoreExists() {
225 return storeExistsInternal(this.leaseOperationOptions, EventProcessorHostActionStrings.CHECKING_LEASE_STORE,
226 "Failure while checking lease store existence");
227 }
228
229 private CompletableFuture<Boolean> storeExistsInternal(BlobRequestOptions options, String action, String trace) {
230 CompletableFuture<Boolean> future = null;
231 try {
232 future = CompletableFuture.completedFuture(this.eventHubContainer.exists(null, options, null));
233 } catch (StorageException e) {
234 TRACE_LOGGER.error(this.hostContext.withHost(trace), e);
235 future = new CompletableFuture<Boolean>();
236 future.completeExceptionally(LoggingUtils.wrapException(e, action));
237 }
238 return future;
239 }
240
241 @Override
242 public CompletableFuture<Void> createLeaseStoreIfNotExists() {
243 CompletableFuture<Void> future = null;
244
245 try {
246
247 this.eventHubContainer.createIfNotExists(this.leaseOperationOptions, null);
248 TRACE_LOGGER.info(this.hostContext.withHost("Created lease store OK or it already existed"));
249 future = CompletableFuture.completedFuture(null);
250 } catch (StorageException e) {
251 future = new CompletableFuture<Void>();
252 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.CREATING_LEASE_STORE));
253 TRACE_LOGGER.error(this.hostContext.withHost("Failure while creating lease store"), e);
254 }
255
256 return future;
257 }
258
259 @Override
260 public CompletableFuture<Void> deleteLeaseStore() {
261 return deleteStoreInternal(this.leaseOperationOptions);
262 }
263
264 private CompletableFuture<Void> deleteStoreInternal(BlobRequestOptions options) {
265 CompletableFuture<Void> future = null;
266
267 try {
268 for (ListBlobItem blob : this.eventHubContainer.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), options, null)) {
269 if (blob instanceof CloudBlobDirectory) {
270 for (ListBlobItem subBlob : ((CloudBlobDirectory) blob).listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), options, null)) {
271 ((CloudBlockBlob) subBlob).deleteIfExists(DeleteSnapshotsOption.NONE, null, options, null);
272 }
273 } else if (blob instanceof CloudBlockBlob) {
274 ((CloudBlockBlob) blob).deleteIfExists(DeleteSnapshotsOption.NONE, null, options, null);
275 }
276 }
277
278 this.eventHubContainer.deleteIfExists(null, options, null);
279
280 future = CompletableFuture.completedFuture(null);
281 } catch (StorageException | URISyntaxException e) {
282 TRACE_LOGGER.error(this.hostContext.withHost("Failure while deleting lease store"), e);
283 future = new CompletableFuture<Void>();
284 future.completeExceptionally(new CompletionException(e));
285 }
286
287 return future;
288 }
289
290 @Override
291 public CompletableFuture<CompleteLease> getLease(String partitionId) {
292 CompletableFuture<CompleteLease> future = null;
293
294 try {
295 future = CompletableFuture.completedFuture(getLeaseInternal(partitionId, this.leaseOperationOptions));
296 } catch (URISyntaxException | IOException | StorageException e) {
297 TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(partitionId, "Failure while getting lease details"), e);
298 future = new CompletableFuture<CompleteLease>();
299 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE));
300 }
301
302 return future;
303 }
304
305 private AzureBlobLease getLeaseInternal(String partitionId, BlobRequestOptions options) throws URISyntaxException, IOException, StorageException {
306 AzureBlobLease retval = null;
307
308 CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
309 if (leaseBlob.exists(null, options, null)) {
310 retval = downloadLease(leaseBlob, options);
311 }
312
313 return retval;
314 }
315
316 @Override
317 public CompletableFuture<List<BaseLease>> getAllLeases() {
318 CompletableFuture<List<BaseLease>> future = null;
319
320 try {
321 ArrayList<BaseLease> infos = new ArrayList<BaseLease>();
322 EnumSet<BlobListingDetails> details = EnumSet.of(BlobListingDetails.METADATA);
323 Iterable<ListBlobItem> leaseBlobs = this.consumerGroupDirectory.listBlobs("", true, details, this.leaseOperationOptions, null);
324 leaseBlobs.forEach((lbi) -> {
325 CloudBlob blob = (CloudBlob) lbi;
326 BlobProperties bp = blob.getProperties();
327 HashMap<String, String> metadata = blob.getMetadata();
328 Path p = Paths.get(lbi.getUri().getPath());
329 Path pFileName = p.getFileName();
330 String partitionId = pFileName != null ? pFileName.toString() : "";
331 infos.add(new BaseLease(partitionId, metadata.get(AzureStorageCheckpointLeaseManager.METADATA_OWNER_NAME),
332 (bp.getLeaseState() == LeaseState.LEASED)));
333 });
334 future = CompletableFuture.completedFuture(infos);
335 } catch (URISyntaxException | StorageException | NoSuchElementException e) {
336 Throwable effective = e;
337 if (e instanceof NoSuchElementException) {
338
339
340 effective = e.getCause();
341 }
342
343 TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e);
344 future = new CompletableFuture<>();
345 future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE));
346 }
347
348 return future;
349 }
350
351
352
353
354 @Override
355 public CompletableFuture<Void> createAllLeasesIfNotExists(List<String> partitionIds) {
356 CompletableFuture<Void> future = null;
357
358
359
360 int blobCount = 0;
361 try {
362 Iterable<ListBlobItem> leaseBlobs = this.consumerGroupDirectory.listBlobs("", true, null, this.leaseOperationOptions, null);
363 Iterator<ListBlobItem> blobIterator = leaseBlobs.iterator();
364 while (blobIterator.hasNext()) {
365 blobCount++;
366 blobIterator.next();
367 }
368 } catch (URISyntaxException | StorageException e) {
369 TRACE_LOGGER.error(this.hostContext.withHost("Exception checking lease existence - leaseContainerName: " + this.storageContainerName + " consumerGroupName: "
370 + this.hostContext.getConsumerGroupName() + " storageBlobPrefix: " + this.storageBlobPrefix), e);
371 future = new CompletableFuture<Void>();
372 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.CREATING_LEASES));
373 }
374
375 if (future == null) {
376
377 if (blobCount == partitionIds.size()) {
378
379 future = CompletableFuture.completedFuture(null);
380 } else {
381
382 ArrayList<CompletableFuture<CompleteLease>> createFutures = new ArrayList<CompletableFuture<CompleteLease>>();
383
384 for (String id : partitionIds) {
385 CompletableFuture<CompleteLease> oneCreate = CompletableFuture.supplyAsync(() -> {
386 CompleteLease returnLease = null;
387 try {
388 returnLease = createLeaseIfNotExistsInternal(id, this.leaseOperationOptions);
389 } catch (URISyntaxException | IOException | StorageException e) {
390 TRACE_LOGGER.error(this.hostContext.withHostAndPartition(id,
391 "Exception creating lease - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.hostContext.getConsumerGroupName()
392 + " storageBlobPrefix: " + this.storageBlobPrefix), e);
393 throw LoggingUtils.wrapException(e, EventProcessorHostActionStrings.CREATING_LEASES);
394 }
395 return returnLease;
396 }, this.hostContext.getExecutor());
397 createFutures.add(oneCreate);
398 }
399
400 CompletableFuture<?>[] dummy = new CompletableFuture<?>[createFutures.size()];
401 future = CompletableFuture.allOf(createFutures.toArray(dummy));
402 }
403 }
404
405 return future;
406 }
407
408 private AzureBlobLease createLeaseIfNotExistsInternal(String partitionId, BlobRequestOptions options) throws URISyntaxException, IOException, StorageException {
409 AzureBlobLease returnLease = null;
410 try {
411 CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId);
412 returnLease = new AzureBlobLease(partitionId, leaseBlob, this.leaseOperationOptions);
413 uploadLease(returnLease, leaseBlob, AccessCondition.generateIfNoneMatchCondition("*"), UploadActivity.Create, options);
414
415 TRACE_LOGGER.info(this.hostContext.withHostAndPartition(partitionId,
416 "CreateLeaseIfNotExist OK - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.hostContext.getConsumerGroupName()
417 + " storageBlobPrefix: " + this.storageBlobPrefix));
418 } catch (StorageException se) {
419 StorageExtendedErrorInformation extendedErrorInfo = se.getExtendedErrorInformation();
420 if ((extendedErrorInfo != null)
421 && ((extendedErrorInfo.getErrorCode().compareTo(StorageErrorCodeStrings.BLOB_ALREADY_EXISTS) == 0)
422 || (extendedErrorInfo.getErrorCode().compareTo(StorageErrorCodeStrings.LEASE_ID_MISSING) == 0))) {
423
424 TRACE_LOGGER.info(this.hostContext.withHostAndPartition(partitionId, "Lease already exists"));
425 returnLease = getLeaseInternal(partitionId, options);
426 } else {
427 throw se;
428 }
429 }
430
431 return returnLease;
432 }
433
434 @Override
435 public CompletableFuture<Void> deleteLease(CompleteLease lease) {
436 CompletableFuture<Void> future = null;
437
438 TRACE_LOGGER.info(this.hostContext.withHostAndPartition(lease, "Deleting lease"));
439 try {
440
441
442 ((AzureBlobLease) lease).getBlob().deleteIfExists();
443 future = CompletableFuture.completedFuture(null);
444 } catch (StorageException e) {
445 TRACE_LOGGER.error(this.hostContext.withHostAndPartition(lease, "Exception deleting lease"), e);
446 future = new CompletableFuture<Void>();
447 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.DELETING_LEASE));
448 }
449
450 return future;
451 }
452
453 @Override
454 public CompletableFuture<Boolean> acquireLease(CompleteLease lease) {
455 CompletableFuture<Boolean> future = null;
456
457 try {
458
459
460 future = CompletableFuture.completedFuture(acquireLeaseInternal((AzureBlobLease) lease));
461 } catch (IOException | StorageException e) {
462 TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Failure acquiring lease"), e);
463 future = new CompletableFuture<Boolean>();
464 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.ACQUIRING_LEASE));
465 }
466
467 return future;
468 }
469
470 private boolean acquireLeaseInternal(AzureBlobLease lease) throws IOException, StorageException {
471 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Acquiring lease"));
472
473 CloudBlockBlob leaseBlob = lease.getBlob();
474 boolean succeeded = true;
475 String newLeaseId = EventProcessorHost.safeCreateUUID();
476 if ((newLeaseId == null) || newLeaseId.isEmpty()) {
477 throw new IllegalArgumentException("acquireLeaseSync: newLeaseId really is " + ((newLeaseId == null) ? "null" : "empty"));
478 }
479 try {
480 String newToken = null;
481 leaseBlob.downloadAttributes();
482 if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED) {
483 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "changeLease"));
484 if ((lease.getToken() == null) || lease.getToken().isEmpty()) {
485
486
487
488
489
490
491
492 succeeded = false;
493 } else {
494 newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
495 }
496 } else {
497 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "acquireLease"));
498 newToken = leaseBlob.acquireLease(this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds(), newLeaseId);
499 }
500 if (succeeded) {
501 lease.setToken(newToken);
502 lease.setOwner(this.hostContext.getHostName());
503 lease.incrementEpoch();
504 uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire, this.leaseOperationOptions);
505 }
506 } catch (StorageException se) {
507 if (wasLeaseLost(se, lease.getPartitionId())) {
508 succeeded = false;
509 } else {
510 throw se;
511 }
512 }
513
514 return succeeded;
515 }
516
517 @Override
518 public CompletableFuture<Boolean> renewLease(CompleteLease lease) {
519 CompletableFuture<Boolean> future = null;
520
521 try {
522 future = CompletableFuture.completedFuture(renewLeaseInternal(lease));
523 } catch (StorageException se) {
524 future = new CompletableFuture<Boolean>();
525 future.completeExceptionally(LoggingUtils.wrapException(se, EventProcessorHostActionStrings.RENEWING_LEASE));
526 }
527
528 return future;
529 }
530
531 private boolean renewLeaseInternal(CompleteLease lease) throws StorageException {
532 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Renewing lease"));
533
534 boolean result = false;
535
536
537
538 AzureBlobLease../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLease azLease = (AzureBlobLease) lease;
539 CloudBlockBlob leaseBlob = azLease.getBlob();
540
541 try {
542 leaseBlob.renewLease(AccessCondition.generateLeaseCondition(azLease.getToken()), this.renewRequestOptions, null);
543 result = true;
544 } catch (StorageException se) {
545 if (!wasLeaseLost(se, azLease.getPartitionId())) {
546 throw se;
547 }
548 }
549
550 return result;
551 }
552
553 @Override
554 public CompletableFuture<Void> releaseLease(CompleteLease lease) {
555 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Releasing lease"));
556
557 CompletableFuture<Void> future = null;
558
559
560
561 AzureBlobLease../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLease inLease = (AzureBlobLease) lease;
562 CloudBlockBlob leaseBlob = inLease.getBlob();
563
564 try {
565 String leaseId = inLease.getToken();
566 AzureBlobLeasereBlobLease.html#AzureBlobLease">AzureBlobLease releasedCopy = new AzureBlobLease(inLease);
567 releasedCopy.setToken("");
568 releasedCopy.setOwner("");
569 uploadLease(releasedCopy, leaseBlob, AccessCondition.generateLeaseCondition(leaseId), UploadActivity.Release, this.leaseOperationOptions);
570 leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
571 future = CompletableFuture.completedFuture(null);
572 } catch (StorageException se) {
573 if (wasLeaseLost(se, lease.getPartitionId())) {
574
575 future = CompletableFuture.completedFuture(null);
576 } else {
577 future = new CompletableFuture<Void>();
578 future.completeExceptionally(LoggingUtils.wrapException(se, EventProcessorHostActionStrings.RELEASING_LEASE));
579 }
580 } catch (IOException ie) {
581 future = new CompletableFuture<Void>();
582 future.completeExceptionally(LoggingUtils.wrapException(ie, EventProcessorHostActionStrings.RELEASING_LEASE));
583 }
584
585 return future;
586 }
587
588 @Override
589 public CompletableFuture<Boolean> updateLease(CompleteLease lease) {
590 CompletableFuture<Boolean> future = null;
591
592 try {
593
594
595 boolean result = updateLeaseInternal((AzureBlobLease) lease, this.leaseOperationOptions);
596 future = CompletableFuture.completedFuture(result);
597 } catch (StorageException | IOException e) {
598 TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Failure updating lease"), e);
599 future = new CompletableFuture<Boolean>();
600 future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.UPDATING_LEASE));
601 }
602
603 return future;
604 }
605
606 public boolean updateLeaseInternal(AzureBlobLease lease, BlobRequestOptions options) throws StorageException, IOException {
607 if (lease == null) {
608 return false;
609 }
610
611 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Updating lease"));
612
613 String token = lease.getToken();
614 if ((token == null) || (token.length() == 0)) {
615 return false;
616 }
617
618
619
620
621 boolean result = renewLeaseInternal(lease);
622 if (result) {
623 CloudBlockBlob leaseBlob = lease.getBlob();
624 try {
625 uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(token), UploadActivity.Update, options);
626
627 } catch (StorageException se) {
628 if (wasLeaseLost(se, lease.getPartitionId())) {
629 result = false;
630 } else {
631 throw se;
632 }
633 } catch (IOException ie) {
634 throw ie;
635 }
636 }
637
638
639 return result;
640 }
641
642 private AzureBlobLease downloadLease(CloudBlockBlob blob, BlobRequestOptions options) throws StorageException, IOException {
643 String jsonLease = blob.downloadText(null, null, options, null);
644 TRACE_LOGGER.debug(this.hostContext.withHost("Raw JSON downloaded: " + jsonLease));
645 AzureBlobLease rehydrated = this.gson.fromJson(jsonLease, AzureBlobLease.class);
646 AzureBlobLeaseAzureBlobLease.html#AzureBlobLease">AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob, this.leaseOperationOptions);
647
648 if (blobLease.getOffset() != null) {
649 this.latestCheckpoint.put(blobLease.getPartitionId(), blobLease.getCheckpoint());
650 }
651
652 return blobLease;
653 }
654
655 private void uploadLease(AzureBlobLease lease, CloudBlockBlob blob, AccessCondition condition, UploadActivity activity, BlobRequestOptions options)
656 throws StorageException, IOException {
657 if (activity != UploadActivity.Create) {
658
659
660
661
662
663
664 Checkpoint cached = this.latestCheckpoint.get(lease.getPartitionId());
665 if ((cached != null) && ((cached.getSequenceNumber() > lease.getSequenceNumber()) || (lease.getOffset() == null))) {
666 lease.setOffset(cached.getOffset());
667 lease.setSequenceNumber(cached.getSequenceNumber());
668 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease,
669 "Replacing stale offset/seqno while uploading lease"));
670 } else if (lease.getOffset() != null) {
671 this.latestCheckpoint.put(lease.getPartitionId(), lease.getCheckpoint());
672 }
673 }
674
675 String jsonLease = this.gson.toJson(lease);
676 blob.uploadText(jsonLease, null, condition, options, null);
677
678
679 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease,
680 "Raw JSON uploading for " + activity + ": " + jsonLease));
681
682 if ((activity == UploadActivity.Acquire) || (activity == UploadActivity.Release)) {
683 blob.downloadAttributes();
684 HashMap<String, String> metadata = blob.getMetadata();
685 switch (activity) {
686 case Acquire:
687
688 metadata.put(AzureStorageCheckpointLeaseManager.METADATA_OWNER_NAME, lease.getOwner());
689 break;
690
691 case Release:
692
693 metadata.remove(AzureStorageCheckpointLeaseManager.METADATA_OWNER_NAME);
694 break;
695
696 default:
697
698 break;
699 }
700 blob.setMetadata(metadata);
701 blob.uploadMetadata(condition, options, null);
702 }
703
704 }
705
706 private boolean wasLeaseLost(StorageException se, String partitionId) {
707 boolean retval = false;
708 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "WAS LEASE LOST? Http " + se.getHttpStatusCode()));
709 if (se.getExtendedErrorInformation() != null) {
710 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId,
711 "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage()));
712 }
713 if ((se.getHttpStatusCode() == 409) ||
714 (se.getHttpStatusCode() == 412)) {
715 StorageExtendedErrorInformation extendedErrorInfo = se.getExtendedErrorInformation();
716 if (extendedErrorInfo != null) {
717 String errorCode = extendedErrorInfo.getErrorCode();
718 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "Error code: " + errorCode));
719 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "Error message: " + extendedErrorInfo.getErrorMessage()));
720 if ((errorCode.compareTo(StorageErrorCodeStrings.LEASE_LOST) == 0)
721 || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_LEASE_OPERATION) == 0)
722 || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_BLOB_OPERATION) == 0)
723 || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ALREADY_PRESENT) == 0)) {
724 retval = true;
725 }
726 }
727 }
728 return retval;
729 }
730
731 private enum UploadActivity { Create, Acquire, Release, Update }
732 }