Q: как раскрутить пакеты из сложной структуры данных в PIG

Изначально у меня есть такая структура:

+-------+-------+----+----+----+-----+
| time  | type  | s1 | s2 | id | p1  |
+-------+-------+----+----+----+-----+
| 10:30 | send  | a  | b  |  1 | 110 |
| 10:35 | send  | c  | d  |  1 | 120 |
| 10:31 | reply | e  | f  |  3 | 221 |
| 10:33 | reply | a  | c  |  1 | 210 |
| 10:34 | send  | a  | a  |  3 | 113 |
| 10:32 | reply | c  | d  |  3 | 157 |
+-------+-------+----+----+----+-----+

Я хочу нормализовать таблицу:

  1. сгруппировать записи по идентификатору,
  2. внутри каждой группы найдите самую старую запись типа отправки,
  3. замените s1, s2 других записей значениями из этой самой старой записи типа отправки

```

+-------+-------+----+----+----+-----+
| time  | type  | s1 | s2 | id | p1  |
+-------+-------+----+----+----+-----+
| 10:30 | send  | a  | b  |  1 | 110 |
| 10:35 | send  | a  | b  |  1 | 120 |
| 10:33 | reply | a  | b  |  1 | 210 |
| 10:31 | reply | a  | a  |  3 | 221 |
| 10:34 | send  | a  | a  |  3 | 113 |
| 10:32 | reply | a  | a  |  3 | 157 |
+-------+-------+----+----+----+-----+

Вот как я пытался решить проблему:

events_groupby_id = GROUP events BY id;
events_normalized = FOREACH events_groupby_id {
   f_reqs = FILTER events BY type matches 'send';
   o_reqs = ORDER events BY time ASC;
   req = LIMIT o_reqs 1;
   GENERATE req, events;
};

Я застрял здесь. Потому что я обнаружил, что events_normalized стал сложной структурой с вложенными сумками, и я не знаю, как правильно расплющить.

события_нормализованные | req:bag{:tuple()} | события: мешок {: кортеж ()}

Отсюда, что я должен сделать, чтобы получить структуру данных, которую я хочу? Буду очень признателен, если кто-нибудь сможет мне помочь. Спасибо.

1 ответ

Вы можете распаковать сумки в events_normalized с помощью FLATTEN:

events_flattened = FOREACH events_normalized GENERATE 
    FLATTEN(req), 
    FLATTEN(events);

Это создает перекрестный продукт между req а также events, но так как есть только один кортеж в reqвы получите только одну запись для каждой из ваших первоначальных записей. Схема для events_flattened является:

req::time | req::type | req::s1 | req::s2 | req::id | req::p1 | events::time | events::type | events::s1 | events::s2 | events::id | events::p1

Теперь вы можете ссылаться на поля, которые хотите сохранить, используя events для оригинальных записей и req для замен из самой старой записи типа отправки:

final = FOREACH events_flattened GENERATE 
    events::time AS time, 
    events::type AS type, 
    req::s1 AS s1, 
    req::s2 AS s2, 
    events::id AS id, 
    events::p1 AS p1;
Другие вопросы по тегам