Flink AvroRuntimeException: не конкретный класс
Я пытаюсь использовать реестр конфлюентных схем в интерактивной оболочке Flink Scala, чтобы начать работу с текущей версией Flink 0.10.1. Дополнительный контекст доступен здесь https://github.com/geoHeil/streaming-reference/tree/5-basic-flink-setup
Моя проблема заключается в попытке инициализировать сериализатор из ConfluentRegistryAvroDeserializationSchema
не удается:
```scala
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)
error: type arguments [Tweet] conform to the bounds of none of the overloaded alternatives of
value forSpecific: [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T], x$2: String, x$3: Int)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T] <and> [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T], x$2: String)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T]
```
Оболочка настроена так (т.е. дополнительные JAR для поддержки реестра avro или схемы добавляются следующим образом:):
```bash
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.10.1/flink-connector-kafka_2.11-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.10.1/flink-connector-kafka-base_2.11-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.10.1/flink-avro-confluent-registry-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.10.1/flink-avro-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.10.1/force-shading-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/
export TERM=xterm-color
./bin/start-scala-shell.sh local
```
Я пытаюсь выполнить следующий фрагмент:
```scala
import org.apache.flink.streaming.connectors.kafka.{
FlinkKafkaConsumer,
FlinkKafkaProducer
}
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties
senv.enableCheckpointing(5000)
final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val schemaRegistryUrl = "http://localhost:8081"
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)
```
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/no-subject-td36269.html и http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Avro-from-avrohugger-still-invalid-td36274.html - это ссылки на список рассылки Flinks.
редактировать
Первая подсказка, на которую я наткнулся: https://github.com/zladovan/gradle-avrohugger-plugin. Мне нужно изменить классы case на одну из Avro Specfic или общую запись при создании классов. Но я также изо всех сил пытаюсь заставить его работать.
В case class Tweet
приведенный выше пример был создан из https://github.com/geoHeil/streaming-reference/blob/5-basic-flink-setup/common/models/src/main/avro/Tweet.avsc с использованием https://github.com/zladovan/gradle-avrohugger-plugin в стандартном (то есть классическом) режиме.
Однако его нужно переместить в SpecificRecord
формат https://github.com/zladovan/gradle-avrohugger-plugin, чтобы иметьTweet
класс, который совместим. Этот довольно длинный. Для полноты он доступен по адресу https://gist.github.com/geoHeil/8b15d44d07e11c32a461b78365e0c158.
Работа по-прежнему не выполняется:
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
даже для возможно совместимого класса. Так что это (пока) не полное решение. Хотя согласно https://issues.apache.org/jira/browse/FLINK-12501 он уже должен работать:
org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
at org.apache.flink.formats.avro.AvroDeserializationSchema.checkAvroInitialized(AvroDeserializationSchema.java:147)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.checkAvroInitialized(RegistryAvroDeserializationSchema.java:79)
at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:64)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
... 9 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
... 13 more