EventGridPublisherAsyncClient.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventgrid;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.SerializerAdapter;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.Constants;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImpl;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImplBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
/**
* A service client that publishes events to an EventGrid topic or domain. Use {@link EventGridPublisherClientBuilder}
* to create an instance of this client. This uses Project Reactor (https://projectreactor.io/) to handle asynchronous
* programming.
* @see EventGridEvent
* @see CloudEvent
*/
@ServiceClient(builder = EventGridPublisherClientBuilder.class, isAsync = true)
public final class EventGridPublisherAsyncClient {
private final String hostname;
private final EventGridPublisherClientImpl impl;
private final EventGridServiceVersion serviceVersion;
private final ClientLogger logger = new ClientLogger(EventGridPublisherAsyncClient.class);
EventGridPublisherAsyncClient(HttpPipeline pipeline, String hostname, SerializerAdapter serializerAdapter,
EventGridServiceVersion serviceVersion) {
this.impl = new EventGridPublisherClientImplBuilder()
.pipeline(pipeline)
.serializerAdapter(serializerAdapter)
.buildClient();
// currently the service version is hardcoded into the Impl client, but once another service version gets
// released we should add this to the impl builder options
this.serviceVersion = serviceVersion;
this.hostname = hostname;
}
/**
* Get the service version of the Rest API.
* @return the Service version of the rest API
*/
public EventGridServiceVersion getServiceVersion() {
return this.serviceVersion;
}
/**
* Publishes the given EventGrid events to the set topic or domain.
* @param events the EventGrid events to publish.
*
* @return the completion.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendEvents(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEvents(events, context));
}
Mono<Void> sendEvents(Iterable<EventGridEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
/**
* Publishes the given cloud events to the set topic or domain.
* @param events the cloud events to publish.
*
* @return the completion.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCloudEvents(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEvents(events, context));
}
Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
/**
* Publishes the given custom events to the set topic or domain.
* @param events the custom events to publish.
*
* @return the completion.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendCustomEvents(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEvents(events, context));
}
Mono<Void> sendCustomEvents(Iterable<Object> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
/**
* Publishes the given EventGrid events to the set topic or domain and gives the response issued by EventGrid.
* @param events the EventGrid events to publish.
*
* @return the response from the EventGrid service.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendEventsWithResponse(events, context));
}
Mono<Response<Void>> sendEventsWithResponse(Iterable<EventGridEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
/**
* Publishes the given cloud events to the set topic or domain and gives the response issued by EventGrid.
* @param events the cloud events to publish.
*
* @return the response from the EventGrid service.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCloudEventsWithResponse(events, context));
}
Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.map(CloudEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
/**
* Publishes the given custom events to the set topic or domain and gives the response issued by EventGrid.
* @param events the custom events to publish.
*
* @return the response from the EventGrid service.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
return withContext(context -> sendCustomEventsWithResponse(events, context));
}
Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<Object> events, Context context) {
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
private void addCloudEventTracePlaceHolder(Iterable<CloudEvent> events) {
if (TracerProxy.isTracingEnabled()) {
for (CloudEvent event : events) {
if (event.getExtensionAttributes() == null ||
(event.getExtensionAttributes().get(Constants.TRACE_PARENT) == null &&
event.getExtensionAttributes().get(Constants.TRACE_STATE) == null)) {
event.addExtensionAttribute(Constants.TRACE_PARENT, Constants.TRACE_PARENT_PLACEHOLDER_UUID);
event.addExtensionAttribute(Constants.TRACE_STATE, Constants.TRACE_STATE_PLACEHOLDER_UUID);
}
}
}
}
}