RntbdMetrics.java
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.directconnectivity.rntbd;
import com.azure.cosmos.implementation.directconnectivity.RntbdTransportClient;
import com.azure.cosmos.implementation.guava25.net.PercentEscaper;
import com.codahale.metrics.ConsoleReporter;
import com.codahale.metrics.MetricRegistry;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Tags;
import io.micrometer.core.instrument.Timer;
import io.micrometer.core.instrument.composite.CompositeMeterRegistry;
import io.micrometer.core.instrument.config.NamingConvention;
import io.micrometer.core.instrument.distribution.HistogramSnapshot;
import io.micrometer.core.instrument.dropwizard.DropwizardConfig;
import io.micrometer.core.instrument.dropwizard.DropwizardMeterRegistry;
import io.micrometer.core.instrument.util.HierarchicalNameMapper;
import io.micrometer.core.lang.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
@SuppressWarnings("UnstableApiUsage")
@JsonPropertyOrder({
"tags", "concurrentRequests", "requests", "responseErrors", "responseSuccesses", "completionRate", "responseRate",
"requestSize", "responseSize", "channelsAcquired", "channelsAvailable", "requestQueueLength", "usedDirectMemory",
"usedHeapMemory"
})
public final class RntbdMetrics {
// region Fields
private static final PercentEscaper PERCENT_ESCAPER = new PercentEscaper("_-", false);
private static final Logger logger = LoggerFactory.getLogger(RntbdMetrics.class);
private static final CompositeMeterRegistry registry = new CompositeMeterRegistry();
static {
try {
int step = Integer.getInteger("azure.cosmos.monitoring.consoleLogging.step", 0);
if (step > 0) {
RntbdMetrics.add(RntbdMetrics.consoleLoggingRegistry(step));
}
} catch (Throwable error) {
logger.error("failed to initialize console logging registry due to ", error);
}
}
private final RntbdEndpoint endpoint;
private final DistributionSummary requestSize;
private final Timer requests;
private final Timer responseErrors;
private final DistributionSummary responseSize;
private final Timer responseSuccesses;
private final Tags tags;
private final RntbdTransportClient transportClient;
// endregion
// region Constructors
public RntbdMetrics(RntbdTransportClient client, RntbdEndpoint endpoint) {
this.transportClient = client;
this.endpoint = endpoint;
this.tags = Tags.of(client.tag(), endpoint.tag());
this.requests = registry.timer(nameOf("requests"), tags);
this.responseErrors = registry.timer(nameOf("responseErrors"), tags);
this.responseSuccesses = registry.timer(nameOf("responseSuccesses"), tags);
Gauge.builder(nameOf("endpoints"), client, RntbdTransportClient::endpointCount)
.description("endpoint count")
.tag(client.tag().getKey(), client.tag().getValue())
.register(registry);
Gauge.builder(nameOf("endpointsEvicted"), client, RntbdTransportClient::endpointEvictionCount)
.description("endpoint eviction count")
.tag(client.tag().getKey(), client.tag().getValue())
.register(registry);
Gauge.builder(nameOf("concurrentRequests"), endpoint, RntbdEndpoint::concurrentRequests)
.description("executing or queued request count")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("requestQueueLength"), endpoint, RntbdEndpoint::requestQueueLength)
.description("queued request count")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("channelsAcquired"), endpoint, RntbdEndpoint::channelsAcquiredMetric)
.description("acquired channel count")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("channelsAvailable"), endpoint, RntbdEndpoint::channelsAvailableMetric)
.description("available channel count")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("usedDirectMemory"), endpoint, x -> x.usedDirectMemory())
.description("Java direct memory usage (MiB)")
.baseUnit("bytes")
.tags(this.tags)
.register(registry);
Gauge.builder(nameOf("usedHeapMemory"), endpoint, x -> x.usedHeapMemory())
.description("Java heap memory usage (MiB)")
.baseUnit("MiB")
.tags(this.tags)
.register(registry);
this.requestSize = DistributionSummary.builder(nameOf("requestSize"))
.description("Request size (bytes)")
.baseUnit("bytes")
.tags(this.tags)
.register(registry);
this.responseSize = DistributionSummary.builder(nameOf("responseSize"))
.description("Response size (bytes)")
.baseUnit("bytes")
.tags(this.tags)
.register(registry);
}
// endregion
// region Accessors
public static void add(MeterRegistry registry) {
RntbdMetrics.registry.add(registry);
}
@JsonProperty
public int channelsAcquired() {
return this.endpoint.channelsAcquiredMetric();
}
@JsonProperty
public int channelsAvailable() {
return this.endpoint.channelsAvailableMetric();
}
/***
* Computes the number of successful (non-error) responses received divided by the number of completed requests.
*
* @return number of successful (non-error) responses received divided by the number of completed requests.
*/
@JsonProperty
public double completionRate() {
return this.responseSuccesses.count() / (double) this.requests.count();
}
@JsonProperty
public long concurrentRequests() {
return this.endpoint.concurrentRequests();
}
@JsonProperty
public int endpoints() {
return this.transportClient.endpointCount();
}
@JsonProperty
public int requestQueueLength() {
return this.endpoint.requestQueueLength();
}
@JsonProperty
public HistogramSnapshot requestSize() {
return this.requestSize.takeSnapshot();
}
@JsonProperty
public HistogramSnapshot requests() {
return this.requests.takeSnapshot();
}
@JsonProperty
public HistogramSnapshot responseErrors() {
return this.responseErrors.takeSnapshot();
}
/***
* Computes the number of successful (non-error) responses received divided by the number of requests sent
*
* @return The number of successful (non-error) responses received divided by the number of requests sent
*/
@JsonProperty
public double responseRate() {
return this.responseSuccesses.count() / (double) (this.requests.count() + this.endpoint.concurrentRequests());
}
@JsonProperty
public HistogramSnapshot responseSize() {
return this.responseSize.takeSnapshot();
}
@JsonProperty
public HistogramSnapshot responseSuccesses() {
return this.responseSuccesses.takeSnapshot();
}
@JsonProperty
public Iterable<Tag> tags() {
return this.tags;
}
@JsonProperty
public long usedDirectMemory() {
return this.endpoint.usedDirectMemory();
}
@JsonProperty
public long usedHeapMemory() {
return this.endpoint.usedHeapMemory();
}
// endregion
// region Methods
public void markComplete(RntbdRequestRecord requestRecord) {
requestRecord.stop(this.requests, requestRecord.isCompletedExceptionally()
? this.responseErrors
: this.responseSuccesses);
this.requestSize.record(requestRecord.requestLength());
this.responseSize.record(requestRecord.responseLength());
}
@Override
public String toString() {
return RntbdObjectMapper.toString(this);
}
// endregion
// region Private
static String escape(String value) {
return PERCENT_ESCAPER.escape(value);
}
private static MeterRegistry consoleLoggingRegistry(final int step) {
final MetricRegistry dropwizardRegistry = new MetricRegistry();
ConsoleReporter consoleReporter = ConsoleReporter
.forRegistry(dropwizardRegistry)
.convertRatesTo(TimeUnit.SECONDS)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.build();
consoleReporter.start(step, TimeUnit.SECONDS);
DropwizardConfig dropwizardConfig = new DropwizardConfig() {
@Override
public String get(@Nullable String key) {
return null;
}
@Override
public String prefix() {
return "console";
}
};
final MeterRegistry consoleLoggingRegistry = new DropwizardMeterRegistry(
dropwizardConfig, dropwizardRegistry, HierarchicalNameMapper.DEFAULT, Clock.SYSTEM) {
@Override
protected Double nullGaugeValue() {
return Double.NaN;
}
};
consoleLoggingRegistry.config().namingConvention(NamingConvention.dot);
return consoleLoggingRegistry;
}
private static String nameOf(final String member) {
return "azure.cosmos.directTcp." + member;
}
// endregion
}