Spring Integration - Обработка устаревших сессий sftp

Я реализовал следующий сценарий:

  1. QueueChannel, содержащий сообщения в виде байта []
  2. MessageHandler, опрашивающий канал очереди и загружающий файлы по sftp
  3. Трансформатор, прослушивающий errorChannel и отправляющий извлеченную полезную нагрузку из сообщения с ошибкой обратно в queueChannel (рассматриваемый как обработчик ошибок для обработки сообщений с ошибками, чтобы ничего не потерялось)

Если сервер sftp подключен, все работает как положено.

Если сервер sftp не работает, сообщение об ошибке, которое приходит как преобразователь:

org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session

Преобразователь ничего не может с этим поделать, так как failMessage полезной нагрузки имеет значение null и выдает исключение. Трансформатор теряет сообщение.

Как я могу настроить свой поток так, чтобы трансформер получал правильное сообщение с соответствующей полезной нагрузкой неудачно загруженного файла?

Моя конфигурация:

  @Bean
  public MessageChannel toSftpChannel() {
    final QueueChannel channel = new QueueChannel();
    channel.setLoggingEnabled(true);
    return new QueueChannel();
  }

  @Bean
  public MessageChannel toSplitter() {
    return new PublishSubscribeChannel();
  }

  @Bean
  @ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
  public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
    handler.setFileNameGenerator(message -> {
      if (message.getPayload() instanceof byte[]) {
        return (String) message.getHeaders().get("name");
      } else {
        throw new IllegalArgumentException("byte[] expected in Payload!");
      }
    });
    return handler;
  }

  @Bean
  public SessionFactory<LsEntry> sftpSessionFactory() {
    final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);

    final Properties jschProps = new Properties();
    jschProps.put("StrictHostKeyChecking", "no");
    jschProps.put("PreferredAuthentications", "publickey,password");
    factory.setSessionConfig(jschProps);

    factory.setHost(sftpHost);
    factory.setPort(sftpPort);
    factory.setUser(sftpUser);
    if (sftpPrivateKey != null) {
      factory.setPrivateKey(sftpPrivateKey);
      factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    } else {
      factory.setPassword(sftpPasword);
    }
    factory.setAllowUnknownKeys(true);
    return new CachingSessionFactory<>(factory);
  }

  @Bean
  @Splitter(inputChannel = "toSplitter")
  public DmsDocumentMessageSplitter splitter() {
    final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
    splitter.setOutputChannelName("toSftpChannel");
    return splitter;
  }

  @Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
  public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {

    Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
      .getFailedMessage();
    return MessageBuilder.withPayload(failedMessage)
                         .copyHeadersIfAbsent(failedMessage.getHeaders())
                         .build();
  }

  @MessagingGateway 
  public interface UploadGateway {

    @Gateway(requestChannel = "toSplitter")
    void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
  }

Спасибо..

Обновить

@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
  PollerMetadata poller() {
    return Pollers
      .fixedRate(5000)
      .maxMessagesPerPoll(1)
      .receiveTimeout(500)
      .taskExecutor(taskExecutor())
      .transactionSynchronizationFactory(transactionSynchronizationFactory())
      .get();
  }

  @Bean
  @ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
  public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toSftpChannel");
    return bridgeHandler;
  }

1 ответ

Решение

nullfailedMessage это ошибка; воспроизведенный INT-4421.

Я бы не рекомендовал использовать QueueChannel по этому сценарию. Если вы используете прямой канал, вы можете настроить рекомендации по повторным попыткам доставки. когда повторные попытки исчерпаны (если так настроено), исключение будет возвращено вызывающему потоку.

Добавьте совет к SftpMessageHandler"s adviceChain имущество.

РЕДАКТИРОВАТЬ

Вы можете обойти "пропущенное" failMessage, вставив мост между опрашиваемым каналом и адаптером sftp:

@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
    BridgeHandler bridgeHandler = new BridgeHandler();
    bridgeHandler.setOutputChannelName("toRealSftpChannel");
    return bridgeHandler;
}

@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
    final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
    handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
    handler.setFileNameGenerator(message -> {
        if (message.getPayload() instanceof byte[]) {
            return (String) message.getHeaders().get("name");
        }
        else {
            throw new IllegalArgumentException("byte[] expected in Payload!");
        }
    });
    return handler;
}
Другие вопросы по тегам