spring-data-elasticsearch ResponseConverter 源码

  • 2022-08-16
  • 浏览 (342)

spring-data-elasticsearch ResponseConverter 代码

文件路径:/src/main/java/org/springframework/data/elasticsearch/client/elc/ResponseConverter.java

/*
 * Copyright 2021-2022 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.data.elasticsearch.client.elc;

import static org.springframework.data.elasticsearch.client.elc.JsonUtils.*;

import co.elastic.clients.elasticsearch._types.BulkIndexByScrollFailure;
import co.elastic.clients.elasticsearch._types.ErrorCause;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.Query;
import co.elastic.clients.elasticsearch.cluster.HealthResponse;
import co.elastic.clients.elasticsearch.core.DeleteByQueryResponse;
import co.elastic.clients.elasticsearch.core.UpdateByQueryResponse;
import co.elastic.clients.elasticsearch.core.mget.MultiGetError;
import co.elastic.clients.elasticsearch.core.mget.MultiGetResponseItem;
import co.elastic.clients.elasticsearch.indices.*;
import co.elastic.clients.elasticsearch.indices.get_mapping.IndexMappingRecord;
import co.elastic.clients.json.JsonpMapper;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.elasticsearch.ElasticsearchErrorCause;
import org.springframework.data.elasticsearch.core.IndexInformation;
import org.springframework.data.elasticsearch.core.MultiGetItem;
import org.springframework.data.elasticsearch.core.cluster.ClusterHealth;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.index.AliasData;
import org.springframework.data.elasticsearch.core.index.Settings;
import org.springframework.data.elasticsearch.core.index.TemplateData;
import org.springframework.data.elasticsearch.core.mapping.IndexCoordinates;
import org.springframework.data.elasticsearch.core.query.ByQueryResponse;
import org.springframework.data.elasticsearch.core.reindex.ReindexResponse;
import org.springframework.data.elasticsearch.support.DefaultStringObjectMap;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

/**
 * Class to convert Elasticsearch responses into Spring Data Elasticsearch classes.
 *
 * @author Peter-Josef Meisch
 * @since 4.4
 */
class ResponseConverter {

	private static final Logger LOGGER = LoggerFactory.getLogger(ResponseConverter.class);

	private final JsonpMapper jsonpMapper;

	public ResponseConverter(JsonpMapper jsonpMapper) {
		this.jsonpMapper = jsonpMapper;
	}

	// region cluster client
	public ClusterHealth clusterHealth(HealthResponse healthResponse) {

		Assert.notNull(healthResponse, "healthResponse must not be null");

		return ClusterHealth.builder() //
				.withActivePrimaryShards(healthResponse.activePrimaryShards()) //
				.withActiveShards(healthResponse.activeShards()) //
				.withActiveShardsPercent(Double.parseDouble(healthResponse.activeShardsPercentAsNumber()))//
				.withClusterName(healthResponse.clusterName()) //
				.withDelayedUnassignedShards(healthResponse.delayedUnassignedShards()) //
				.withInitializingShards(healthResponse.initializingShards()) //
				.withNumberOfDataNodes(healthResponse.numberOfDataNodes()) //
				.withNumberOfInFlightFetch(healthResponse.numberOfInFlightFetch()) //
				.withNumberOfNodes(healthResponse.numberOfNodes()) //
				.withNumberOfPendingTasks(healthResponse.numberOfPendingTasks()) //
				.withRelocatingShards(healthResponse.relocatingShards()) //
				.withStatus(healthResponse.status().toString()) //
				.withTaskMaxWaitingTimeMillis(healthResponse.taskMaxWaitingInQueueMillis().toEpochMilli()) //
				.withTimedOut(healthResponse.timedOut()) //
				.withUnassignedShards(healthResponse.unassignedShards()) //
				.build(); //
	}
	// endregion

	// region indices client
	public Settings indicesGetSettings(GetIndicesSettingsResponse getIndicesSettingsResponse, String indexName) {

		Assert.notNull(getIndicesSettingsResponse, "getIndicesSettingsResponse must not be null");
		Assert.notNull(indexName, "indexName must not be null");

		Settings settings = new Settings();
		IndexState indexState = getIndicesSettingsResponse.get(indexName);

		if (indexState != null) {

			Function<IndexSettings, Settings> indexSettingsToSettings = indexSettings -> {
				Settings parsedSettings = Settings.parse(toJson(indexSettings, jsonpMapper));
				return (indexSettings.index() != null) ? parsedSettings : new Settings().append("index", parsedSettings);
			};

			if (indexState.defaults() != null) {
				Settings defaultSettings = indexSettingsToSettings.apply(indexState.defaults());
				settings.merge(defaultSettings);
			}

			if (indexState.settings() != null) {
				Settings nonDefaultSettings = indexSettingsToSettings.apply(indexState.settings());
				settings.merge(nonDefaultSettings);
			}
		}

		return settings;
	}

	public Document indicesGetMapping(GetMappingResponse getMappingResponse, IndexCoordinates indexCoordinates) {

		Assert.notNull(getMappingResponse, "getMappingResponse must not be null");
		Assert.notNull(indexCoordinates, "indexCoordinates must not be null");

		Map<String, IndexMappingRecord> mappings = getMappingResponse.result();

		if (mappings == null || mappings.size() == 0) {
			return Document.create();
		}

		IndexMappingRecord indexMappingRecord = mappings.get(indexCoordinates.getIndexName());

		// this can happen when the mapping was requested with an alias
		if (indexMappingRecord == null) {

			if (mappings.size() != 1) {
				LOGGER.warn("no mapping returned for index {}", indexCoordinates.getIndexName());
				return Document.create();
			}
			String index = mappings.keySet().iterator().next();
			indexMappingRecord = mappings.get(index);
		}

		return Document.parse(toJson(indexMappingRecord.mappings(), jsonpMapper));
	}

	public List<IndexInformation> indicesGetIndexInformations(GetIndexResponse getIndexResponse) {

		Assert.notNull(getIndexResponse, "getIndexResponse must not be null");

		List<IndexInformation> indexInformationList = new ArrayList<>();

		getIndexResponse.result().forEach((indexName, indexState) -> {
			Settings settings = indexState.settings() != null ? Settings.parse(toJson(indexState.settings(), jsonpMapper))
					: new Settings();
			Document mappings = indexState.mappings() != null ? Document.parse(toJson(indexState.mappings(), jsonpMapper))
					: Document.create();

			List<AliasData> aliasDataList = new ArrayList<>();
			indexState.aliases().forEach((aliasName, alias) -> aliasDataList.add(indicesGetAliasData(aliasName, alias)));

			indexInformationList.add(IndexInformation.of(indexName, settings, mappings, aliasDataList));

		});
		return indexInformationList;
	}

	public Map<String, Set<AliasData>> indicesGetAliasData(GetAliasResponse getAliasResponse) {

		Assert.notNull(getAliasResponse, "getAliasResponse must not be null");

		Map<String, Set<AliasData>> aliasDataMap = new HashMap<>();
		getAliasResponse.result().forEach((indexName, alias) -> {
			Set<AliasData> aliasDataSet = new HashSet<>();
			alias.aliases()
					.forEach((aliasName, aliasDefinition) -> aliasDataSet.add(indicesGetAliasData(aliasName, aliasDefinition)));
			aliasDataMap.put(indexName, aliasDataSet);
		});
		return aliasDataMap;
	}

	private AliasData indicesGetAliasData(String aliasName, Alias alias) {
		Query filter = alias.filter();
		String filterJson = filter != null ? toJson(filter, jsonpMapper) : null;
		Document filterDocument = filterJson != null ? Document.parse(filterJson) : null;
		return AliasData.of(aliasName, filterDocument, alias.indexRouting(), alias.searchRouting(), alias.isWriteIndex(),
				alias.isHidden());
	}

	private AliasData indicesGetAliasData(String aliasName, AliasDefinition alias) {
		Query filter = alias.filter();
		String filterJson = filter != null ? toJson(filter, jsonpMapper) : null;
		Document filterDocument = filterJson != null ? Document.parse(filterJson) : null;
		return AliasData.of(aliasName, filterDocument, alias.indexRouting(), alias.searchRouting(), alias.isWriteIndex(),
				null);
	}

	@Nullable
	public TemplateData indicesGetTemplateData(GetTemplateResponse getTemplateResponse, String templateName) {

		Assert.notNull(getTemplateResponse, "getTemplateResponse must not be null");
		Assert.notNull(templateName, "templateName must not be null");

		TemplateMapping templateMapping = getTemplateResponse.get(templateName);
		if (templateMapping != null) {

			Settings settings = new Settings();
			templateMapping.settings().forEach((key, jsonData) -> {

				if (key.contains(".")) {
					// returned string contains " quotes
					settings.put(key, jsonData.toJson().toString().replaceAll("^\"|\"$", ""));
				} else {
					settings.put(key, new DefaultStringObjectMap<>().fromJson(jsonData.toJson().toString()));
				}
			});

			Function<? super Map.Entry<String, Alias>, String> keyMapper = Map.Entry::getKey;
			Function<? super Map.Entry<String, Alias>, AliasData> valueMapper = entry -> indicesGetAliasData(entry.getKey(),
					entry.getValue());

			Map<String, AliasData> aliases = templateMapping.aliases().entrySet().stream()
					.collect(Collectors.toMap(keyMapper, valueMapper));

			Document mapping = Document.parse(toJson(templateMapping.mappings(), jsonpMapper));

			TemplateData.TemplateDataBuilder builder = TemplateData.builder() //
					.withIndexPatterns(templateMapping.indexPatterns().toArray(new String[0])) //
					.withOrder(templateMapping.order()) //
					.withSettings(settings) //
					.withMapping(mapping) //
					.withAliases(aliases) //
			;

			if (templateMapping.version() != null) {
				builder.withVersion(templateMapping.version().intValue());
			}

			return builder.build();
		}

		return null;
	}

	// endregion

	// region document operations
	public ReindexResponse reindexResponse(co.elastic.clients.elasticsearch.core.ReindexResponse reindexResponse) {

		Assert.notNull(reindexResponse, "reindexResponse must not be null");

		List<ReindexResponse.Failure> failures = reindexResponse.failures() //
				.stream() //
				.map(this::reindexResponseFailureOf) //
				.collect(Collectors.toList());

		// noinspection ConstantConditions
		return ReindexResponse.builder() //
				.withTook(timeToLong(reindexResponse.took())) //
				.withTimedOut(reindexResponse.timedOut()) //
				.withTotal(reindexResponse.total()) //
				.withCreated(reindexResponse.created()) //
				.withUpdated(reindexResponse.updated()) //
				.withDeleted(reindexResponse.deleted()) //
				.withBatches(reindexResponse.batches()) //
				.withVersionConflicts(reindexResponse.versionConflicts()) //
				.withNoops(reindexResponse.noops()) //
				.withBulkRetries(reindexResponse.retries().bulk()) //
				.withSearchRetries(reindexResponse.retries().search()) //
				.withThrottledMillis(reindexResponse.throttledMillis().toEpochMilli()) //
				.withRequestsPerSecond(reindexResponse.requestsPerSecond()) //
				.withThrottledUntilMillis(reindexResponse.throttledUntilMillis().toEpochMilli()).withFailures(failures) //
				.build();
	}

	private ReindexResponse.Failure reindexResponseFailureOf(BulkIndexByScrollFailure failure) {
		return ReindexResponse.Failure.builder() //
				.withIndex(failure.index()) //
				.withType(failure.type()) //
				.withId(failure.id()) //
				.withStatus(failure.status())//
				.withErrorCause(toErrorCause(failure.cause())) //
				// seqno, term, aborted are not available in the new client
				.build();
	}

	private ByQueryResponse.Failure byQueryResponseFailureOf(BulkIndexByScrollFailure failure) {
		return ByQueryResponse.Failure.builder() //
				.withIndex(failure.index()) //
				.withType(failure.type()) //
				.withId(failure.id()) //
				.withStatus(failure.status())//
				.withErrorCause(toErrorCause(failure.cause())).build();
	}

	@Nullable
	public static MultiGetItem.Failure getFailure(MultiGetResponseItem<EntityAsMap> itemResponse) {

		MultiGetError responseFailure = itemResponse.isFailure() ? itemResponse.failure() : null;

		return responseFailure != null
				? MultiGetItem.Failure.of(responseFailure.index(), null, responseFailure.id(), null,
						toErrorCause(responseFailure.error()))
				: null;
	}

	public ByQueryResponse byQueryResponse(DeleteByQueryResponse response) {
		// the code for the methods taking a DeleteByQueryResponse or a UpdateByQueryResponse is duplicated because the
		// Elasticsearch responses do not share a common class
		// noinspection DuplicatedCode
		List<ByQueryResponse.Failure> failures = response.failures().stream().map(this::byQueryResponseFailureOf)
				.collect(Collectors.toList());

		ByQueryResponse.ByQueryResponseBuilder builder = ByQueryResponse.builder();

		if (response.took() != null) {
			builder.withTook(response.took());
		}

		if (response.timedOut() != null) {
			builder.withTimedOut(response.timedOut());
		}

		if (response.total() != null) {
			builder.withTotal(response.total());
		}

		if (response.deleted() != null) {
			builder.withDeleted(response.deleted());
		}

		if (response.batches() != null) {
			builder.withBatches(Math.toIntExact(response.batches()));
		}

		if (response.versionConflicts() != null) {
			builder.withVersionConflicts(response.versionConflicts());
		}

		if (response.noops() != null) {
			builder.withNoops(response.noops());
		}

		if (response.retries() != null) {
			builder.withBulkRetries(response.retries().bulk());
			builder.withSearchRetries(response.retries().search());
		}

		builder.withFailures(failures);

		return builder.build();
	}

	public ByQueryResponse byQueryResponse(UpdateByQueryResponse response) {
		// the code for the methods taking a DeleteByQueryResponse or a UpdateByQueryResponse is duplicated because the
		// Elasticsearch responses do not share a common class
		// noinspection DuplicatedCode
		List<ByQueryResponse.Failure> failures = response.failures().stream().map(this::byQueryResponseFailureOf)
				.collect(Collectors.toList());

		ByQueryResponse.ByQueryResponseBuilder builder = ByQueryResponse.builder();

		if (response.took() != null) {
			builder.withTook(response.took());
		}

		if (response.timedOut() != null) {
			builder.withTimedOut(response.timedOut());
		}

		if (response.total() != null) {
			builder.withTotal(response.total());
		}

		if (response.deleted() != null) {
			builder.withDeleted(response.deleted());
		}

		if (response.batches() != null) {
			builder.withBatches(Math.toIntExact(response.batches()));
		}

		if (response.versionConflicts() != null) {
			builder.withVersionConflicts(response.versionConflicts());
		}

		if (response.noops() != null) {
			builder.withNoops(response.noops());
		}

		if (response.retries() != null) {
			builder.withBulkRetries(response.retries().bulk());
			builder.withSearchRetries(response.retries().search());
		}

		builder.withFailures(failures);

		return builder.build();
	}

	// endregion
	// region helper functions

	private long timeToLong(Time time) {

		if (time.isTime()) {
			return Long.parseLong(time.time());
		} else {
			return time.offset();
		}
	}

	@Nullable
	private static ElasticsearchErrorCause toErrorCause(@Nullable ErrorCause errorCause) {

		if (errorCause != null) {
			return new ElasticsearchErrorCause( //
					errorCause.type(), //
					errorCause.reason(), //
					errorCause.stackTrace(), //
					toErrorCause(errorCause.causedBy()), //
					errorCause.rootCause().stream().map(ResponseConverter::toErrorCause).collect(Collectors.toList()), //
					errorCause.suppressed().stream().map(ResponseConverter::toErrorCause).collect(Collectors.toList()));
		} else {
			return null;
		}
	}

	// endregion
}

相关信息

spring-data-elasticsearch 源码目录

相关文章

spring-data-elasticsearch Aggregation 源码

spring-data-elasticsearch AutoCloseableElasticsearchClient 源码

spring-data-elasticsearch ChildTemplate 源码

spring-data-elasticsearch ClusterTemplate 源码

spring-data-elasticsearch CriteriaFilterProcessor 源码

spring-data-elasticsearch CriteriaQueryException 源码

spring-data-elasticsearch CriteriaQueryProcessor 源码

spring-data-elasticsearch DocumentAdapters 源码

spring-data-elasticsearch ElasticsearchAggregation 源码

spring-data-elasticsearch ElasticsearchAggregations 源码

0  赞