Верблюд: как разделить, а затем агрегировать, когда количество элементов меньше размера партии
У меня есть маршрут Camel, который читает файл из S3 и обрабатывает входной файл следующим образом:
- Разобрать каждый ряд в POJO (Студент) с помощью Bindy
- Разделить вывод по телу ()
- Агрегировать по атрибуту тела (
.semester
) и размер партии 2 - Вызов службы сохранения для загрузки в БД в заданных пакетах
Проблема в том, что при размере пакета 2 и нечетном количестве записей всегда есть одна запись, которая не сохраняется.
Предоставленный код - это Kotlin, но он не должен сильно отличаться от эквивалентного кода Java (за исключением косой черты перед "\${simple expression}" или отсутствия точек с запятой для завершения операторов.
Если я установлю размер пакета на 1, то каждая запись будет сохранена, в противном случае последняя запись никогда не будет сохранена.
Я несколько раз проверял документацию для обработчика сообщений, но, похоже, он не охватывает этот конкретный сценарий.
Я также установил [completionTimeout
|completionInterval
] в дополнение к completionSize
но это не имеет никакого значения.
Кто-нибудь сталкивался с этой проблемой раньше?
val csvDataFormat = BindyCsvDataFormat(Student::class.java)
from("aws-s3://$student-12-bucket?amazonS3Client=#amazonS3&delay=5000")
.log("A new Student input file has been received in S3: '\${header.CamelAwsS3BucketName}/\${header.CamelAwsS3Key}'")
.to("direct:move-input-s3-object-to-in-progress")
.to("direct:process-s3-file")
.to("direct:move-input-s3-object-to-completed")
.end()
from("direct:process-s3-file")
.unmarshal(csvDataFormat)
.split(body())
.streaming()
.parallelProcessing()
.aggregate(simple("\${body.semester}"), GroupedBodyAggregationStrategy())
.completionSize(2)
.bean(persistenceService)
.end()
Для входного CSV-файла, включающего семь (7) записей, это сгенерированный вывод (с некоторыми добавленными записями отладки):
ПРЕДУПРЕЖДЕНИЕ 19540 --- [student-12-move] cassinternal.S3AbortableInputStream: не все байты были прочитаны из S3ObjectInputStream, что прерывает HTTP-соединение. Это, вероятно, ошибка и может привести к неоптимальному поведению. Запросите только те байты, которые вам нужны, через ранжированный GET или истощите входной поток после использования. INFO 19540 --- [student-12-move] student-workflow-main: В S3 получен новый входной файл Student: 'student-12-bucket/inbox/foo.csv' INFO 19540 --- [student-12-move] move-input-s3-объект-в-процессе: перемещение S3-файла 'inbox/foo.csv' в папку 'in-progress'... INFO 19540 --- [student-12-move] student-workflow-main: перемещенный входной файл S3 'in-progress/foo.csv' в папку 'in-progress'... INFO 19540 --- [student-12-move] pre-process-s3-file-records: Начать сохранение в базе данных... DEBUG 19540 --- [read #7 - Split] cbidsStudentPersistenceServiceImpl: сохранение записи в базе данных: Student(id=7, имя =Student 7, семестр =2nd, javaMarks=25) DEBUG 19540 -- [read # 7 - Split] cbidsStudentPersistenceServiceImpl: сохранение записи в базе данных: Student(id=5, имя =Student 5, семестр =2nd, javaMarks=81) DEBUG 19540 --- [read #3 - Split] cbidsStudentPersistenceServiceImpl: сохранение записи в базу данных: Студент (id=6, имя = Студент 6, семестр =1-й, javaMarks=15) DEBUG 19540 --- [read #3 - Split] cbidsStuden tPersistenceServiceImpl: сохранение записи в базе данных: Student(id=2, имя =Student 2, семестр =1st, javaMarks=62) DEBUG 19540 --- [read #2 - Split] cbidsStudentPersistenceServiceImpl: сохранение записи в базе данных: Student(id=3, name=Student 3, semester=2nd, javaMarks=72) DEBUG 19540 --- [read #2 - Split] cbidsStudentPersistenceServiceImpl: сохранение записи в базу данных: Student(id=1, имя =Student 1, семестр =2nd, javaMarks=87) INFO 19540 --- [device-12-move] устройство-группа-workflow-main: завершить предварительную обработку записей файла CSV S3... INFO 19540 --- [student-12-move] move-input-s3-объект к завершению: перемещение файла S3 'in-progress/foo.csv' в папку 'complete'... INFO 19540 --- [студент-12-перемещение] device-group-workflow-main: перемещенный файл S3 'in-progress/foo.csv' в папку 'complete'...
1 ответ
Если вам нужно немедленно завершить ваше сообщение, вы можете указать предикат завершения, который основан на свойствах обмена, установленных сплиттером. Я не пробовал это, но я думаю,
.completionPredicate( simple( "${exchangeProperty.CamelSplitComplete}" ) )
обработает последнее сообщение.
Моя другая проблема в том, что вы установили parallelProcessing
в вашем сплиттере, что может означать, что сообщения не обрабатываются по порядку. Это действительно сплиттер, к которому вы хотите применить параллельную обработку, или агрегатор? Похоже, вы ничего не делаете с разделенными записями, кроме как агрегируете их, а затем обрабатываете, так что может быть лучше переместить parallelProcessing
инструкция к агрегатору.