WordCount с Apache Crunch в HBase Standalone
В настоящее время я оцениваю Apache Crunch. Я последовал простому примеру задания WordCount MapReduce: после этого я пытаюсь сохранить результаты в автономной базе данных HBase. HBase работает (проверено с помощью jps и оболочки HBase), как описано здесь: http://hbase.apache.org/book/quickstart.html
Теперь я принимаю пример для записи в HBase:
Pipeline pipeline = new MRPipeline(WordCount.class,getConf());
PCollection<String> lines = pipeline.readTextFile(inputPath);
PTable<String,Long> counts = noStopWords.count();
pipeline.write(counts, new HBaseTarget("wordCountOutTable");
PipelineResult result = pipeline.done();
Я получаю исключение: "исключение:java.lang.illegalArgumentException: HBaseTarget поддерживает только операции вставки и удаления"
Любые подсказки, что пошло не так?
1 ответ
PTable может быть PCollection, но HBaseTarget может обрабатывать только объекты Put или Delete. Таким образом, вы должны преобразовать PTable в PCollection, где каждый элемент коллекции - это либо Put, либо Delete. Взгляните на Crunch-Примеры, где это сделано.
Пример преобразования может выглядеть так:
public PCollection<Put> createPut(final PTable<String, String> counts) {
return counts.parallelDo("Convert to puts", new DoFn<Pair<String, String>, Put>() {
@Override
public void process(final Pair<String, String> input, final Emitter<Put> emitter) {
Put put;
// input.first is used as row key
put = new Put(Bytes.toBytes(input.first()));
// the value (input.second) is added with its family and qualifier
put.add(COLUMN_FAMILY_TARGET, COLUMN_QUALIFIER_TARGET_TEXT, Bytes.toBytes(input.second()));
emitter.emit(put);
}
}, Writables.writables(Put.class));
}