Pig Join не возвращает результатов

Я застрял на этой проблеме более двенадцати часов. У меня есть сценарий Pig, который работает на Amazon Web Services. В настоящее время я просто запускаю свой скрипт в интерактивном режиме. Я пытаюсь получить средние значения для большого набора данных о климатических показаниях с метеостанций; однако эти данные не содержат информацию о стране или штате, поэтому их необходимо объединить с другой таблицей, в которой они есть.

Таблица состояния:

719990 99999 LILLOOET                      CN CA BC WKF   +50683 -121933 +02780
719994 99999 SEDCO 710                     CN CA    CWQJ  +46500 -048500 +00000
720000 99999 BOGUS AMERICAN                US US          -99999 -999999 -99999
720001 99999 PEASON RIDGE/RANGE            US US LA K02R  +31400 -093283 +01410
720002 99999 HALLOCK(AWS)                  US US MN K03Y  +48783 -096950 +02500
720003 99999 DEER PARK(AWS)                US US WA K07S  +47967 -117433 +06720
720004 99999 MASON                         US US MI K09G  +42567 -084417 +02800
720005 99999 GASTONIA                      US US NC K0A6  +35200 -081150 +02440

Таблица климата: (Я понимаю, что это не содержит ничего, чтобы удовлетворить условию соединения, но полный набор данных делает.)

STN--- WBAN   YEARMODA    TEMP       DEWP      SLP        STP       VISIB      WDSP     MXSPD   GUST    MAX     MIN   PRCP   SNDP   FRSHTT
010010 99999  20090101    23.3 24    15.6 24  1033.2 24  1032.0 24   13.5  6    9.6 24   17.5  999.9    27.9*   16.7   0.00G 999.9  001000
010010 99999  20090102    27.3 24    20.5 24  1026.1 24  1024.9 24   13.7  5   14.6 24   23.3  999.9    28.9    25.3*  0.00G 999.9  001000
010010 99999  20090103    25.2 24    18.4 24  1028.3 24  1027.1 24   15.5  6    4.2 24    9.7  999.9    26.2*   23.9*  0.00G 999.9  001000
010010 99999  20090104    27.7 24    23.2 24  1019.3 24  1018.1 24    6.7  6    8.6 24   13.6  999.9    29.8    24.8   0.00G 999.9  011000
010010 99999  20090105    19.3 24    13.0 24  1015.5 24  1014.3 24    5.6  6   17.5 24   25.3  999.9    26.2*   10.2*  0.05G 999.9  001000
010010 99999  20090106    12.9 24     2.9 24  1019.6 24  1018.3 24    8.2  6   15.5 24   25.3  999.9    19.0*    8.8   0.02G 999.9  001000
010010 99999  20090107    26.2 23    20.7 23   998.6 23   997.4 23    6.6  6   12.1 22   21.4  999.9    31.5    19.2*  0.00G 999.9  011000
010010 99999  20090108    21.5 24    15.2 24   995.3 24   994.1 24   12.4  5   12.8 24   25.3  999.9    24.6*   19.2*  0.05G 999.9  011000
010010 99999  20090109    27.5 23    24.5 23   982.5 23   981.3 23    7.9  5   20.2 22   33.0  999.9    34.2    20.1*  0.00G 999.9  011000
010010 99999  20090110    22.5 23    16.7 23   977.2 23   976.1 23   11.9  6   15.5 23   35.0  999.9    28.9*   17.2   0.09G 999.9  000000

Я загружаю климатические данные с помощью TextLoader, применяю регулярное выражение для получения полей и отфильтровываю нули из набора результатов. Затем я делаю то же самое с данными штата, но фильтрую их по стране, являющейся США.

Пакеты имеют следующую схему: CLIMATE_REMOVE_EMPTY: {станция: int, wban: int, год: int, месяц: int, день: int,temp: double} STATES_FILTER_US: {station: int,wban: int, имя: chararray, wmo: chararray, fips: chararray, состояние: chararray}

Мне нужно выполнить операцию соединения с (station,wban), чтобы я мог получить получившуюся сумку со станцией, wban, годом, месяцем и временными значениями. Когда я выполняю сброс на получившуюся сумку, он говорит, что она прошла успешно; однако дамп возвращает 0 результатов. Это выход.

HadoopVersion   PigVersion      UserId  StartedAt       FinishedAt      Features
1.0.3   0.9.2-amzn      hadoop  2013-05-03 00:10:51     2013-05-03 00:12:42         HASH_JOIN,FILTER

Success!

Job Stats (time in seconds):
JobId   Maps    Reduces MaxMapTime      MinMapTIme      AvgMapTime          MaxReduceTime   MinReduceTime   AvgReduceTime   Alias   Feature Outputs
job_201305030005_0001   2       1       36      15      25      33      33      33              CLIMATE,CLIMATE_REMOVE_NULL,RAW_CLIMATE,RAW_STATES,STATES,STATES_FILTER_US,STATE_CLIMATE_JO    IN   HASH_JOIN       hdfs://10.204.30.125:9000/tmp/temp-204730737/tmp1776606203,

Input(s):
Successfully read 30587 records from: "hiddenbucket"
Successfully read 21027 records from: "hiddenbucket"

Output(s):
Successfully stored 0 records in: "hdfs://10.204.30.125:9000/tmp/temp-204730737/tmp1776606203"

Counters:
Total records written : 0
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Я понятия не имею, почему мой это содержит 0 результатов. Мое извлечение данных кажется правильным. и работа успешна. Это приводит меня к мысли, что условие соединения никогда не выполняется. Я знаю, что во входных файлах есть некоторые данные, которые должны удовлетворять условию соединения, но он абсолютно ничего не возвращает.

Единственное, что выглядит подозрительно, - это предупреждение, которое гласит: "Обнаружено предупреждение ACCESSING_NON_EXISTENT_FIELD 26001 раз".

Я не совсем уверен, куда идти отсюда. Поскольку работа не дает сбоев, я не вижу ошибок или чего-либо в отладке.

Я не уверен, что это что-то значит, но вот другие вещи, которые выделяются: когда я пытаюсь проиллюстрировать STATE_CLIMATE_JOIN, я получаю исключение nullPointerException - ОШИБКА 2997: Обнаружено исключение IOException. Исключение: ноль

Когда я пытаюсь проиллюстрировать STATES, я получаю java.lang.IndexOutOfBoundsException: Index: 1, Size: 1

Вот мой полный код:

--Piggy Bank Functions
register file:/home/hadoop/lib/pig/piggybank.jar
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT();

--Load Climate Data
RAW_CLIMATE = LOAD 'hiddenbucket' USING TextLoader as (line:chararray);
RAW_STATES= LOAD 'hiddenbucket' USING TextLoader as (line:chararray);

CLIMATE= 
  FOREACH 
    RAW_CLIMATE
  GENERATE   
    FLATTEN ((tuple(int,int,int,int,int,double))
      EXTRACT(line,'^(\\d{6})\\s+(\\d{5})\\s+(\\d{4})(\\d{2})(\\d{2})\\s+(\\d{1,3}\\.\\d{1})')
    ) 
    AS (
      station: int,
  wban: int,
  year: int,
  month: int,
  day: int,
  temp: double
    )
  ;

STATES= 
  FOREACH 
    RAW_STATES
  GENERATE   
    FLATTEN ((tuple(int,int,chararray,chararray,chararray,chararray))
      EXTRACT(line,'^(\\d{6})\\s+(\\d{5})\\s+(\\S+)\\s+(\\w{2})\\s+(\\w{2})\\s+(\\w{2})')
    ) 
    AS (
      station: int,
  wban: int,
  name: chararray,
  wmo: chararray,
      fips: chararray,
      state: chararray
      )
    ;

CLIMATE_REMOVE_NULL = FILTER CLIMATE BY station IS NOT NULL;
STATES_FILTER_US = FILTER STATES BY (fips == 'US');
STATE_CLIMATE_JOIN = JOIN CLIMATE_REMOVE_NULL BY (station), STATES_FILTER_US BY (station);

Заранее спасибо. Я в недоумении здесь.

- EDIT-- Я наконец получил это на работу! Мое регулярное выражение для разбора STATE_DATA было недействительным.

0 ответов

Другие вопросы по тегам