View Javadoc
1   // Copyright (c) Microsoft Corporation. All rights reserved.
2   // Licensed under the MIT License.
3   
4   package com.microsoft.azure.eventprocessorhost;
5   
6   import com.google.gson.Gson;
7   import com.microsoft.azure.storage.AccessCondition;
8   import com.microsoft.azure.storage.CloudStorageAccount;
9   import com.microsoft.azure.storage.StorageErrorCodeStrings;
10  import com.microsoft.azure.storage.StorageException;
11  import com.microsoft.azure.storage.StorageExtendedErrorInformation;
12  import com.microsoft.azure.storage.blob.BlobListingDetails;
13  import com.microsoft.azure.storage.blob.BlobProperties;
14  import com.microsoft.azure.storage.blob.BlobRequestOptions;
15  import com.microsoft.azure.storage.blob.CloudBlob;
16  import com.microsoft.azure.storage.blob.CloudBlobClient;
17  import com.microsoft.azure.storage.blob.CloudBlobContainer;
18  import com.microsoft.azure.storage.blob.CloudBlobDirectory;
19  import com.microsoft.azure.storage.blob.CloudBlockBlob;
20  import com.microsoft.azure.storage.blob.DeleteSnapshotsOption;
21  import com.microsoft.azure.storage.blob.LeaseState;
22  import com.microsoft.azure.storage.blob.ListBlobItem;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  import java.io.IOException;
27  import java.net.URISyntaxException;
28  import java.nio.file.Path;
29  import java.nio.file.Paths;
30  import java.security.InvalidKeyException;
31  import java.util.ArrayList;
32  import java.util.EnumSet;
33  import java.util.HashMap;
34  import java.util.Hashtable;
35  import java.util.Iterator;
36  import java.util.List;
37  import java.util.NoSuchElementException;
38  import java.util.concurrent.CompletableFuture;
39  import java.util.concurrent.CompletionException;
40  import java.util.regex.Matcher;
41  import java.util.regex.Pattern;
42  
43  
44  class AzureStorageCheckpointLeaseManager implements ICheckpointManager, ILeaseManager {
45      private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(AzureStorageCheckpointLeaseManager.class);
46      private static final String METADATA_OWNER_NAME = "OWNINGHOST";
47  
48      private final String storageConnectionString;
49      private final String storageBlobPrefix;
50      private final BlobRequestOptions leaseOperationOptions = new BlobRequestOptions();
51      private final BlobRequestOptions checkpointOperationOptions = new BlobRequestOptions();
52      private final BlobRequestOptions renewRequestOptions = new BlobRequestOptions();
53      private HostContext hostContext;
54      private String storageContainerName;
55      private CloudBlobClient storageClient;
56      private CloudBlobContainer eventHubContainer;
57      private CloudBlobDirectory consumerGroupDirectory;
58      private Gson gson;
59  
60      private Hashtable<String, Checkpoint> latestCheckpoint = new Hashtable<String, Checkpoint>();
61  
62      AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName) {
63          this(storageConnectionString, storageContainerName, "");
64      }
65  
66      AzureStorageCheckpointLeaseManager(String storageConnectionString, String storageContainerName, String storageBlobPrefix) {
67          if ((storageConnectionString == null) || storageConnectionString.trim().isEmpty()) {
68              throw new IllegalArgumentException("Provide valid Azure Storage connection string when using Azure Storage");
69          }
70          this.storageConnectionString = storageConnectionString;
71  
72          if ((storageContainerName != null) && storageContainerName.trim().isEmpty()) {
73              throw new IllegalArgumentException("Azure Storage container name must be a valid container name or null to use the default");
74          }
75          this.storageContainerName = storageContainerName;
76  
77          // Convert all-whitespace prefix to empty string. Convert null prefix to empty string.
78          // Then the rest of the code only has one case to worry about.
79          this.storageBlobPrefix = (storageBlobPrefix != null) ? storageBlobPrefix.trim() : "";
80      }
81  
82      // The EventProcessorHost can't pass itself to the AzureStorageCheckpointLeaseManager constructor
83      // because it is still being constructed. Do other initialization here also because it might throw and
84      // hence we don't want it in the constructor.
85      void initialize(HostContext hostContext) throws InvalidKeyException, URISyntaxException, StorageException {
86          this.hostContext = hostContext;
87  
88          if (this.storageContainerName == null) {
89              this.storageContainerName = this.hostContext.getEventHubPath();
90          }
91  
92          // Validate that the event hub name is also a legal storage container name.
93          // Regex pattern is copied from .NET version. The syntax for Java regexes seems to be the same.
94          // Error message is also copied from .NET version.
95          Pattern p = Pattern.compile("^(?-i)(?:[a-z0-9]|(?<=[0-9a-z])-(?=[0-9a-z])){3,63}$");
96          Matcher m = p.matcher(this.storageContainerName);
97          if (!m.find()) {
98              throw new IllegalArgumentException("EventHub names must conform to the following rules to be able to use it with EventProcessorHost: "
99                      + "Must start with a letter or number, and can contain only letters, numbers, and the dash (-) character. "
100                     + "Every dash (-) character must be immediately preceded and followed by a letter or number; consecutive dashes are not permitted in container names. "
101                     + "All letters in a container name must be lowercase. "
102                     + "Must be from 3 to 63 characters long.");
103         }
104 
105         this.storageClient = CloudStorageAccount.parse(this.storageConnectionString).createCloudBlobClient();
106 
107         this.eventHubContainer = this.storageClient.getContainerReference(this.storageContainerName);
108 
109         // storageBlobPrefix is either empty or a real user-supplied string. Either way we can just
110         // stick it on the front and get the desired result.
111         this.consumerGroupDirectory = this.eventHubContainer.getDirectoryReference(this.storageBlobPrefix + this.hostContext.getConsumerGroupName());
112 
113         this.gson = new Gson();
114 
115         this.leaseOperationOptions.setMaximumExecutionTimeInMs(this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000);
116         this.storageClient.setDefaultRequestOptions(this.leaseOperationOptions);
117         this.checkpointOperationOptions.setMaximumExecutionTimeInMs(this.hostContext.getPartitionManagerOptions().getCheckpointTimeoutInSeconds() * 1000);
118         // The only option that .NET sets on renewRequestOptions is ServerTimeout, which doesn't exist in Java equivalent.
119         // Keep it separate in case we need to change something later.
120         // Only used for leases, not checkpoints, so set max execution time to lease value
121         this.renewRequestOptions.setMaximumExecutionTimeInMs(this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000);
122     }
123 
124     @Override
125     public CompletableFuture<Boolean> checkpointStoreExists() {
126         return storeExistsInternal(this.checkpointOperationOptions, EventProcessorHostActionStrings.CHECKING_CHECKPOINT_STORE,
127             "Failure while checking checkpoint store existence");
128     }
129 
130 
131     //
132     // In this implementation, checkpoints are data that's actually in the lease blob, so checkpoint operations
133     // turn into lease operations under the covers.
134     //
135 
136     @Override
137     public CompletableFuture<Void> createCheckpointStoreIfNotExists() {
138         // Because we control the caller, we know that this method will only be called after createLeaseStoreIfNotExists.
139         // In this implementation, it's the same store, so the store will always exist if execution reaches here.
140         return CompletableFuture.completedFuture(null);
141     }
142 
143     @Override
144     public CompletableFuture<Void> deleteCheckpointStore() {
145         return deleteStoreInternal(this.checkpointOperationOptions);
146     }
147 
148     @Override
149     public CompletableFuture<Checkpoint> getCheckpoint(String partitionId) {
150         CompletableFuture<Checkpoint> future = null;
151 
152         try {
153             AzureBlobLease lease = getLeaseInternal(partitionId, this.checkpointOperationOptions);
154             Checkpoint checkpoint = null;
155             if (lease != null) {
156                 if ((lease.getOffset() != null) && !lease.getOffset().isEmpty()) {
157                     checkpoint = new Checkpoint(partitionId);
158                     checkpoint.setOffset(lease.getOffset());
159                     checkpoint.setSequenceNumber(lease.getSequenceNumber());
160                 }
161                 // else offset is null meaning no checkpoint stored for this partition so return null
162             }
163             future = CompletableFuture.completedFuture(checkpoint);
164         } catch (URISyntaxException | IOException | StorageException e) {
165             future = new CompletableFuture<Checkpoint>();
166             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_CHECKPOINT));
167         }
168 
169         return future;
170     }
171 
172     @Override
173     public CompletableFuture<Void> createAllCheckpointsIfNotExists(List<String> partitionIds) {
174         // Because we control the caller, we know that this method will only be called after createAllLeasesIfNotExists.
175         // In this implementation checkpoints are in the same blobs as leases, so the blobs will already exist if execution reaches here.
176         return CompletableFuture.completedFuture(null);
177     }
178 
179     @Override
180     public CompletableFuture<Void> updateCheckpoint(CompleteLease lease, Checkpoint checkpoint) {
181         AzureBlobLease"../../../../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLeasereBlobLease.html#AzureBlobLease">AzureBlobLease updatedLease = new AzureBlobLease"../../../../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLease((AzureBlobLease) lease);
182         TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(checkpoint.getPartitionId(),
183                 "Checkpointing at " + checkpoint.getOffset() + " // " + checkpoint.getSequenceNumber()));
184         updatedLease.setOffset(checkpoint.getOffset());
185         updatedLease.setSequenceNumber(checkpoint.getSequenceNumber());
186 
187         CompletableFuture<Void> future = null;
188 
189         try {
190             if (updateLeaseInternal(updatedLease, this.checkpointOperationOptions)) {
191                 future = CompletableFuture.completedFuture(null);
192             } else {
193                 TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Lease lost"));
194                 future = new CompletableFuture<Void>();
195                 future.completeExceptionally(LoggingUtils.wrapException(new RuntimeException("Lease lost while updating checkpoint"),
196                         EventProcessorHostActionStrings.UPDATING_CHECKPOINT));
197             }
198         } catch (StorageException | IOException e) {
199             TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Failure updating checkpoint"), e);
200             future = new CompletableFuture<Void>();
201             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.UPDATING_CHECKPOINT));
202         }
203 
204         return future;
205     }
206 
207     @Override
208     public CompletableFuture<Void> deleteCheckpoint(String partitionId) {
209         // Not currently used by EventProcessorHost.
210         return CompletableFuture.completedFuture(null);
211     }
212 
213 
214     //
215     // Lease operations.
216     //
217 
218     @Override
219     public int getLeaseDurationInMilliseconds() {
220         return this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds() * 1000;
221     }
222 
223     @Override
224     public CompletableFuture<Boolean> leaseStoreExists() {
225         return storeExistsInternal(this.leaseOperationOptions, EventProcessorHostActionStrings.CHECKING_LEASE_STORE,
226                 "Failure while checking lease store existence");
227     }
228 
229     private CompletableFuture<Boolean> storeExistsInternal(BlobRequestOptions options, String action, String trace) {
230         CompletableFuture<Boolean> future = null;
231         try {
232             future = CompletableFuture.completedFuture(this.eventHubContainer.exists(null, options, null));
233         } catch (StorageException e) {
234             TRACE_LOGGER.error(this.hostContext.withHost(trace), e);
235             future = new CompletableFuture<Boolean>();
236             future.completeExceptionally(LoggingUtils.wrapException(e, action));
237         }
238         return future;
239     }
240 
241     @Override
242     public CompletableFuture<Void> createLeaseStoreIfNotExists() {
243         CompletableFuture<Void> future = null;
244 
245         try {
246             // returns true if the container was created, false if it already existed -- we don't care
247             this.eventHubContainer.createIfNotExists(this.leaseOperationOptions, null);
248             TRACE_LOGGER.info(this.hostContext.withHost("Created lease store OK or it already existed"));
249             future = CompletableFuture.completedFuture(null);
250         } catch (StorageException e) {
251             future = new CompletableFuture<Void>();
252             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.CREATING_LEASE_STORE));
253             TRACE_LOGGER.error(this.hostContext.withHost("Failure while creating lease store"), e);
254         }
255 
256         return future;
257     }
258 
259     @Override
260     public CompletableFuture<Void> deleteLeaseStore() {
261         return deleteStoreInternal(this.leaseOperationOptions);
262     }
263 
264     private CompletableFuture<Void> deleteStoreInternal(BlobRequestOptions options) {
265         CompletableFuture<Void> future = null;
266 
267         try {
268             for (ListBlobItem blob : this.eventHubContainer.listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), options, null)) {
269                 if (blob instanceof CloudBlobDirectory) {
270                     for (ListBlobItem subBlob : ((CloudBlobDirectory) blob).listBlobs(null, false, EnumSet.noneOf(BlobListingDetails.class), options, null)) {
271                         ((CloudBlockBlob) subBlob).deleteIfExists(DeleteSnapshotsOption.NONE, null, options, null);
272                     }
273                 } else if (blob instanceof CloudBlockBlob) {
274                     ((CloudBlockBlob) blob).deleteIfExists(DeleteSnapshotsOption.NONE, null, options, null);
275                 }
276             }
277 
278             this.eventHubContainer.deleteIfExists(null, options, null);
279 
280             future = CompletableFuture.completedFuture(null);
281         } catch (StorageException | URISyntaxException e) {
282             TRACE_LOGGER.error(this.hostContext.withHost("Failure while deleting lease store"), e);
283             future = new CompletableFuture<Void>();
284             future.completeExceptionally(new CompletionException(e));
285         }
286 
287         return future;
288     }
289 
290     @Override
291     public CompletableFuture<CompleteLease> getLease(String partitionId) {
292         CompletableFuture<CompleteLease> future = null;
293 
294         try {
295             future = CompletableFuture.completedFuture(getLeaseInternal(partitionId, this.leaseOperationOptions));
296         } catch (URISyntaxException | IOException | StorageException e) {
297             TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(partitionId, "Failure while getting lease details"), e);
298             future = new CompletableFuture<CompleteLease>();
299             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.GETTING_LEASE));
300         }
301 
302         return future;
303     }
304 
305     private AzureBlobLease getLeaseInternal(String partitionId, BlobRequestOptions options) throws URISyntaxException, IOException, StorageException {
306         AzureBlobLease retval = null;
307 
308         CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId); // getBlockBlobReference does not take options
309         if (leaseBlob.exists(null, options, null)) {
310             retval = downloadLease(leaseBlob, options);
311         }
312 
313         return retval;
314     }
315 
316     @Override
317     public CompletableFuture<List<BaseLease>> getAllLeases() {
318         CompletableFuture<List<BaseLease>> future = null;
319 
320         try {
321             ArrayList<BaseLease> infos = new ArrayList<BaseLease>();
322             EnumSet<BlobListingDetails> details = EnumSet.of(BlobListingDetails.METADATA);
323             Iterable<ListBlobItem> leaseBlobs = this.consumerGroupDirectory.listBlobs("", true, details, this.leaseOperationOptions, null);
324             leaseBlobs.forEach((lbi) -> {
325                 CloudBlob blob = (CloudBlob) lbi;
326                 BlobProperties bp = blob.getProperties();
327                 HashMap<String, String> metadata = blob.getMetadata();
328                 Path p = Paths.get(lbi.getUri().getPath());
329                 Path pFileName = p.getFileName();
330                 String partitionId = pFileName != null ? pFileName.toString() : "";
331                 infos.add(new BaseLease(partitionId, metadata.get(AzureStorageCheckpointLeaseManager.METADATA_OWNER_NAME),
332                         (bp.getLeaseState() == LeaseState.LEASED)));
333             });
334             future = CompletableFuture.completedFuture(infos);
335         } catch (URISyntaxException | StorageException | NoSuchElementException e) {
336             Throwable effective = e;
337             if (e instanceof NoSuchElementException) {
338                 // If there is a StorageException in the forEach, it arrives wrapped in a NoSuchElementException.
339                 // Strip the misleading NoSuchElementException to provide a meaningful error for the user.
340                 effective = e.getCause();
341             }
342 
343             TRACE_LOGGER.warn(this.hostContext.withHost("Failure while getting lease state details"), e);
344             future = new CompletableFuture<>();
345             future.completeExceptionally(LoggingUtils.wrapException(effective, EventProcessorHostActionStrings.GETTING_LEASE));
346         }
347 
348         return future;
349     }
350 
351     // NOTE NOTE NOTE: this is the one place where this lease manager implementation returns an uncompleted future.
352     // This is to support creating the blobs in parallel, which can be an important part of fast startup.
353     // Because it happens during startup, when no user code is running, it cannot deadlock with checkpointing.
354     @Override
355     public CompletableFuture<Void> createAllLeasesIfNotExists(List<String> partitionIds) {
356         CompletableFuture<Void> future = null;
357 
358         // Optimization: list the blobs currently existing in the directory. If there are the
359         // expected number of blobs, then we can skip doing the creates.
360         int blobCount = 0;
361         try {
362             Iterable<ListBlobItem> leaseBlobs = this.consumerGroupDirectory.listBlobs("", true, null, this.leaseOperationOptions, null);
363             Iterator<ListBlobItem> blobIterator = leaseBlobs.iterator();
364             while (blobIterator.hasNext()) {
365                 blobCount++;
366                 blobIterator.next();
367             }
368         } catch (URISyntaxException | StorageException e) {
369             TRACE_LOGGER.error(this.hostContext.withHost("Exception checking lease existence - leaseContainerName: " + this.storageContainerName + " consumerGroupName: "
370                     + this.hostContext.getConsumerGroupName() + " storageBlobPrefix: " + this.storageBlobPrefix), e);
371             future = new CompletableFuture<Void>();
372             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.CREATING_LEASES));
373         }
374 
375         if (future == null) {
376             // No error checking the list, so keep going
377             if (blobCount == partitionIds.size()) {
378                 // All expected blobs found, so short-circuit
379                 future = CompletableFuture.completedFuture(null);
380             } else {
381                 // Create the blobs in parallel
382                 ArrayList<CompletableFuture<CompleteLease>> createFutures = new ArrayList<CompletableFuture<CompleteLease>>();
383 
384                 for (String id : partitionIds) {
385                     CompletableFuture<CompleteLease> oneCreate = CompletableFuture.supplyAsync(() -> {
386                         CompleteLease returnLease = null;
387                         try {
388                             returnLease = createLeaseIfNotExistsInternal(id, this.leaseOperationOptions);
389                         } catch (URISyntaxException | IOException | StorageException e) {
390                             TRACE_LOGGER.error(this.hostContext.withHostAndPartition(id,
391                                     "Exception creating lease - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.hostContext.getConsumerGroupName()
392                                             + " storageBlobPrefix: " + this.storageBlobPrefix), e);
393                             throw LoggingUtils.wrapException(e, EventProcessorHostActionStrings.CREATING_LEASES);
394                         }
395                         return returnLease;
396                     }, this.hostContext.getExecutor());
397                     createFutures.add(oneCreate);
398                 }
399 
400                 CompletableFuture<?>[] dummy = new CompletableFuture<?>[createFutures.size()];
401                 future = CompletableFuture.allOf(createFutures.toArray(dummy));
402             }
403         }
404 
405         return future;
406     }
407 
408     private AzureBlobLease createLeaseIfNotExistsInternal(String partitionId, BlobRequestOptions options) throws URISyntaxException, IOException, StorageException {
409         AzureBlobLease returnLease = null;
410         try {
411             CloudBlockBlob leaseBlob = this.consumerGroupDirectory.getBlockBlobReference(partitionId); // getBlockBlobReference does not take options
412             returnLease = new AzureBlobLease(partitionId, leaseBlob, this.leaseOperationOptions);
413             uploadLease(returnLease, leaseBlob, AccessCondition.generateIfNoneMatchCondition("*"), UploadActivity.Create, options);
414             // Do not set metadata on creation. No metadata/no owner value indicates that the lease is unowned.
415             TRACE_LOGGER.info(this.hostContext.withHostAndPartition(partitionId,
416                     "CreateLeaseIfNotExist OK - leaseContainerName: " + this.storageContainerName + " consumerGroupName: " + this.hostContext.getConsumerGroupName()
417                             + " storageBlobPrefix: " + this.storageBlobPrefix));
418         } catch (StorageException se) {
419             StorageExtendedErrorInformation extendedErrorInfo = se.getExtendedErrorInformation();
420             if ((extendedErrorInfo != null)
421                     && ((extendedErrorInfo.getErrorCode().compareTo(StorageErrorCodeStrings.BLOB_ALREADY_EXISTS) == 0)
422                             || (extendedErrorInfo.getErrorCode().compareTo(StorageErrorCodeStrings.LEASE_ID_MISSING) == 0))) { // occurs when somebody else already has leased the blob
423                 // The blob already exists.
424                 TRACE_LOGGER.info(this.hostContext.withHostAndPartition(partitionId, "Lease already exists"));
425                 returnLease = getLeaseInternal(partitionId, options);
426             } else {
427                 throw se;
428             }
429         }
430 
431         return returnLease;
432     }
433 
434     @Override
435     public CompletableFuture<Void> deleteLease(CompleteLease lease) {
436         CompletableFuture<Void> future = null;
437 
438         TRACE_LOGGER.info(this.hostContext.withHostAndPartition(lease, "Deleting lease"));
439         try {
440             // Fetching leases (using getLease) from AzureStorageCheckpointLeaseManager as the ILeaseManager returns an
441             // AzureBlobLease. This unchecked cast won't fail.
442             ((AzureBlobLease) lease).getBlob().deleteIfExists();
443             future = CompletableFuture.completedFuture(null);
444         } catch (StorageException e) {
445             TRACE_LOGGER.error(this.hostContext.withHostAndPartition(lease, "Exception deleting lease"), e);
446             future = new CompletableFuture<Void>();
447             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.DELETING_LEASE));
448         }
449 
450         return future;
451     }
452 
453     @Override
454     public CompletableFuture<Boolean> acquireLease(CompleteLease lease) {
455         CompletableFuture<Boolean> future = null;
456 
457         try {
458             // Fetching leases (using getLease) from AzureStorageCheckpointLeaseManager as the ILeaseManager returns an
459             // AzureBlobLease. This unchecked cast won't fail.
460             future = CompletableFuture.completedFuture(acquireLeaseInternal((AzureBlobLease) lease));
461         } catch (IOException | StorageException e) {
462             TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Failure acquiring lease"), e);
463             future = new CompletableFuture<Boolean>();
464             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.ACQUIRING_LEASE));
465         }
466 
467         return future;
468     }
469 
470     private boolean acquireLeaseInternal(AzureBlobLease lease) throws IOException, StorageException {
471         TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Acquiring lease"));
472 
473         CloudBlockBlob leaseBlob = lease.getBlob();
474         boolean succeeded = true;
475         String newLeaseId = EventProcessorHost.safeCreateUUID();
476         if ((newLeaseId == null) || newLeaseId.isEmpty()) {
477             throw new IllegalArgumentException("acquireLeaseSync: newLeaseId really is " + ((newLeaseId == null) ? "null" : "empty"));
478         }
479         try {
480             String newToken = null;
481             leaseBlob.downloadAttributes();
482             if (leaseBlob.getProperties().getLeaseState() == LeaseState.LEASED) {
483                 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "changeLease"));
484                 if ((lease.getToken() == null) || lease.getToken().isEmpty()) {
485                     // We reach here in a race condition: when this instance of EventProcessorHost scanned the
486                     // lease blobs, this partition was unowned (token is empty) but between then and now, another
487                     // instance of EPH has established a lease (getLeaseState() is LEASED). We normally enforce
488                     // that we only steal the lease if it is still owned by the instance which owned it when we
489                     // scanned, but we can't do that when we don't know who owns it. The safest thing to do is just
490                     // fail the acquisition. If that means that one EPH instance gets more partitions than it should,
491                     // rebalancing will take care of that quickly enough.
492                     succeeded = false;
493                 } else {
494                     newToken = leaseBlob.changeLease(newLeaseId, AccessCondition.generateLeaseCondition(lease.getToken()));
495                 }
496             } else {
497                 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "acquireLease"));
498                 newToken = leaseBlob.acquireLease(this.hostContext.getPartitionManagerOptions().getLeaseDurationInSeconds(), newLeaseId);
499             }
500             if (succeeded) {
501                 lease.setToken(newToken);
502                 lease.setOwner(this.hostContext.getHostName());
503                 lease.incrementEpoch(); // Increment epoch each time lease is acquired or stolen by a new host
504                 uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(lease.getToken()), UploadActivity.Acquire, this.leaseOperationOptions);
505             }
506         } catch (StorageException se) {
507             if (wasLeaseLost(se, lease.getPartitionId())) {
508                 succeeded = false;
509             } else {
510                 throw se;
511             }
512         }
513 
514         return succeeded;
515     }
516 
517     @Override
518     public CompletableFuture<Boolean> renewLease(CompleteLease lease) {
519         CompletableFuture<Boolean> future = null;
520 
521         try {
522             future = CompletableFuture.completedFuture(renewLeaseInternal(lease));
523         } catch (StorageException se) {
524             future = new CompletableFuture<Boolean>();
525             future.completeExceptionally(LoggingUtils.wrapException(se, EventProcessorHostActionStrings.RENEWING_LEASE));
526         }
527 
528         return future;
529     }
530 
531     private boolean renewLeaseInternal(CompleteLease lease) throws StorageException {
532         TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Renewing lease"));
533 
534         boolean result = false;
535 
536         // Fetching leases (using getLease) from AzureStorageCheckpointLeaseManager as the ILeaseManager returns an
537         // AzureBlobLease. This unchecked cast won't fail.
538         AzureBlobLease../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLease azLease = (AzureBlobLease) lease;
539         CloudBlockBlob leaseBlob = azLease.getBlob();
540 
541         try {
542             leaseBlob.renewLease(AccessCondition.generateLeaseCondition(azLease.getToken()), this.renewRequestOptions, null);
543             result = true;
544         } catch (StorageException se) {
545             if (!wasLeaseLost(se, azLease.getPartitionId())) {
546                 throw se;
547             }
548         }
549 
550         return result;
551     }
552 
553     @Override
554     public CompletableFuture<Void> releaseLease(CompleteLease lease) {
555         TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Releasing lease"));
556 
557         CompletableFuture<Void> future = null;
558 
559         // Fetching leases (using getLease) from AzureStorageCheckpointLeaseManager as the ILeaseManager returns an
560         // AzureBlobLease. This unchecked cast won't fail.
561         AzureBlobLease../com/microsoft/azure/eventprocessorhost/AzureBlobLease.html#AzureBlobLease">AzureBlobLease inLease = (AzureBlobLease) lease;
562         CloudBlockBlob leaseBlob = inLease.getBlob();
563 
564         try {
565             String leaseId = inLease.getToken();
566             AzureBlobLeasereBlobLease.html#AzureBlobLease">AzureBlobLease releasedCopy = new AzureBlobLease(inLease);
567             releasedCopy.setToken("");
568             releasedCopy.setOwner("");
569             uploadLease(releasedCopy, leaseBlob, AccessCondition.generateLeaseCondition(leaseId), UploadActivity.Release, this.leaseOperationOptions);
570             leaseBlob.releaseLease(AccessCondition.generateLeaseCondition(leaseId));
571             future = CompletableFuture.completedFuture(null);
572         } catch (StorageException se) {
573             if (wasLeaseLost(se, lease.getPartitionId())) {
574                 // If the lease was already lost, then the intent of releasing it has been achieved.
575                 future = CompletableFuture.completedFuture(null);
576             } else {
577                 future = new CompletableFuture<Void>();
578                 future.completeExceptionally(LoggingUtils.wrapException(se, EventProcessorHostActionStrings.RELEASING_LEASE));
579             }
580         } catch (IOException ie) {
581             future = new CompletableFuture<Void>();
582             future.completeExceptionally(LoggingUtils.wrapException(ie, EventProcessorHostActionStrings.RELEASING_LEASE));
583         }
584 
585         return future;
586     }
587 
588     @Override
589     public CompletableFuture<Boolean> updateLease(CompleteLease lease) {
590         CompletableFuture<Boolean> future = null;
591 
592         try {
593             // Fetching leases (using getLease) from AzureStorageCheckpointLeaseManager as the ILeaseManager returns an
594             // AzureBlobLease. This unchecked cast won't fail.
595             boolean result = updateLeaseInternal((AzureBlobLease) lease, this.leaseOperationOptions);
596             future = CompletableFuture.completedFuture(result);
597         } catch (StorageException | IOException e) {
598             TRACE_LOGGER.warn(this.hostContext.withHostAndPartition(lease, "Failure updating lease"), e);
599             future = new CompletableFuture<Boolean>();
600             future.completeExceptionally(LoggingUtils.wrapException(e, EventProcessorHostActionStrings.UPDATING_LEASE));
601         }
602 
603         return future;
604     }
605 
606     public boolean updateLeaseInternal(AzureBlobLease lease, BlobRequestOptions options) throws StorageException, IOException {
607         if (lease == null) {
608             return false;
609         }
610 
611         TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease, "Updating lease"));
612 
613         String token = lease.getToken();
614         if ((token == null) || (token.length() == 0)) {
615             return false;
616         }
617 
618         // Renew the lease to make sure the update will go through.
619         // Renewing the lease is always logically a lease operation, even if it is part of writing a checkpoint, so
620         // don't pass options.
621         boolean result = renewLeaseInternal(lease);
622         if (result) {
623             CloudBlockBlob leaseBlob = lease.getBlob();
624             try {
625                 uploadLease(lease, leaseBlob, AccessCondition.generateLeaseCondition(token), UploadActivity.Update, options);
626                 // Success! Result is already true, so pass it up unchanged
627             } catch (StorageException se) {
628                 if (wasLeaseLost(se, lease.getPartitionId())) {
629                     result = false;
630                 } else {
631                     throw se;
632                 }
633             } catch (IOException ie) {
634                 throw ie;
635             }
636         }
637         // else could not renew lease due to lease loss. Result is already false, so pass it up unchanged
638 
639         return result;
640     }
641 
642     private AzureBlobLease downloadLease(CloudBlockBlob blob, BlobRequestOptions options) throws StorageException, IOException {
643         String jsonLease = blob.downloadText(null, null, options, null);
644         TRACE_LOGGER.debug(this.hostContext.withHost("Raw JSON downloaded: " + jsonLease));
645         AzureBlobLease rehydrated = this.gson.fromJson(jsonLease, AzureBlobLease.class);
646         AzureBlobLeaseAzureBlobLease.html#AzureBlobLease">AzureBlobLease blobLease = new AzureBlobLease(rehydrated, blob, this.leaseOperationOptions);
647 
648         if (blobLease.getOffset() != null) {
649             this.latestCheckpoint.put(blobLease.getPartitionId(), blobLease.getCheckpoint());
650         }
651 
652         return blobLease;
653     }
654 
655     private void uploadLease(AzureBlobLease lease, CloudBlockBlob blob, AccessCondition condition, UploadActivity activity, BlobRequestOptions options)
656             throws StorageException, IOException {
657         if (activity != UploadActivity.Create) {
658             // It is possible for AzureBlobLease objects in memory to have stale offset/sequence number fields if a
659             // checkpoint was written but PartitionManager hasn't done its ten-second sweep which downloads new copies
660             // of all the leases. This can happen because we're trying to maintain the fiction that checkpoints and leases
661             // are separate -- which they can be in other implementations -- even though they are completely intertwined
662             // in this implementation. To prevent writing stale checkpoint data to the store, merge the checkpoint data
663             // from the most recently written checkpoint into this write, if needed.
664             Checkpoint cached = this.latestCheckpoint.get(lease.getPartitionId()); // HASHTABLE
665             if ((cached != null) && ((cached.getSequenceNumber() > lease.getSequenceNumber()) || (lease.getOffset() == null))) {
666                 lease.setOffset(cached.getOffset());
667                 lease.setSequenceNumber(cached.getSequenceNumber());
668                 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease,
669                         "Replacing stale offset/seqno while uploading lease"));
670             } else if (lease.getOffset() != null) {
671                 this.latestCheckpoint.put(lease.getPartitionId(), lease.getCheckpoint());
672             }
673         }
674 
675         String jsonLease = this.gson.toJson(lease);
676         blob.uploadText(jsonLease, null, condition, options, null);
677         // During create, we blindly try upload and it may throw. Doing the logging after the upload
678         // avoids a spurious trace in that case.
679         TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(lease,
680                 "Raw JSON uploading for " + activity + ": " + jsonLease));
681 
682         if ((activity == UploadActivity.Acquire) || (activity == UploadActivity.Release)) {
683             blob.downloadAttributes();
684             HashMap<String, String> metadata = blob.getMetadata();
685             switch (activity) {
686                 case Acquire:
687                     // Add owner in metadata
688                     metadata.put(AzureStorageCheckpointLeaseManager.METADATA_OWNER_NAME, lease.getOwner());
689                     break;
690 
691                 case Release:
692                     // Remove owner in metadata
693                     metadata.remove(AzureStorageCheckpointLeaseManager.METADATA_OWNER_NAME);
694                     break;
695 
696                 default:
697                     // Should never get here, but passing the metadata through unchanged is harmless.
698                     break;
699             }
700             blob.setMetadata(metadata);
701             blob.uploadMetadata(condition, options, null);
702         }
703         // else don't touch metadata
704     }
705 
706     private boolean wasLeaseLost(StorageException se, String partitionId) {
707         boolean retval = false;
708         TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "WAS LEASE LOST? Http " + se.getHttpStatusCode()));
709         if (se.getExtendedErrorInformation() != null) {
710             TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId,
711                     "Http " + se.getExtendedErrorInformation().getErrorCode() + " :: " + se.getExtendedErrorInformation().getErrorMessage()));
712         }
713         if ((se.getHttpStatusCode() == 409) || // conflict
714                 (se.getHttpStatusCode() == 412)) { // precondition failed
715             StorageExtendedErrorInformation extendedErrorInfo = se.getExtendedErrorInformation();
716             if (extendedErrorInfo != null) {
717                 String errorCode = extendedErrorInfo.getErrorCode();
718                 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "Error code: " + errorCode));
719                 TRACE_LOGGER.debug(this.hostContext.withHostAndPartition(partitionId, "Error message: " + extendedErrorInfo.getErrorMessage()));
720                 if ((errorCode.compareTo(StorageErrorCodeStrings.LEASE_LOST) == 0)
721                         || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_LEASE_OPERATION) == 0)
722                         || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ID_MISMATCH_WITH_BLOB_OPERATION) == 0)
723                         || (errorCode.compareTo(StorageErrorCodeStrings.LEASE_ALREADY_PRESENT) == 0)) {
724                     retval = true;
725                 }
726             }
727         }
728         return retval;
729     }
730 
731     private enum UploadActivity { Create, Acquire, Release, Update }
732 }