gpt4 book ai didi

apache-kafka - Kafka KSQL Re分区和rekey问题

转载 作者:行者123 更新时间:2023-12-04 04:22:07 26 4
gpt4 key购买 nike

我定义了一个流

CREATE STREAM QUOTE (quoteId VARCHAR,
counterPartyId VARCHAR)
WITH (KAFKA_TOPIC='quotes',
VALUE_FORMAT='JSON',
KEY='quoteId');

我想汇总到目前为止我得到了多少报价,以及该事件的最后一个 quoteId

CREATE TABLE KQUOTE AS
SELECT Max(CAST(quoteId as INT)) as quoteId,COUNT(*) AS COUNT
FROM QUOTE
GROUP BY 1;

将此表转为流,因为我想知道聚合结果历史记录。 (好像我必须使用底层主题来创建流。不能直接从表“KQUOTE”创建流)。

CREATE stream KQuoteStream (quoteId VARCHAR,
count INT)
WITH (KAFKA_TOPIC='KQUOTE',
VALUE_FORMAT='JSON',
KEY='quoteId');

我希望上面使用 RAWKEY quoteId,但事实并非如此。正如我们在下面看到的,RAWKEY 始终为 1(因为我们在创建表 kquote 时按常量 1 分组)。

ksql> select * from KQuoteStream;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

尝试按quoteId重新分区流,将RAWKEY改为quoteId

CREATE stream KQuoteStreamByQuoteId
as
SELECT quoteId, COUNT FROM KQuoteStream PARTITION BY quoteId;

RAMKEY 还是常量 1

ksql> select * from KQuoteStreamByQuoteId;
1574121797111 | 1 | 806 | 20
1574121979291 | 1 | 807 | 21

顺便说一句:所有主题都具有与 1 相同的分区,以使事情变得更简单。有人知道吗?非常感谢 !

最佳答案

这绝对是您发现的一个有趣的错误!

这里的技巧是要理解 WITH(KEY='quoteId') 实际上 任何事情,它是对 ksqlDB 的提示,表明关键字段发生了也作为 quoteId 存在于值中。然后,当您PARTITION BY quoteId 时,它认为您是按行键进行分区,所以它什么都不做!我同意这种行为非常不直观,这就是为什么我们计划删除 WITH(KEY=...) 功能以支持更直观的功能(待定)。

与此同时,解决方法应该是在创建 KQuoteStream指定键,这样 KSQL 就不会优化重新分区。

关于apache-kafka - Kafka KSQL Re分区和rekey问题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58924870/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com