Scio / apache beam java.lang.IllegalArgumentException: невозможно сериализовать метод
Я пытаюсь использовать поток данных для перемещения некоторых данных из паба sub в облачное хранилище. Мне нужно предоставить временную метку для scio / beam, чтобы она могла группировать данные в окна.
У меня есть простой класс case, который моделирует мое событие, выглядит так (некоторые поля удалены)
case class DataEvent(source: String,
record: AnyRef,
timestampUtc: DateTime,
publishedUtc: DateTime)
Мой трубопровод начинается с этого. События в sub sub как в формате json, и я использую json4s для десериализации:
sc
.pubsubSubscription[String]("subscription")
.map(event => parse(event).camelizeKeys.extract[DataEvent])
.timestampBy({ event => event.timestampUtc.toInstant })
В той же области я определил формат implcit для Json4s
implicit val formats: Formats = new DefaultFormats {
override def dateFormatter = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
} ++ org.json4s.ext.JodaTimeSerializers.all
Я использую json4s.ext, который поддерживает jodatime, обратите внимание, что datetime в классе case - это время joda. Кажется, есть некоторые проблемы с этой библиотекой расширений, потому что я получаю следующее исключение:
java.lang.IllegalArgumentException: unable to serialize anonymous function map@{PubSubToGcsJob.scala:78}
java.lang.IllegalArgumentException: unable to serialize anonymous function map@{PubSubToGcsJob.scala:78}
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at com.spotify.scio.values.PCollectionWrapper$class.parDo(PCollectionWrapper.scala:58)
at com.spotify.scio.values.SCollectionImpl.parDo(SCollection.scala:1181)
at com.spotify.scio.values.SCollection$class.map(SCollection.scala:359)
at com.spotify.scio.values.SCollectionImpl.map(SCollection.scala:1181)
at PubSubToGcsJob$.main(PubSubToGcsJob.scala:78)
at PubSubToGcsJob.main(PubSubToGcsJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Caused by: java.io.NotSerializableException: org.json4s.ext.IntervalSerializer$$anon$1
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at scala.collection.immutable.List$SerializationProxy.writeObject(List.scala:468)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:49)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
at org.apache.beam.sdk.transforms.ParDo$SingleOutput.<init>(ParDo.java:591)
at org.apache.beam.sdk.transforms.ParDo.of(ParDo.java:435)
at com.spotify.scio.values.PCollectionWrapper$class.parDo(PCollectionWrapper.scala:58)
at com.spotify.scio.values.SCollectionImpl.parDo(SCollection.scala:1181)
at com.spotify.scio.values.SCollection$class.map(SCollection.scala:359)
at com.spotify.scio.values.SCollectionImpl.map(SCollection.scala:1181)
at PubSubToGcsJob$.main(PubSubToGcsJob.scala:78)
at PubSubToGcsJob.main(PubSubToGcsJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
Я попытался обойти, где я сделал тип строк timestampUtc и Опубликовал Utc и просто проанализировал строку внутри моего конвейера следующим образом:
val formatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
and in the pipeline
.timestampBy({ event => formatter.parseDateTime(event.timestampUtc).toInstant })
Но я получаю похожее исключение:
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn@7b7a1a8f
java.lang.IllegalArgumentException: unable to serialize org.apache.beam.sdk.transforms.WithTimestamps$AddTimestampsDoFn@7b7a1a8f
at org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:53)
at org.apache.beam.sdk.util.SerializableUtils.clone(SerializableUtils.java:90)
... etc
Почему это происходит и как мне это решить?
Спасибо
1 ответ
Это происходит, когда лямбда-функция извлекает что-то не сериализуемое из своего закрытия. В этом случае я подозреваю val formatter
,
Одним из обходных путей является перемещение val
объекту-компаньону, поэтому он статически инициализируется на рабочих и не требует прохождения через ser / de. Например:
object Util {
val formatter = ...
}
Для получения дополнительной информации см.: https://www.lyh.me/lambda-serialization.html