Confluent Kafka: Как в клиенте confluent-kafka-python определены сериализация и разбиение?

1) В Java-клиенте с помощью02.production.put() мы можем добавить сериализаторы для целых чисел и строк. Как мы это делаем для клиента confluent-kafka-python? 2) Как мы сериализуем записи protoBuf?

2 ответа

В последней версии, я думаю, это не дает такого свойства конфигурации: «value.serializer», поэтому нам понадобится пользователь SerializingProducer, а значение value.serializer должно быть обратным вызовом.

      from confluent_kafka import SerializingProducer
import json


def json_serializer(msg, s_obj):
    return json.dumps(msg).encode('utf-8')


conf = {'bootstrap.servers': 'localhost:29092',
        'value.serializer': json_serializer}

producer_client = SerializingProducer(conf)


def producer_json_msg(topic_name, **args):
    producer_client.produce(topic_name, **args)

producer_json_msg('test_two', value={'test': "this is fine"})
producer_client.flush()

На вопрос № 1: Добавить key.serializer а также value.serializer в конфиге, как показано ниже:

from confluent_kafka import Producer
import socket
conf = {'bootstrap.servers': "host1:9092,host2:9092",
    'client.id': socket.gethostname(),
    'key.serializer': 'key serializer classpath',
    'value.serializer': 'value serializer classpath',
    'default.topic.config': {'acks': 'all'}}
producer = Producer(conf)

На вопрос № 2: Реализуйте свой собственный класс ProtobufSerializer и установите для него {key|value}.serializer.

Другие вопросы по тегам