gpt4 book ai didi

apache-kafka - 在分布式模式下使用 HDFS 连接器接收器避免来自 Kafka 连接的小文件

转载 作者:行者123 更新时间:2023-12-05 03:05:17 24 4
gpt4 key购买 nike

我们有一个主题,消息速率为每秒 1 msg,有 3 个分区,我正在使用 HDFS 连接器将数据以 AVRo 格式(默认)写入 HDFS,它生成大小为 KBS 的文件,所以我尝试更改HDFS 属性中的以下属性。

"flush.size":"5000","rotate.interval.ms":"7200000"

但是输出仍然是小文件,所以我需要弄清楚以下几点来解决这个问题:

  1. flush.size 属性是强制性的,如果我们不提及 flus.size 属性,数据将如何刷新?

  2. 如果我们提到刷新大小为 5000 并且旋转间隔为 2 小时,则前 3 个间隔每 2 小时刷新一次数据,但之后它随机刷新数据,请找到文件的计时创建(19:14,21:14,23:15,01:15,06:59,08:59,12:40,14:40)--突出显示了不匹配的间隔。是因为提到的属性的覆盖?这让我想到了第三个问题。

  3. 如果我们提到以下所有属性 (flush.size,rotate.interval.ms,rotate.schedule.interval.ms),flush 的偏好是什么

  4. 提高msg的速率,减少partition,实际上是flush的数据变大了,是不是只有这样才能控制小文件,如果输入事件的速率变化且不稳定?

如果您能分享有关在使用 HDFS 连接器连接的 kafka 中处理小文件的文档,那将会很有帮助,谢谢。

最佳答案

如果您使用的是 TimeBasedPartitioner,并且消息的时间戳不会一直增加,那么当它在 的时间间隔内看到一条具有较小时间戳的消息时,您将以单个编写器任务转储文件而告终rotate.interval.ms 读取任何给定记录。

如果你想拥有一致的双小时分区窗口,那么你应该使用 rotate.interval.ms=-1 来禁用它,然后使用 rotate.schedule.interval.ms 到分区持续时间窗口内的某个合理数字。

例如您每 2 小时有 7200 条消息,并且不清楚每条消息有多大,但可以说是 1MB。然后,您将在缓冲区中保存约 7GB 的数据,并且您需要调整 Connect 堆大小以保存这么多数据。

出现顺序是

  1. 预定轮换,从整点开始
  2. 刷新大小或“基于消息”的时间轮换,以先发生者为准,或者存在被视为“在”当前批处理开始之前的记录

而且我认为存储连接器的齐平尺寸是强制性的


总体来说,像Uber的Hudi或者之前Camus Sweeper的Kafka-HDFS工具这样的系统,处理小文件的能力更强。 Connect Sink Tasks 只关心从 Kafka 消费,并写入下游系统;框架本身不承认 Hadoop 更喜欢更大的文件。

关于apache-kafka - 在分布式模式下使用 HDFS 连接器接收器避免来自 Kafka 连接的小文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51157978/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com