LockContainer.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.messaging.servicebus.implementation;

import com.azure.core.util.logging.ClientLogger;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;

import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/**
 * Container to store items that are periodically cleaned.
 */
public class LockContainer<T> implements AutoCloseable {
    private final ClientLogger logger = new ClientLogger(LockContainer.class);
    private final ConcurrentHashMap<String, OffsetDateTime> lockTokenExpirationMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, T> lockTokenItemMap = new ConcurrentHashMap<>();
    private final AtomicBoolean isDisposed = new AtomicBoolean();
    private final Disposable cleanupOperation;
    private final Consumer<T> onExpired;

    public LockContainer(Duration cleanupInterval) {
        this(cleanupInterval, t -> {
        });
    }

    public LockContainer(Duration cleanupInterval, Consumer<T> onExpired) {
        Objects.requireNonNull(cleanupInterval, "'cleanupInterval' cannot be null.");

        this.onExpired = Objects.requireNonNull(onExpired, "'onExpired' cannot be null.");
        this.cleanupOperation = Flux.interval(cleanupInterval).subscribe(e -> {
            if (lockTokenExpirationMap.isEmpty()) {
                return;
            }

            final OffsetDateTime now = OffsetDateTime.now();
            final List<String> expired = lockTokenExpirationMap.entrySet().stream()
                .filter(entry -> entry.getValue() != null && entry.getValue().isBefore(now))
                .map(Map.Entry::getKey)
                .collect(Collectors.toList());

            expired.forEach(this::remove);
        });
    }

    /**
     * Adds or updates the expiration time on a lock token. If the expiration time in the container is larger than
     * {@code lockTokenExpiration}, then the current container value is used.
     *
     * @param lockToken Token to update associated lock expiration.
     * @param item Item to hold in the container.
     * @param lockTokenExpiration Time at which the lock token expires.
     *
     * @return The updated value in the container. If the expiration time in the container is larger than {@code
     *     lockTokenExpiration}, then the current container value is used.
     */
    public OffsetDateTime addOrUpdate(String lockToken, OffsetDateTime lockTokenExpiration, T item) {
        if (isDisposed.get()) {
            throw logger.logExceptionAsError(new IllegalStateException("Cannot perform operations on a disposed set."));
        }

        Objects.requireNonNull(lockToken, "'lockToken' cannot be null.");
        Objects.requireNonNull(item, "'item' cannot be null.");
        Objects.requireNonNull(lockTokenExpiration, "'lockTokenExpiration' cannot be null.");


        final OffsetDateTime computed = lockTokenExpirationMap.compute(lockToken, (key, existing) -> {
            if (existing == null) {
                return lockTokenExpiration;
            } else {
                return existing.isBefore(lockTokenExpiration)
                    ? lockTokenExpiration
                    : existing;
            }
        });

        lockTokenItemMap.put(lockToken, item);

        return computed;
    }

    /**
     * Gets whether or not the lock token is held in the container and has not expired.
     *
     * @param lockToken Lock token to check.
     *
     * @return {@code true} if the lock token is in the container and has not expired; {@code false} otherwise.
     */
    public boolean containsUnexpired(String lockToken) {
        if (isDisposed.get()) {
            throw logger.logExceptionAsError(new IllegalStateException("Cannot perform operations on a disposed set."));
        }

        final OffsetDateTime value = lockTokenExpirationMap.getOrDefault(lockToken, OffsetDateTime.MIN);
        return value.isAfter(OffsetDateTime.now());
    }

    /**
     * Removes the lock token from the map.
     *
     * @param lockToken Token to remove.
     */
    public void remove(String lockToken) {
        lockTokenExpirationMap.remove(lockToken);
        final T remove = lockTokenItemMap.remove(lockToken);

        if (remove != null) {
            onExpired.accept(remove);
        }
    }

    @Override
    public void close() {
        if (isDisposed.getAndSet(true)) {
            return;
        }

        cleanupOperation.dispose();

        for (String key : lockTokenExpirationMap.keySet().toArray(new String[0])) {
            remove(key);
        }
    }

    public boolean isClosed() {
        return isDisposed.get();
    }
}