gpt4 book ai didi

java - 为什么元数据添加到这个 Kafka 连接器的输出中?

转载 作者:行者123 更新时间:2023-12-01 09:11:41 25 4
gpt4 key购买 nike

我有一个 Kafka 连接器,其中包含 SourceTask 实现中的 poll() 方法的以下代码。

@Override
public List<SourceRecord> poll() throws InterruptedException
{
SomeType item = mQueue.take();
List<SourceRecord> records = new ArrayList<>();
SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
Schema.STRING_SCHEMA, "bar")
};
Collections.addAll(records, sourceRecords);

return records;
}

如果我将使用者附加到数据主题,我会收到从连接器发送的以下消息:

{"schema":{"type":"string","optional":false},"payload":"foo"}   {"schema":{"type":"string","optional":false},"payload":"bar"}

如果我使用以下命令直接向主题发布消息:

echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,

然后附加同一个消费者,我收到此消息:

foo bar

这是我期望看到的连接器实现的输出,而不是我收到的 {"schema":... 消息。

如何更改 poll() 的实现,以便在消息的实际键和值中不出现架构元数据的情况下发送消息?

最佳答案

好吧,事实证明这只是因为我在 connect-standalone.properties 中有以下几行

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

我应该有

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.storage.StringConverter

作为替代解决方案,我还可以将以下设置从 true 更改为 false

value.converter.schemas.enable=false

然后在我的处理器类中,我将代码更改为:

SourceRecord[] sourceRecords = new SourceRecord[]{
new SourceRecord(null, null, "data", null,
Schema.STRING_SCHEMA, "foo",
null, "bar")
};

这有所不同,因为我不再为该值指定架构。

关于java - 为什么元数据添加到这个 Kafka 连接器的输出中?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40878365/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com