创建需要 STREAM 并提供 BATCH 的 Kapacitor UDF (Python)

Creating Kapacitor UDF that wants a STREAM and provides a BATCH (Python)

我在制作需要 STREAM 并提供 BATCH 的 UDF 时遇到了问题。

这样:

def info(self):
    response = udf_pb2.Response()
    response.info.wants = udf_pb2.STREAM
    response.info.provides = udf_pb2.BATCH
    response.info.options['field'].valueTypes.append(udf_pb2.STRING)
    return response

有没有人有示例代码?我在网上搜索(论坛、文档),但所有示例都是针对 BATCH-BACH、STREAM-STREAM 或 BATCH-STREAM 的。

我在示例中看到,在“end_batch(self, end_req)”方法中编写对 Kapacitor 的响应时,对于 BATCH 具有的某种“通信”是必要的结束了,在一个例子中,这是这样制作的:

def end_batch(self, end_req):
    # Send begin batch with count of outliers
    self._begin_response.begin.size = len(self._batch)
    self._agent.write_response(self._begin_response)

    response = udf_pb2.Response()


                                  ...    


    # Send an identical end batch back to Kapacitor
        # HERE
    response.end.CopyFrom(end_req)
    self._agent.write_response(response)

为了发送 BATCH,我必须从“point(self, point)”方法发送它,但无法访问 end_req 对象并且不知道如何创建一个.

提前致谢! 再见!

希望这仍然有意义,我将创建一个 STREAM-STREAM UDF 并将其通过管道传输到 window 节点中。您可以保留 window 数据的副本,就像在他们的移动平均示例中一样,并对其进行任何批量分析。如果您知道如何编写 STREAM-BATCH UDF,我很乐意看到它,但没有我的回答那么难看。

编辑

jdv 绝对是正确的,我最后的回答肯定更像是评论。这是 python 中的 STREAM-BATCH UDF,它只是回显批处理流中传入的数据。它仍然有点坏,因为它无法序列化 handler snapshot 方法中的点 class。因此,每当它需要拍摄快照时它就会崩溃,可以通过使用不同的序列化方法(如酸洗)或通过编写 JSON encoder/decoder 来解决。我会抽出时间解决这个问题,但我的工作周快结束了。制作 STREAM-BATCH UDF 所需要做的主要事情是构造批处理开始和结束消息,这分别在 createEndBatch 和 createStartBatch 方法中完成。

编辑 2

通过结合使用 protobufs 方法和 json 修复了序列化。