Avro Текстовый файл, созданный Flume Twitter Agent, который не читается на Java

Невозможно прочитать и проанализировать файл, созданный потоковой передачей данных с помощью Twitter-агента Flume, а также с помощью Java или Avro Tools. Мое требование - преобразовать формат avro в формат JSON.

При использовании любого из методов я получаю исключение: org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -40

Я использую ванильный конфиг Hadoop в кластере псевдоузлов и версия hadoop 2.7.1

Версия Flume 1.6.0

Файл конфигурации flume для твиттера и код java для разбора файла avro прилагается ниже:

TwitterAgent.sources=Twitter
TwitterAgent.channels=MemChannel
TwitterAgent.sinks=HDFS
TwitterAgent.sources.Twitter.type=org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.channels=MemChannel

TwitterAgent.sources.Twitter.consumerKey=xxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.consumerSecret=xxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessToken=xxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessTokenSecret=xxxxxxxxxxxxxx

TwitterAgent.sources.Twitter.keywords=Modi,PMO,Narendra Modi,BJP

TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type=hdfs
TwitterAgent.sinks.HDFS.hdfs.path=hdfs://localhost:9000/user/ashish/Twitter_Data
TwitterAgent.sinks.HDFS.hdfs.fileType=DataStream
TwitterAgent.sinks.HDFS.hdfs.writeformat=Text
TwitterAgent.sinks.HDFS.hdfs.batchSize=100
TwitterAgent.sinks.HDFS.hdfs.rollSize=0
TwitterAgent.sinks.HDFS.hdfs.rollCount=10
TwitterAgent.sinks.HDFS.hdfs.rollInterval=30
TwitterAgent.channels.MemChannel.type=memory
TwitterAgent.channels.MemChannel.capacity=10000
TwitterAgent.channels.MemChannel.transactionCapacity=100

import org.apache.avro.file.DataFileReader;
import org.apache.avro.file.FileReader;
import org.apache.avro.file.SeekableInput;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.FsInput;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import java.io.IOException;

public class AvroReader {

    public static void main(String[] args) throws IOException {
        Path path = new Path("hdfs://localhost:9000/user/ashish/Twitter_Data/FlumeData.1449656815028");
        Configuration config = new Configuration();
        SeekableInput input = new FsInput(path, config);
        DatumReader<GenericRecord> reader = new GenericDatumReader<>();
        FileReader<GenericRecord> fileReader = DataFileReader.openReader(input, reader);

        for (GenericRecord datum : fileReader) {
            System.out.println("value = " + datum);
        }

        fileReader.close();
    }
}

Трассировка стека исключений, которую я получил:

2015-12-09 17:48:19,291 WARN  [main] util.NativeCodeLoader (NativeCodeLoader.java:<clinit>(62)) - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    value = {"id": "674535686809120768", "user_friends_count": 1260, "user_location": "ユウサリ", "user_description": "「テガミバチ」に登場するザジのbotです。追加してほしい言葉などの希望があればDMでお願いします。リムーブする際はブロックでお願いします。", "user_statuses_count": 47762, "user_followers_count": 1153, "user_name": "ザジ", "user_screen_name": "zazie_bot", "created_at": "2015-12-09T15:56:54Z", "text": "@ill_akane_bot お前、なんか、\u2026すっげー楽しそうだな\u2026", "retweet_count": 0, "retweeted": false, "in_reply_to_user_id": 204695477, "source": "<a href=\"http:\/\/twittbot.net\/\" rel=\"nofollow\">twittbot.net<\/a>", "in_reply_to_status_id": 674535430423887872, "media_url_https": null, "expanded_url": null}
    Exception in thread "main" org.apache.avro.AvroRuntimeException: java.io.IOException: Block size invalid or too large for this implementation: -40
        at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:275)
        at org.apache.avro.file.DataFileStream.hasNext(DataFileStream.java:197)
        at avro.AvroReader.main(AvroReader.java:24)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
    Caused by: java.io.IOException: Block size invalid or too large for this implementation: -40
        at org.apache.avro.file.DataFileStream.hasNextBlock(DataFileStream.java:266)
        ... 7 more

Также мне нужно указать схему Avro для правильного чтения файла Avro, если да, то где?

1 ответ

Я тоже встречал эту проблему. Хотя я вижу ваш файл данных, который больше не существует. Я проверил этот файл данных, который должен совпадать с вашим.

Я обнаружил, что мой файл данных уже был файлом контейнера avro, что означает, что он имеет свою схему и данные.

Файл avro, который я получил, был очень неправильным, потому что он должен включать в себя только один заголовок, содержащий схему avro, но на самом деле в его файле несколько заголовков.

Другое дело, что твиты уже в формате JSON, почему Flume конвертировать их в формат AVRO?

Другие вопросы по тегам