gpt4 book ai didi

c# - 有没有办法从 .NET 使用 Kafka Ksql 推送查询

转载 作者:行者123 更新时间:2023-12-04 15:27:54 24 4
gpt4 key购买 nike

我目前正在使用 Kafka 消费者在 .NET 中处理大量 Kafka 消息。

我处理的第 1 步是解析 JSON 并根据 JSON 中特定字段的值丢弃许多消息。

我不想首先处理(特别是不下载)那些不需要的消息。

看起来像一个 kSql 查询 - 编写为推送查询 - 可以有效地过滤掉我需要处理的消息。

我如何通过 .NET 使用这些?我看到一些文档提到了 REST API,但我怀疑这是个好主意,我需要在一天的高峰时间每分钟处理超过 100 000 条记录。(如果我可以选择性地下载和处理消息,我只会正在处理当前体积的大约三分之一。)

不幸的是,我无法控制发布者,所以我无法更改发布消息的内容/方式。

最佳答案

是的,你可以使用 ksqlDB 来做到这一点

-- Declare a stream on the source topic
-- Because it's JSON you'll need to specify the schema
CREATE STREAM my_source (COL1 VARCHAR, COL2 INT)
WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');

-- Apply the filter to the stream, with the results written
-- to a new stream (backed by a new topic)
CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
SELECT * FROM my_source WHERE COL1='FOO';

然后使用 the REST API从您的应用程序中运行一个推送查询,该查询将仅使用过滤后的消息:

SELECT * FROM target EMIT CHANGES;

除了 ksqlDB,您可能还想看看这个社区最近发布的项目:https://github.com/LGouellec/kafka-streams-dotnet

关于c# - 有没有办法从 .NET 使用 Kafka Ksql 推送查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61879865/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com