Могу ли я получить пример кода для использования сообщения avro kafka?

Я только что установил платформу Datatorrent RTS (Apache Apex) и запустил демонстрацию pi. Я хочу использовать "avro" сообщения от kafka, а затем собирать и хранить данные в формате hdf. Можно ли получить пример кода для этого или кафки?

3 ответа

Решение

Вот код для полного рабочего приложения, использующего новый оператор ввода Kafka и оператор вывода файла из Apex Malhar. Он преобразует байтовые массивы в строки и записывает их в HDFS, используя скользящие файлы с ограниченным размером (в данном примере 1 КБ); пока размер файла не достигнет границы, он будет иметь временное имя с .tmp расширение. Вы можете вставить дополнительные операторы между этими двумя, как предложено DevT в /questions/13453143/mogu-li-ya-poluchit-primer-koda-dlya-ispolzovaniya-soobscheniya-avro-kafka/13453152#13453152):

package com.example.myapexapp;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;

import com.datatorrent.lib.io.ConsoleOutputOperator;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator.FileLineInputOperator;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;

@ApplicationAnnotation(name="MyFirstApplication")
public class KafkaApp implements StreamingApplication
{

  @Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    KafkaSinglePortInputOperator in = dag.addOperator("in", new KafkaSinglePortInputOperator());
    in.setInitialPartitionCount(1);
    in.setTopics("test");
    in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
    //in.setClusters("localhost:2181");
    in.setClusters("localhost:9092");   // NOTE: need broker address, not zookeeper

    LineOutputOperator out = dag.addOperator("out", new LineOutputOperator());
    out.setFilePath("/tmp/FromKafka");
    out.setFileName("test");
    out.setMaxLength(1024);        // max size of rolling output file

    // create stream connecting input adapter to output adapter
    dag.addStream("data", in.outputPort, out.input);
  }
}

/**
 * Converts each tuple to a string and writes it as a new line to the output file
 */
class LineOutputOperator extends AbstractFileOutputOperator<byte[]>
{
  private static final String NL = System.lineSeparator();
  private static final Charset CS = StandardCharsets.UTF_8;
  private String fileName;

  @Override
  public byte[] getBytesForTuple(byte[] t) { return (new String(t, CS) + NL).getBytes(CS); }

  @Override
  protected String getFileName(byte[] tuple) { return fileName; }

  public String getFileName() { return fileName; }
  public void setFileName(final String v) { fileName = v; }
}

На высоком уровне код вашего приложения будет похож на

KafkaSinglePortStringInputOperator -> AvroToPojo -> Агрегатор измерений -> Реализация AbstractFileOutputOperator

KafkaSinglePortStringInputOperator - если вы работаете с другим типом данных, вы можете использовать KafkaSinglePortByteArrayInputOperator или написать собственную реализацию.

AvroToPojo - https://github.com/apache/incubator-apex-malhar/blob/5075ce0ef75afccdff2edf4c044465340176a148/contrib/src/main/java/com/datatorrent/contrib/avro/AvroToPojo.java

Этот оператор преобразует GenericRecord в заданный пользователем POJO. Пользователь должен указать класс POJO, который должен быть выдан, если используется отражение. В настоящее время это используется для чтения GenericRecords из файлов-контейнеров, и поддерживаются только примитивные типы. Для чтения из Kafka вы можно моделировать ваш оператор по аналогичным линиям и добавить объект Schema для разбора входящих записей. Должно работать что-то вроде ниже в методе processTuple, Schema schema = new Schema.Parser().parse()); GenericDatumReader reader = new GenericDatumReader(схема);

Агрегатор измерений - вы можете выбрать один из приведенных здесь агрегаторов - https://github.com/apache/incubator-apex-malhar/tree/5075ce0ef75afccdff2edf4c044465340176a148/library/src/main/java/org/apache/apex/malhar/lib/dimensions или написать собственный агрегатор в том же духе.

FileWriter - из примера в посте выше.

Пример кода для чтения из Kafka и записи в JDBC.

github.com/tweise/apex-samples/tree/master/exactly-once

Блог, объясняющий приведенный выше код.

www.datatorrent.com/blog/end-to-end-exactly-once-with-apache-apex/

Авро операторов можно найти https://github.com/apache/incubator-apex-malhar/search?utf8=%E2%9C%93&q=avro

Контрольные примеры содержат примеры использования.

Другие вопросы по тегам