Как мне использовать hadoop с активным центром?
Я использую версию 5.1 Active Pivot, но планирую обновить до 5.2. Я хотел бы прочитать данные с использованием CsvSource и получать обновления в реальном времени.
1 ответ
Вступление
В этой статье рассказывается о том, как читать данные из Hadoop в Active Pivot. Это было протестировано с Active Pivot 5.1 и 5.2. Короче говоря, у вас есть 2 способа восполнить этот пробел:
Использование смонтированной HDFS делает вашу HDFS похожей на диск
Использование Hadoop Java API
Использование смонтированной HDFS
Вы можете легко смонтировать HDFS с помощью определенных дистрибутивов Hadoop. (Пример: монтировать HDFS с Cloudera CDH 5 было легко.)
После этого у вас будет точка монтирования на вашем сервере Active Pivot, связанная с вашей HDFS, и она будет вести себя как обычный диск. (По крайней мере, для чтения у письма есть некоторые ограничения)
Например, если у вас есть файлы CSV в вашей HDFS, вы сможете напрямую использовать Active Pivot Csv Source.
Использование Hadoop Java API
Другой способ - использовать Hadoop Java API: http://hadoop.apache.org/docs/current/api/
Несколько основных классов для использования:
org.apache.hadoop.fs.FileSystem
- Используется для общих операций с Hadoop.
org.apache.hadoop.conf.Configuration
- Используется для настройки объекта FileSystem.
org.apache.hadoop.hdfs.client.HdfsAdmin
- Можно использовать для просмотра событий (например, новый файл добавлен в HDFS)
Примечание. Отслеживание событий доступно для Hadoop 2.6.0 и выше. Для предыдущей версии Hadoop вы могли либо создать собственную, либо использовать смонтированную HDFS с существующим FileWatcher.
зависимости
Вам понадобится несколько зависимостей Hadoop.
Остерегайтесь, могут быть конфликты между зависимостями Hadoop и Active Pivot на Jaxb.
В следующем файле pom.xml решение этой проблемы состояло в том, чтобы исключить зависимости Jaxb из зависимостей Hadoop.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>2.6.0</version>
</dependency>
<!-- These 2 dependencies have conflicts with ActivePivotCva on Jaxb -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.6.0</version>
<exclusions>
<exclusion>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
<exclusions>
<exclusion>
<groupId>com.sun.xml.bind</groupId>
<artifactId>jaxb-impl</artifactId>
</exclusion>
<exclusion>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
</exclusion>
</exclusions>
</dependency>
свойства
Вам нужно будет определить как минимум 2 свойства:
Адрес Hadoop (например: hdfs://localhost:9000)
Путь HDFS к вашим файлам (например: /user/quartetfs/data/)
Если ваш кластер защищен, вам необходимо выяснить, как получить к нему удаленный доступ безопасным способом.
Пример чтения файла из Hadoop
// Configuring
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
FileSystem hdfs = FileSystem.get(this.conf);
Path filePath = new Path(/user/username/input/file.txt);
// Reading
BufferedReader bfr =new BufferedReader(new InputStreamReader(hdfs.open(filePath)));
String str = null;
while ((str = bfr.readLine()) != null)
{
System.out.println(str);
}
Hadoop Source
Теперь, когда вы можете читать с HDFS, вы можете написать свой Hadoop Source, как и для других источников.
Например, вы можете создать HadoopSource, реализующий ISource.
И вы можете запустить его в вашем SourceConfig, где вы получите свои свойства из вашей среды.
Наблюдение за событиями (например, новые файлы)
Если вы хотите получить файлы, хранящиеся в HDFS, вы можете создать другой класс, отслеживающий события.
Примером может служить следующий код, в котором у вас будут свои собственные методы для обработки определенных событий. (Например, в следующем коде: onCreation(), onAppend())
protected HdfsAdmin admin;
protected String threadName;
public void run()
{
DFSInotifyEventInputStream eventStream;
try
{
eventStream = admin.getInotifyEventStream();
LOGGER.info(" - Thread: " + this.threadName + "Starting catching events.");
while (true)
{
try
{
Event event = eventStream.take();
// Possible eventType: CREATE, APPEND, CLOSE, RENAME, METADATA, UNLINK
switch (event.getEventType())
{
case CREATE:
CreateEvent createEvent = (CreateEvent) event;
onCreation(createEvent.getPath());
break;
case APPEND:
AppendEvent appendEvent = (AppendEvent) event;
onAppend(appendEvent.getPath());
break;
default:
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (MissingEventsException e) {
e.printStackTrace();
}
}
} catch (IOException e1) {
LOGGER.severe(" - Thread: " + this.threadName + "Failure to start the eventStream");
e1.printStackTrace();
}
}
То, что я сделал для моего метода onCreation (не показан), заключался в том, чтобы хранить вновь созданные файлы в параллельной очереди, чтобы мой HadoopSource мог получать несколько файлов параллельно.
-
Если у меня недостаточно ясности по определенным аспектам или у вас есть вопросы, не стесняйтесь спрашивать.