Самостоятельное присоединение к потоку kafka с обновленным временем потока
У меня есть поток со статусом сайта (подключен / отключен), агрегированный за минуту, где агрегирование подавлено, и для каждого минутного ключа у нас есть только одна запись. Теперь я хочу написать в новую «тревожную» тему сообщения, когда у нас есть изменение состояния от одной минуты к другой.
например 17.03.21 12:00 - подключено, 17.09.21 12:01 - отключено 17.09.21, мы хотим написать оповещение об отключении на 12:01.
Я пытаюсь сделать это, чтобы взять stream1 и повторно использовать его для stream2, в котором время увеличено на 1 минуту. Затем выполните соединение с окном 0, чтобы объединить каждую агрегацию с результатом предыдущей минуты и отфильтровать те, которые имеют разные состояния.
например stream1 17.03.21 12:00 - подключено, 17.09.21 12:01 - отключено streams2 будет 17.03.21 12:01 - подключено, 17.09.21 12:02 - отключено нулевое окно присоединится только 17/09/21 12:01 состояние другое, поэтому запись останется в потоке, и я напишу ее в тему оповещения.
Проблема в том, что соединение не было с записью, которую я ожидал, потребовалось 12:00 из потока 1 с 12:01 из потока 2 и 12:01 с 12:02 (нулевое окно без льгот). Похоже, что изменение, которое я сделал для времени, не повлияло, и соединение по-прежнему считает, что 12:01 - это 12:00. Что мне не хватает?