Как потреблять агрегированный обмен в apache camel с streamz dsl

Я пытаюсь использовать верблюд, чтобы найти последний файл, доступный в конечной точке в файловой системе только для чтения, и использовать его. Так как у меня есть приложение, основанное на akka streams, и поэтому я решил использовать библиотеку streamz

Вот мое текущее решение и проблема, которую я вижу с ним. Я уверен, что я могу найти лучшее решение здесь, я просто не могу понять это сам.

Мое текущее решение заключается в настройке маршрута на конечной точке только для чтения, которая объединяет все сопоставленные файлы с родительским путем, являющимся ключом корреляции, и выбирает последний. Я использую sortBy в настройках конечной точки, чтобы последний измененный файл перешел в последний обмен. То, что я также делаю здесь, это то, что я записываю этот вывод в другую конечную точку файла (локальную копию), а затем я receive настройка метода на этой конечной точке. Я хотел бы избежать этого дополнительного копирования файлов и, возможно, передать агрегированный обмен прямо в receive метод?

object FileConsumerStarter extends App {

  import FeedConsumer._

  val endpoint = """file:poller/src/main/resources/file-poller?"""
  val fileRegex = "file.*\\.csv"
  setupSource(endpoint, fileRegex)
      .to(Sink.foreach(a => println(a)))
      .run()

}

import java.util.UUID
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import org.apache.camel.{Exchange}
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy
import org.apache.camel.builder.RouteBuilder
import streamz.camel.StreamContext
import streamz.camel.akka.scaladsl.receive

trait FeedConsumer {

  implicit val actorSystem = ActorSystem("FeedConsumer")
  implicit val actorMaterializer = ActorMaterializer()
  implicit val streamContext = StreamContext()
  streamContext.camelContext.start()

  val defaultEndpointSettings = Seq(
    "autoCreate=false", 
    "charset=utf-8",
    "delay=10000",
    "noop=true",
    "sortBy=file:modified",
    "eagerMaxMessagesPerPoll=false",
    "readLock=none"
  )

  def setupSource(camelFileEndpoint: String, camelFileEndpointFileRegex: String, endpointSettings:Seq[String] = defaultEndpointSettings) = {

    val endpointString = (camelFileEndpoint.last match {
      case '?' => camelFileEndpoint
      case _ => camelFileEndpoint + '?'
    }
      ) + (s"include=$camelFileEndpointFileRegex" +: endpointSettings).mkString("&")

    val someUniqId = UUID.randomUUID()

    val rb = new RouteBuilder() {
      override def configure(): Unit =
        from(endpointString)
          .aggregate(header(Exchange.FILE_PARENT), new UseLatestAggregationStrategy())
            .completionTimeout(1000L)
            .ignoreInvalidCorrelationKeys()
          .to(s"file:///tmp/file_manager/${someUniqId}")
    }

    streamContext.camelContext.addRoutes(rb)

    val settings = Seq(
      "delete=true",
      pickOneFile,
      "readLock=markerFile",
      "delay=2000"
    ).mkString("&")

    receive[String](s"file:///tmp/gcm_file_manager/${someUniqId}?${settings}")

  }

object FeedConsumer extends FeedConsumer

0 ответов

Другие вопросы по тегам