Присоединение DStream и RDD с контрольными точками

Я боролся за объединение DStream и RDD. Чтобы установить сцену:

  • Искра - 2.3.1
  • Python - 3.6.3

РДД

Я читаю в RDD из файла CSV, разделяю записи и создаю пару RDD.

sku_prices = sc.textFile("sku-catalog.csv")\
    .map(lambda line: line.split(","))\
    .map(lambda fields: (fields[0], float(fields[1])))

Это выход из sku_prices.collect():

[('0003003001', 19.25),
 ('0001017002', 2.25),
 ('0001017003', 3.5),
 ('0003013001', 18.75),
 ('0004017002', 16.5),
 ('0002008001', 2.25),
 ('0004002001', 10.75),
 ('0005020002', 10.5),
 ('0001004002', 3.5),
 ('0002016003', 14.25)]

DStream

Я читаю DStream от Кафки.

orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))

items = orders.map(lambda order: order['items'])\
              .flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
              .reduceByKey(lambda x, y: x + y)

Когда я бегу pprint() на orders Я получаю вывод, который выглядит так:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)

Присоединиться

Теперь я хочу присоединиться к items Dtream к sku_prices РДД. Я знаю, что не могу сделать это соединение напрямую, но мое чтение предполагает, что я могу использовать transform() метод на DStream, чтобы сделать работу. Итак, вот что у меня есть:

items.transform(lambda rdd: rdd.join(sku_prices)).pprint()

Я ожидаю получить DStream, который выглядит примерно так:

-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))

Документация Spark предполагает, что это должно работать, и это работает: именно этот результат я и получаю!:)

Checkpointing

Однако я также хочу сделать операцию с состоянием, поэтому мне нужно ввести контрольные точки.

ssc.checkpoint("checkpoint")

Просто добавив результаты контрольных точек в этой ошибке на transform():

Похоже, что вы пытаетесь передать СДР или сослаться на СДР из действия или преобразования. Преобразования и действия СДР могут вызываться только драйвером, но не внутри других преобразований; например, rdd1.map(lambda x: rdd2.values.count() * x) недопустим, поскольку преобразование значений и действие count не могут быть выполнены внутри преобразования rdd1.map.

Ответ на эту тему предполагает, что контрольные точки и внешние СДР не смешиваются. Это можно обойти? Можно ли присоединить DStream и RDD, когда в StreamingContext включена контрольная точка?

Спасибо, Андрей.

0 ответов

Другие вопросы по тегам