Akka Kafka Producersettings: перегруженное значение метода применяется с альтернативами:
Я снова и снова сталкиваюсь с проблемой, когда помещаю настройки производителя в свой код. Когда у меня его нет, все работает отлично. Ниже приведен файл одного файла, он содержит весь код, я пытаюсь записать файл в поток Кафка. И получаю эту ошибку компиляции.
package somePackage
import java.nio.file.Paths
import akka.Done
import akka.actor.{Actor, ActorLogging, ActorSystem, Cancellable, Props}
import akka.kafka.ProducerSettings
import akka.serialization.ByteArraySerializer
import akka.stream.{ActorMaterializer, Materializer}
import akka.stream.scaladsl.{FileIO, Sink}
import akka.util.ByteString
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.StringSerializer
import scala.concurrent.Future
// Lines Producer
class LinesProducer (implicit mat: Materializer) extends Actor with ActorLogging {
import LinesProducerCompanion._
override def preStart():Unit = {
log.info("Not doing anything in PreStart!")
}
def receive = {
case Start =>
log.info("LinesProducer Started.")
val future:Future[Done] = FileIO.fromPath(Paths.get("C:\\Users\\tnkteja\\Documents\\GitHub\\scala-immersion-program\\miniproject-1\\1000-genomes.csv"))
.map( line => {new ProducerRecord[Array[Byte],ByteString]("genomes0", line)})
.runWith( Sink.ignore )
}
override def postStop():Unit = {
log.info("Doing nothing in postStop!")
}
}
object LinesProducerCompanion {
val props = Props[LinesProducer]
case object Start
case object Stop
}
object Application extends App {
implicit val system:ActorSystem = ActorSystem("some")
implicit val materializer:ActorMaterializer= ActorMaterializer()
implicit val executor = system.dispatcher
val LinesProducer = system.actorOf(LinesProducerCompanion.props, "LinesProducer")
val producerSetting = ProducerSettings(system, new ByteArraySerializer(), new StringSerializer).withBootstrapServers("localhost:9092")
LinesProducer ! LinesProducerCompanion.Start
// This example app will ping pong 3 times and thereafter terminate the ActorSystem -
// see counter logic in PingActor
//system.awaitTermination()
}
и ошибка
info] Resolving org.fusesource.jansi#jansi;1.4 ...
[info] Done updating.
[info] Set current project to project (in build file:/C:/Users/tnkteja/Documents/GitHub/scala-immersion-program/miniproject-1/)
[info] Compiling 1 Scala source to C:\Users\tnkteja\Documents\GitHub\scala-immersion-program\miniproject-1\target\scala-2.12\classes...
[error] C:\Users\tnkteja\Documents\GitHub\scala-immersion-program\miniproject-1\src\main\scala\Application.scala:29: overloaded method value apply with alternatives:
[error] [K, V](config: com.typesafe.config.Config, keySerializer: org.apache.kafka.common.serialization.Serializer[K], valueSerializer: org.apache.kafka.common.serialization.Serializer[V
])akka.kafka.ProducerSettings[K,V] <and>
[error] [K, V](system: akka.actor.ActorSystem, keySerializer: org.apache.kafka.common.serialization.Serializer[K], valueSerializer: org.apache.kafka.common.serialization.Serializer[V])ak
ka.kafka.ProducerSettings[K,V] <and>
[error] [K, V](config: com.typesafe.config.Config, keySerializer: Option[org.apache.kafka.common.serialization.Serializer[K]], valueSerializer: Option[org.apache.kafka.common.serializati
on.Serializer[V]])akka.kafka.ProducerSettings[K,V] <and>
[error] [K, V](system: akka.actor.ActorSystem, keySerializer: Option[org.apache.kafka.common.serialization.Serializer[K]], valueSerializer: Option[org.apache.kafka.common.serialization.S
erializer[V]])akka.kafka.ProducerSettings[K,V]
[error] cannot be applied to (akka.actor.ActorSystem, akka.serialization.ByteArraySerializer, org.apache.kafka.common.serialization.StringSerializer)
[error] val producerSetting = ProducerSettings(system, new ByteArraySerializer(), new StringSerializer).withBootstrapServers("localhost:9092")
[error] ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 8 s, completed 6 May, 2017 10:42:15 AM
2 ответа
Вы пробовали использовать Кафку ByteArraySerializer
вместо?
import org.apache.kafka.common.serialization.ByteArraySerializer
Конструктор экземпляров ByteArraySerializer() Akka объявлен устаревшим.
Этот код исправил проблему.
пакет com.miniproject1
import akka.Done
import akka.actor.{Actor, ActorLogging, Props}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.Materializer
import akka.stream.scaladsl.Source
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
// Futures need execution context to reuse allocated thread pools
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
import scala.io.Source.fromFile
class LinesProducer(implicit mat: Materializer) extends Actor with ActorLogging {
import LinesProducerCompanion._
override def preStart(): Unit = {
super.preStart()
log.info("Not doing anything in PreStart!")
}
override def receive: Receive = {
case Start => {
val producerSettings = ProducerSettings(context.system, new ByteArraySerializer, new StringSerializer)
.withBootstrapServers("192.168.99.100:9092")
log.info("Initializing writer")
val kafkaSink = Producer.plainSink(producerSettings)
//
val done: Future[Done] = Source.fromIterator(() => fromFile("C:\\Users\\tnkteja\\Documents\\GitHub\\scala-immersion-program\\miniproject-1\\1000-genomes.csv").getLines().drop(1))
.map(line => {new ProducerRecord[Array[Byte], String]("genomes0", line)})
.runWith(kafkaSink)
done.onComplete({
success =>
log.info("Writing to kafka Complete!")
context.stop(self)
})
done.onFailure {
case ex =>
log.info("*********************Stopping********************")
context.stop(self)
}
}
}
}
object LinesProducerCompanion{
val props = Props[LinesProducer]
case object Start
}