有没有办法从 .NET 使用 Kafka Ksql 推送查询
Is there a way to consume a Kafka Ksql Push query from .NET
我目前正在使用 Kafka 消费者在 .NET 中处理大量 Kafka 消息。
我处理的第 1 步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。
我一开始就不想处理(特别是不想下载)那些不需要的消息。
看起来像一个 kSql 查询 - 编写为推送查询 - 可以有效地过滤掉我需要处理的消息。
我如何通过 .NET 使用这些?我看到一些文档提到 REST API,但我怀疑这是个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以有选择地下载和处理消息, 我只会处理当前体积的三分之一。)
很遗憾,我无法控制发布者,因此我无法更改what/how 消息已发布。
是的,您可以使用 ksqlDB 来执行此操作
-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT)
WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');
-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
SELECT * FROM my_source WHERE COL1='FOO';
然后在您的应用程序中使用 the REST API 运行 推送查询,它将仅使用过滤后的消息:
SELECT * FROM target EMIT CHANGES;
除了 ksqlDB,您可能还想看看这个社区最近发布的项目:https://github.com/LGouellec/kafka-streams-dotnet
您可以通过以下方式使用 ksqldb Linq 提供程序。
使用 Nuget 包管理器安装包:
Install-Package ksqlDB.RestApi.Client
使用 C# (.NET) 创建查询:
var ksqlDbUrl = @"http:\localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
await using var context = new KSqlDBContext(contextOptions);
using var subscription = context.CreateQueryStream<Message>() //stream name
.Where(p => p.RowTime >= 1510923225000) // add your own conditions
//....
.Select(l => new { l.Id, l.Message, l.RowTime })
.Subscribe(onNext: message =>
{
}, onError: error => { }, onCompleted: () => { });
上面的C#代码等价于下面的ksql:
SELECT Id, Message, RowTime FROM Messages WHERE RowTime >= 1510923225000 EMIT CHANGES;
更多运营商的项目 Wiki。
我目前正在使用 Kafka 消费者在 .NET 中处理大量 Kafka 消息。
我处理的第 1 步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。
我一开始就不想处理(特别是不想下载)那些不需要的消息。
看起来像一个 kSql 查询 - 编写为推送查询 - 可以有效地过滤掉我需要处理的消息。
我如何通过 .NET 使用这些?我看到一些文档提到 REST API,但我怀疑这是个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以有选择地下载和处理消息, 我只会处理当前体积的三分之一。)
很遗憾,我无法控制发布者,因此我无法更改what/how 消息已发布。
是的,您可以使用 ksqlDB 来执行此操作
-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT)
WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');
-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
SELECT * FROM my_source WHERE COL1='FOO';
然后在您的应用程序中使用 the REST API 运行 推送查询,它将仅使用过滤后的消息:
SELECT * FROM target EMIT CHANGES;
除了 ksqlDB,您可能还想看看这个社区最近发布的项目:https://github.com/LGouellec/kafka-streams-dotnet
您可以通过以下方式使用 ksqldb Linq 提供程序。
使用 Nuget 包管理器安装包:
Install-Package ksqlDB.RestApi.Client
使用 C# (.NET) 创建查询:
var ksqlDbUrl = @"http:\localhost:8088";
var contextOptions = new KSqlDBContextOptions(ksqlDbUrl);
await using var context = new KSqlDBContext(contextOptions);
using var subscription = context.CreateQueryStream<Message>() //stream name
.Where(p => p.RowTime >= 1510923225000) // add your own conditions
//....
.Select(l => new { l.Id, l.Message, l.RowTime })
.Subscribe(onNext: message =>
{
}, onError: error => { }, onCompleted: () => { });
上面的C#代码等价于下面的ksql:
SELECT Id, Message, RowTime FROM Messages WHERE RowTime >= 1510923225000 EMIT CHANGES;
更多运营商的项目 Wiki。