Присоединение DStream и RDD с контрольными точками
Я боролся за объединение DStream и RDD. Чтобы установить сцену:
- Искра - 2.3.1
- Python - 3.6.3
РДД
Я читаю в RDD из файла CSV, разделяю записи и создаю пару RDD.
sku_prices = sc.textFile("sku-catalog.csv")\
.map(lambda line: line.split(","))\
.map(lambda fields: (fields[0], float(fields[1])))
Это выход из sku_prices.collect()
:
[('0003003001', 19.25),
('0001017002', 2.25),
('0001017003', 3.5),
('0003013001', 18.75),
('0004017002', 16.5),
('0002008001', 2.25),
('0004002001', 10.75),
('0005020002', 10.5),
('0001004002', 3.5),
('0002016003', 14.25)]
DStream
Я читаю DStream от Кафки.
orders = kstream.map(lambda n: n[1]).map(lambda n: json.loads(n))
items = orders.map(lambda order: order['items'])\
.flatMap(lambda items: [(i['sku'], i['count']) for i in items])\
.reduceByKey(lambda x, y: x + y)
Когда я бегу pprint()
на orders
Я получаю вывод, который выглядит так:
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', 3)
('0002016003', 1)
('0003013001', 1)
Присоединиться
Теперь я хочу присоединиться к items
Dtream к sku_prices
РДД. Я знаю, что не могу сделать это соединение напрямую, но мое чтение предполагает, что я могу использовать transform()
метод на DStream, чтобы сделать работу. Итак, вот что у меня есть:
items.transform(lambda rdd: rdd.join(sku_prices)).pprint()
Я ожидаю получить DStream, который выглядит примерно так:
-------------------------------------------
Time: 2018-09-03 06:57:20
-------------------------------------------
('0004002001', (3, 10.75))
('0002016003', (1, 14.25))
('0003013001', (1, 18.75))
Документация Spark предполагает, что это должно работать, и это работает: именно этот результат я и получаю!:)
Checkpointing
Однако я также хочу сделать операцию с состоянием, поэтому мне нужно ввести контрольные точки.
ssc.checkpoint("checkpoint")
Просто добавив результаты контрольных точек в этой ошибке на transform()
:
Похоже, что вы пытаетесь передать СДР или сослаться на СДР из действия или преобразования. Преобразования и действия СДР могут вызываться только драйвером, но не внутри других преобразований; например, rdd1.map(lambda x: rdd2.values.count() * x) недопустим, поскольку преобразование значений и действие count не могут быть выполнены внутри преобразования rdd1.map.
Ответ на эту тему предполагает, что контрольные точки и внешние СДР не смешиваются. Это можно обойти? Можно ли присоединить DStream и RDD, когда в StreamingContext включена контрольная точка?
Спасибо, Андрей.