Как остановить ИЛИ изменить задержку Spring Integration Poller

Я использую Spring Integration для чтения файлов из каталога, используя следующую конфигурацию. Однако я ищу, чтобы остановить опрос, как только я нашел какой-либо файл, пока служба не перезапускается снова Есть ли способ изменить задержку поллера во время выполнения или запустить / остановить поллер во время выполнения?

@Bean
public MessageChannel fileInputChannel() {
    return new DirectChannel();
}

@Bean
@InboundChannelAdapter(channel = "fileInputChannel", poller = @Poller(cron = "0 0/10 19-23,0-6 ? * *", maxMessagesPerPoll = "1"))
public MessageSource<File> fileReadingMessageSource() {
    FileReadingMessageSource source = new FileReadingMessageSource();
    File directory = new File(localFtpDirectory);
    if (clearLocalDir && directory.isDirectory() && directory.exists()) {
        LOG.info("Clear directory {} on startup of service", directory);
        Arrays.stream(directory.listFiles()).forEach(File::delete);
    }
    source.setDirectory(directory);
    source.setFilter(new LastModifiedFileFilter(remoteFileFilter));
    return source;
}

@Bean
@ServiceActivator(inputChannel = "fileInputChannel")
public MessageHandler fileHandler() {
    return new MessageHandlerService();
}

2 ответа

У меня было аналогичное требование, но принятый ответ меня не удовлетворил. Упомянутые здесь функции запуска / остановки определяются контракт, и типичный сценарий, который я вижу, - это завершение работы приложения или обновление контекста приложения. Просто, когда я наивно пытался это использовать, у меня действительно очень быстро возникали проблемы.

Моим требованием было скорее приостановить опрос и возобновить его позже. Этого можно легко добиться с помощью совета, и его очень легко реализовать. Однако получить эти знания было непросто, и я закончил исследование исходного кода фреймворка (действительно, я мог что-то пропустить в документе).

          @Bean
    public MessagePollingControlAdvice messagePollingControlAdvice() {
        return new MessagePollingControlAdvice();
    }

    //call this method for your DSL bridge configuration etc..
    Consumer<GenericEndpointSpec<BridgeHandler>> pollerConfiguration() {
        return b -> b.poller(pollerFactory -> pollerFactory.advice(messagePollingControlAdvice()));
    }
   

   public static class MessagePollingControlAdvice implements ReceiveMessageAdvice {
    private volatile boolean pollingActive = false;

    @Override
    public boolean beforeReceive(Object source) {
        return pollingActive;
    }

    @Override
    public Message<?> afterReceive(Message<?> result, Object source) {
        return result;
    }

    public boolean isPollingActive() {
        return pollingActive;
    }

    //call this method from whatever place in your code to activate/deactivate poller
    public void setPollingActive(boolean pollingActive) {
        this.pollingActive = pollingActive;
    }
   }

Есть эта аннотация для использования вместе с @InboundChannelAdapter:

/**
 * When used alongside an EIP annotation (and no {@code @Bean}), specifies the bean name of
 * the consumer bean with the handler bean being {@code id.handler} (for a consuming
 * endpoint) or {@code id.source} for a message source (e.g. inbound channel adapter).
 * <p>
 * When there is also a {@code @Bean} annotation, this is the name of the consumer or
 * source polling bean (the handler or source gets the normal {@code @Bean} name). When
 * using on a {@code MessageHandler @Bean}, it is recommended to name the bean
 * {@code foo.handler} when using {@code @EndpointId("foo"}. This will align with
 * conventions in the framework. Similarly, for a message source, use
 * {@code @Bean("bar.source"} and {@code @EndpointId("bar")}.
 * <p>
 * <b>This is not allowed if there are multiple EIP annotations on the same method.</b>
 *
 * @author Gary Russell
 *
 * @since 5.0.4
 */
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface EndpointId {

и есть реализация EIP Cotrol Bus: https://docs.spring.io/spring-integration/docs/5.0.7.RELEASE/reference/html/system-management-chapter.html.

С этим вы можете отправить start()/stop() Командные сообщения, когда вам удобно.

Другие вопросы по тегам