ExternalChildResourceCollectionImpl.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.resourcemanager.resources.fluentcore.arm.collection.implementation;
import com.azure.resourcemanager.resources.fluentcore.arm.models.ExternalChildResource;
import com.azure.resourcemanager.resources.fluentcore.arm.models.implementation.ExternalChildResourceImpl;
import com.azure.resourcemanager.resources.fluentcore.dag.TaskGroup;
import reactor.core.Exceptions;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
/**
* Base class for cached {@link ExternalChildResourcesCachedImpl}
* and non-cached {@link ExternalChildResourcesNonCachedImpl}
* externalized child resource collection.
* (Internal use only)
*
* @param <FluentModelTImpl> the implementation of {@param FluentModelT}
* @param <FluentModelT> the fluent model type of the child resource
* @param <InnerModelT> Azure inner resource class type representing the child resource
* @param <ParentImplT> the parent Azure resource impl class type that implements {@link ParentT}
* @param <ParentT> the parent interface
*/
public abstract class ExternalChildResourceCollectionImpl<
FluentModelTImpl extends ExternalChildResourceImpl<FluentModelT, InnerModelT, ParentImplT, ParentT>,
FluentModelT extends ExternalChildResource<FluentModelT, ParentT>,
InnerModelT,
ParentImplT extends ParentT,
ParentT> {
/**
* The parent resource of this collection of child resources.
*/
private final ParentImplT parent;
/**
* the TaskGroup of parent resource. This is used to schedule the "post run" works
* when post run is enabled via {@link this#enablePostRunMode()}
*/
private final TaskGroup parentTaskGroup;
/**
* The child resource instances that this collection contains.
*/
protected ConcurrentMap<String, FluentModelTImpl> childCollection = new ConcurrentSkipListMap<>();
/**
* Indicates how the pending operations on the child resources are performed, true
* if operations are performed through "post run" task, false if operations are
* performed via explicit call to {@link this#commitAsync()}.
*/
private boolean isPostRunMode;
/**
* Used to construct error string, this is user friendly name of the child resource (e.g. Subnet, Extension).
*/
protected final String childResourceName;
/**
* Creates a new ExternalChildResourcesImpl.
*
* @param parent the parent Azure resource
* @param parentTaskGroup the TaskGroup the parent Azure resource belongs to
* @param childResourceName the child resource name
*/
protected ExternalChildResourceCollectionImpl(ParentImplT parent,
TaskGroup parentTaskGroup, String childResourceName) {
this.parent = parent;
this.parentTaskGroup = parentTaskGroup;
this.childResourceName = childResourceName;
this.isPostRunMode = true;
}
/**
* Indicates that the pending operations on the resources are performed as "post run" tasks.
*/
public void enablePostRunMode() {
this.isPostRunMode = true;
}
/**
* Indicates that the pending operations on the resources are performed via explicit call to
* {@link this#commitAsync()}.
*/
public void enableCommitMode() {
this.isPostRunMode = false;
}
/**
* Clear the child collection.
*/
public void clear() {
for (FluentModelTImpl child : childCollection.values()) {
child.clear();
}
this.childCollection.clear();
}
/**
* Mark the given child resource as the post run dependent of the parent of this collection.
*
* @param childResource the child resource
*/
protected FluentModelTImpl prepareForFutureCommitOrPostRun(FluentModelTImpl childResource) {
if (this.isPostRunMode) {
if (!childResource.taskGroup().dependsOn(this.parentTaskGroup)) {
this.parentTaskGroup.addPostRunDependentTaskGroup(childResource.taskGroup());
}
}
return childResource;
}
/**
* Commits the changes in the external child resource childCollection.
* <p/>
* This method returns a Flux stream, its Flux's onNext will be called for each successfully
* committed resource followed by one call to 'onCompleted' or one call to 'onError' with a
* {@link RuntimeException} containing the list of exceptions where each exception describes the reason
* for failure of a resource commit.
*
* @return the Flux stream
*/
public Flux<FluentModelTImpl> commitAsync() {
if (this.isPostRunMode) {
return Flux.error(
new IllegalStateException("commitAsync() cannot be invoked when 'post run' mode is enabled"));
}
final ExternalChildResourceCollectionImpl<FluentModelTImpl, FluentModelT, InnerModelT, ParentImplT, ParentT>
self = this;
List<FluentModelTImpl> items = new ArrayList<>();
for (FluentModelTImpl item : this.childCollection.values()) {
items.add(item);
}
final List<Throwable> exceptionsList = Collections.synchronizedList(new ArrayList<>());
final List<FluentModelTImpl> successfullyRemoved = new ArrayList<>();
ReplayProcessor<FluentModelTImpl> aggregatedErrorStream = ReplayProcessor.create();
Flux<FluentModelTImpl> deleteStream = Flux.fromIterable(items)
.filter(childResource ->
childResource.pendingOperation() == ExternalChildResourceImpl.PendingOperation.ToBeRemoved)
.flatMap(childResource -> childResource.deleteResourceAsync()
.map(response -> childResource)
.doOnSuccess(fluentModelT -> {
childResource.setPendingOperation(ExternalChildResourceImpl.PendingOperation.None);
self.childCollection.remove(childResource.name());
successfullyRemoved.add(childResource);
})
.onErrorResume(throwable -> {
exceptionsList.add(throwable);
return Mono.empty();
}));
Flux<FluentModelTImpl> createStream = Flux.fromIterable(items)
.filter(childResource ->
childResource.pendingOperation() == ExternalChildResourceImpl.PendingOperation.ToBeCreated)
.flatMap(childResource -> childResource.createResourceAsync()
.map(fluentModelT -> childResource)
.doOnNext(fluentModelT ->
childResource.setPendingOperation(ExternalChildResourceImpl.PendingOperation.None))
.onErrorResume(throwable -> {
self.childCollection.remove(childResource.name());
exceptionsList.add(throwable);
return Mono.empty();
}));
Flux<FluentModelTImpl> updateStream = Flux.fromIterable(items)
.filter(childResource ->
childResource.pendingOperation() == ExternalChildResourceImpl.PendingOperation.ToBeUpdated)
.flatMap(childResource -> childResource.updateResourceAsync()
.map(e -> childResource)
.doOnNext(resource ->
resource.setPendingOperation(ExternalChildResourceImpl.PendingOperation.None))
.onErrorResume(throwable -> {
exceptionsList.add(throwable);
return Mono.empty();
}));
Flux<FluentModelTImpl> operationsStream = Flux.merge(deleteStream, createStream, updateStream)
.doOnTerminate(() -> {
if (clearAfterCommit()) {
self.childCollection.clear();
}
if (successfullyRemoved.size() > 0) {
for (FluentModelTImpl removed : successfullyRemoved) {
aggregatedErrorStream.sink().next(removed);
}
}
if (!exceptionsList.isEmpty()) {
aggregatedErrorStream.sink().error(Exceptions.multiple(exceptionsList));
} else {
aggregatedErrorStream.sink().complete();
}
});
return Flux.concat(operationsStream, aggregatedErrorStream);
}
/**
* Commits the changes in the external child resource childCollection.
* <p/>
* This method returns a observable stream, either its observer's onError will be called with
* {@link RuntimeException} if some resources failed to commit or onNext will be called if all resources
* committed successfully.
*
* @return the Mono stream
*/
public Mono<List<FluentModelTImpl>> commitAndGetAllAsync() {
return commitAsync().collect(() -> new ArrayList<FluentModelTImpl>(),
(state, item) -> state.add(item));
}
/**
* Finds a child resource with the given key.
*
* @param key the child resource key
* @return null if no child resource exists with the given name else the child resource
*/
protected FluentModelTImpl find(String key) {
for (Map.Entry<String, FluentModelTImpl> entry : this.childCollection.entrySet()) {
if (entry.getKey().equalsIgnoreCase(key)) {
return entry.getValue();
}
}
return null;
}
/**
* @return the parent Azure resource of the external child resource
*/
protected ParentImplT getParent() {
return parent;
}
/**
* @return true if the child resource collection needs to be cleared after the commit.
*/
protected abstract boolean clearAfterCommit();
}