Как потреблять агрегированный обмен в apache camel с streamz dsl
Я пытаюсь использовать верблюд, чтобы найти последний файл, доступный в конечной точке в файловой системе только для чтения, и использовать его. Так как у меня есть приложение, основанное на akka streams, и поэтому я решил использовать библиотеку streamz
Вот мое текущее решение и проблема, которую я вижу с ним. Я уверен, что я могу найти лучшее решение здесь, я просто не могу понять это сам.
Мое текущее решение заключается в настройке маршрута на конечной точке только для чтения, которая объединяет все сопоставленные файлы с родительским путем, являющимся ключом корреляции, и выбирает последний. Я использую sortBy в настройках конечной точки, чтобы последний измененный файл перешел в последний обмен. То, что я также делаю здесь, это то, что я записываю этот вывод в другую конечную точку файла (локальную копию), а затем я receive
настройка метода на этой конечной точке. Я хотел бы избежать этого дополнительного копирования файлов и, возможно, передать агрегированный обмен прямо в receive
метод?
object FileConsumerStarter extends App {
import FeedConsumer._
val endpoint = """file:poller/src/main/resources/file-poller?"""
val fileRegex = "file.*\\.csv"
setupSource(endpoint, fileRegex)
.to(Sink.foreach(a => println(a)))
.run()
}
import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.apache.camel.{Exchange}
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy
import org.apache.camel.builder.RouteBuilder
import streamz.camel.StreamContext
import streamz.camel.akka.scaladsl.receive
trait FeedConsumer {
implicit val actorSystem = ActorSystem("FeedConsumer")
implicit val actorMaterializer = ActorMaterializer()
implicit val streamContext = StreamContext()
streamContext.camelContext.start()
val defaultEndpointSettings = Seq(
"autoCreate=false",
"charset=utf-8",
"delay=10000",
"noop=true",
"sortBy=file:modified",
"eagerMaxMessagesPerPoll=false",
"readLock=none"
)
def setupSource(camelFileEndpoint: String, camelFileEndpointFileRegex: String, endpointSettings:Seq[String] = defaultEndpointSettings) = {
val endpointString = (camelFileEndpoint.last match {
case '?' => camelFileEndpoint
case _ => camelFileEndpoint + '?'
}
) + (s"include=$camelFileEndpointFileRegex" +: endpointSettings).mkString("&")
val someUniqId = UUID.randomUUID()
val rb = new RouteBuilder() {
override def configure(): Unit =
from(endpointString)
.aggregate(header(Exchange.FILE_PARENT), new UseLatestAggregationStrategy())
.completionTimeout(1000L)
.ignoreInvalidCorrelationKeys()
.to(s"file:///tmp/file_manager/${someUniqId}")
}
streamContext.camelContext.addRoutes(rb)
val settings = Seq(
"delete=true",
pickOneFile,
"readLock=markerFile",
"delay=2000"
).mkString("&")
receive[String](s"file:///tmp/gcm_file_manager/${someUniqId}?${settings}")
}
object FeedConsumer extends FeedConsumer