Невозможно подключиться к узлу с идентификатором 1: [Рабочий]: Ошибка: ConnectionError("Нет подключения к узлу с идентификатором")
Я пытаюсь использовать robinhood / faust, но безуспешно!
Я уже создал продюсера, который успешно вставляет в исходную тему в моем экземпляре confluent-kafka localhost!
но фауст не может подключиться к localhost.
Мой app.py:
import faust
import base64
import random
from datetime import datetime
SOURCE_TOPIC="input_msgs"
TARGET_TOPIC="output_msgs"
app = faust.App("messages-stream",
broker="kafka://"+'localhost:9092',
topic_partitions=1,
store="memory://")
class OriginalMessage(faust.Record):
msg: str
class TransformedMessage(faust.Record):
msg_id: int
msg_data: str
msg_base64: str
created_at: float
source_topic: str
target_topic: str
deleted: bool
topic = app.topic(SOURCE_TOPIC, value_type=OriginalMessage)
out_topic = app.topic(TARGET_TOPIC, partitions=1)
table = app.Table(
"output_msgs",
default=TransformedMessage,
partitions=1,
changelog_topic=out_topic,
)
print('Initializing Thread Processor...')
@app.agent(topic)
async def transformedmessage(messageevents):
async for transfmessage in messageevents:
try:
table[transfmessage.msg_id] = random.randint(1, 999999)
table[transfmessage.msg_data] = transfmessage.msg
table[transfmessage.msg_base64] = base64.b64encode(transfmessage.msg)
table[transfmessage.created_at] = datetime.now().isoformat()
table[transfmessage.source_topic] = SOURCE_TOPIC
table[transfmessage.target_topic] = TARGET_TOPIC
table[transfmessage.deleted] = False
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
app.main()
ошибка
[2020-01-24 18:05:36,910] [55712] [ERROR] Unable connect to node with id 1: [Errno 8] nodename nor servname provided, or not known
[2020-01-24 18:05:36,910] [55712] [ERROR] [^Worker]: Error: ConnectionError('No connection to node with id 1')
"No connection to node with id {}".format(node_id))
kafka.errors.ConnectionError: ConnectionError: No connection to node with id 1
Я использую: faust -A app worker -l debug
1 ответ
Я столкнулся с этой ошибкой и, к счастью, несколько предвидел ее, так что выяснить проблему было несложно.
В Confluent вам необходимо настроить домен, который должен использоваться для доступа ко всем брокерам Kafka, которые загружаются для вас. Я действительно не знал, насколько важным будет домен, поэтому я просто вставлял что-то случайное, пока не застрял.
Конечно, я застрял здесь, как и вы, поэтому я запустил Wireshark, чтобы посмотреть, что происходит между Faust и сервером начальной загрузки. Оказывается, диалог начальной загрузки выглядит примерно так:
..............faust-1.10.4..PLAIN <-- client name
................PLAIN <-- authentication protocol
....foobar.foobar.foobar2000. <-- credentials!
b0.svs.cluster.local..#.......b1.svs.cluster.local..# <-- individual Kafka brokers
Они соответствуют шаблону выбранных мной доменных имен, которые описаны в документации Confluent: https://docs.confluent.io/current/installation/operator/co-endpoints.html
Если эти имена не разрешаются, вы получите расплывчатую ошибку, потому что, несмотря на успешную загрузку, клиент Kafka не может, следовательно, фактически подключиться к конечным точкам. Итак, выберите домен, который вы фактически контролируете, или поместите нужные вам ответы в свой локальный/etc/hosts
или аналогичный файл.
17 123.111.222.1 b0.svs.cluster.local
18 123.111.222.2 b1.svs.cluster.local
После перезапуска Faust загрузка и подключение Kafka завершились успешно.