Как мне использовать hadoop с активным центром?

Я использую версию 5.1 Active Pivot, но планирую обновить до 5.2. Я хотел бы прочитать данные с использованием CsvSource и получать обновления в реальном времени.

1 ответ

Вступление

В этой статье рассказывается о том, как читать данные из Hadoop в Active Pivot. Это было протестировано с Active Pivot 5.1 и 5.2. Короче говоря, у вас есть 2 способа восполнить этот пробел:

  • Использование смонтированной HDFS делает вашу HDFS похожей на диск

  • Использование Hadoop Java API

Использование смонтированной HDFS

Вы можете легко смонтировать HDFS с помощью определенных дистрибутивов Hadoop. (Пример: монтировать HDFS с Cloudera CDH 5 было легко.)

После этого у вас будет точка монтирования на вашем сервере Active Pivot, связанная с вашей HDFS, и она будет вести себя как обычный диск. (По крайней мере, для чтения у письма есть некоторые ограничения)

Например, если у вас есть файлы CSV в вашей HDFS, вы сможете напрямую использовать Active Pivot Csv Source.

Использование Hadoop Java API

Другой способ - использовать Hadoop Java API: http://hadoop.apache.org/docs/current/api/

Несколько основных классов для использования:

org.apache.hadoop.fs.FileSystem - Используется для общих операций с Hadoop.

org.apache.hadoop.conf.Configuration - Используется для настройки объекта FileSystem.

org.apache.hadoop.hdfs.client.HdfsAdmin - Можно использовать для просмотра событий (например, новый файл добавлен в HDFS)

Примечание. Отслеживание событий доступно для Hadoop 2.6.0 и выше. Для предыдущей версии Hadoop вы могли либо создать собственную, либо использовать смонтированную HDFS с существующим FileWatcher.

зависимости

Вам понадобится несколько зависимостей Hadoop.

Остерегайтесь, могут быть конфликты между зависимостями Hadoop и Active Pivot на Jaxb.

В следующем файле pom.xml решение этой проблемы состояло в том, чтобы исключить зависимости Jaxb из зависимостей Hadoop.

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-auth</artifactId>
    <version>2.6.0</version>
</dependency>
<!-- These 2 dependencies have conflicts with ActivePivotCva on Jaxb -->
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.0</version>
    <exclusions>
        <exclusion>
            <groupId>com.sun.xml.bind</groupId>
            <artifactId>jaxb-impl</artifactId>
        </exclusion>
        <exclusion>
        <groupId>javax.xml.bind</groupId>
        <artifactId>jaxb-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-core</artifactId>
    <version>1.2.1</version>
    <exclusions>
        <exclusion>
            <groupId>com.sun.xml.bind</groupId>
            <artifactId>jaxb-impl</artifactId>
        </exclusion>
        <exclusion>
            <groupId>javax.xml.bind</groupId>
            <artifactId>jaxb-api</artifactId>
        </exclusion>
    </exclusions>
</dependency>

свойства

Вам нужно будет определить как минимум 2 свойства:

  • Адрес Hadoop (например: hdfs://localhost:9000)

  • Путь HDFS к вашим файлам (например: /user/quartetfs/data/)

Если ваш кластер защищен, вам необходимо выяснить, как получить к нему удаленный доступ безопасным способом.

Пример чтения файла из Hadoop

// Configuring

Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");

FileSystem hdfs = FileSystem.get(this.conf);
Path filePath = new Path(/user/username/input/file.txt);

// Reading
BufferedReader bfr =new BufferedReader(new InputStreamReader(hdfs.open(filePath)));
String str = null;

while ((str = bfr.readLine()) != null)
{
       System.out.println(str);
}

Hadoop Source

Теперь, когда вы можете читать с HDFS, вы можете написать свой Hadoop Source, как и для других источников.

Например, вы можете создать HadoopSource, реализующий ISource.

И вы можете запустить его в вашем SourceConfig, где вы получите свои свойства из вашей среды.

Наблюдение за событиями (например, новые файлы)

Если вы хотите получить файлы, хранящиеся в HDFS, вы можете создать другой класс, отслеживающий события.

Примером может служить следующий код, в котором у вас будут свои собственные методы для обработки определенных событий. (Например, в следующем коде: onCreation(), onAppend())

protected HdfsAdmin admin;
protected String threadName;

public void run()
{
    DFSInotifyEventInputStream eventStream;

    try
    {
      eventStream = admin.getInotifyEventStream();
      LOGGER.info(" - Thread: " + this.threadName + "Starting catching events.");

      while (true)
      {

        try
        {
          Event event = eventStream.take();

          // Possible eventType: CREATE, APPEND, CLOSE, RENAME, METADATA, UNLINK
          switch (event.getEventType())
          {
          case CREATE:
            CreateEvent createEvent = (CreateEvent) event;
            onCreation(createEvent.getPath());
            break;

          case APPEND:
            AppendEvent appendEvent = (AppendEvent) event;
            onAppend(appendEvent.getPath());
            break;

          default:
            break;
          }

        } catch (InterruptedException e) {
          e.printStackTrace();

        } catch (MissingEventsException e) {
          e.printStackTrace();
        }
      }
    } catch (IOException e1) {
      LOGGER.severe(" - Thread: " + this.threadName + "Failure to start the eventStream");
      e1.printStackTrace();
    }
  }

То, что я сделал для моего метода onCreation (не показан), заключался в том, чтобы хранить вновь созданные файлы в параллельной очереди, чтобы мой HadoopSource мог получать несколько файлов параллельно.

-

Если у меня недостаточно ясности по определенным аспектам или у вас есть вопросы, не стесняйтесь спрашивать.

Другие вопросы по тегам