gpt4 book ai didi

database - 如何比较两个kafka流或数据库表之间的(10亿条记录)数据

转载 作者:行者123 更新时间:2023-12-05 07:30:42 25 4
gpt4 key购买 nike

我们正在通过 CDC 将数据从 DB2 (table-1) 发送到 Kafka 主题 (topic-1)。我们需要在 DB2 数据和 Kafka 主题之间进行协调。我们有两个选择 -

a) 将所有 kafka 主题数据放入 DB2(作为 table-1-copy),然后进行左外连接(在 table-1 和 table-1-copy 之间)以查看不匹配的记录,创建增量并将其推回卡夫卡。问题:可扩展性——我们的数据集大约有十亿条记录,我不确定 DB2 DBA 是否会让我们运行如此庞大的连接操作(可能很容易持续超过 15-20 分钟)。

b) 再次将 DB2 推回并行的 kafka 主题(topic-1-copy),然后执行一些基于 kafka 流的解决方案以在 kafka topic-1 和 topic-1-copy 之间进行左外连接。我仍然在思考 kafka 流和左外连接。我不确定(在 kafka 流中使用窗口系统)我是否能够将 topic-1 的全部内容与 topic-1-copy 进行比较。

更糟糕的是,kafka中的topic-1是一个紧凑的topic,因此,当我们将数据从 DB2 推回 Kafka topic-1-copy 时,我们无法确定地启动 kafka topic-compaction 周期以确保 topic-1 和 topic-1-copy 在运行任何类型的比较之前都已完全压缩对他们进行操作。

c) 有没有其他我们可以考虑的框架选项?

理想的解决方案必须针对任何规模的数据进行扩展。

最佳答案

我看不出您不能在 Kafka Streams 或 KSQL 中执行此操作的原因。两者都支持表-表连接。这是假设支持数据格式。

键压缩不会影响结果,因为 Streams 和 KSQL 都会构建连接两个表的正确最终状态。如果压缩已经运行,需要处理的数据量可能会更少,但结果是一样的。

例如,在 ksqlDB 中,您可以将两个主题作为表导入并执行连接,然后通过 topic-1 表为 null 进行过滤以查找缺失的列表行。

-- example using 0.9 ksqlDB, assuming a INT primary key:

-- create table from main topic:
CREATE TABLE_1
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1', value_format='?');

-- create table from second topic:
CREATE TABLE_2
(ROWKEY INT PRIMARY KEY, <other column defs>)
WITH (kafka_topic='topic-1-copy', value_format='?');

-- create a table containing only the missing keys:
CREATE MISSING AS
SELECT T2.* FROM TABLE_2 T2 LEFT JOIN TABLE_1 T1
WHERE T1.ROWKEY = null;

这种方法的好处是缺失行的 MISSING 表会自动更新:当您从源 DB2 实例中提取缺失的行并将它们生成到 topic-1 那么“MISSING”表中的行将被删除,即您会看到为 MISSING 主题生成的墓碑。

您甚至可以扩展此方法以查找不再存在于源数据库中的 topic-1 中的行:

-- using the same DDL statements for TABLE_1 and TABLE_2 from above

-- perform the join:
CREATE JOINED AS
SELECT * FROM TABLE_2 T2 FULL OUTER JOIN TABLE_1 T1;

-- detect rows in the DB that aren't in the topic:
CREATE MISSING AS
SELECT * FROM JOINED
WHERE T1_ROWKEY = null;

-- detect rows in the topic that aren't in the DB:
CREATE EXTRA AS
SELECT * FROM JOINED
WHERE T2_ROWKEY = null;

当然,您需要相应地调整集群的大小。 ksqlDB 集群越大,处理数据的速度就越快。它还需要磁盘容量来实现表。

您可以根据主题上的分区数设置的最大并行化量。如果您只有 1 个分区,则数据将按顺序处理。如果运行 100 个分区,则可以使用 100 个 CPU 内核处理数据,前提是您运行了足够多的 ksqlDB 实例。 (默认情况下,每个 ksqlDB 节点将为每个查询创建 4 个流处理线程,(尽管如果服务器有更多内核,您可以增加它!))。

关于database - 如何比较两个kafka流或数据库表之间的(10亿条记录)数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52129604/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com