Создание Kapacitor UDF, который хочет STREAM и предоставляет BATCH (Python)
У меня проблемы с созданием UDF, который хочет STREAM и предоставляет пакет.
Сюда:
def info(self):
response = udf_pb2.Response()
response.info.wants = udf_pb2.STREAM
response.info.provides = udf_pb2.BATCH
response.info.options['field'].valueTypes.append(udf_pb2.STRING)
return response
Есть кто-нибудь с примером кода?? Я искал по сети (foruns, документация), но все примеры для BATCH-BACH, STREAM-STREAM или BATCH-STREAM.
В примерах я видел, что при написании ответа на Kapacitor в методе "end_batch(self, end_req)" необходимо как бы "связать", что BATCH закончился, в примере это было сделано следующим образом:
def end_batch(self, end_req):
# Send begin batch with count of outliers
self._begin_response.begin.size = len(self._batch)
self._agent.write_response(self._begin_response)
response = udf_pb2.Response()
...
# Send an identical end batch back to Kapacitor
# HERE
response.end.CopyFrom(end_req)
self._agent.write_response(response)
Чтобы отправить BATCH, я должен отправить его из метода "point(self, point)", но не могу получить доступ к объекту end_req и не знаю, как его создать.
Заранее спасибо! Пока-пока!
1 ответ
Надеюсь, что это все еще актуально, я бы сделал UDF STREAM-STREAM и направил его в оконный узел. Вы можете сохранить копию окна данных, как в примере с их скользящим средним, и выполнить любой пакетный анализ по этому вопросу. Если бы вы выяснили, как написать UDF STREAM-BATCH, я бы с удовольствием посмотрел его, хотя и менее безобразно, чем мой ответ.
редактировать
JDV был определенно прав, мой последний ответ был скорее комментарием. Вот UDF STREAM-BATCH в python, он просто отображает данные, поступившие в поток в пакете. Он все еще немного сломан, потому что не может сериализовать класс точек в методе снимка обработчика. Поэтому, когда ему нужно сделать снимок, он падает, его можно решить, используя другой метод сериализации, такой как выборка, или записав кодировщик / декодер JSON для точки. Я найду время, чтобы исправить это, но моя рабочая неделя почти закончена. Главное, что нужно сделать для создания UDF STREAM-BATCH, - это создать сообщения начала и конца пакета, что делается в методах createEndBatch и createStartBatch соответственно.
Редактировать 2
Исправлена сериализация с использованием комбинации метода protobufs и json.