Как записать файл в Kafka Producer
Я пытаюсь загрузить простой текстовый файл вместо стандартного ввода в Kafka. После загрузки Кафки я выполнил следующие действия:
Начал зоопарк:
bin/zookeeper-server-start.sh config/zookeeper.properties
Запущенный сервер
bin/kafka-server-start.sh config/server.properties
Создана тема под названием "тест":
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
Побежал продюсер:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test1
Test2
Слушал Потребитель:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2
Вместо стандартного ввода я хочу передать файл данных или даже простой текстовый файл Производителю, который может быть просмотрен непосредственно Потребителем. Любая помощь будет принята с благодарностью. Спасибо!
4 ответа
Вы можете передать это в:
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
--new-producer < my_file.txt
Нашел nlucaroni.
От 0.9.0:
kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
$ kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic < my_file.txt
у меня работал в Кафке-0.9.0
Вот несколько способов, которые немного более обобщены, но могут быть излишними для простого файла
хвост
tail -n0 -F my_file.txt | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
объяснение
tail
читает с конца файла по мере его роста или непрерывного добавления в него логов-n0
указывает на выходные строки 0, поэтому выбирается только новая строка-F
следует за файлом по имени вместо дескриптора, следовательно, он работает, даже если он повернут
Syslog-нг
options {
flush_lines (0);
time_reopen (10);
log_fifo_size (1000);
long_hostnames (off);
use_dns (no);
use_fqdn (no);
create_dirs (no);
keep_hostname (no);
};
source s_file {
file("path to my-file.txt" flags(no-parse));
}
destination loghost {
tcp("*.*.*.*" port(5140));
}
потребляющий
nc -k -l 5140 | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic
Объяснение (от man nc
)
-k' Forces nc to stay listening for another connection after its current connection is completed. It is an error to use this option without the -l option.
-l' Used to specify that nc should listen for an incoming connection rather than initiate a connection to a remote host. It is an error to use this option in conjunction with the -p, -s, or -z options. Additionally, any timeouts specified with the -w option are ignored.
ссылка
echo "Hello" | kafka-console-producer.sh --broker-list localhost:9092 --topic my_topic