SchemaRegistryApacheAvroSerializer.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.data.schemaregistry.apacheavro;

import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.serializer.ObjectSerializer;
import com.azure.core.util.serializer.TypeReference;
import com.azure.data.schemaregistry.SchemaRegistryAsyncClient;
import com.azure.data.schemaregistry.models.SchemaFormat;
import com.azure.data.schemaregistry.models.SchemaProperties;
import org.apache.avro.Schema;
import reactor.core.publisher.Mono;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;

import static com.azure.core.util.FluxUtil.monoError;

/**
 * Schema Registry-based serializer implementation for Avro data format using Apache Avro.
 */
public final class SchemaRegistryApacheAvroSerializer implements ObjectSerializer {

    static final byte[] RECORD_FORMAT_INDICATOR = new byte[]{0x00, 0x00, 0x00, 0x00};
    static final int SCHEMA_ID_SIZE = 32;
    static final int RECORD_FORMAT_INDICATOR_SIZE = 4;

    private final ClientLogger logger = new ClientLogger(SchemaRegistryApacheAvroSerializer.class);
    private final SchemaRegistryAsyncClient schemaRegistryClient;
    private final AvroSerializer avroSerializer;
    private final SerializerOptions serializerOptions;

    /**
     * Creates a new instance.
     *
     * @param schemaRegistryClient Client that interacts with Schema Registry.
     * @param avroSerializer Serializer implemented using Apache Avro.
     * @param serializerOptions Options to configure the serializer with.
     */
    SchemaRegistryApacheAvroSerializer(SchemaRegistryAsyncClient schemaRegistryClient,
        AvroSerializer avroSerializer, SerializerOptions serializerOptions) {
        this.schemaRegistryClient = Objects.requireNonNull(schemaRegistryClient,
            "'schemaRegistryClient' cannot be null.");
        this.avroSerializer = Objects.requireNonNull(avroSerializer,
            "'avroSchemaRegistryUtils' cannot be null.");
        this.serializerOptions = Objects.requireNonNull(serializerOptions, "'serializerOptions' cannot be null.");
    }

    /**
     * Deserializes the {@code inputStream} into a strongly-typed object.
     *
     * @param inputStream The stream to read from.
     * @param typeReference Type reference of the strongly-typed object.
     * @param <T> Strongly typed object.
     *
     * @return The deserialized object. If {@code inputStream} is null then {@code null} is returned.
     *
     * @throws NullPointerException if {@code typeReference} is null.
     */
    @Override
    public <T> T deserialize(InputStream inputStream, TypeReference<T> typeReference) {
        return deserializeAsync(inputStream, typeReference).block();
    }

    /**
     * Deserializes the {@code inputStream} into a strongly-typed object.
     *
     * @param inputStream The stream to read from.
     * @param typeReference Type reference of the strongly-typed object.
     * @param <T> Strongly typed object.
     *
     * @return A Mono that completes with the deserialized object. If {@code inputStream} is null, then Mono completes
     *     with an empty Mono.
     *
     * @throws NullPointerException if {@code typeReference} is null.
     */
    @Override
    public <T> Mono<T> deserializeAsync(InputStream inputStream, TypeReference<T> typeReference) {
        if (inputStream == null) {
            return Mono.empty();
        }

        if (typeReference == null) {
            return monoError(logger, new NullPointerException("'typeReference' cannot be null."));
        }

        return Mono.fromCallable(() -> {
            byte[] payload = new byte[inputStream.available()];
            while (true) {
                if (inputStream.read(payload) == -1) {
                    break;
                }
            }
            return payload;
        })
            .flatMap(payload -> {
                if (payload.length == 0) {
                    return Mono.empty();
                }

                final ByteBuffer buffer = ByteBuffer.wrap(payload);
                final byte[] recordFormatIndicator = new byte[RECORD_FORMAT_INDICATOR_SIZE];

                buffer.get(recordFormatIndicator);

                if (!Arrays.equals(recordFormatIndicator, RECORD_FORMAT_INDICATOR)) {
                    return Mono.error(
                        new IllegalStateException("Illegal format: unsupported record format indicator in payload"));
                }

                final String schemaId = getSchemaIdFromPayload(buffer);

                return this.schemaRegistryClient.getSchema(schemaId)
                    .handle((registryObject, sink) -> {
                        byte[] payloadSchema = registryObject.getDefinition().getBytes(StandardCharsets.UTF_8);
                        int start = buffer.position() + buffer.arrayOffset();
                        int length = buffer.limit() - SCHEMA_ID_SIZE;
                        byte[] b = Arrays.copyOfRange(buffer.array(), start, start + length);

                        final T decode = avroSerializer.decode(b, payloadSchema, typeReference);
                        sink.next(decode);
                    });
            });
    }

    /**
     * Serializes the {@code object} into the {@code outputStream}.
     *
     * @param outputStream Output stream to write serialization of {@code object} to.
     * @param object The object to serialize into {@code outputStream}.
     *
     * @throws NullPointerException if {@code outputStream} or {@code object} is null.
     */
    @Override
    public void serialize(OutputStream outputStream, Object object) {
        serializeAsync(outputStream, object).block();
    }

    /**
     * Serializes the {@code object} into the {@code outputStream}.
     *
     * @param outputStream Output stream to write serialization of {@code object} to.
     * @param object The object to serialize into {@code outputStream}.
     *
     * @return A Mono that completes when the object has been serialized into the stream.
     *
     * @throws NullPointerException if {@code outputStream} or {@code object} is null.
     */
    @Override
    public Mono<Void> serializeAsync(OutputStream outputStream, Object object) {
        if (outputStream == null) {
            return monoError(logger, new NullPointerException("'outputStream' cannot be null."));
        }

        if (object == null) {
            return monoError(logger, new NullPointerException(
                "Null object, behavior should be defined in concrete serializer implementation."));
        }

        Schema schema;
        try {
            schema = AvroSerializer.getSchema(object);
        } catch (IllegalArgumentException exception) {
            return monoError(logger, exception);
        }

        final String schemaFullName = schema.getFullName();
        final String schemaString = schema.toString();

        return this.maybeRegisterSchema(serializerOptions.getSchemaGroup(), schemaFullName, schemaString)
            .handle((id, sink) -> {
                ByteBuffer recordFormatIndicatorBuffer = ByteBuffer
                    .allocate(RECORD_FORMAT_INDICATOR_SIZE)
                    .put(new byte[]{0x00, 0x00, 0x00, 0x00});
                ByteBuffer idBuffer = ByteBuffer
                    .allocate(SCHEMA_ID_SIZE)
                    .put(id.getBytes(StandardCharsets.UTF_8));
                try {
                    outputStream.write(recordFormatIndicatorBuffer.array());
                    outputStream.write(idBuffer.array());
                    outputStream.write(avroSerializer.encode(object));
                    sink.complete();
                } catch (IOException e) {
                    sink.error(new UncheckedIOException(e.getMessage(), e));
                }
            });
    }

    /**
     * If auto-registering is enabled, register schema against Schema Registry. If auto-registering is disabled, fetch
     * schema ID for provided schema. Requires pre-registering of schema against registry.
     *
     * @param schemaGroup Schema group where schema should be registered.
     * @param schemaName name of schema
     * @param schemaString string representation of schema being stored - must match group schema type
     *
     * @return string representation of schema ID
     */
    private Mono<String> maybeRegisterSchema(String schemaGroup, String schemaName, String schemaString) {
        if (serializerOptions.autoRegisterSchemas()) {
            return this.schemaRegistryClient
                .registerSchema(schemaGroup, schemaName, schemaString, SchemaFormat.AVRO)
                .map(SchemaProperties::getId);
        } else {
            return this.schemaRegistryClient.getSchemaProperties(
                schemaGroup, schemaName, schemaString, SchemaFormat.AVRO).map(properties -> properties.getId());
        }
    }

    /**
     * @param buffer full payload bytes
     *
     * @return String representation of schema ID
     */
    private static String getSchemaIdFromPayload(ByteBuffer buffer) {
        byte[] schemaGuidByteArray = new byte[SCHEMA_ID_SIZE];
        buffer.get(schemaGuidByteArray);

        return new String(schemaGuidByteArray, StandardCharsets.UTF_8);
    }
}