gpt4 book ai didi

apache-kafka - 将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时刷新大小

转载 作者:行者123 更新时间:2023-12-05 07:23:37 25 4
gpt4 key购买 nike

我想在我的数据湖中保留 Kafka 主题中的数据。

在担心 key 之前,我能够使用 HdfsSinkConnector 将 Avro 值保存在数据湖上的文件中。每个文件中消息值的数量由 HdfsSinkConnector 的“flush.size”属性决定。

一切顺利。接下来我也想保留 key 。为此,我使用了 kafka-connect-transform-archive,它将 String 键和 Avro 值包装到一个新的 Avro 模式中。

这很好用……除了 HdfsSinkConnector 的 flush.size 现在被忽略了。保存在数据湖中的每个文件只有 1 条消息。

因此,这两种情况是 1) 仅保存值,每个文件中值的数量由 flush.size 确定,以及 2) 保存键和值,每个文件只包含一条消息,而 flush.size 被忽略。

这两种情况之间的唯一区别是指定存档转换的 HdfsSinkConnector 配置。

"transforms": "tran",
"transforms.tran.type": "com.github.jcustenborder.kafka.connect.archive.Archive"

kafka-connect-transform-archive 是否通过设计忽略刷新大小,或者是否需要一些额外的配置才能在数据湖上为每个文件保存多个键值消息?

最佳答案

我在使用 kafka gcs 接收器连接器时遇到了同样的问题。

在 com.github.jcustenborder.kafka.connect.archive.Archive 代码中,每条消息都会创建一个新的 Schema。

private R applyWithSchema(R r) {
final Schema schema = SchemaBuilder.struct()
.name("com.github.jcustenborder.kafka.connect.archive.Storage")
.field("key", r.keySchema())
.field("value", r.valueSchema())
.field("topic", Schema.STRING_SCHEMA)
.field("timestamp", Schema.INT64_SCHEMA);
Struct value = new Struct(schema)
.put("key", r.key())
.put("value", r.value())
.put("topic", r.topic())
.put("timestamp", r.timestamp());
return r.newRecord(r.topic(), r.kafkaPartition(), null, null, schema, value, r.timestamp());

如果您查看 kafka transform InsertField$Value 方法,您会发现它使用 SynchronizedCache 以便每次都检索相同的模式。

https://github.com/axbaretto/kafka/blob/ba633e40ea77f28d8f385c7a92ec9601e218fb5b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java#L170

因此,您只需创建一个架构(在应用函数之外)或使用相同的 SynchronizedCache 代码。

关于apache-kafka - 将 kafka-connect-transform-archive 与 HdfsSinkConnector 一起使用时刷新大小,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55865349/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com