Flink AvroRuntimeException: не конкретный класс

Я пытаюсь использовать реестр конфлюентных схем в интерактивной оболочке Flink Scala, чтобы начать работу с текущей версией Flink 0.10.1. Дополнительный контекст доступен здесь https://github.com/geoHeil/streaming-reference/tree/5-basic-flink-setup

Моя проблема заключается в попытке инициализировать сериализатор из ConfluentRegistryAvroDeserializationSchema не удается:

```scala
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)
error: type arguments [Tweet] conform to the bounds of none of the overloaded alternatives of
value forSpecific: [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T], x$2: String, x$3: Int)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T] <and> [T <: org.apache.avro.specific.SpecificRecord](x$1: Class[T], x$2: String)org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema[T]
```

Оболочка настроена так (т.е. дополнительные JAR для поддержки реестра avro или схемы добавляются следующим образом:):

```bash
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.11/1.10.1/flink-connector-kafka_2.11-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka-base_2.11/1.10.1/flink-connector-kafka-base_2.11-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/0.10.2.1/kafka-clients-0.10.2.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro-confluent-registry/1.10.1/flink-avro-confluent-registry-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/flink-avro/1.10.1/flink-avro-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/flink/force-shading/1.10.1/force-shading-1.10.1.jar -P lib/
wget https://repo1.maven.org/maven2/org/apache/avro/avro/1.8.2/avro-1.8.2.jar -P lib/

export TERM=xterm-color
./bin/start-scala-shell.sh local
```

Я пытаюсь выполнить следующий фрагмент:

```scala
import org.apache.flink.streaming.connectors.kafka.{
  FlinkKafkaConsumer,
  FlinkKafkaProducer
}
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema
import java.util.Properties

senv.enableCheckpointing(5000)

final case class Tweet(tweet_id: Option[String], text: Option[String], source: Option[String], geo: Option[String], place: Option[String], lang: Option[String], created_at: Option[String], timestamp_ms: Option[String], coordinates: Option[String], user_id: Option[Long], user_name: Option[String], screen_name: Option[String], user_created_at: Option[String], followers_count: Option[Long], friends_count: Option[Long], user_lang: Option[String], user_location: Option[String], hashtags: Option[Seq[String]])

val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("group.id", "test")
val schemaRegistryUrl = "http://localhost:8081"
val serializer = ConfluentRegistryAvroDeserializationSchema.forSpecific[Tweet](classOf[Tweet], schemaRegistryUrl)

```

http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/no-subject-td36269.html и http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Avro-from-avrohugger-still-invalid-td36274.html - это ссылки на список рассылки Flinks.

редактировать

Первая подсказка, на которую я наткнулся: https://github.com/zladovan/gradle-avrohugger-plugin. Мне нужно изменить классы case на одну из Avro Specfic или общую запись при создании классов. Но я также изо всех сил пытаюсь заставить его работать.

В case class Tweetприведенный выше пример был создан из https://github.com/geoHeil/streaming-reference/blob/5-basic-flink-setup/common/models/src/main/avro/Tweet.avsc с использованием https://github.com/zladovan/gradle-avrohugger-plugin в стандартном (то есть классическом) режиме.

Однако его нужно переместить в SpecificRecordформат https://github.com/zladovan/gradle-avrohugger-plugin, чтобы иметьTweetкласс, который совместим. Этот довольно длинный. Для полноты он доступен по адресу https://gist.github.com/geoHeil/8b15d44d07e11c32a461b78365e0c158.

Работа по-прежнему не выполняется:

Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet

даже для возможно совместимого класса. Так что это (пока) не полное решение. Хотя согласно https://issues.apache.org/jira/browse/FLINK-12501 он уже должен работать:

org.apache.avro.AvroRuntimeException: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:227)
    at org.apache.flink.formats.avro.AvroDeserializationSchema.checkAvroInitialized(AvroDeserializationSchema.java:147)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.checkAvroInitialized(RegistryAvroDeserializationSchema.java:79)
    at org.apache.flink.formats.avro.RegistryAvroDeserializationSchema.deserialize(RegistryAvroDeserializationSchema.java:64)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
    at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:718)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:200)
Caused by: avro.shaded.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2234)
    at avro.shaded.com.google.common.cache.LocalCache.get(LocalCache.java:3965)
    at avro.shaded.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3969)
    at avro.shaded.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4829)
    at org.apache.avro.specific.SpecificData.getSchema(SpecificData.java:225)
    ... 9 more
Caused by: org.apache.avro.AvroRuntimeException: Not a Specific class: class com.github.geoheil.streamingreference.Tweet
    at org.apache.avro.specific.SpecificData.createSchema(SpecificData.java:285)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:218)
    at org.apache.avro.specific.SpecificData$2.load(SpecificData.java:215)
    at avro.shaded.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3568)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2350)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2313)
    at avro.shaded.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2228)
    ... 13 more

0 ответов

Другие вопросы по тегам