Фауст. Как создать уплотняющую тему
Я хочу создать тему, которая автоматически сжимается kafka, используя поток faust. Я использую такой код:
import asyncio
import random
import faust
app = faust.App(
'mysample',
broker=<...>,
value_serializer='raw',
key_serializer='raw',
topic_replication_factor=3,
)
topic = app.topic('mytest5', partitions=1, compacting=True, acks=False)
count = 1
@app.timer(2.0, on_leader=True)
async def publish_greetings():
global count
print('PUBLISHING ON LEADER!')
await topic.send(key=str(count), value=str(random.random()))
count += 1
@app.agent(topic)
async def say(greetings):
async for k, v in greetings.items():
print(f'{k}, {v}')
Я пробовал также варианты internal
, retention
, deleting
с созданием темы и каждый раз, когда я менял название темы, поэтому должна была быть создана новая тема. Но я не думаю, что кафка действительно сжимает тему. Каждый раз, когда читаю тему, я вижу полную историю событий.
Как создать уплотняющую тему с фаустом?