Ожидаемая временная метка в заголовках событий Flume, но она была нулевой

Я использую приведенные ниже сведения о конфигурации, чтобы вставить каналы Twitter в HDFS с помощью Flume, но получаю ожидаемую временную метку в заголовках событий Flume, но она была нулевой

twitter.conf

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = xxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessToken =  xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessTokenSecret = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.keywords = bigdata, hadoop, hive, hbase
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = /user/farooque/bigdata/tweets/%Y/%m/%d/%H/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

Бег с командой

$ flume-ng agent --conf-file twitter.conf --name TwitterAgent

где twitter.conf мое имя файла конфигурации

Но получаю ошибку как:

java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null
        at com.google.common.base.Preconditions.checkNotNull(Preconditions.java:204)
        at org.apache.flume.formatter.output.BucketPath.replaceShorthand(BucketPath.java:200)
        at org.apache.flume.formatter.output.BucketPath.escapeString(BucketPath.java:396)
        at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:388)
        at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
        at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
        at java.lang.Thread.run(Thread.java:745)
15/06/04 18:26:01 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.

Ищете дополнительную помощь?

3 ответа

Решение

В twitter.conf добавлено еще одно свойство config как

TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true

и проблема была решена.

Для получения более подробной информации см. Hadoop tutorial.info

С опцией "TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true" он будет использовать временную метку места назначения (то есть приемника HDFS). Вместо этого, если вы хотите использовать временную метку фактического события, тогда мы должны использовать перехватчики. Используйте строку ниже в файле конфигурации или свойств.

TwitterAgent.sources.Twitter.interceptors = interceptor1
TwitterAgent.sources.Twitter.interceptors.interceptor1.type = timestamp

Ты используешь org.apache.flume.source.twitter.TwitterSource который Apache предоставил Twitter Source. Не поставляется со встроенным timestamp в Flume Event. Итак, у вас есть 2 варианта здесь:

1) Либо используйте com.cloudera.flume.source.TwitterSource в вашем конфигурационном файле.

2) Или вы можете добавить TwitterAgent.sinks.HDFS.hdfs.useLocalTimeStamp = true свойство в вашем конфигурационном файле.

Обратите внимание, что вы столкнулись с этой проблемой, потому что вы указали параметры метки времени в пути HDFS /user/farooque/bigdata/tweets/%Y/%m/%d/%H/, Если вы не укажете их, исходные коды Apache и Cloudera будут работать без проблем.

Другие вопросы по тегам