如何从 pipelinedb 中的连续视图流式传输输出?

How to stream output from a continous view in pipelinedb?

我已经设置了 pipelinedb,效果很好!我想知道是否可以在更新视图中的值后从连续视图中流式传输数据?也就是说,让一些外部进程对视图的更改采取行动。

我希望将视图生成的指标流式传输到仪表板中,我不想使用轮询数据库来实现此目的。

如果您需要文档以外的帮助,请查看我们关于 output streams and continuous transforms for help on how to do this, and feel free to ping us in our Gitter channel 的技术文档中的部分。

我觉得自己有点像个白痴,试图使用 Didactic 提供的工具找出答案。也许我是盲人,但我仍然没有找到办法。我找到了包含连续触发器的 9.3 版本的数据库,但此后已被删除,我不想切换到旧版本的数据库。

这有点令人难过,但我想它已从项目的开源版本中移出,以适应同一家公司提供的实时分析仪表板项目。

无论哪种方式。我通过使用存储过程解决了这个问题。与内置函数提供的功能相比,它可能效率稍低,但我每分钟访问数据库几千次,而我的 VM CPU 和 RAM 只是对我打哈欠。

CREATE OR REPLACE FUNCTION all_insert(text,text)
  RETURNS void AS
 $BODY$
DECLARE
    result text;
BEGIN
    INSERT INTO all_in (streamid, generalinput) values(, );
    SELECT array_to_json(array_agg(json_build_object('streamId', streamid, 'total', count)))::text into result from totals;
    PERFORM pg_notify('totals', result);
END
$BODY$
LANGUAGE plpgsql;

所以我的插入和通知是通过查询这个单一的存储过程来完成的。然后我的应用程序只需监听 PSQL 数据库通知事件并适当地处理它们。在上面的示例中,应用程序将收到一个 JSON 对象,其中包含特定的流 ID 以及与之关联的总数。

从 0.9.5 开始,连续触发器已被删除,以支持使用输出流和连续转换。 (首先由 DidacticTactic 建议)。连续视图的输出本质上是一个流,这意味着您可以基于它创建连续视图或转换。

简单示例:

  1. 首先创建流和连续视图。
CREATE STREAM s (
    x int
);

CREATE CONTINUOUS VIEW hourly_cv AS
    SELECT
        hour(arrival_timestamp) AS ts,
        SUM(x) AS sum
    FROM s GROUP BY ts;
  1. 每个连续视图现在都有一个输出流。您可以使用 output_of 基于视图的输出创建转换。在转换中,您可以访问分别代表旧值和新值的元组 oldnew。 (0.9.7 有第三个称为 delta)因此您可以创建一个使用 'hourly_cv' 输出的转换,如下所示:
CREATE CONTINUOUS TRANSFORM hourly_ct AS
    SELECT
        (new).sum
    FROM output_of('hourly_cv')
    THEN EXECUTE PROCEDURE update();
  1. 在这个例子中,我调用了 update,我们仍然需要定义它。它需要是一个 returns 触发器的函数。
CREATE OR REPLACE FUNCTION update()
    RETURNS trigger AS
    $$
    BEGIN
        // Do anything you want here.
        RETURN NEW;
    END;
    $$
    LANGUAGE plpgsql;

我发现 0.9.5 release notes blog post 有助于理解输出流以及为什么不再需要连续触发器。