gpt4 book ai didi

amazon-s3 - 强制 Confluence s3 水槽冲洗

转载 作者:行者123 更新时间:2023-12-02 23:04:38 27 4
gpt4 key购买 nike

我设置了 kafka connect s3 接收器,持续时间设置为 1 小时,并且还设置了相当大的刷新计数,比如 10,000。现在如果kafka channel 中消息不多,s3接收器会尝试将它们缓冲在内存中,并等待累积到flush计数,然后将它们一起上传并将偏移量提交给自己的消费者组。

但想想这种情况。如果在 channel 里,我只发送5000条消息。然后就没有s3水槽冲洗了。那么过了很长一段时间,这5000条消息最终会因为保留时间的原因被从kafka中逐出。但这些消息仍然在s3接收器的内存中,而不是在s3中。这是非常危险的,例如,如果我们重新启动 s3 Sink 或运行 s3 Sink 的机器崩溃了。然后我们就丢失了这 5,000 条消息。我们无法从 kafka 中再次找到它们,因为它已被删除。

s3 sink 会发生这种情况吗?或者有一些设置强制它在一段时间后刷新?

最佳答案

如果从 Kafka 到 S3 的流没有恒定的记录流,您可以使用该属性

rotate.schedule.interval.ms

按计划的时间间隔刷新记录。

请注意,在重新处理的情况下,如果使用此选项,您的下游系统应该能够处理重复项。这是因为,如果连接器计划从 Kafka 重新导出记录,则根据挂钟刷新此类记录可能会导致不同文件中出现重复项。

作为旁注,如果您使用属性:

rotate.interval.ms

使用 Wallclock 时间戳提取器 (timestamp.extractor=Wallclock),您的记录将被刷新,而无需设置 rotate.schedule.interval.ms.但这意味着您的分区程序依赖于挂钟,因此您应该能够考虑重复记录。

连接器能够通过确定性分区器对恒定的记录流提供一次性交付,并具有各种时间戳提取器,例如依赖于记录的时间戳 (Record) 或时间戳的提取器。字段时间戳 (RecordField) 。

分区的配置属性 here

关于amazon-s3 - 强制 Confluence s3 水槽冲洗,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50761999/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com