Как непрерывно отправлять данные на кафку?
Я пытаюсь непрерывно отправлять данные (сниффинг пакетов с помощью tshark) брокеру / потребителю kafka.
Вот шаги, за которыми я следовал:
1. Запустил зоопарк:
kafka/bin/zookeeper-server-start.sh ../kafka//config/zookeeper.properties
2. Запустил кафку на сервере:
kafka/bin/kafka-server-start.sh ../kafka/config/server.properties
3. Начат кафка потребителя:
kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic \
'my-topic' --from-beginning
4. Написал следующий скрипт на Python для отправки данных потребителю:
from kafka import KafkaProducer
import subprocess
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send('my-topic', subprocess.check_output(['tshark','-i','wlan0']))
но это остается на терминале прокудера и выводит:
Capturing on 'wlan0'
605
^C
ничего не передается потребителю.
Я знаю, что могу использовать pyshark
реализовать tshark на питоне:
import pyshark
capture = pyshark.LiveCapture(interface='eth0')
capture.sniff(timeout=5)
capture1=capture[0]
print capture1
Но я не знаю, как непрерывно отправлять захваченные пакеты от производителя к потребителю. Любой совет?
Спасибо!
1 ответ
Проверьте следующую ссылку.
http://zdatainc.com/2014/07/real-time-streaming-apache-storm-apache-kafka/
Реализация производителя Kafka Здесь определяются основные части кода для производителя Kafka, который использовался для тестирования нашего кластера. В основном классе мы настраиваем каналы данных и потоки:
LOGGER.debug("Setting up streams");
PipedInputStream send = new PipedInputStream(BUFFER_LEN);
PipedOutputStream input = new PipedOutputStream(send);
LOGGER.debug("Setting up connections");
LOGGER.debug("Setting up file reader");
BufferedFileReader reader = new BufferedFileReader(filename, input);
LOGGER.debug("Setting up kafka producer");
KafkaProducer kafkaProducer = new KafkaProducer(topic, send);
LOGGER.debug("Spinning up threads");
Thread source = new Thread(reader);
Thread kafka = new Thread(kafkaProducer);
source.start();
kafka.start();
LOGGER.debug("Joining");
kafka.join();
The BufferedFileReader in its own thread reads off the data from disk:
rd = new BufferedReader(new FileReader(this.fileToRead));
wd = new BufferedWriter(new OutputStreamWriter(this.outputStream, ENC));
int b = -1;
while ((b = rd.read()) != -1)
{
wd.write(b);
}
Finally, the KafkaProducer sends asynchronous messages to the Kafka Cluster:
rd = new BufferedReader(new InputStreamReader(this.inputStream, ENC));
String line = null;
producer = new Producer<Integer, String>(conf);
while ((line = rd.readLine()) != null)
{
producer.send(new KeyedMessage<Integer, String>(this.topic, line));
}
Doing these operations on separate threads gives us the benefit of having disk reads not block the Kafka producer or vice-versa, enabling maximum performance tunable by the size of the buffer.
Implementing the Storm Topology
Topology Definition
Moving onward to Storm, here we define the topology and how each bolt will be talking to each other:
TopologyBuilder topology = new TopologyBuilder();
topology.setSpout("kafka_spout", new KafkaSpout(kafkaConf), 4);
topology.setBolt("twitter_filter", new TwitterFilterBolt(), 4)
.shuffleGrouping("kafka_spout");
topology.setBolt("text_filter", new TextFilterBolt(), 4)
.shuffleGrouping("twitter_filter");
topology.setBolt("stemming", new StemmingBolt(), 4)
.shuffleGrouping("text_filter");
topology.setBolt("positive", new PositiveSentimentBolt(), 4)
.shuffleGrouping("stemming");
topology.setBolt("negative", new NegativeSentimentBolt(), 4)
.shuffleGrouping("stemming");
topology.setBolt("join", new JoinSentimentsBolt(), 4)
.fieldsGrouping("positive", new Fields("tweet_id"))
.fieldsGrouping("negative", new Fields("tweet_id"));
topology.setBolt("score", new SentimentScoringBolt(), 4)
.shuffleGrouping("join");
topology.setBolt("hdfs", new HDFSBolt(), 4)
.shuffleGrouping("score");
topology.setBolt("nodejs", new NodeNotifierBolt(), 4)
.shuffleGrouping("score");
Примечательно, что данные перетасовываются на каждый болт до тех пор, пока они не будут соединены, поскольку очень важно, чтобы одинаковые твиты передавались одному и тому же экземпляру соединительного болта.