gpt4 book ai didi

jdbc - 如何为 kafka 连接源连接器使用时间戳或时间戳+递增模式?

转载 作者:行者123 更新时间:2023-12-03 21:03:32 25 4
gpt4 key购买 nike

我有一个数据库(Mariadb)关系,其中一列“修改”为“bigint(10)”,表示时间戳,我相信unix时间格式。当我尝试以“时间戳”或“时间戳+递增”模式运行 kafka 源连接器时,没有事件被推送到主题中。如果我只运行递增,新条目将被推送到主题。有人可以提示我错误配置连接器的位置吗?或者连接器无法识别 unix 时间格式的时间戳?

我尝试运行具有以下属性的连接器(仅基于时间戳检索):

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name":"only_ts",
"config": {
"numeric.mapping": "best_fit",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://mariadb/moodle",
"connection.user": "user",
"connection.password": "",
"topic.prefix": "only_ts_",
"mode": "timestamp",
"timestamp.column.name":"modified",
"table.whitelist":"mdl_forum_posts",
"poll.intervals.ms": 10000
}
}'

每当我创建条目或更新条目时,我都希望看到来自“mdl_forum_posts”的条目被推送到 kafka 主题“only_ts_mdl_forum_posts”中。但是,使用此连接器,没有任何 react 。
如果我只使用“递增”模式,这可以正常工作并且符合预期。但是为了获得数据库更新,我需要添加模式时间戳。

“描述 mdl_forum_posts”的输出
+---------------+--------------+------+-----+---------+----------------+

| Field | Type | Null | Key | Default | Extra |

+---------------+--------------+------+-----+---------+----------------+

| id | bigint(10) | NO | PRI | NULL | auto_increment |

| discussion | bigint(10) | NO | MUL | 0 | |

| parent | bigint(10) | NO | MUL | 0 | |

| userid | bigint(10) | NO | MUL | 0 | |

| created | bigint(10) | NO | MUL | 0 | |

| modified | bigint(10) | NO | | 0 | |

| mailed | tinyint(2) | NO | MUL | 0 | |

| subject | varchar(255) | NO | | | |

| message | longtext | NO | | NULL | |

| messageformat | tinyint(2) | NO | | 0 | |

| messagetrust | tinyint(2) | NO | | 0 | |

| attachment | varchar(100) | NO | | | |

| totalscore | smallint(4) | NO | | 0 | |

| mailnow | bigint(10) | NO | | 0 | |

| deleted | tinyint(1) | NO | | 0 | |

+---------------+--------------+------+-----+---------+----------------+

和“显示创建表moodle.mdl_forum_posts;”的输出:
| mdl_forum_posts | CREATE TABLE mdl_forum_posts (

id bigint(10) NOT NULL AUTO_INCREMENT,

discussion bigint(10) NOT NULL DEFAULT '0',

parent bigint(10) NOT NULL DEFAULT '0',

userid bigint(10) NOT NULL DEFAULT '0',

created bigint(10) NOT NULL DEFAULT '0',

modified bigint(10) NOT NULL DEFAULT '0',

mailed tinyint(2) NOT NULL DEFAULT '0',

subject varchar(255) NOT NULL DEFAULT '',

message longtext NOT NULL,

messageformat tinyint(2) NOT NULL DEFAULT '0',

messagetrust tinyint(2) NOT NULL DEFAULT '0',

attachment varchar(100) NOT NULL DEFAULT '',

totalscore smallint(4) NOT NULL DEFAULT '0',

mailnow bigint(10) NOT NULL DEFAULT '0',

deleted tinyint(1) NOT NULL DEFAULT '0',

PRIMARY KEY (id),

KEY mdl_forupost_use_ix (userid),

KEY mdl_forupost_cre_ix (created),

KEY mdl_forupost_mai_ix (mailed),

KEY mdl_forupost_dis_ix (discussion),

KEY mdl_forupost_par_ix (parent)

) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COMMENT='All posts are stored in this table' |

“已修改”列中的示例条目是:
select modified from mdl_forum_posts;
1557487199

它是 unix 时间的时间戳,如下所示:
select from_unixtime(modified) from mdl_forum_posts;
2019-05-10 11:19:59

有关相关连接器的相关日志(仅时间戳)似乎显示了一些查询?
kafka-connect_1    | [2019-05-10 11:48:47,434] DEBUG TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} prepared SQL query: SELECT * FROM `moodle`.`mdl_forum_posts` WHERE `moodle`.`mdl_forum_posts`.`modified` > ? AND `moodle`.`mdl_forum_posts`.`modified` < ? ORDER BY `moodle`.`mdl_forum_posts`.`modified` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
kafka-connect_1 | [2019-05-10 11:48:47,435] DEBUG Resetting querier TimestampIncrementingTableQuerier{table="moodle"."mdl_forum_posts", query='null', topicPrefix='only_ts_', incrementingColumn='', timestampColumns=[modified]} (io.confluent.connect.jdbc.source.JdbcSourceTask)

最佳答案

我有同样的问题。对我来说唯一的解决方法是这里提到的:https://github.com/confluentinc/kafka-connect-jdbc/issues/566 .
这意味着 timestamp Unix 时间戳 (bigint) 列的模式可以与自定义查询一起使用。您只需要使用自己的where clause .例如在你的情况下,它可能是这样的:

SELECT id 
FROM mdl_forum_posts
WHERE to_timestamp(modified/1000) > ? AND to_timestamp(modified/1000) < ? ORDER BY modified ASC
--
to_timestamp是数据库方言中的日期转换函数。并请注意 --允许评论自动生成 where clause .

关于jdbc - 如何为 kafka 连接源连接器使用时间戳或时间戳+递增模式?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56075201/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com