Исключение: ошибка Pulsar: несовместимая схема

Я новичок в Pulsar и просто изучаю функциональные возможности нового проекта. Я пытаюсь использовать очень простой пример для отправки данных от производителя на основе схемы. Чтобы дать некоторую информацию, моя идея состоит в том, чтобы отправить данные из apache-pulsar в базу данных Clickhouse. Я завершил настройку коннектора приемника и проверил то же самое с помощью приведенных ниже команд.

статус стоков bin/pulsar-admin --tenant public --namespace default --name jdbc-clickhouse-sink

список приемников bin/pulsar-admin --tenant public --namespace default Вывод:[ "jdbc-clickhouse-sink" ]

Итак, у меня есть таблица, созданная в Clickhouse DB. Я хочу, чтобы данные были отправлены в тему, которая должна быть сохранена в базе данных. При этом я хочу сохранить согласованность схемы, поэтому я хочу настроить схему. Пример кода ниже

      import pulsar
from pulsar.schema import *

class Example(Record):
    a = Integer()
    b = Integer()
    c = Integer()


client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
                    topic='my-topic',
                    schema=AvroSchema(Example) )

producer.send(Example(  a=444 , b=62, c=999 ))

Когда я запускаю приведенный выше код, я получаю следующую ошибку

      ---------------------------------------------------------------------------
Exception                                 Traceback (most recent call last)
<ipython-input-114-3b0aa7d0415f> in <module>
      9 
     10 client = pulsar.Client('pulsar://localhost:6650' class="ansi-blue-fg">)
---> 11 producer = client.create_producer(
     12                     topic='my-topic',
     13                     schema=AvroSchema(Example) )

~/opt/anaconda3/lib/python3.8/site-packages/pulsar/__init__.py in 
create_producer(self, topic, producer_name, schema, initial_sequence_id, 
send_timeout_millis, compression_type, max_pending_messages, 
max_pending_messages_across_partitions, block_if_queue_full, batching_enabled, 
batching_max_messages, batching_max_allowed_size_in_bytes, 
batching_max_publish_delay_ms, message_routing_mode, properties, batching_type)
    560 
    561         p = Producer()
--> 562         p._producer = self._client.create_producer(topic, conf)
    563         p._schema = schema
    564         return p

Exception: Pulsar error: IncompatibleSchema

Может кто-нибудь помочь, что мне здесь не хватает

1 ответ

Убедитесь, что у вас установлен клиент Pulsar Python с avro.

pip3 установить fastavro pip3 установить pytz pip3 установить pulsar-client[avro]

Посмотрите мой пример Python здесь со схемой https://github.com/tspannhw/FLiP-Pi-Weather/blob/main/weather.py

Посмотрите мои примеры https://github.com/tspannhw/FLiP-Stream2Clickhouse

Проверьте, что ваша схема bin/pulsar-admin получает постоянство: //public/default/my-topic

Документация по Python https://pulsar.apache.org/api/python/https://pulsar.apache.org/api/python/schema/schema.m.html#pulsar.schema.schema.AvroSchema

Функции, доступные для каждого клиента Pulsarhttps://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit

Вам может понадобиться сгенерировать класс из реального файла Avro Schema, что обычно и делается в Java.

См. этот пример:

https://github.com/ta1meng/pulsar-python-avro-schema-examples

Если вам не нужен Avro, JsonSchema не требует этого дополнительного шага.

Другие вопросы по тегам