В чем причина TypeError: неразрешимый тип: 'TopicAndPartition', когда KafkaUtils.createDirectStream?
Я хочу использовать сообщение Кафки от любого произвольного смещения на KafkaUtils.createDirectStream
,
Мой исходный код:
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition
def functionToCreateContext():
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
ssc = StreamingContext(sc, 2)
kvs = KafkaUtils.createDirectStream(
ssc,
['test123'],
{"metadata.broker.list": "localhost:9092"},
{TopicAndPartition("test123", 0): 100, TopicAndPartition("test123", 1): 100}
)
#kvs = kvs.checkpoint(10)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate("./checkpoint", functionToCreateContext())
ssc.start()
ssc.awaitTermination()
но получите ошибку, как показано ниже:
Traceback (most recent call last):
File "/usr/local/spark-1.6.0-bin-hadoop2.6/examples/src/main/python/streaming/direct_kafka_wordcount.py", line 56, in <module>
ssc = StreamingContext.getOrCreate("./checkpoint", functionToCreateContext())
File "/usr/local/spark-1.6.0-bin-hadoop2.6/examples/src/main/python/streaming/direct_kafka_wordcount.py", line 45, in functionToCreateContext
{TopicAndPartition("test123", 0): 100, TopicAndPartition("test123", 1): 100}
TypeError: unhashable type: 'TopicAndPartition'
Исходный код pyspark:
@staticmethod
def createDirectStream(ssc, topics, kafkaParams, fromOffsets=None,
keyDecoder=utf8_decoder, valueDecoder=utf8_decoder,
messageHandler=None):
class TopicAndPartition(object):
"""
Represents a specific top and partition for Kafka.
"""
def __init__(self, topic, partition):
"""
Create a Python TopicAndPartition to map to the Java related object
:param topic: Kafka topic name.
:param partition: Kafka partition id.
"""
self._topic = topic
self._partition = partition
def _jTopicAndPartition(self, helper):
return helper.createTopicAndPartition(self._topic, self._partition)
.........
jfromOffsets = dict([(k._jTopicAndPartition(helper),
v) for (k, v) in fromOffsets.items()])
fromOffsets должен быть dict, ключ dict должен быть TopicAndPartition
объект.
Есть идеи для этого?
1 ответ
Решение
pyspark
есть ошибка для Python3, TopicAndPartition
класс отсутствует hash
метод, поэтому вы должны изменить python3 на python2, ошибка исчезла.
тогда следует привести смещение от int к long:
{TopicAndPartition("test123", 0): long(100), TopicAndPartition("test123", 1): long(100)}