gpt4 book ai didi

cassandra - 配置使用 Kafka Connect Cassandra Source 时写入 Kafka Topic 的内容

转载 作者:行者123 更新时间:2023-12-04 00:31:14 25 4
gpt4 key购买 nike

我正在做一个高峰期,我们希望将数据发布到 Cassandra 表中,然后发布到 Kafka 主题。我们正在考虑使用 Kafka Connect 和 Stream Reactor 连接器。

我正在使用 Kafka 0.10.0.1

我正在使用 DataMountaineer Stream Reactor 0.2.4

我将 Stream Reactor 的 jar 文件放入 Kafka libs 文件夹,并以分布式模式运行 Kafka Connect

bin/connect-distributed.sh config/connect-distributed.properties

我添加了 Cassandra Source 连接器,如下所示:
curl -X POST -H "Content-Type: application/json" -d @config/connect-idoc-cassandra-source.json.txt localhost:8083/connectors

当我将数据添加到 Cassandra 表时,我看到它被添加到使用 Kafka 命令行消费者的主题中
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic idocs-topic --from-beginning

以下是目前正在写入主题的内容示例:
{
"schema": {
"type": "struct",
"fields": [{
"type": "string",
"optional": true,
"field": "idoc_id"
}, {
"type": "string",
"optional": true,
"field": "idoc_event_ts"
}, {
"type": "string",
"optional": true,
"field": "json_doc"
}],
"optional": false,
"name": "idoc.idocs_events"
},
"payload": {
"idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
"idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
"json_doc": "{\"foo\":\"bar\"}"
}}

我想写的话题是 json_doc的值柱子。

这是我的 Cassandra 源配置中的内容
{
"name": "cassandra-idocs",
"config": {
"tasks.max": "1",
"connector.class": "com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector",
"connect.cassandra.key.space": "idoc",
"connect.cassandra.source.kcql": "INSERT INTO idocs-topic SELECT json_doc FROM idocs_events PK idoc_event_ts",
"connect.cassandra.import.mode": "incremental",
"connect.cassandra.contact.points": "localhost",
"connect.cassandra.port": 9042,
"connect.cassandra.import.poll.interval": 10000
}}

如何更改 Kafka Connect Cassandra Source 的配置方式,以便只有 json_doc 的值被写入主题,所以它看起来像这样:
{"foo":"bar"}

Kassandra Connect Query Language似乎是要走的路,但它不限制写入 KCQL 中指定的列的内容。

更新

看到这个 answer on StackOverflow并更改了 connect-distributed.properties 中的转换器文件来自 JsonConverterStringConverter .

结果是现在将其写入主题:
Struct{idoc_id=74597cf0-fdf7-11e6-8285-1bce55915fdd,idoc_event_ts=74597cf1-fdf7-11e6-8285-1bce55915fdd,json_doc={"foo":"bar"}}

更新 2

更改了 connect-distributed.properties 中的转换器文件回 JsonConverter .然后还禁用了模式。
key.converter.schemas.enable=false
value.converter.schemas.enable=false

结果是现在将其写入主题:
{
"idoc_id": "dc4ab8a0-fdf8-11e6-8285-1bce55915fdd",
"idoc_event_ts": "dc4ab8a1-fdf8-11e6-8285-1bce55915fdd",
"json_doc": "{\"foo\":\"bar\"}"
}

备注
使用快照版本中的代码并将 KCQL 更改为
INSERT INTO idocs-topic 
SELECT json_doc, idoc_event_ts
FROM idocs_events
IGNORE idoc_event_ts
PK idoc_event_ts

在主题上产生这个结果
{"json_doc": "{\"foo\":\"bar\"}"}

谢谢

最佳答案

事实证明,在 DataMountaineer Stream Reactor 0.2.4 的 Cassandra Source 中,我尝试做的事情是不可能的。但是,快照版本(我认为将成为 0.2.5 版)将支持这一点。

以下是它的工作方式:

1) 在 connect-distributed.properties 中设置转换器文件到 StringConverter .

2) 将 Cassandra Source 连接器的 JSON 配置中的 KCQL 设置为

INSERT INTO idocs-topic 
SELECT json_doc, idoc_event_ts
FROM idocs_events
IGNORE idoc_event_ts
PK idoc_event_ts
WITHUNWRAP

这将导致 json_doc 的值列被发布到 Kafka 主题,没有任何架构信息或列名本身。

所以如果列 json_doc包含值 {"foo":"bar"}那么这就是将出现在主题上的内容:
{"foo":"bar"}

以下是有关 KCQL 在快照版本中如何工作的一些背景信息。
SELECT现在将仅检索该表中在 KCQL 中指定的列。最初它总是检索所有列。需要注意的是,PK列必须是 SELECT的一部分。使用 incremental 时的声明进口模式。如果 PK 列的值不应该包含在发布到 Kafka 主题的消息中,则将其添加到 IGNORE语句(如上例所示)。
WITHUNWRAP是 KCQL 的新功能,它将告诉 Cassandra Source 连接器创建 SourceRecord使用字符串 Schema类型(而不是结构)。在此模式下,只有 SELECT 中的列的值语句将存储为 SourceRecord 的值.如果 SELECT 中有不止一列应用 IGNORE 后的声明语句然后将值附加到一起并用逗号分隔。

关于cassandra - 配置使用 Kafka Connect Cassandra Source 时写入 Kafka Topic 的内容,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42519927/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com