Как использовать кафку на торнадо?
Я пытаюсь сделать простое приложение для чата, используя торнадо на основе этого
Но я также хочу использовать Кафку для хранения сообщений. Как я могу это сделать?
Теперь я использовал это, чтобы сделать потребителя и каким-то образом он работает, но он печатает только на консоли, и мне нужно, чтобы сообщения отображались на веб-странице, как приложение tornade, только они сохраняются в kafka.
Вот мой код app.py на данный момент
#!/usr/bin/env python
#
# Copyright 2009 Facebook
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import tornado.escape
import tornado.ioloop
import tornado.web
import os.path
import uuid
from tornado.concurrent import Future
from tornado import gen
from tornado.options import define, options, parse_command_line
from pykafka import KafkaClient
define("port", default=8888, help="run on the given port", type=int)
define("debug", default=False, help="run in debug mode")
class MessageBuffer(object):
def __init__(self):
self.waiters = set()
self.cache = []
self.cache_size = 200
def wait_for_messages(self, cursor=None):
# Construct a Future to return to our caller. This allows
# wait_for_messages to be yielded from a coroutine even though
# it is not a coroutine itself. We will set the result of the
# Future when results are available.
result_future = Future()
if cursor:
new_count = 0
for msg in reversed(self.cache):
if msg["id"] == cursor:
break
new_count += 1
if new_count:
result_future.set_result(self.cache[-new_count:])
return result_future
self.waiters.add(result_future)
return result_future
def cancel_wait(self, future):
self.waiters.remove(future)
# Set an empty result to unblock any coroutines waiting.
future.set_result([])
def new_messages(self, messages):
logging.info("Sending new message to %r listeners", len(self.waiters))
for future in self.waiters:
future.set_result(messages)
self.waiters = set()
self.cache.extend(messages)
if len(self.cache) > self.cache_size:
self.cache = self.cache[-self.cache_size:]
client = KafkaClient(hosts="127.0.0.1:9092")
topic = client.topics['test']
consumer = topic.get_simple_consumer()
for message in consumer:
if message is not None:
print message.value
# Making this a non-singleton is left as an exercise for the reader.
global_message_buffer = MessageBuffer()
class MainHandler(tornado.web.RequestHandler):
def get(self):
self.render("index.html", messages=global_message_buffer.cache)
class MessageNewHandler(tornado.web.RequestHandler):
def post(self):
message = {
"id": str(uuid.uuid4()),
"body": self.get_argument("body"),
}
# to_basestring is necessary for Python 3's json encoder,
# which doesn't accept byte strings.
message["html"] = tornado.escape.to_basestring(
self.render_string("message.html", message=message))
if self.get_argument("next", None):
self.redirect(self.get_argument("next"))
else:
self.write(message)
global_message_buffer.new_messages([message])
class MessageUpdatesHandler(tornado.web.RequestHandler):
@gen.coroutine
def post(self):
cursor = self.get_argument("cursor", None)
# Save the future returned by wait_for_messages so we can cancel
# it in wait_for_messages
self.future = global_message_buffer.wait_for_messages(cursor=cursor)
messages = yield self.future
if self.request.connection.stream.closed():
return
self.write(dict(messages=messages))
def on_connection_close(self):
global_message_buffer.cancel_wait(self.future)
def main():
parse_command_line()
app = tornado.web.Application(
[
(r"/", MainHandler),
(r"/a/message/new", MessageNewHandler),
(r"/a/message/updates", MessageUpdatesHandler),
],
cookie_secret="__TODO:_GENERATE_YOUR_OWN_RANDOM_VALUE_HERE__",
template_path=os.path.join(os.path.dirname(__file__), "templates"),
static_path=os.path.join(os.path.dirname(__file__), "static"),
xsrf_cookies=True,
debug=options.debug,
)
app.listen(options.port)
tornado.ioloop.IOLoop.current().start()
if __name__ == "__main__":
main()
2 ответа
Я знаю, что это старый вопрос, но в случае, если он пригодится кому-то еще, можно использовать торнадо и модуль python-kafka вместе (хотя предложение @sixstone об использовании kiel также является хорошим).
Поскольку python-kafaka блокируется, и нам также нужно запустить основной цикл торнадо, нам нужно разделить потоки. В следующем (длинном) примере я создаю поток для вызова python-kafka и сохраняю IOLoop торнадо в основном потоке.
Пример довольно длинный, поскольку он также использует веб-сокеты для публикации сообщений, как только они будут получены. Надеемся, что дополнительная сложность стоит дополнительных нескольких строк для тех, кто хочет объединить уведомления в режиме реального времени через веб-сокеты с торнадо и кафкой.
from __future__ import absolute_import, print_function
import collections
import threading
import jinja2
from kafka import KafkaConsumer
import tornado.web
import tornado.websocket
# A global to store some history...
message_history = collections.deque([], maxlen=100)
class KafkaWebSocket(tornado.websocket.WebSocketHandler):
# Keep track of open sockets globally, so that we can
# communicate with them conveniently.
open_sockets = set()
@classmethod
def write_to_all(cls, message):
removable = set()
for ws in cls.open_sockets:
if not ws.ws_connection or not ws.ws_connection.stream.socket:
removable.add(ws)
else:
ws.write_message(message)
for ws in removable:
cls.open_sockets.remove(ws)
def open(self):
# We don't want these sockets to be buffered.
self.set_nodelay(True)
type(self).open_sockets.add(self)
class MainHandler(tornado.web.RequestHandler):
template = """
<html>
<head>
<link href="//netdna.bootstrapcdn.com/twitter-bootstrap/2.3.1/css/bootstrap-combined.no-icons.min.css" rel="stylesheet">
<script type="text/javascript" src="//ajax.googleapis.com/ajax/libs/jquery/1.8.2/jquery.min.js"></script>
</head>
<body>
<div class="container">
{{ messages|length }} messages in cache:
<br><br>
<div id="messages">
{% for message in messages %}
<div>{{ message }}</div>
{% endfor %}
</div>
</div>
<footer class="footer">
<div class="container">
Web socket status: <div id="message">Not connected</div>
</div>
</footer>
<script>
var loc = window.location, new_uri;
if (loc.protocol === "https:") {
new_uri = "wss:";
} else {
new_uri = "ws:";
}
new_uri += "//" + loc.host;
new_uri += loc.pathname + "ws";
var ws = new WebSocket(new_uri);
var $message = $('#message');
ws.onopen = function(){
$message.attr("class", 'label label-success');
$message.text('open');
};
ws.onmessage = function(ev){
$message.attr("class", 'label label-info');
$message.hide();
$message.fadeIn("slow");
$message.text('recieved message ' + new Date().toLocaleString());
$('#messages').append("<div>" + ev.data + "</div>")
};
ws.onclose = function(ev){
$message.attr("class", 'label label-important');
$message.text('closed');
};
ws.onerror = function(ev){
$message.attr("class", 'label label-warning');
$message.text('error occurred');
};
</script>
</body>
</html>
"""
def get(self):
env = jinja2.Environment()
template = env.from_string(self.template)
self.write(template.render(messages=message_history))
class Consumer(threading.Thread):
daemon = True
def __init__(self, kafka_consumer):
self._consumer = kafka_consumer
super(Consumer, self).__init__()
def run(self):
for message in self._consumer:
message = str(message)
message_history.append(message)
KafkaWebSocket.write_to_all(message)
def make_app(*args, **kwargs):
return tornado.web.Application([
(r"/?", MainHandler),
(r"/ws/?", KafkaWebSocket),
], *args, **kwargs)
if __name__ == "__main__":
kafka_consumer = Consumer(KafkaConsumer('mytopic'))
# Start the kafka consumer thread.
kafka_consumer.start()
app = make_app()
app.listen(8889)
io_loop = tornado.ioloop.IOLoop.current()
io_loop.start()
Согласно https://github.com/dpkp/kafka-python/issues/560, наиболее закрытое решение Java в python не совместимо с какой-либо асинхронной средой (asyncio, twisted или tornado), поэтому мы должны использовать другую библиотеку который совместим с торнадо. Следующий работает для меня: