Как использовать групповые и оконные потоки в потоковой передаче faust kafka?
Мне нужно создать приложение для робастности, которое должно обрабатывать группировку с последующей обработкой окон.
Теперь я использую ниже для окон,
class Sample(faust.Record):
master_mac: str
uuid: str
slave_mac: str
rawData: str
rssi: int
app = faust.App('sample_app', broker='kafka://localhost:9092')
my_topic = app.topic('out', key_type=str, value_type=Sample)
@app.agent(my_topic)
async def process(samples):
async for sample in samples.take(5000, within=5):
print("sample :: ", sample)
if __name__ == '__main__':
worker = Worker(app, loglevel="INFO")
worker.execute_from_commandline()
Принимая во внимание, что мне нужно сделать концепцию группировки в дополнение к оконному
class Sample(faust.Record):
master_mac: str
uuid: str
slave_mac: str
rawData: str
rssi: int
app = faust.App('sample_app', broker='kafka://localhost:9092')
my_topic = app.topic('out', key_type=str, value_type=Sample)
@app.agent(my_topic)
async def process(samples):
async for sample in samples.group_by(Sample.master_mac and Sample.slave_mac):
print("sample :: ", sample)
if __name__ == '__main__':
worker = Worker(app, loglevel="INFO")
worker.execute_from_commandline()
Может ли кто-нибудь помочь мне в этом, чтобы добиться как окон, так и группировки вместе?