SynchronizedAccessor.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.identity.implementation;

import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

/**
 * Synchronizes reactor threads accessing/instantiating a common value {@code T}.
 *
 * @param <T> The value being instantiated / accessed.
 */
public class SynchronizedAccessor<T> {
    private final AtomicBoolean wip;
    private volatile T cache;
    private final ReplayProcessor<T> replayProcessor = ReplayProcessor.create(1);
    private final FluxSink<T> sink = replayProcessor.sink(FluxSink.OverflowStrategy.BUFFER);
    private final Supplier<T> supplier;

    public SynchronizedAccessor(Supplier<T> supplier) {
        this.wip = new AtomicBoolean(false);
        this.supplier = supplier;
    }

    /**
     * Get the value from the configured supplier.
     *
     * @return the output {@code T}
     */
    public Mono<T> getValue() {
        return Mono.defer(() -> {
            if (cache != null) {
                return Mono.just(cache);
            }
            if (!wip.getAndSet(true)) {
                try {
                    cache = supplier.get();
                    sink.next(cache);
                } catch (Exception e) {
                    sink.error(e);
                }
            }
            return replayProcessor.next();
        });
    }
}