OrderByUtils.java

// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package com.azure.cosmos.implementation.query;

import com.azure.cosmos.implementation.ClientSideRequestStatistics;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.Utils;
import com.azure.cosmos.implementation.feedranges.FeedRangeEpkImpl;
import com.azure.cosmos.implementation.query.orderbyquery.OrderByRowResult;
import com.azure.cosmos.implementation.query.orderbyquery.OrderbyRowComparer;
import com.azure.cosmos.implementation.BadRequestException;
import com.azure.cosmos.BridgeInternal;
import com.azure.cosmos.models.ModelBridgeInternal;
import com.azure.cosmos.implementation.Resource;
import com.azure.cosmos.implementation.QueryMetrics;
import com.azure.cosmos.implementation.RequestChargeTracker;
import com.azure.cosmos.implementation.ResourceId;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair;
import reactor.core.publisher.Flux;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;

class OrderByUtils {

    public static <T extends Resource> Flux<OrderByRowResult<T>> orderedMerge(Class<T> klass,
                                                                              OrderbyRowComparer<T> consumeComparer,
                                                                              RequestChargeTracker tracker,
                                                                              List<DocumentProducer<T>> documentProducers,
                                                                              Map<String, QueryMetrics> queryMetricsMap,
                                                                              Map<FeedRangeEpkImpl, OrderByContinuationToken> targetRangeToOrderByContinuationTokenMap,
                                                                              List<ClientSideRequestStatistics> clientSideRequestStatisticsList) {
        @SuppressWarnings("unchecked")
        Flux<OrderByRowResult<T>>[] fluxes = documentProducers
                .subList(0, documentProducers.size())
                .stream()
                .map(producer ->
                        toOrderByQueryResultObservable(klass, producer, tracker, queryMetricsMap,
                                                       targetRangeToOrderByContinuationTokenMap,
                                                       consumeComparer.getSortOrders(), clientSideRequestStatisticsList))
                .toArray(Flux[]::new);
        return Flux.mergeOrdered(consumeComparer, fluxes);
    }

    private static <T extends Resource> Flux<OrderByRowResult<T>> toOrderByQueryResultObservable(Class<T> klass,
                                                                                                 DocumentProducer<T> producer,
                                                                                                 RequestChargeTracker tracker,
                                                                                                 Map<String, QueryMetrics> queryMetricsMap,
                                                                                                 Map<FeedRangeEpkImpl, OrderByContinuationToken> targetRangeToOrderByContinuationTokenMap,
                                                                                                 List<SortOrder> sortOrders,
                                                                                                 List<ClientSideRequestStatistics> clientSideRequestStatisticsList) {
        return producer
                .produceAsync()
                   .transformDeferred(new OrderByUtils.PageToItemTransformer<T>(klass, tracker, queryMetricsMap,
                                                                      targetRangeToOrderByContinuationTokenMap,
                                                                      sortOrders, clientSideRequestStatisticsList));
    }

    private static class PageToItemTransformer<T extends Resource> implements
        Function<Flux<DocumentProducer<T>.DocumentProducerFeedResponse>, Flux<OrderByRowResult<T>>> {
        private final RequestChargeTracker tracker;
        private final Class<T> klass;
        private final Map<String, QueryMetrics> queryMetricsMap;
        private final Map<FeedRangeEpkImpl, OrderByContinuationToken> targetRangeToOrderByContinuationTokenMap;
        private final List<SortOrder> sortOrders;
        private final List<ClientSideRequestStatistics> clientSideRequestStatisticsList;

        public PageToItemTransformer(
            Class<T> klass, RequestChargeTracker tracker, Map<String, QueryMetrics> queryMetricsMap,
            Map<FeedRangeEpkImpl, OrderByContinuationToken> targetRangeToOrderByContinuationTokenMap,
            List<SortOrder> sortOrders, List<ClientSideRequestStatistics> clientSideRequestStatisticsList) {
            this.klass = klass;
            this.tracker = tracker;
            this.queryMetricsMap = queryMetricsMap;
            this.targetRangeToOrderByContinuationTokenMap = targetRangeToOrderByContinuationTokenMap;
            this.sortOrders = sortOrders;
            this.clientSideRequestStatisticsList = clientSideRequestStatisticsList;
        }

        @Override
        public Flux<OrderByRowResult<T>> apply(Flux<DocumentProducer<T>.DocumentProducerFeedResponse> source) {
            return source.flatMap(documentProducerFeedResponse -> {
                clientSideRequestStatisticsList.addAll(
                    BridgeInternal.getClientSideRequestStatisticsList(documentProducerFeedResponse
                                                                   .pageResult.getCosmosDiagnostics()));

                for (String key : BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult)
                                      .keySet()) {
                    if (queryMetricsMap.containsKey(key)) {
                        QueryMetrics qm = BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult).get(key);
                        queryMetricsMap.get(key).add(qm);
                    } else {
                        queryMetricsMap.put(key, BridgeInternal.queryMetricsFromFeedResponse(documentProducerFeedResponse.pageResult).get(key));
                    }
                }
                List<T> results = documentProducerFeedResponse.pageResult.getResults();
                OrderByContinuationToken orderByContinuationToken =
                    targetRangeToOrderByContinuationTokenMap.get(documentProducerFeedResponse.sourceFeedRange);
                if (orderByContinuationToken != null) {
                    Pair<Boolean, ResourceId> booleanResourceIdPair = ResourceId.tryParse(orderByContinuationToken.getRid());
                    if (!booleanResourceIdPair.getLeft()) {
                        return Flux.error(new BadRequestException(String.format("INVALID Rid in the continuation token %s for OrderBy~Context.",
                                orderByContinuationToken.getCompositeContinuationToken().getToken())));
                    }
                    ResourceId continuationTokenRid = booleanResourceIdPair.getRight();

                    String queryInfoExecution = documentProducerFeedResponse.pageResult.getResponseHeaders().getOrDefault(
                        HttpConstants.HttpHeaders.QUERY_EXECUTION_INFO, null);
                    QueryExecutionInfo queryExecutionInfo = Utils.parse(queryInfoExecution, QueryExecutionInfo.class);

                    results = results.stream()
                            .filter(tOrderByRowResult -> {
                                // When we resume a query on a partition there is a possibility that we only read a partial page from the backend
                                // meaning that will we repeat some documents if we didn't do anything about it.
                                // The solution is to filter all the documents that come before in the sort order, since we have already emitted them to the client.
                                // The key is to seek until we get an order by value that matches the order by value we left off on.
                                // Once we do that we need to seek to the correct _rid within the term,
                                // since there might be many documents with the same order by value we left off on.
                                List<QueryItem> queryItems = new ArrayList<QueryItem>();
                                ArrayNode arrayNode = (ArrayNode)ModelBridgeInternal.getObjectFromJsonSerializable(tOrderByRowResult, "orderByItems");
                                for (JsonNode jsonNode : arrayNode) {
                                    QueryItem queryItem = new QueryItem(jsonNode.toString());
                                    queryItems.add(queryItem);
                                }

                                // Check  if its the same orderby item from the token
                                long cmp = 0;
                                for (int i = 0; i < sortOrders.size(); i++) {
                                    cmp = ItemComparator.getInstance().compare(orderByContinuationToken.getOrderByItems()[i].getItem(),
                                            queryItems.get(i).getItem());
                                    if (cmp != 0) {
                                        cmp = sortOrders.get(i).equals(SortOrder.Descending) ? -cmp : cmp;
                                        break;
                                    }
                                }

                                if (cmp == 0) {
                                    // Once the item matches the order by items from the continuation tokens
                                    // We still need to remove all the documents that have a lower rid in the rid sort order.
                                    // If there is a tie in the sort order the documents should be in _rid order in the same direction as the index (given by the backend)
                                    cmp = (continuationTokenRid.getDocument() - ResourceId.tryParse(tOrderByRowResult.getResourceId()).getRight().getDocument());
                                    if ((queryExecutionInfo == null) || queryExecutionInfo.getReverseRidEnabled())
                                    {
                                        // If reverse rid is enabled on the backend then fallback to the old way of doing it.
                                        // So if it's ORDER BY c.age ASC, c.name DESC the _rids are ASC
                                        // If it's ORDER BY c.age DESC, c.name DESC the _rids are DESC
                                        if (sortOrders.get(0) == SortOrder.Descending)
                                        {
                                            cmp = -cmp;
                                        }
                                    }
                                    else
                                    {
                                        // Go by the whatever order the index wants
                                        if (queryExecutionInfo.getReverseIndexScan())
                                        {
                                            cmp = -cmp;
                                        }
                                    }

                                    return (cmp <= 0);
                                }
                                return true;

                            })
                            .collect(Collectors.toList());

                }

                tracker.addCharge(documentProducerFeedResponse.pageResult.getRequestCharge());
                Flux<T> x = Flux.fromIterable(results);

                return x.map(r -> new OrderByRowResult<T>(
                        klass,
                        ModelBridgeInternal.toJsonFromJsonSerializable(r),
                        documentProducerFeedResponse.sourceFeedRange,
                        documentProducerFeedResponse.pageResult.getContinuationToken()));
            }, 1);
        }
    }

}