spring-batch RemoteChunkingManagerStepBuilder 源码

  • 2022-08-16
  • 浏览 (267)

spring-batch RemoteChunkingManagerStepBuilder 代码

文件路径:/spring-batch-integration/src/main/java/org/springframework/batch/integration/chunk/RemoteChunkingManagerStepBuilder.java

/*
 * Copyright 2019 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *       https://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.springframework.batch.integration.chunk;

import org.springframework.batch.core.ChunkListener;
import org.springframework.batch.core.ItemReadListener;
import org.springframework.batch.core.ItemWriteListener;
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.step.builder.FaultTolerantStepBuilder;
import org.springframework.batch.core.step.builder.StepBuilder;
import org.springframework.batch.core.step.item.KeyGenerator;
import org.springframework.batch.core.step.skip.SkipPolicy;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.repeat.CompletionPolicy;
import org.springframework.batch.repeat.RepeatOperations;
import org.springframework.batch.repeat.exception.ExceptionHandler;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.PollableChannel;
import org.springframework.retry.RetryPolicy;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.policy.RetryContextCache;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.interceptor.TransactionAttribute;
import org.springframework.util.Assert;

/**
 * Builder for a manager step in a remote chunking setup. This builder creates and sets a
 * {@link ChunkMessageChannelItemWriter} on the manager step.
 *
 * <p>
 * If no {@code messagingTemplate} is provided through
 * {@link RemoteChunkingManagerStepBuilder#messagingTemplate(MessagingTemplate)}, this
 * builder will create one and set its default channel to the {@code outputChannel}
 * provided through
 * {@link RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel)}.
 * </p>
 *
 * <p>
 * If a {@code messagingTemplate} is provided, it is assumed that it is fully configured
 * and that its default channel is set to an output channel on which requests to workers
 * will be sent.
 * </p>
 *
 * @param <I> type of input items
 * @param <O> type of output items
 * @since 4.2
 * @author Mahmoud Ben Hassine
 */
public class RemoteChunkingManagerStepBuilder<I, O> extends FaultTolerantStepBuilder<I, O> {

	private MessagingTemplate messagingTemplate;

	private PollableChannel inputChannel;

	private MessageChannel outputChannel;

	private final int DEFAULT_MAX_WAIT_TIMEOUTS = 40;

	private static final long DEFAULT_THROTTLE_LIMIT = 6;

	private int maxWaitTimeouts = DEFAULT_MAX_WAIT_TIMEOUTS;

	private long throttleLimit = DEFAULT_THROTTLE_LIMIT;

	/**
	 * Create a new {@link RemoteChunkingManagerStepBuilder}.
	 * @param stepName name of the manager step
	 */
	public RemoteChunkingManagerStepBuilder(String stepName) {
		super(new StepBuilder(stepName));
	}

	/**
	 * Set the input channel on which replies from workers will be received. The provided
	 * input channel will be set as a reply channel on the
	 * {@link ChunkMessageChannelItemWriter} created by this builder.
	 * @param inputChannel the input channel
	 * @return this builder instance for fluent chaining
	 *
	 * @see ChunkMessageChannelItemWriter#setReplyChannel
	 */
	public RemoteChunkingManagerStepBuilder<I, O> inputChannel(PollableChannel inputChannel) {
		Assert.notNull(inputChannel, "inputChannel must not be null");
		this.inputChannel = inputChannel;
		return this;
	}

	/**
	 * Set the output channel on which requests to workers will be sent. By using this
	 * setter, a default messaging template will be created and the output channel will be
	 * set as its default channel.
	 * <p>
	 * Use either this setter or
	 * {@link RemoteChunkingManagerStepBuilder#messagingTemplate(MessagingTemplate)} to
	 * provide a fully configured messaging template.
	 * </p>
	 * @param outputChannel the output channel.
	 * @return this builder instance for fluent chaining
	 *
	 * @see RemoteChunkingManagerStepBuilder#messagingTemplate(MessagingTemplate)
	 */
	public RemoteChunkingManagerStepBuilder<I, O> outputChannel(MessageChannel outputChannel) {
		Assert.notNull(outputChannel, "outputChannel must not be null");
		this.outputChannel = outputChannel;
		return this;
	}

	/**
	 * Set the {@link MessagingTemplate} to use to send data to workers. <strong>The
	 * default channel of the messaging template must be set</strong>.
	 * <p>
	 * Use either this setter to provide a fully configured messaging template or provide
	 * an output channel through
	 * {@link RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel)} and a
	 * default messaging template will be created.
	 * </p>
	 * @param messagingTemplate the messaging template to use
	 * @return this builder instance for fluent chaining
	 * @see RemoteChunkingManagerStepBuilder#outputChannel(MessageChannel)
	 */
	public RemoteChunkingManagerStepBuilder<I, O> messagingTemplate(MessagingTemplate messagingTemplate) {
		Assert.notNull(messagingTemplate, "messagingTemplate must not be null");
		this.messagingTemplate = messagingTemplate;
		return this;
	}

	/**
	 * The maximum number of times to wait at the end of a step for a non-null result from
	 * the remote workers. This is a multiplier on the receive timeout set separately on
	 * the gateway. The ideal value is a compromise between allowing slow workers time to
	 * finish, and responsiveness if there is a dead worker. Defaults to 40.
	 * @param maxWaitTimeouts the maximum number of wait timeouts
	 * @return this builder instance for fluent chaining
	 * @see ChunkMessageChannelItemWriter#setMaxWaitTimeouts(int)
	 */
	public RemoteChunkingManagerStepBuilder<I, O> maxWaitTimeouts(int maxWaitTimeouts) {
		Assert.isTrue(maxWaitTimeouts > 0, "maxWaitTimeouts must be greater than zero");
		this.maxWaitTimeouts = maxWaitTimeouts;
		return this;
	}

	/**
	 * Public setter for the throttle limit. This limits the number of pending requests
	 * for chunk processing to avoid overwhelming the receivers.
	 * @param throttleLimit the throttle limit to set
	 * @return this builder instance for fluent chaining
	 * @see ChunkMessageChannelItemWriter#setThrottleLimit(long)
	 */
	public RemoteChunkingManagerStepBuilder<I, O> throttleLimit(long throttleLimit) {
		Assert.isTrue(throttleLimit > 0, "throttleLimit must be greater than zero");
		this.throttleLimit = throttleLimit;
		return this;
	}

	/**
	 * Build a manager {@link TaskletStep}.
	 * @return the configured manager step
	 * @see RemoteChunkHandlerFactoryBean
	 */
	public TaskletStep build() {
		Assert.notNull(this.inputChannel, "An InputChannel must be provided");
		Assert.state(this.outputChannel == null || this.messagingTemplate == null,
				"You must specify either an outputChannel or a messagingTemplate but not both.");

		// configure messaging template
		if (this.messagingTemplate == null) {
			this.messagingTemplate = new MessagingTemplate();
			this.messagingTemplate.setDefaultChannel(this.outputChannel);
			if (this.logger.isDebugEnabled()) {
				this.logger.debug("No messagingTemplate was provided, using a default one");
			}
		}

		// configure item writer
		ChunkMessageChannelItemWriter<O> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
		chunkMessageChannelItemWriter.setMessagingOperations(this.messagingTemplate);
		chunkMessageChannelItemWriter.setMaxWaitTimeouts(this.maxWaitTimeouts);
		chunkMessageChannelItemWriter.setThrottleLimit(this.throttleLimit);
		chunkMessageChannelItemWriter.setReplyChannel(this.inputChannel);
		super.writer(chunkMessageChannelItemWriter);

		return super.build();
	}

	/*
	 * The following methods override those from parent builders and return the current
	 * builder type. FIXME: Change parent builders to be generic and return current
	 * builder type in each method.
	 */

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> reader(ItemReader<? extends I> reader) {
		super.reader(reader);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> repository(JobRepository jobRepository) {
		super.repository(jobRepository);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> transactionManager(PlatformTransactionManager transactionManager) {
		super.transactionManager(transactionManager);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> listener(Object listener) {
		super.listener(listener);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> listener(SkipListener<? super I, ? super O> listener) {
		super.listener(listener);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> listener(ChunkListener listener) {
		super.listener(listener);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> transactionAttribute(TransactionAttribute transactionAttribute) {
		super.transactionAttribute(transactionAttribute);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> listener(org.springframework.retry.RetryListener listener) {
		super.listener(listener);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> keyGenerator(KeyGenerator keyGenerator) {
		super.keyGenerator(keyGenerator);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> retryLimit(int retryLimit) {
		super.retryLimit(retryLimit);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> retryPolicy(RetryPolicy retryPolicy) {
		super.retryPolicy(retryPolicy);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> backOffPolicy(BackOffPolicy backOffPolicy) {
		super.backOffPolicy(backOffPolicy);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> retryContextCache(RetryContextCache retryContextCache) {
		super.retryContextCache(retryContextCache);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> skipLimit(int skipLimit) {
		super.skipLimit(skipLimit);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> noSkip(Class<? extends Throwable> type) {
		super.noSkip(type);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> skip(Class<? extends Throwable> type) {
		super.skip(type);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> skipPolicy(SkipPolicy skipPolicy) {
		super.skipPolicy(skipPolicy);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> noRollback(Class<? extends Throwable> type) {
		super.noRollback(type);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> noRetry(Class<? extends Throwable> type) {
		super.noRetry(type);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> retry(Class<? extends Throwable> type) {
		super.retry(type);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> stream(ItemStream stream) {
		super.stream(stream);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> chunk(int chunkSize) {
		super.chunk(chunkSize);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> chunk(CompletionPolicy completionPolicy) {
		super.chunk(completionPolicy);
		return this;
	}

	/**
	 * This method will throw a {@link UnsupportedOperationException} since the item
	 * writer of the manager step in a remote chunking setup will be automatically set to
	 * an instance of {@link ChunkMessageChannelItemWriter}.
	 *
	 * When building a manager step for remote chunking, no item writer must be provided.
	 * @throws UnsupportedOperationException if an item writer is provided
	 * @see ChunkMessageChannelItemWriter
	 * @see RemoteChunkHandlerFactoryBean#setChunkWriter(ItemWriter)
	 */
	@Override
	public RemoteChunkingManagerStepBuilder<I, O> writer(ItemWriter<? super O> writer)
			throws UnsupportedOperationException {
		throw new UnsupportedOperationException(
				"When configuring a manager step " + "for remote chunking, the item writer will be automatically set "
						+ "to an instance of ChunkMessageChannelItemWriter. The item writer "
						+ "must not be provided in this case.");
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> readerIsTransactionalQueue() {
		super.readerIsTransactionalQueue();
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> listener(ItemReadListener<? super I> listener) {
		super.listener(listener);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> listener(ItemWriteListener<? super O> listener) {
		super.listener(listener);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> chunkOperations(RepeatOperations repeatTemplate) {
		super.chunkOperations(repeatTemplate);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> exceptionHandler(ExceptionHandler exceptionHandler) {
		super.exceptionHandler(exceptionHandler);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> stepOperations(RepeatOperations repeatTemplate) {
		super.stepOperations(repeatTemplate);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> startLimit(int startLimit) {
		super.startLimit(startLimit);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> listener(StepExecutionListener listener) {
		super.listener(listener);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> allowStartIfComplete(boolean allowStartIfComplete) {
		super.allowStartIfComplete(allowStartIfComplete);
		return this;
	}

	@Override
	public RemoteChunkingManagerStepBuilder<I, O> processor(ItemProcessor<? super I, ? extends O> itemProcessor) {
		super.processor(itemProcessor);
		return this;
	}

}

相关信息

spring-batch 源码目录

相关文章

spring-batch AsynchronousFailureException 源码

spring-batch ChunkHandler 源码

spring-batch ChunkMessageChannelItemWriter 源码

spring-batch ChunkProcessorChunkHandler 源码

spring-batch ChunkRequest 源码

spring-batch ChunkResponse 源码

spring-batch JmsRedeliveredExtractor 源码

spring-batch MessageSourcePollerInterceptor 源码

spring-batch RemoteChunkHandlerFactoryBean 源码

spring-batch RemoteChunkingManagerStepBuilderFactory 源码

0  赞