Как динамически зарегистрировать адаптер входящего потока в Spring Integration?
Я пытаюсь реализовать агрегатор RSS/Atom в spring-integration
и я в основном использую Java DSL, чтобы написать свой IntegrationFlow
, Требование этого агрегатора заключается в том, что каналы могут быть добавлены / удалены во время выполнения. То есть каналы не известны во время разработки.
Я нашел, что это просто использовать основные Feed.inboundAdapter()
с помощью тестового URL и извлеките ссылки из канала с помощью трансформатора, а затем передайте его outbound-file-adapter
сохранить ссылки в файл. Тем не менее, я очень застрял при попытке прочитать (тысячи) URL-адресов фида из inbound-file-adapter
запустить файл через FileSplitter
а затем передать каждый результат Message<String>
содержащий URL канала, чтобы затем зарегистрировать новый Feed.inboundAdapter()
, Это невозможно с Java DSL?
В идеале мне бы понравилось, если бы я мог сделать следующее:
@Bean
public IntegrationFlow getFeedsFromFile() throws MalformedURLException {
return IntegrationFlows.from(inboundFileChannel(), e -> e.poller(Pollers.fixedDelay(10000)))
.handle(new FileSplitter())
//register new Feed.inboundAdapter(payload.toString()) foreach Message<String> containing feed url coming from FileSplitter
.transform(extractLinkFromFeedEntry())
.handle(appendLinkToFile())
.get();
}
Хотя после многократного прочтения весеннего интеграционного Java-кода DSL (и изучения всего этого на ходу) я просто не могу понять, что это можно сделать таким образом. Итак... А) это так? Б) это должно быть? В) предложения?
Такое ощущение, что я должен быть в состоянии взять вывод .handle(new FileSplitter())
и передать это в .handleWithAdapter(Feed.inboundAdapter(/*stuff here*/))
но DSL только ссылки outbound-adapter
там. Входящие адаптеры на самом деле просто подкласс AbstractMessageSource
и, кажется, единственное место, где вы можете указать один из них, - это аргумент IntegrationFlows.from(/*stuff here*/)
метод.
Я бы подумал, что было бы возможно взять входные данные из файла, разбить его построчно, использовать эти выходные данные для регистрации адаптеров входящих каналов, опросить эти каналы, извлечь новые ссылки из каналов по мере их появления и добавить их в файл, Похоже, что это не так.
Есть ли какие-то умные подклассы, которые я могу сделать, чтобы сделать эту работу?
В противном случае... и я подозреваю, что это будет ответом, я нашел пружинный пример Dynamic Ftp Channel Resolver и этот ответ о том, как адаптировать его динамически регистрировать вещи для входящего случая...
Так это путь? Любая помощь / руководство приветствуется. После пролистывания кода DSL и чтения документации в течение нескольких дней, я думаю, что у меня будет возможность реализовать пример динамического ftp и адаптировать его для работы с FeedEntryMessageSource... в этом случае мой вопрос... этот пример динамического ftp работает с конфигурацией XML, но возможно ли это сделать с помощью конфигурации Java или Java DSL?
Обновить
Я реализовал решение следующим образом:
@SpringBootApplication
class MonsterFeedApplication {
public static void main(String[] args) throws IOException {
ConfigurableApplicationContext parent = SpringApplication.run(MonsterFeedApplication.class, args);
parent.setId("parent");
String[] feedUrls = {
"https://1nichi.wordpress.com/feed/",
"http://jcmuofficialblog.com/feed/"};
List<ConfigurableApplicationContext> children = new ArrayList<>();
int n = 0;
for(String feedUrl : feedUrls) {
AnnotationConfigApplicationContext child = new AnnotationConfigApplicationContext();
child.setId("child" + ++n);
children.add(child);
child.setParent(parent);
child.register(DynamicFeedAdapter.class);
StandardEnvironment env = new StandardEnvironment();
Properties props = new Properties();
props.setProperty("feed.url", feedUrl);
PropertiesPropertySource pps = new PropertiesPropertySource("feed", props);
env.getPropertySources().addLast(pps);
child.setEnvironment(env);
child.refresh();
}
System.out.println("Press any key to exit...");
System.in.read();
for (ConfigurableApplicationContext child : children) {
child.close();
}
parent.close();
}
@Bean
public IntegrationFlow aggregateFeeds() {
return IntegrationFlows.from("feedChannel")
.transform(extractLinkFromFeed())
.handle(System.out::println)
.get();
}
@Bean
public MessageChannel feedChannel() {
return new DirectChannel();
}
@Bean
public AbstractPayloadTransformer<SyndEntry, String> extractLinkFromFeed() {
return new AbstractPayloadTransformer<SyndEntry, String>() {
@Override
protected String transformPayload(SyndEntry payload) throws Exception {
return payload.getLink();
}
};
}
}
DynamicFeedAdapter.java
@Configuration
@EnableIntegration
public class DynamicFeedAdapter {
@Value("${feed.url}")
public String feedUrl;
@Bean
public static PropertySourcesPlaceholderConfigurer pspc() {
return new PropertySourcesPlaceholderConfigurer();
}
@Bean
public IntegrationFlow feedAdapter() throws MalformedURLException {
URL url = new URL(feedUrl);
return IntegrationFlows
.from(s -> s.feed(url, "feedTest"),
e -> e.poller(p -> p.fixedDelay(10000)))
.channel("feedChannel")
.get();
}
}
И это работает, если и только если у меня есть один из URL-адресов, определенных в application.properties
как feed.url=[insert url here]
, Иначе это не говорит мне "невозможно разрешить свойство {feed.url}". Я подозреваю, что происходит там, что @Bean
определены в DynamicFeedAdapter.java
все получают синглтоны с нетерпением инициализируются, так что кроме бинов, создаваемых вручную в нашем цикле for в методе main (которые работают нормально, потому что им было введено свойство feed.url), у нас есть блуждающий синглтон, который был с энтузиазмом инициализирован, и если нет feed.url определен в application.properties, затем он не может разрешить свойство, и все идет на ура. Теперь из того, что я знаю о весне, я знаю, что это должно быть возможно @Lazy
инициализировать бобы в DynamicFeedAdapter.java
таким образом, мы не сталкиваемся с этим одним нежелательным беспризорным ребенком-проблемой. Проблема сейчас... если я просто отмечу feedAdapter()
@Lazy
тогда бобы никогда не инициализируются. Как я могу их инициализировать сам?
Обновление - проблема решена
Не проверяя его, я думаю, что проблема заключается в том, что boot находит DynamicFeedAdapter во время сканирования компонентов. Простое решение - переместить его в одноуровневую упаковку. Если MonsterFeedApplication находится в com.acme.foo, поместите класс конфигурации адаптера в com.acme.bar. Таким образом, boot не будет считать это "частью" приложения
Это была действительно проблема. После реализации предложения Гэри все работает отлично.
1 ответ
Посмотрите ответ на этот вопрос и его ответ на аналогичный вопрос об адаптерах входящей почты.
По сути, каждый адаптер канала создается в дочернем контексте, который параметризован.
В этом случае дочерние контексты создаются в main()
метод, но нет никаких причин, что это не может быть сделано в службе, вызванной .handle()
,