Confluent Kafka: Как в клиенте confluent-kafka-python определены сериализация и разбиение?
1) В Java-клиенте с помощью02.production.put() мы можем добавить сериализаторы для целых чисел и строк. Как мы это делаем для клиента confluent-kafka-python? 2) Как мы сериализуем записи protoBuf?
2 ответа
В последней версии, я думаю, это не дает такого свойства конфигурации: «value.serializer», поэтому нам понадобится пользователь SerializingProducer, а значение value.serializer должно быть обратным вызовом.
from confluent_kafka import SerializingProducer
import json
def json_serializer(msg, s_obj):
return json.dumps(msg).encode('utf-8')
conf = {'bootstrap.servers': 'localhost:29092',
'value.serializer': json_serializer}
producer_client = SerializingProducer(conf)
def producer_json_msg(topic_name, **args):
producer_client.produce(topic_name, **args)
producer_json_msg('test_two', value={'test': "this is fine"})
producer_client.flush()
На вопрос № 1: Добавить key.serializer
а также value.serializer
в конфиге, как показано ниже:
from confluent_kafka import Producer
import socket
conf = {'bootstrap.servers': "host1:9092,host2:9092",
'client.id': socket.gethostname(),
'key.serializer': 'key serializer classpath',
'value.serializer': 'value serializer classpath',
'default.topic.config': {'acks': 'all'}}
producer = Producer(conf)
На вопрос № 2: Реализуйте свой собственный класс ProtobufSerializer и установите для него {key|value}.serializer.