Как отправить данные в Websocket из ошибочного приложения
В данный момент я работаю над сценарием использования Kafka и robinhood, чтобы обработать данные, поступающие с Kafka. Я успешно выполнил вычисление, и результаты, которые мне нужны, выводятся на консоль, в которой работает мой сборщик ошибок.
Теперь я хочу найти способ получать результаты не только в консоли, но и на HTML-странице. Я взглянул на библиотеку websockets, но не могу заставить ее работать в сочетании с faust. Я получаю ошибку Crashed reason=RuntimeError('This event loop is already running')
Я думаю, что это вызвано тем, что код выполняется для каждого обрабатываемого сообщения.
Любая помощь высоко ценится
Это код, который я использую:
import faust, datetime, websockets, asyncio
app = faust.App(
'UseCase',
broker='kafka://localhost:29092',
)
usecase_topic = app.topic('usecase',partitions=8)
usecase_table = app.Table('usecase', default=int)
checkfailure = {}
@app.agent(usecase_topic)
async def process_record(records):
async for record in records:
#count records for each Sensor
print(record)
sensor = record['ext_id']
usecase_table[sensor] += 1
#print(f'Records for Sensor {sensor}: {usecase_table[sensor]}')
#write current timestamp of record and previous timestamp for each sensor to usecase_table dict
currtime_id = record['ext_id']+'c'
prevtime_id = record['ext_id']+'p'
usecase_table[currtime_id] = datetime.datetime.strptime(record['tag_tsp'], "%Y%m%d%H%M%S.%f")
#print current time
print(f'Current time for Sensor {sensor}: {usecase_table[currtime_id]}')
#calculate and print timestamp delta; if no previous value is given print message
if usecase_table[prevtime_id] == 0:
print(f'no previous timestamp for sensor {sensor}')
else:
usecase_table[prevtime_id] = datetime.datetime.strptime(usecase_table[prevtime_id], "%Y%m%d%H%M%S.%f")
print(f'previous time for Sensor {sensor}: {usecase_table[prevtime_id]}')
tsdelta = usecase_table[currtime_id] - usecase_table[prevtime_id]
tsdelta_id = record['ext_id']+'t'
usecase_table[tsdelta_id] = str(tsdelta)
print(f'Sensor: {sensor} timestamp delta: {usecase_table[tsdelta_id]}')
#calculate value delta
currvalue_id = record['ext_id']+'cv'
prevvalue_id = record['ext_id']+'pv'
usecase_table[currvalue_id] = record['tag_value_int']
print(f'current value for Sensor {sensor}: {usecase_table[currvalue_id]}')
if usecase_table[prevvalue_id] == 0:
print(f'no previous record for sensor {sensor}')
else:
print(f'previous value for Sensor {sensor}: {usecase_table[prevvalue_id]}')
vdelta = usecase_table[currvalue_id] - usecase_table[prevvalue_id]
vdelta_id = record['ext_id']+'v'
usecase_table[vdelta_id] = vdelta
print(f'Sensor: {sensor} value delta:{usecase_table[vdelta_id]}')
#calculate cycle time
if usecase_table[prevtime_id] != 0 and usecase_table[prevvalue_id] != 0 and usecase_table[vdelta_id] != 0:
cycletime = tsdelta / usecase_table[vdelta_id]
cyclemsg = f'Sensor {sensor}; Cycletime {cycletime}'
print(cyclemsg)
#add timestamp to checkfailure dict
checkfailure[sensor] = datetime.datetime.strptime(record['tag_tsp'], "%Y%m%d%H%M%S.%f")
#check if newest timestamp for a sensor is older than 10 secs
for key in checkfailure:
if datetime.datetime.now() - checkfailure[key] >= datetime.timedelta(seconds=10):
failuremsg = f'Error: Sensor {key}'
print(failuremsg)
#send results to websocket
async def send_result(websocket,path):
results = cyclemsg + failuremsg
await websockets.send(results)
start_server = websockets.serve(send_result, '127.0.0.1', 5678)
asyncio.get_event_loop().run_until_complete(start_server)
#set previous value and timestamp to current
usecase_table[prevtime_id] = record['tag_tsp']
usecase_table[prevvalue_id] = record['tag_value_int']
2 ответа
Нормально быть смущенным этим сообщением об ошибке asyncio:)
Вы не можете позвонить loop.run_until_complete
из async def
функция.
Что вам нужно сделать, это запустить сервер websocket в фоновом режиме. Это должно быть легко, и он использует asyncio.ensure_future
, но вы также хотите, чтобы сервер веб-сокетов корректно завершал работу при выходе из приложения.
По этой причине Faust использует "сервисы", и вы можете определить сервис для вашего сервера веб-сокетов:
import faust
import websockets
from mode import Service
from websockets.exceptions import ConnectionClosed
from websockets.server import WebSocketServerProtocol
class App(faust.App):
def on_init(self):
self.websockets = Websockets(self)
async def on_start(self):
await self.add_runtime_dependency(self.websockets)
class Websockets(Service):
def __init__(self, app, bind: str = 'localhost', port: int = 9999, **kwargs):
self.app = app
self.bind = bind
self.port = port
super().__init__(**kwargs)
async def on_message(self, ws, message):
...
async def on_messages(self,
ws: WebSocketServerProtocol,
path: str) -> None:
try:
async for message in ws:
await self.on_message(ws, message)
except ConnectionClosed:
await self.on_close(ws)
except asyncio.CancelledError:
pass
async def on_close(self, ws):
# called when websocket socket is closed.
...
@Service.task
def _background_server(self):
await websockets.serve(self.on_messages, self.bind, self.port)
app = App('UseCase')
# [...]
Другой вариант, если вы хотите использовать сервер Faust aiohttp:
import os
import logging
import aiohttp
from app import app
from faust.web import Request, Response, View, Web
logger = logging.getLogger(__name__)
@app.page('/healthcheck')
async def websocket_handler(self, request):
ws = aiohttp.web.WebSocketResponse()
await ws.prepare(request)
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
await ws.send_str(msg.data + '/answer')
elif msg.type == aiohttp.WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
print('websocket connection closed')
return ws
протестируйте это с помощью:wscat -c ws://localhost:6066/healthcheck