gpt4 book ai didi

amazon-s3 - TimeBasedPartitioner 的 Kafka Connect S3 连接器 OutOfMemory 错误

转载 作者:行者123 更新时间:2023-12-04 17:43:19 25 4
gpt4 key购买 nike

我目前正在使用 Kafka Connect S3 Sink Connector 3.3.1将 Kafka 消息复制到 S3,并且在处理后期数据时出现 OutOfMemory 错误。

我知道这看起来是一个很长的问题,但我尽力让它清晰易懂。
我非常感谢您的帮助。

高级信息

  • 连接器对 Kafka 消息进行简单的字节到字节复制,并在字节数组的开头添加消息的长度(用于解压缩)。
  • 这就是CustomByteArrayFormat的作用类(见下面的配置)
  • 根据 Record 对数据进行分区和分桶时间戳
  • CustomTimeBasedPartitioner扩展 io.confluent.connect.storage.partitioner.TimeBasedPartitioner其唯一目的是覆盖 generatePartitionedPath方法将主题放在路径的末尾。
  • Kafka Connect 进程的总堆大小为 24GB(只有一个节点)
  • 连接器每秒处理 8,000 到 10,000 条消息
  • 每条消息的大小接近 1 KB
  • Kafka 主题有 32 个分区

  • OutOfMemory 错误的上下文
  • 这些错误仅在连接器关闭数小时且必须 catch 数据时才会发生
  • 重新打开连接器时,它开始追赶,但很快就会失败并出现 OutOfMemory 错误

  • 可能但不完整的解释
  • timestamp.extractor连接器的配置设置为 Record当这些 OOM 错误发生时
  • 将此配置切换到 Wallclock (即Kafka Connect进程的时间)不要抛出OOM错误,所有迟到的数据都可以处理,但迟到的数据不再正确分桶
  • 所有迟到的数据都将被分桶在 YYYY/MM/dd/HH/mm/topic-name 中连接器重新打开的时间
  • 所以我的猜测是,当连接器试图根据 Record 正确存储数据时时间戳,它做了太多的并行读取导致 OOM 错误
  • "partition.duration.ms": "600000"参数在每小时 6 个 10 分钟路径中生成连接器存储桶数据(2018/06/20/12/[00|10|20|30|40|50] for 2018-06-20 at 12pm)
  • 因此,对于 24 小时的延迟数据,连接器必须在 24h * 6 = 144 中输出数据。不同的 S3 路径。
  • 每个 10 分钟文件夹包含 10,000 条消息/秒 * 600 秒 = 6,000,000 条消息,大小为 6 GB
  • 如果确实是并行读取,那将使 864GB 的数据进入内存
  • 我认为我必须正确配置一组给定的参数以避免那些 OOM 错误,但我觉得我没有看到大局
  • "flush.size": "100000"暗示如果有更多 dans 100,000 条消息被读取,它们应该被提交到文件(从而释放内存)
  • 对于 1KB 的消息,这意味着每 100MB 提交一次
  • 但即使有 144 个并行读数,总共也只能得到 14.4 GB,这小于可用的 24 GB 堆大小
  • "flush.size"要读取的记录数 每个分区 在 promise 之前?或者 每个连接器的任务 ?
  • 我理解的方式"rotate.schedule.interval.ms": "600000"配置是即使 flush.size 的 100,000 条消息,数据也将每 10 分钟提交一次。还没有达到。

  • 我的主要问题是让我计划内存使用的数学是什么:
  • 每秒的数量或记录
  • 记录的大小
  • 我从
  • 中读取的主题的 Kafka 分区数
  • 连接器任务的数量(如果相关)
  • 每小时写入的桶数(这里是 6,因为 "partition.duration.ms": "600000" 配置)
  • 要处理的延迟数据的最大小时数

  • 配置

    S3 接收器连接器配置
    {
    "name": "xxxxxxx",
    "config": {
    "connector.class": "io.confluent.connect.s3.S3SinkConnector",
    "s3.region": "us-east-1",
    "partition.duration.ms": "600000",
    "topics.dir": "xxxxx",
    "flush.size": "100000",
    "schema.compatibility": "NONE",
    "topics": "xxxxxx,xxxxxx",
    "tasks.max": "16",
    "s3.part.size": "52428800",
    "timezone": "UTC",
    "locale": "en",
    "format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
    "partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
    "schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
    "name": "xxxxxxxxx",
    "storage.class": "io.confluent.connect.s3.storage.S3Storage",
    "s3.bucket.name": "xxxxxxx",
    "rotate.schedule.interval.ms": "600000",
    "path.format": "YYYY/MM/dd/HH/mm",
    "timestamp.extractor": "Record"
    }

    worker 配置
    bootstrap.servers=XXXXXX
    key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    consumer.auto.offset.reset=earliest
    consumer.max.partition.fetch.bytes=2097152
    consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
    group.id=xxxxxxx
    config.storage.topic=connect-configs
    offset.storage.topic=connect-offsets
    status.storage.topic=connect-status
    rest.advertised.host.name=XXXX

    编辑 :

    我忘了添加一个我遇到的错误的例子:
    2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
    java.lang.OutOfMemoryError: Java heap space
    [2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
    [2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
    org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

    最佳答案

    我终于能够理解堆大小使用在 Kafka Connect S3 连接器中是如何工作的

  • S3 Connector 会将每个 Kafka 分区的数据写入分区 paths
  • 方式那些paths分区取决于 partitioner.class范围;
  • 默认按时间戳,取值partition.duration.ms然后将确定每个分区的持续时间 paths .
  • S3 连接器将分配一个缓冲区 s3.part.size每个 Kafka 分区(对于读取的所有主题)和每个分区的字节数 paths
  • 读取 20 个分区的示例,timestamp.extractor设置为 Record , partition.duration.ms设置为 1 小时,s3.part.size设置为 50 MB
  • 每小时所需的堆大小等于 20 * 50 MB = 1 GB;
  • 但是,timestamp.extractor被设置为 Record ,具有与较早时间相对应的时间戳的消息,然后读取它们的时间将缓冲在此较早时间缓冲区中。因此,实际上,连接器至少需要 20 * 50 MB * 2h = 2 GB 内存,因为总是有延迟事件,如果有延迟超过 1 小时的事件,则需要更多;
  • 请注意,如果 timestamp.extractor,则情况并非如此。设置为 Wallclock因为就 Kafka Connect 而言,几乎永远不会有迟到的事件。
  • 这些缓冲区在 3 个条件下被刷新(即离开内存)
  • rotate.schedule.interval.ms时间过去了
  • 此冲洗条件是 总是 触发。
  • rotate.interval.ms时间过去了timestamp.extractor 方面时间
  • 这意味着如果 timestamp.extractor设置为 Record , 10 分钟 Record时间可以少于或多于 10 分钟的实际时间
  • 例如,处理迟到的数据时,10 分钟的数据将在几秒钟内处理完毕,如果 rotate.interval.ms设置为 10 分钟,则此条件将每秒触发一次(应该如此);
  • 相反,如果事件流中出现暂停,则直到看到时间戳显示超过 rotate.interval.ms 的事件才会触发此条件。自上次触发条件以来已经过去。
  • flush.size消息已被阅读少于 min(rotate.schedule.interval.ms , rotate.interval.ms)
  • 至于rotate.interval.ms ,如果没有足够的消息,这种情况可能永远不会触发。
  • 因此,您需要规划 Kafka partitions * s3.part.size至少堆大小
  • 如果您使用的是 Record分区的时间戳,您应该将其乘以 max lateness in milliseconds / partition.duration.ms
  • 这是最坏的情况,您在所有分区和所有范围 max lateness in milliseconds 中不断发生延迟事件。 .
  • S3 连接器还将缓冲 consumer.max.partition.fetch.bytes从 Kafka 读取时每个分区的字节数
  • 默认设置为 2.1 MB。
  • 最后,您不应该认为所有 Heap Size 都可用于缓冲 Kafka 消息,因为其中还有很多不同的对象
  • 一个安全的考虑是确保 Kafka 消息的缓冲不超过总可用堆大小的 50%。
  • 关于amazon-s3 - TimeBasedPartitioner 的 Kafka Connect S3 连接器 OutOfMemory 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50971065/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com