Как я могу загрузить каждый файл в папку, используя PIG?
У меня есть папка с файлами, которые создаются ежедневно, и все они хранят информацию одного типа. Я хотел бы создать сценарий, который загружает последние 10 из них, объединяет их, а затем запускает на них какой-то другой код. Поскольку у pig уже есть метод ls, мне было интересно, есть ли для меня простой способ получить последние 10 созданных файлов и загрузить их под общими именами, используя тот же загрузчик и параметры. Я думаю, это будет выглядеть примерно так:
REGISTER /usr/local/lib/hadoop/hadoop-lzo-0.4.13.jar;
REGISTER /usr/local/lib/hadoop/elephant-bird-2.0.5.jar;
FOREACH file in some_path:
file = LOAD 'file'
USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
AS (i1, i2, i3);
3 ответа
Ответ Дональда Майнера по-прежнему работает отлично, но, по-моему, теперь есть лучший подход к этому с использованием Embedded Pig в Python. У О'Рейли есть краткое объяснение здесь. Также есть презентация о том, почему это то, что вы хотели бы сделать, и как это работает здесь. Короче говоря, есть много функциональных возможностей, к которым было бы неплохо иметь доступ, прежде чем запускать сценарий Pig для определения частей сценария. Оборачивая и / или динамически генерируя части скрипта в Jython, давайте сделаем это. Радуйтесь!
Это не то, что я смог сделать из коробки, и это то, что можно сделать вне сценария с помощью какого-либо сценария-оболочки или вспомогательного сценария (bash, perl и т. Д.). Если вы пишете сценарий, называется last10.sh
, что бы вывести ваши последние 10 файлов через запятую:
$ ./last10.sh
/input/file38,/input/file39,...,/input/file48
Примерно так должно получиться для последних 10 файлов:
hadoop fs -ls /input/ | sort -k6,7 | tail -n10 | awk '{print $8}' | tr '\n' ','
Вы могли бы сделать:
$ pig -p files="`last10.sh`" my_mr.pig
Затем, в вашем сценарии свиньи, выполните:
data = LOAD '$files'
USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
AS (i1, i2, i3);
Pig загружает отдельные файлы, если они разделены запятыми, как это. Это было бы эквивалентно выполнению:
data = LOAD '/input/file38,/input/file39,...,/input/file48'
USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
AS (i1, i2, i3);
Мне нравится выше 2 подходов. Просто хотел дать еще один вариант для энтузиастов. Действие Java в oozie выплевывает файл в месте, настроенном с помощью "oozie.action.output.properties", и действие Pig принимает его, который передается сценарию pig. Это определенно не элегантное решение по сравнению с вышеприведенным 2. У меня были проблемы с настройкой встроенного поросенка с использованием java schedule в oozie, поэтому мне пришлось пойти с этим решением.
<workflow-app xmlns='uri:oozie:workflow:0.1' name='java-wf'>
<start to='java1' />
<action name='java1'>
<java>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<main-class>org.apache.oozie.test.MyTest</main-class>
<arg>${outputFileName}</arg>
<capture-output/>
</java>
<ok to="pig1" />
<error to="fail" />
</action>
<action name='pig1'>
<pig>
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>${queueName}</value>
</property>
</configuration>
<script>script.pig</script>
<param>MY_VAR=${wf:actionData('java1')['PASS_ME']}</param>
</pig>
<ok to="end" />
<error to="fail" />
</action>
<kill name="fail">
<message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />