Как я могу прочитать все файлы из каталога aws s3 в одном опросе, используя apache-camel
Я пытаюсь достичь:
- Прочитайте все файлы из каталога s3.
- Скопируйте все файлы в резервную папку на s3.
- собрать все содержимое файла в один файл и скопировать его в другой каталог на s3.
Но я застрял на первой точке, чтобы прочитать все файлы в одном опросе.
my from router :
aws-s3://${camel.bucket.name}?amazonS3Client=#s3Client&prefix=some_path_on_s3&deleteAfterRead=true&delay=100s
for example if, some_path_on_s3 -> has 2 files say first.txt and
second.txt
according to camel documentation, it has to read both the files in a
single poll, but is reading 1 file per poll.
I also tried with parameter, maxMessagesPerPoll=2 but no luck. It
still reads one file per poll.
Есть ли способ получить все файлы из каталога s3 в одном опросе?
2 ответа
Правда в том, что он отправляет по одному файлу за раз, но он подтверждает весь пакет за опрос.
maxMessagesPerPoll только создает ограничение на количество файлов, читаемых за пакет. Я думаю, что информация, которую вы ищете, находится в заголовках партий верблюдов на каждой бирже:
CamelBatchComplete: логическое значение, указывающее последний Exchange в пакете. Это верно только для последней записи.
CamelBatchIndex: текущий индекс пакета. Начинается с 0.
CamelBatchSize: общее количество обменов, которые были опрошены в этом пакете.
С помощью этой информации вы можете многоадресно передать сообщение, а затем внедрить агрегатор для объединения файлов на одном маршруте, когда CamelBatchComplete = true, и резервного копирования файлов на другом.
Найти больше информации здесь:
Я получил это работает здесь,
from("file://<some_path_to_dir>") .routeId("some_route_id") .to("backup_dir") .to("direct:aggregate") .end(); from("direct:aggregate") .routeId("aggregate_router") .aggregate(constant(true), new GroupedExchangeAggregationStrategy()) .completionPredicate(exchange -> { List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); Exchange latestExchange = list.get(list.size() - 1); return (boolean) latestExchange.getProperty(Exchange.BATCH_COMPLETE); }) .to("direct:merge"); from("direct:merge") .routeId("merge_router") .process(new Processor() { @Override public void process(Exchange exchange) throws Exception { List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class); StringBuilder builder = new StringBuilder(); for(Exchange ex : list){ builder.append(ex.getIn().getBody(String.class)); } exchange.getIn().setBody(builder.toString()); // set any other necessary header if required here // example, if aws s3 is the endpoint, set the S3Constants.KEY header here } }) .to("some_final_endpoint");