gpt4 book ai didi

apache-kafka - Kafka ksql 简单连接不起作用

转载 作者:行者123 更新时间:2023-12-04 05:21:40 25 4
gpt4 key购买 nike

我使用 Confluent 4.1 对流和表中的数据进行了重新加密

1) 创建流

   CREATE STREAM session_details_stream (Media varchar ,SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'sessionDetails', value_format = 'json');

2) 创建重新加密的流,因为这个脚本不起作用,但在此之前它起作用了,为什么?

CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by root;

然后我创建下一个脚本s

CREATE STREAM session_details_stream_update as select Media,SessionIdTime ,SessionIdSeq,CONCAT(SessionIdTime,SessionIdSeq) as root from SESSION_DETAILS_STREAM  partition by SessionIdTime;
CREATE STREAM session_details_stream_rekeyed as select Media,SessionIdTime ,SessionIdSeq,root from session_details_stream_update partition by root;

session_details_stream_rekeyed 的结果正常:

ksql> select * from session_details_stream_rekeyed;
1526411486488 | 2018-02-05T15:16:07.113+02:001| tex | 2018-02-05T15:16:07.113+02:001 | 1 | 2018-02-05T15:16:07.113+02:001

3) 为主题创建流;

 CREATE STREAM voip_details_stream (SessionIdTime varchar,SessionIdSeq long) with (kafka_topic = 'voipDetails', value_format = 'json');
CREATE STREAM voip_details_stream_update as select SessionIdTime ,SessionIdSeq, CONCAT(SESSIONIDTIME,SESSIONIDSEQ) as root from voip_details_stream partition by SessionIdTime;
CREATE STREAM voip_details_stream_rekeyed6 as select SessionIdTime ,SessionIdSeq,root from voip_details_stream_update partition by root;



ksql> select * from voip_details_stream_rekeyed6;
1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

4) 创建表

 CREATE TABLE voipDetails_table_test(SessionIdTime varchar,SessionIdSeq long,root varchar) WITH (kafka_topic='VOIP_DETAILS_STREAM_REKEYED6', value_format='JSON', KEY='root');

ksql> select * from voip_details_table;

1526411479438 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:00 | 1 | 2018-02-05T15:16:07.113+02:001

5) 然后我创建一个左连接

select  c.root,u.root from session_details_stream_rekeyed c LEFT JOIN voipDetails_table_test u On c.root  = u.root;

1526411477780 | 2018-02-05T15:16:07.113+02:001 | 2018-02-05T15:16:07.113+02:001 | null

问题出在哪里?

最佳答案

tl;dr 在进行流表连接时,您的消息必须在流之前已经存在(并且必须带有时间戳)消息。如果您重新发送源流消息,则在填充表主题后,连接将成功。

示例数据

使用kafkacat填充主题(粘贴数据到stdin)

cat > /tmp/msgs <<EOF
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}
EOF
kafkacat -b localhost:9092 -P -t sessionDetails /tmp/msgs


cat > /tmp/msgs <<EOF
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}
EOF
kafkacat -b localhost:9092 -P -t voipDetails /tmp/msgs

验证主题内容:

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t sessionDetails
{"Media":"Foo","SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1}
{"Media":"Foo","SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2}

Robin@asgard02 ~> kafkacat -b localhost:9092 -C -t voipDetails
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1a"}
{"SessionIdTime":"2018-05-17 11:25:33 BST","SessionIdSeq":1,"Details":"Bar1b"}
{"SessionIdTime":"2018-05-17 11:26:33 BST","SessionIdSeq":2,"Details":"Bar2"}

声明源流

ksql> CREATE STREAM session_details_stream \
(Media varchar ,SessionIdTime varchar,SessionIdSeq long) \
WITH (KAFKA_TOPIC = 'sessionDetails', VALUE_FORMAT = 'json');

Message
----------------
Stream created
----------------
ksql> CREATE STREAM voip_details_stream \
(SessionIdTime varchar,SessionIdSeq long, Details varchar) \
WITH (KAFKA_TOPIC = 'voipDetails', VALUE_FORMAT = 'json');

Message
----------------
Stream created
----------------
ksql> select * from session_details_stream;
1526553130864 | null | Foo | 2018-05-17 11:25:33 BST | 1
1526553130865 | null | Foo | 2018-05-17 11:26:33 BST | 2
^CQuery terminated
ksql> select * from voip_details_stream;
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1a
1526553143176 | null | 2018-05-17 11:25:33 BST | 1 | Bar1b
1526553143176 | null | 2018-05-17 11:26:33 BST | 2 | Bar2
^CQuery terminated

在SessionIdTime+SessionIdSeq上重新划分每个topic

ksql> CREATE STREAM SESSION AS \
SELECT Media, CONCAT(SessionIdTime,SessionIdSeq) AS root \
FROM session_details_stream \
PARTITION BY root;

Message
----------------------------
Stream created and running
----------------------------


ksql> SELECT ROWTIME, ROWKEY, root, media FROM SESSION;
1526553130864 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Foo
1526553130865 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Foo


ksql> CREATE STREAM VOIP AS \
SELECT CONCAT(SessionIdTime,SessionIdSeq) AS root, details \
FROM voip_details_stream \
PARTITION BY root;

Message
----------------------------
Stream created and running
----------------------------
ksql>

声明表

ksql> CREATE TABLE VOIP_TABLE (root VARCHAR, details VARCHAR) \
WITH (KAFKA_TOPIC='VOIP', VALUE_FORMAT='JSON', KEY='root');

Message
---------------
Table created
---------------
ksql> SELECT ROWTIME, ROWKEY, root, details FROM VOIP;
1526553143176 | 2018-05-17 11:26:33 BST2 | 2018-05-17 11:26:33 BST2 | Bar2
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1a
1526553143176 | 2018-05-17 11:25:33 BST1 | 2018-05-17 11:25:33 BST1 | Bar1b

加入SESSION流到VOIP表

ksql> SELECT s.ROWTIME, s.root, s.media, v.details \
FROM SESSION s \
LEFT OUTER JOIN VOIP_TABLE v ON S.root = V.root;
1526553130864 | 2018-05-17 11:25:33 BST1 | Foo | null
1526553130865 | 2018-05-17 11:26:33 BST2 | Foo | null

让上面的 JOIN 查询保持运行。将 SESSION 消息重新发送到源主题(使用 kafkacat 将相同的消息发送到 sessionDetails 如上所述):

1526553862403 | 2018-05-17 11:25:33 BST1 | Foo | Bar1a
1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

Per Rohan Desai 在 Confluent Community Slack 上:

The problem is that the rowtime of the record from your stream is earlier than the rowtime of the record in your table that you expect it to join with. So when the stream record is processed there is no corresponding record in the table

使用 ROWTIME 查看源表上的消息以查看消息时间戳(不要与基于时间戳的 root):

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, details from VOIP WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:23 | 1526553143176 | 2018-05-17 11:26:33 BST2 | Bar2

将此与源 session 流主题上的消息进行比较:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss') , ROWTIME, root, media from SESSION WHERE root='2018-05-17 11:26:33 BST2';
2018-05-17 11:32:10 | 1526553130865 | 2018-05-17 11:26:33 BST2 | Foo
2018-05-17 11:46:28 | 1526553988639 | 2018-05-17 11:26:33 BST2 | Foo

这些中的第一个(在11:32:10/1526553130865)先于相应的VOIP 消息(如上所示),并导致我们第一次看到的 null 连接结果。 second 这些日期较晚 (11:46:28/1526553988639) 产生了我们随后看到的成功连接:

1526553988639 | 2018-05-17 11:26:33 BST2 | Foo | Bar2

关于apache-kafka - Kafka ksql 简单连接不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50371518/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com