EventGridPublisherAsyncClient.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.messaging.eventgrid;
import com.azure.core.annotation.ReturnType;
import com.azure.core.annotation.ServiceClient;
import com.azure.core.annotation.ServiceMethod;
import com.azure.core.credential.AzureKeyCredential;
import com.azure.core.credential.AzureSasCredential;
import com.azure.core.http.HttpPipeline;
import com.azure.core.http.rest.Response;
import com.azure.core.models.CloudEvent;
import com.azure.core.util.BinaryData;
import com.azure.core.util.Context;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.tracing.TracerProxy;
import com.azure.messaging.eventgrid.implementation.Constants;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImpl;
import com.azure.messaging.eventgrid.implementation.EventGridPublisherClientImplBuilder;
import com.fasterxml.jackson.databind.util.RawValue;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import static com.azure.core.util.FluxUtil.monoError;
import static com.azure.core.util.FluxUtil.withContext;
import static com.azure.core.util.tracing.Tracer.AZ_TRACING_NAMESPACE_KEY;
/**
* A service client that publishes events to an EventGrid topic or domain asynchronously.
* Use {@link EventGridPublisherClientBuilder} to create an instance of this client.
*
* <p><strong>Create EventGridPublisherAsyncClient for CloudEvent Samples</strong></p>
* <!-- src_embed com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#CreateCloudEventClient -->
* <pre>
* // Create a client to send events of CloudEvent schema (com.azure.core.models.CloudEvent)
* EventGridPublisherAsyncClient<CloudEvent> cloudEventPublisherClient = new EventGridPublisherClientBuilder()
* .endpoint(System.getenv("AZURE_EVENTGRID_CLOUDEVENT_ENDPOINT")) // make sure it accepts CloudEvent
* .credential(new AzureKeyCredential(System.getenv("AZURE_EVENTGRID_CLOUDEVENT_KEY")))
* .buildCloudEventPublisherAsyncClient();
* </pre>
* <!-- end com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#CreateCloudEventClient -->
*
* <p><strong>Send CloudEvent Samples</strong></p>
* <!-- src_embed com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#SendCloudEvent -->
* <pre>
* // Create a com.azure.models.CloudEvent.
* User user = new User("Stephen", "James");
* CloudEvent cloudEventDataObject = new CloudEvent("/cloudevents/example/source", "Example.EventType",
* BinaryData.fromObject(user), CloudEventDataFormat.JSON, "application/json");
*
* // Send a single CloudEvent
* cloudEventPublisherClient.sendEvent(cloudEventDataObject).block();
*
* // Send a list of CloudEvents to the EventGrid service altogether.
* // This has better performance than sending one by one.
* cloudEventPublisherClient.sendEvents(Arrays.asList(
* cloudEventDataObject
* // add more CloudEvents objects
* )).block();
* </pre>
* <!-- end com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#SendCloudEvent -->
*
* <p><strong>Create EventGridPublisherAsyncClient for EventGridEvent Samples</strong></p>
* <!-- src_embed com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#CreateEventGridEventClient -->
* <pre>
* // Create a client to send events of EventGridEvent schema
* EventGridPublisherAsyncClient<EventGridEvent> eventGridEventPublisherClient = new EventGridPublisherClientBuilder()
* .endpoint(System.getenv("AZURE_EVENTGRID_EVENT_ENDPOINT")) // make sure it accepts EventGridEvent
* .credential(new AzureKeyCredential(System.getenv("AZURE_EVENTGRID_EVENT_KEY")))
* .buildEventGridEventPublisherAsyncClient();
* </pre>
* <!-- end com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#CreateEventGridEventClient -->
*
* <p><strong>Send EventGridEvent Samples</strong></p>
* <!-- src_embed com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#SendEventGridEvent -->
* <pre>
* // Create an EventGridEvent
* User user = new User("John", "James");
* EventGridEvent eventGridEvent = new EventGridEvent("/EventGridEvents/example/source",
* "Example.EventType", BinaryData.fromObject(user), "0.1");
*
* // Send a single EventGridEvent
* eventGridEventPublisherClient.sendEvent(eventGridEvent).block();
*
* // Send a list of EventGridEvents to the EventGrid service altogether.
* // This has better performance than sending one by one.
* eventGridEventPublisherClient.sendEvents(Arrays.asList(
* eventGridEvent
* // add more EventGridEvents objects
* )).block();
* </pre>
* <!-- end com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#SendEventGridEvent -->
*
* <p><strong>Create EventGridPublisherAsyncClient for Custom Event Schema Samples</strong></p>
* <!-- src_embed com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#CreateCustomEventClient -->
* <pre>
* // Create a client to send events of custom event
* EventGridPublisherAsyncClient<BinaryData> customEventPublisherClient = new EventGridPublisherClientBuilder()
* .endpoint(System.getenv("AZURE_CUSTOM_EVENT_ENDPOINT")) // make sure it accepts custom events
* .credential(new AzureKeyCredential(System.getenv("AZURE_CUSTOM_EVENT_KEY")))
* .buildCustomEventPublisherAsyncClient();
* </pre>
* <!-- end com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#CreateCustomEventClient -->
*
* <p><strong>Send Custom Event Schema Samples</strong></p>
* <!-- src_embed com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#SendCustomEvent -->
* <pre>
* // Create an custom event object (both POJO and Map work)
* Map<String, Object> customEvent = new HashMap<String, Object>() {
* {
* put("id", UUID.randomUUID().toString());
* put("subject", "Test");
* put("foo", "bar");
* put("type", "Microsoft.MockPublisher.TestEvent");
* put("data", 100.0);
* put("dataVersion", "0.1");
* }
* };
*
* // Send a single custom event
* customEventPublisherClient.sendEvent(BinaryData.fromObject(customEvent)).block();
*
* // Send a list of EventGridEvents to the EventGrid service altogether.
* // This has better performance than sending one by one.
* customEventPublisherClient.sendEvents(Arrays.asList(
* BinaryData.fromObject(customEvent)
* // add more custom events in BinaryData
* )).block();
* </pre>
* <!-- end com.azure.messaging.eventgrid.EventGridPublisherAsyncClient#SendCustomEvent -->
*
* @see EventGridEvent
* @see com.azure.core.models.CloudEvent
*/
@ServiceClient(builder = EventGridPublisherClientBuilder.class, isAsync = true)
public final class EventGridPublisherAsyncClient<T> {
private final String hostname;
private final EventGridPublisherClientImpl impl;
private final ClientLogger logger = new ClientLogger(EventGridPublisherAsyncClient.class);
private final Class<T> eventClass;
private static final DateTimeFormatter SAS_DATE_TIME_FORMATER = DateTimeFormatter.ofPattern("M/d/yyyy h:m:s a");
private static final String HMAC_SHA256 = "hmacSHA256";
private static final String API_VERSION = "api-version";
private static final ClientLogger LOGGER = new ClientLogger(EventGridPublisherAsyncClient.class);
EventGridPublisherAsyncClient(HttpPipeline pipeline, String hostname, EventGridServiceVersion serviceVersion,
Class<T> eventClass) {
this.impl = new EventGridPublisherClientImplBuilder()
.pipeline(pipeline)
.apiVersion(serviceVersion.getVersion())
.buildClient();
this.hostname = hostname;
this.eventClass = eventClass;
}
/**
* Generate a shared access signature to provide time-limited authentication for requests to the Event Grid
* service with the latest Event Grid service API defined in {@link EventGridServiceVersion#getLatest()}.
* @param endpoint the endpoint of the Event Grid topic or domain.
* @param expirationTime the time in which the signature should expire, no longer providing authentication.
* @param keyCredential the access key obtained from the Event Grid topic or domain.
*
* @return the shared access signature string which can be used to construct an instance of
* {@link AzureSasCredential}.
*
* @throws NullPointerException if endpoint, keyCredential or expirationTime is {@code null}.
* @throws RuntimeException if java security doesn't have algorithm "hmacSHA256".
*/
public static String generateSas(String endpoint, AzureKeyCredential keyCredential, OffsetDateTime expirationTime) {
return generateSas(endpoint, keyCredential, expirationTime, EventGridServiceVersion.getLatest());
}
/**
* Generate a shared access signature to provide time-limited authentication for requests to the Event Grid
* service.
* @param endpoint the endpoint of the Event Grid topic or domain.
* @param expirationTime the time in which the signature should expire, no longer providing authentication.
* @param keyCredential the access key obtained from the Event Grid topic or domain.
* @param apiVersion the EventGrid service api version defined in {@link EventGridServiceVersion}
*
* @return the shared access signature string which can be used to construct an instance of
* {@link AzureSasCredential}.
*
* @throws NullPointerException if endpoint, keyCredential or expirationTime is {@code null}.
* @throws RuntimeException if java security doesn't have algorithm "hmacSHA256".
*/
public static String generateSas(String endpoint, AzureKeyCredential keyCredential, OffsetDateTime expirationTime,
EventGridServiceVersion apiVersion) {
if (Objects.isNull(endpoint)) {
throw LOGGER.logExceptionAsError(new NullPointerException("'endpoint' cannot be null."));
}
if (Objects.isNull(keyCredential)) {
throw LOGGER.logExceptionAsError(new NullPointerException("'keyCredetial' cannot be null."));
}
if (Objects.isNull(expirationTime)) {
throw LOGGER.logExceptionAsError(new NullPointerException("'expirationTime' cannot be null."));
}
try {
String resKey = "r";
String expKey = "e";
String signKey = "s";
Charset charset = StandardCharsets.UTF_8;
endpoint = String.format("%s?%s=%s", endpoint, API_VERSION, apiVersion.getVersion());
String encodedResource = URLEncoder.encode(endpoint, charset.name());
String encodedExpiration = URLEncoder.encode(expirationTime.atZoneSameInstant(ZoneOffset.UTC).format(
SAS_DATE_TIME_FORMATER),
charset.name());
String unsignedSas = String.format("%s=%s&%s=%s", resKey, encodedResource, expKey, encodedExpiration);
Mac hmac = Mac.getInstance(HMAC_SHA256);
hmac.init(new SecretKeySpec(Base64.getDecoder().decode(keyCredential.getKey()), HMAC_SHA256));
String signature = new String(Base64.getEncoder().encode(
hmac.doFinal(unsignedSas.getBytes(charset))),
charset);
String encodedSignature = URLEncoder.encode(signature, charset.name());
return String.format("%s&%s=%s", unsignedSas, signKey, encodedSignature);
} catch (NoSuchAlgorithmException | UnsupportedEncodingException | InvalidKeyException e) {
throw LOGGER.logExceptionAsError(new RuntimeException(e));
}
}
/**
* Publishes the given events to the set topic or domain.
* @param events the events to publish.
*
* @return A {@link Mono} that completes when the events are sent to the service.
* @throws NullPointerException if events is {@code null}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendEvents(Iterable<T> events) {
return withContext(context -> sendEvents(events, context));
}
@SuppressWarnings("unchecked")
Mono<Void> sendEvents(Iterable<T> events, Context context) {
if (this.eventClass == CloudEvent.class) {
return this.sendCloudEvents((Iterable<CloudEvent>) events, context);
} else if (this.eventClass == EventGridEvent.class) {
return this.sendEventGridEvents((Iterable<EventGridEvent>) events, context);
} else {
return this.sendCustomEvents((Iterable<BinaryData>) events, context);
}
}
/**
* Publishes the given events to the set topic or domain and gives the response issued by EventGrid.
* @param events the events to publish.
*
* @return the response from the EventGrid service.
* @throws NullPointerException if events is {@code null}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Response<Void>> sendEventsWithResponse(Iterable<T> events) {
return withContext(context -> this.sendEventsWithResponse(events, context));
}
@SuppressWarnings("unchecked")
Mono<Response<Void>> sendEventsWithResponse(Iterable<T> events, Context context) {
if (this.eventClass == CloudEvent.class) {
return this.sendCloudEventsWithResponse((Iterable<CloudEvent>) events, context);
} else if (this.eventClass == EventGridEvent.class) {
return this.sendEventGridEventsWithResponse((Iterable<EventGridEvent>) events, context);
} else {
return this.sendCustomEventsWithResponse((Iterable<BinaryData>) events, context);
}
}
/**
* Publishes the given events to the set topic or domain.
* @param event the event to publish.
*
* @return A {@link Mono} that completes when the event is sent to the service.
* @throws NullPointerException if events is {@code null}.
*/
@ServiceMethod(returns = ReturnType.SINGLE)
public Mono<Void> sendEvent(T event) {
List<T> events = Collections.singletonList(event);
return withContext(context -> sendEvents(events, context));
}
Mono<Void> sendEventGridEvents(Iterable<EventGridEvent> events, Context context) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
Mono<Void> sendCloudEvents(Iterable<CloudEvent> events, Context context) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
Mono<Void> sendCustomEvents(Iterable<BinaryData> events, Context context) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(event -> (Object) new RawValue(event.toString()))
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
Mono<Response<Void>> sendEventGridEventsWithResponse(Iterable<EventGridEvent> events, Context context) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(EventGridEvent::toImpl)
.collectList()
.flatMap(list -> this.impl.publishEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
Mono<Response<Void>> sendCloudEventsWithResponse(Iterable<CloudEvent> events, Context context) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
final Context finalContext = context != null ? context : Context.NONE;
this.addCloudEventTracePlaceHolder(events);
return Flux.fromIterable(events)
.collectList()
.flatMap(list -> this.impl.publishCloudEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
Mono<Response<Void>> sendCustomEventsWithResponse(Iterable<BinaryData> events, Context context) {
if (events == null) {
return monoError(logger, new NullPointerException("'events' cannot be null."));
}
final Context finalContext = context != null ? context : Context.NONE;
return Flux.fromIterable(events)
.map(event -> (Object) new RawValue(event.toString()))
.collectList()
.flatMap(list -> this.impl.publishCustomEventEventsWithResponseAsync(this.hostname, list,
finalContext.addData(AZ_TRACING_NAMESPACE_KEY, Constants.EVENT_GRID_TRACING_NAMESPACE_VALUE)));
}
private void addCloudEventTracePlaceHolder(Iterable<CloudEvent> events) {
if (TracerProxy.isTracingEnabled()) {
for (CloudEvent event : events) {
if (event.getExtensionAttributes() == null
|| (event.getExtensionAttributes().get(Constants.TRACE_PARENT) == null
&& event.getExtensionAttributes().get(Constants.TRACE_STATE) == null)) {
event.addExtensionAttribute(Constants.TRACE_PARENT, Constants.TRACE_PARENT_PLACEHOLDER_UUID);
event.addExtensionAttribute(Constants.TRACE_STATE, Constants.TRACE_STATE_PLACEHOLDER_UUID);
}
}
}
}
}