有没有办法从 .NET 使用 Kafka Ksql 推送查询

Is there a way to consume a Kafka Ksql Push query from .NET

我目前正在使用 Kafka 消费者在 .NET 中处理大量 Kafka 消息。

我处理的第 1 步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。

我一开始就不想处理(特别是不想下载)那些不需要的消息。

看起来像一个 kSql 查询 - 编写为推送查询 - 可以有效地过滤掉我需要处理的消息。

我如何通过 .NET 使用这些?我看到一些文档提到 REST API,但我怀疑这是个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以有选择地下载和处理消息, 我只会处理当前体积的三分之一。)

很遗憾,我无法控制发布者,因此我无法更改what/how 消息已发布。

是的,您可以使用 ksqlDB 来执行此操作

-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT) 
  WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');

-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
  SELECT * FROM my_source WHERE COL1='FOO';

然后在您的应用程序中使用 the REST API 运行 推送查询,它将仅使用过滤后的消息:

SELECT * FROM target EMIT CHANGES;

除了 ksqlDB,您可能还想看看这个社区最近发布的项目:https://github.com/LGouellec/kafka-streams-dotnet

您可以通过以下方式使用 ksqldb Linq 提供程序。

使用 Nuget 包管理器安装包:

Install-Package ksqlDB.RestApi.Client

使用 C# (.NET) 创建查询:

var ksqlDbUrl = @"http:\localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
      
await using var context = new KSqlDBContext(contextOptions);

using var subscription = context.CreateQueryStream<Message>() //stream name
  .Where(p => p.RowTime >= 1510923225000) // add your own conditions
  //....
  .Select(l => new { l.Id, l.Message, l.RowTime })
  .Subscribe(onNext: message =>
  {
  }, onError: error => {  }, onCompleted: () => { });

上面的C#代码等价于下面的ksql:

SELECT Id, Message, RowTime FROM Messages WHERE RowTime >= 1510923225000 EMIT CHANGES;

更多运营商的项目 Wiki