Raku Cro 服务订阅数据 "in the background" 一般指南

Raku Cro service subscribing to data "in the background" general guidance

我正在尝试组合一个 Cro 服务,该服务有一个 react/whenever 块“在后台”消耗数据 因此,与使用 Cro 的 websocket 使用的许多示例不同,这与可能的路由无关通过浏览器访问。

我的用例是使用通过 MQTT 主题接收的消息并对其进行一些处理。在开发的后期阶段,我可能会用这些数据创建供应,但现在,当接收到数据时,它将存储在一个变量中,并根据特定条件,通过 http post 发送到另一个服务.

我的想法是在 Cro::HTTP::Server 设置中包含一个 provider(),如下所示:

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use DataProvider; # Here

my Cro::Service $http = Cro::HTTP::Server.new(
        http => <1.1>,
        host => ...,
        port => ...,
        application => [routes(), provider()], # Made this into an array of subs?
        after => [
            Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
        ]
    );

并且在 DataProvider.pm6 中:

use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {
        whenever $mqtt.subscribe('some/mqtt/topic') {
            say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
        }
    }
}

这会引发一堆错误:

A react block:
  in sub provider at DataProvider.pm6 (DataProvider) line 5
  in block <unit> at service.p6 line 26

Died because of the exception:
    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'.  Did
    you forget a '.new'?
      in method subscribe at /home/cam/raku/share/perl6/site/sources/42C762836A951A1C11586214B78AD34262EC465F (MQTT::Client) line 133
      in sub provider at DataProvider.pm6 (DataProvider) line 6
      in block <unit> at service.p6 line 26

老实说,我完全猜测这就是我处理在 Cro 服务后台订阅数据的需要的方式,但我找不到任何关于什么可能被认为是推荐的方法。

最初我在主 service.pm6 文件中有我的 react/whenever 块,但这似乎不正确。并且需要包裹在 start{} 块中,因为正如我刚刚了解到的那样,反应正在阻塞 :) 并且 cro 无法真正启动。

但是遵循路由的实现方式似乎合乎逻辑,但我遗漏了一些东西。该错误与设置新方法有关,但我不认为这是根本原因。 Routes.pm6 没有构造函数。

谁能给我指出正确的方向?

根本原因

The error speaks about setting up a new method, but I'm not convinced that is the root cause.

这不是关于设置一种新方法。它是关于一个应该定义而不是未定义的值。这通常意味着无法尝试初始化它,这通常意味着无法 call .new.

Can anyone point me in the right direction please?

希望这个问题对您有所帮助。

查找有关推荐方法的信息

I am totally guessing that this is how I would approach the need to subscribe to data in the background of a Cro service, but I was not able to find any information on what might be considered the recommended approach.

列出您在 Getting started with Cro 中遵循的哪些快速入门步骤可能会对您有所帮助,包括基础知识以及最后的“了解”步骤。

错误信息

A react block:
  in sub provider ...

Died because of the exception:
    ... 
      in method subscribe ...

错误消息以内置 react 构造报告它捕获到异常(并通过抛出自己的异常作为响应来处理)开始。 "backtrace" 对应于 react 在您的代码中出现的位置,从最初的“A react block:”开始缩进。

错误消息继续 react 构造总结了它自己的异常 (Died because ...) 并通过报告 原始 异常来解释自己,进一步缩进,在后续行中。这包括 另一个 回溯,这次一个对应于原始异常,它可能发生在具有不同调用堆栈的不同线程上。

(Raku 的所有结构化多线程构造[1] 使用这种两部分错误报告方法来捕获异常并通过抛出处理另一个例外。)


第一个回溯指示 react 行:

in sub provider at DataProvider.pm6 (DataProvider) line 5
use MQTT::Client;

sub provider() is export {
    my $mqtt  = MQTT::Client.new: server => 'localhost';
    react {

第二个回溯是关于原始异常的:

    Invocant of method 'write' must be an object instance of type
    'IO::Socket::Async', not a type object of type 'IO::Socket::Async'. ...
      in method subscribe at ... (MQTT::Client) line 133

报告在 MQTT::Client 的第 133 行调用的 write 方法需要它的 invocant is an instance of type 'IO::Socket::Async'. The value it got was of that type but was not an instance, but instead a "type object"。 (非本机类型的所有值都是 类型对象 它们类型的实例。)。

错误信息的结尾是:

  Did you forget a '.new'?

这是一个基于现实的简洁提示,即在需要实例时遇到类型对象的原因中有 100 次有 99 次是代码未能初始化变量。 (类型对象的用途之一是在 Perl 等语言中充当“未定义”的角色。)

所以,你能明白为什么应该是 'IO::Socket::Async' 的初始化实例的东西却变成了未初始化的实例吗?

脚注

[1] Raku 的并行、并发和异步构造遵循结构化编程 范例。请参阅 Parallelism, Concurrency, and Asynchrony in Raku,了解 Jonathan Worthington 对这种整体方法的视频演示。像 react 这样的结构化构造可以清楚地观察、包含和管理在其执行范围内任何地方发生的事件,包括错误异常等错误,即使它们发生在其他线程上也是如此。

感谢所有提供信息的人,这是一次非常有价值的学习练习。

将附加子例程与 application 参数中的 router() 一起传递给 Cro::HTTP::Server.new 的方法带来了更多麻烦。 (不允许数组,并且破坏了路由)

相反,我将后台工作移到了它自己的 class 中,并赋予它更类似于 Cro::HTTP::Serverstartstop 方法。

我的新方法:

service.pm6

use Cro::HTTP::Log::File;
use Cro::HTTP::Server;

use Routes;
use KlineDataSubscriber; # Moved mqtt functionality here 
use Database;

my $dsn         = "host=localhost port=5432 dbname=act user=.. password=..";
my $dbh         = Database.new :$dsn;

my $mqtt-host   = 'localhost';
my $subscriber  = KlineDataSubscriber.new :$mqtt-host;

$subscriber.start; # Inspired by $http.start below

my Cro::Service $http = Cro::HTTP::Server.new(
    http => <1.1>,
    host => ...,
    port => ...,
    application => routes($dbh), # Basically back the way it was originally 
    after => [
        Cro::HTTP::Log::File.new(logs => $*OUT, errors => $*ERR)
    ]
);

$http.start;
say "Listening at...";
react {
    whenever signal(SIGINT) {
        say "Shutting down...";
        $subscriber.stop;
        $http.stop;
        done;
    }
}

并且在 KlineDataSubscriber.pm6

use MQTT::Client;

class KlineDataSubscriber {
    has Str $.mqtt-host is required;
    has MQTT::Client $.mqtt = Nil;

    submethod TWEAK() {
        $!mqtt = MQTT::Client.new: server => $!mqtt-host;
        await $!mqtt.connect;
    }

    method start(Str $topic = 'act/feed/exchange/binance/kline-closed/+/json') {
        start {
            react {
                whenever $!mqtt.subscribe($topic) {
                    say "+ topic: { .<topic> } => { .<message>.decode("utf8-c8") }";
                }
            }
        }
    }

    method stop() {
        # TODO Figure how to unsubscribe and cleanup nicely
    }
}

这对我来说更像是“Cro 惯用语”,但我很乐意得到纠正。 更重要的是,它按预期工作,我觉得在某种程度上是未来的证明。我应该能够创建一个供应,使实时数据可用于路由器,并将数据推送到任何连接的 Web 客户端。

我还打算有一个 http GET 端点 /status 进行各种检查以确保一切正常

你现在看起来很好,但当我第一次看到这个时,我做了这个 https://github.com/jonathanstowe/Cro-MQTT 它将 MQTT 客户端变成第一个 class Cro 服务。

我还没有发布它,但它可能具有指导意义。