Как я могу загрузить каждый файл в папку, используя PIG?

У меня есть папка с файлами, которые создаются ежедневно, и все они хранят информацию одного типа. Я хотел бы создать сценарий, который загружает последние 10 из них, объединяет их, а затем запускает на них какой-то другой код. Поскольку у pig уже есть метод ls, мне было интересно, есть ли для меня простой способ получить последние 10 созданных файлов и загрузить их под общими именами, используя тот же загрузчик и параметры. Я думаю, это будет выглядеть примерно так:

REGISTER /usr/local/lib/hadoop/hadoop-lzo-0.4.13.jar;
REGISTER /usr/local/lib/hadoop/elephant-bird-2.0.5.jar;
FOREACH file in some_path:
    file = LOAD 'file' 
    USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t') 
    AS (i1, i2, i3);

3 ответа

Решение

Ответ Дональда Майнера по-прежнему работает отлично, но, по-моему, теперь есть лучший подход к этому с использованием Embedded Pig в Python. У О'Рейли есть краткое объяснение здесь. Также есть презентация о том, почему это то, что вы хотели бы сделать, и как это работает здесь. Короче говоря, есть много функциональных возможностей, к которым было бы неплохо иметь доступ, прежде чем запускать сценарий Pig для определения частей сценария. Оборачивая и / или динамически генерируя части скрипта в Jython, давайте сделаем это. Радуйтесь!

Это не то, что я смог сделать из коробки, и это то, что можно сделать вне сценария с помощью какого-либо сценария-оболочки или вспомогательного сценария (bash, perl и т. Д.). Если вы пишете сценарий, называется last10.sh, что бы вывести ваши последние 10 файлов через запятую:

$ ./last10.sh
/input/file38,/input/file39,...,/input/file48

Примерно так должно получиться для последних 10 файлов:

hadoop fs -ls /input/ | sort -k6,7 | tail -n10 | awk '{print $8}' | tr '\n' ','

Вы могли бы сделать:

$ pig -p files="`last10.sh`" my_mr.pig

Затем, в вашем сценарии свиньи, выполните:

data = LOAD '$files'
       USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
       AS (i1, i2, i3);

Pig загружает отдельные файлы, если они разделены запятыми, как это. Это было бы эквивалентно выполнению:

data = LOAD '/input/file38,/input/file39,...,/input/file48'
       USING com.twitter.elephantbird.pig.load.LzoTokenizedLoader('\\t')
       AS (i1, i2, i3);

Мне нравится выше 2 подходов. Просто хотел дать еще один вариант для энтузиастов. Действие Java в oozie выплевывает файл в месте, настроенном с помощью "oozie.action.output.properties", и действие Pig принимает его, который передается сценарию pig. Это определенно не элегантное решение по сравнению с вышеприведенным 2. У меня были проблемы с настройкой встроенного поросенка с использованием java schedule в oozie, поэтому мне пришлось пойти с этим решением.

<workflow-app xmlns='uri:oozie:workflow:0.1' name='java-wf'>
<start to='java1' />

<action name='java1'>
    <java>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
           <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <main-class>org.apache.oozie.test.MyTest</main-class>
        <arg>${outputFileName}</arg>
        <capture-output/>
    </java>
    <ok to="pig1" />
    <error to="fail" />
</action>


<action name='pig1'>
    <pig>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>mapred.job.queue.name</name>
                <value>${queueName}</value>
            </property>
        </configuration>
        <script>script.pig</script>
        <param>MY_VAR=${wf:actionData('java1')['PASS_ME']}</param>
    </pig>
    <ok to="end" />
    <error to="fail" />
</action>

<kill name="fail">
    <message>Pig failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name='end' />

Другие вопросы по тегам