Spring Integration - Обработка устаревших сессий sftp
Я реализовал следующий сценарий:
- QueueChannel, содержащий сообщения в виде байта []
- MessageHandler, опрашивающий канал очереди и загружающий файлы по sftp
- Трансформатор, прослушивающий errorChannel и отправляющий извлеченную полезную нагрузку из сообщения с ошибкой обратно в queueChannel (рассматриваемый как обработчик ошибок для обработки сообщений с ошибками, чтобы ничего не потерялось)
Если сервер sftp подключен, все работает как положено.
Если сервер sftp не работает, сообщение об ошибке, которое приходит как преобразователь:
org.springframework.messaging.MessagingException: Failed to obtain pooled item; nested exception is java.lang.IllegalStateException: failed to create SFTP Session
Преобразователь ничего не может с этим поделать, так как failMessage полезной нагрузки имеет значение null и выдает исключение. Трансформатор теряет сообщение.
Как я могу настроить свой поток так, чтобы трансформер получал правильное сообщение с соответствующей полезной нагрузкой неудачно загруженного файла?
Моя конфигурация:
@Bean
public MessageChannel toSftpChannel() {
final QueueChannel channel = new QueueChannel();
channel.setLoggingEnabled(true);
return new QueueChannel();
}
@Bean
public MessageChannel toSplitter() {
return new PublishSubscribeChannel();
}
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression(sftpRemoteDirectory));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
} else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}
@Bean
public SessionFactory<LsEntry> sftpSessionFactory() {
final DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
final Properties jschProps = new Properties();
jschProps.put("StrictHostKeyChecking", "no");
jschProps.put("PreferredAuthentications", "publickey,password");
factory.setSessionConfig(jschProps);
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setUser(sftpUser);
if (sftpPrivateKey != null) {
factory.setPrivateKey(sftpPrivateKey);
factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
} else {
factory.setPassword(sftpPasword);
}
factory.setAllowUnknownKeys(true);
return new CachingSessionFactory<>(factory);
}
@Bean
@Splitter(inputChannel = "toSplitter")
public DmsDocumentMessageSplitter splitter() {
final DmsDocumentMessageSplitter splitter = new DmsDocumentMessageSplitter();
splitter.setOutputChannelName("toSftpChannel");
return splitter;
}
@Transformer(inputChannel = "errorChannel", outputChannel = "toSftpChannel")
public Message<?> errorChannelHandler(ErrorMessage errorMessage) throws RuntimeException {
Message<?> failedMessage = ((MessagingException) errorMessage.getPayload())
.getFailedMessage();
return MessageBuilder.withPayload(failedMessage)
.copyHeadersIfAbsent(failedMessage.getHeaders())
.build();
}
@MessagingGateway
public interface UploadGateway {
@Gateway(requestChannel = "toSplitter")
void upload(@Payload List<byte[]> payload, @Header("header") DmsDocumentUploadRequestHeader header);
}
Спасибо..
Обновить
@Bean(PollerMetadata.DEFAULT_POLLER)
@Transactional(propagation = Propagation.REQUIRED, isolation = Isolation.READ_COMMITTED)
PollerMetadata poller() {
return Pollers
.fixedRate(5000)
.maxMessagesPerPoll(1)
.receiveTimeout(500)
.taskExecutor(taskExecutor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.get();
}
@Bean
@ServiceActivator(inputChannel = "toMessageStore", poller = @Poller(PollerMetadata.DEFAULT_POLLER))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toSftpChannel");
return bridgeHandler;
}
1 ответ
null
failedMessage
это ошибка; воспроизведенный INT-4421.
Я бы не рекомендовал использовать QueueChannel
по этому сценарию. Если вы используете прямой канал, вы можете настроить рекомендации по повторным попыткам доставки. когда повторные попытки исчерпаны (если так настроено), исключение будет возвращено вызывающему потоку.
Добавьте совет к SftpMessageHandler
"s adviceChain
имущество.
РЕДАКТИРОВАТЬ
Вы можете обойти "пропущенное" failMessage, вставив мост между опрашиваемым каналом и адаптером sftp:
@Bean
@ServiceActivator(inputChannel = "toSftpChannel", poller = @Poller(fixedDelay = "5000", maxMessagesPerPoll = "1"))
public BridgeHandler bridge() {
BridgeHandler bridgeHandler = new BridgeHandler();
bridgeHandler.setOutputChannelName("toRealSftpChannel");
return bridgeHandler;
}
@Bean
@ServiceActivator(inputChannel = "toRealSftpChannel")
public MessageHandler handler() {
final SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpression(new LiteralExpression("foo"));
handler.setFileNameGenerator(message -> {
if (message.getPayload() instanceof byte[]) {
return (String) message.getHeaders().get("name");
}
else {
throw new IllegalArgumentException("byte[] expected in Payload!");
}
});
return handler;
}