Как я могу прочитать все файлы из каталога aws s3 в одном опросе, используя apache-camel

Я пытаюсь достичь:

  1. Прочитайте все файлы из каталога s3.
  2. Скопируйте все файлы в резервную папку на s3.
  3. собрать все содержимое файла в один файл и скопировать его в другой каталог на s3.

Но я застрял на первой точке, чтобы прочитать все файлы в одном опросе.

my from router : aws-s3://${camel.bucket.name}?amazonS3Client=#s3Client&prefix=some_path_on_s3&deleteAfterRead=true&delay=100s

for example if, some_path_on_s3 -> has 2 files say first.txt and 
second.txt

according to camel documentation, it has to read both the files in a 
single poll, but is reading 1 file per poll.

I also tried with parameter,  maxMessagesPerPoll=2 but no luck. It 
still reads one file per poll.

Есть ли способ получить все файлы из каталога s3 в одном опросе?

2 ответа

Правда в том, что он отправляет по одному файлу за раз, но он подтверждает весь пакет за опрос.

maxMessagesPerPoll только создает ограничение на количество файлов, читаемых за пакет. Я думаю, что информация, которую вы ищете, находится в заголовках партий верблюдов на каждой бирже:

CamelBatchComplete: логическое значение, указывающее последний Exchange в пакете. Это верно только для последней записи.

CamelBatchIndex: текущий индекс пакета. Начинается с 0.

CamelBatchSize: общее количество обменов, которые были опрошены в этом пакете.

С помощью этой информации вы можете многоадресно передать сообщение, а затем внедрить агрегатор для объединения файлов на одном маршруте, когда CamelBatchComplete = true, и резервного копирования файлов на другом.

Найти больше информации здесь:

Пакетный потребитель

Multicast

  • Я получил это работает здесь,

    from("file://<some_path_to_dir>")
    .routeId("some_route_id")
    .to("backup_dir")
    .to("direct:aggregate")
    .end();
    
    
    
    from("direct:aggregate")
    .routeId("aggregate_router")
    .aggregate(constant(true), new GroupedExchangeAggregationStrategy())
            .completionPredicate(exchange -> {
                List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                Exchange latestExchange = list.get(list.size() - 1);
                return (boolean) latestExchange.getProperty(Exchange.BATCH_COMPLETE);
            })
    .to("direct:merge");
    
    
    from("direct:merge")
            .routeId("merge_router")
            .process(new Processor() {
                @Override
                public void process(Exchange exchange) throws Exception {
                    List<Exchange> list = exchange.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
                    StringBuilder builder = new StringBuilder();
                    for(Exchange ex : list){
                        builder.append(ex.getIn().getBody(String.class));
                    }
    
                    exchange.getIn().setBody(builder.toString());
                    // set any other necessary header if required here
                    // example, if aws s3 is the endpoint, set the S3Constants.KEY header here
                }
            })
    .to("some_final_endpoint");
    
Другие вопросы по тегам