如何在不需要 DAG 的情况下在 Hazelcast Jet 中使用 Twitter 流源?

How to use a Twitter stream source in Hazelcast Jet without needing a DAG?

我想对直播的推文进行简单分析。

如何在不需要 DAG 的情况下在 Hazelcast Jet 中使用 Twitter 流源?

详情

Twitter的封装API还不错StreamTwitterP.java

但是,调用者将其用作 DAG 的一部分,c/o:

Vertex twitterSource = 
  dag.newVertex("twitter", StreamTwitterP.streamTwitterP(properties, terms));

我的用例不需要 DAG 的强大功能,因此我宁愿避免这种不必要的额外复杂性。

为了避免 DAG,我希望使用 SourceBuilder 为实时推文流定义新的数据源。

我假设它的代码类似于上面提到的 StreamTwitterP.java,但是我不清楚是否适合使用 Hazelcast JET 的 API。

我指的是 SourceBuilder example from the docs

您可以将处理器转换为管道源:

Pipeline p = Pipeline.create();
p.drawFrom(Sources.<String>streamFromProcessor("twitter", 
    streamTwitterP(properties, terms)))
...

还有使用 SourceBuilder here.

的 twitterSource 版本