- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我目前正在使用 Kafka Connect S3 Sink Connector 3.3.1将 Kafka 消息复制到 S3,并且在处理后期数据时出现 OutOfMemory 错误。
我知道这看起来是一个很长的问题,但我尽力让它清晰易懂。
我非常感谢您的帮助。
高级信息
CustomByteArrayFormat
的作用类(见下面的配置)Record
对数据进行分区和分桶时间戳CustomTimeBasedPartitioner
扩展 io.confluent.connect.storage.partitioner.TimeBasedPartitioner
其唯一目的是覆盖 generatePartitionedPath
方法将主题放在路径的末尾。 timestamp.extractor
连接器的配置设置为 Record
当这些 OOM 错误发生时 Wallclock
(即Kafka Connect进程的时间)不要抛出OOM错误,所有迟到的数据都可以处理,但迟到的数据不再正确分桶YYYY/MM/dd/HH/mm/topic-name
中连接器重新打开的时间 Record
正确存储数据时时间戳,它做了太多的并行读取导致 OOM 错误"partition.duration.ms": "600000"
参数在每小时 6 个 10 分钟路径中生成连接器存储桶数据(2018/06/20/12/[00|10|20|30|40|50]
for 2018-06-20 at 12pm)24h * 6 = 144
中输出数据。不同的 S3 路径。 "flush.size": "100000"
暗示如果有更多 dans 100,000 条消息被读取,它们应该被提交到文件(从而释放内存)"flush.size"
要读取的记录数 每个分区 在 promise 之前?或者 每个连接器的任务 ? "rotate.schedule.interval.ms": "600000"
配置是即使 flush.size
的 100,000 条消息,数据也将每 10 分钟提交一次。还没有达到。 "partition.duration.ms": "600000"
配置){
"name": "xxxxxxx",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"s3.region": "us-east-1",
"partition.duration.ms": "600000",
"topics.dir": "xxxxx",
"flush.size": "100000",
"schema.compatibility": "NONE",
"topics": "xxxxxx,xxxxxx",
"tasks.max": "16",
"s3.part.size": "52428800",
"timezone": "UTC",
"locale": "en",
"format.class": "xxx.xxxx.xxx.CustomByteArrayFormat",
"partitioner.class": "xxx.xxxx.xxx.CustomTimeBasedPartitioner",
"schema.generator.class": "io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator",
"name": "xxxxxxxxx",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"s3.bucket.name": "xxxxxxx",
"rotate.schedule.interval.ms": "600000",
"path.format": "YYYY/MM/dd/HH/mm",
"timestamp.extractor": "Record"
}
bootstrap.servers=XXXXXX
key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
consumer.auto.offset.reset=earliest
consumer.max.partition.fetch.bytes=2097152
consumer.partition.assignment.strategy=org.apache.kafka.clients.consumer.RoundRobinAssignor
group.id=xxxxxxx
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
rest.advertised.host.name=XXXX
2018-06-21 14:54:48,644] ERROR Task XXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerSinkTask:482)
java.lang.OutOfMemoryError: Java heap space
[2018-06-21 14:54:48,645] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerSinkTask:483)
[2018-06-21 14:54:48,645] ERROR Task XXXXXXXXXXXXXXXX-15 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:148)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:484)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:182)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:150)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
最佳答案
我终于能够理解堆大小使用在 Kafka Connect S3 连接器中是如何工作的
paths
paths
分区取决于 partitioner.class
范围; partition.duration.ms
然后将确定每个分区的持续时间 paths
. s3.part.size
每个 Kafka 分区(对于读取的所有主题)和每个分区的字节数 paths
timestamp.extractor
设置为 Record
, partition.duration.ms
设置为 1 小时,s3.part.size
设置为 50 MB20 * 50 MB
= 1 GB; timestamp.extractor
被设置为 Record
,具有与较早时间相对应的时间戳的消息,然后读取它们的时间将缓冲在此较早时间缓冲区中。因此,实际上,连接器至少需要 20 * 50 MB * 2h
= 2 GB 内存,因为总是有延迟事件,如果有延迟超过 1 小时的事件,则需要更多; timestamp.extractor
,则情况并非如此。设置为 Wallclock
因为就 Kafka Connect 而言,几乎永远不会有迟到的事件。 rotate.schedule.interval.ms
时间过去了rotate.interval.ms
时间过去了在 timestamp.extractor
方面时间 timestamp.extractor
设置为 Record
, 10 分钟 Record
时间可以少于或多于 10 分钟的实际时间rotate.interval.ms
设置为 10 分钟,则此条件将每秒触发一次(应该如此); rotate.interval.ms
的事件才会触发此条件。自上次触发条件以来已经过去。 flush.size
消息已被阅读少于 min(rotate.schedule.interval.ms
, rotate.interval.ms)
rotate.interval.ms
,如果没有足够的消息,这种情况可能永远不会触发。 Kafka partitions * s3.part.size
至少堆大小Record
分区的时间戳,您应该将其乘以 max lateness in milliseconds / partition.duration.ms
max lateness in milliseconds
中不断发生延迟事件。 . consumer.max.partition.fetch.bytes
从 Kafka 读取时每个分区的字节数关于amazon-s3 - TimeBasedPartitioner 的 Kafka Connect S3 连接器 OutOfMemory 错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50971065/
我们被要求从 Tomcat 1.6 迁移到 TomEE。在我们的应用程序中,我们使用 tomcat 作为嵌入式服务器。当我们尝试根据以下教程“http://www.copperykeenclaws.c
我需要一些有关配置文件的帮助。我已经在我的项目中包含了MySql.Data.dll,但是我如何告诉配置文件这个dll包含MySQL连接器? NHibernate.Connection.Dr
我花了几个小时盯着这段代码。请新鲜的眼睛! 这是查询的简化版本: You have an error in your SQL syntax; check the manual that corresp
我正在开发一个应用程序,它可以在不使用任何网络服务的情况下将数据插入数据库。我用 MySQL Workbench 在我的笔记本上创建了一个数据库。我可以使用模拟器将数据插入数据库,但我无法使用手机将数
我刚刚发现了 Zimbra,并且有一个用 Java 编写的连接器。我一直在网上寻找其他人的一些文档或经验,但找不到任何东西。是否有任何关于 API 的良好文档,以便我可以开始并检查可以用它做什么? 谢
我正在使用 C++ mysql 连接器在我的 mysql 数据库中执行操作。 我的 C++ 程序是一个实时应用程序(rest api),它始终在云端运行,始终等待用户请求。 当我启动第一种类型的程序时
我有一个 C 进程正在快速写入 mysql 数据库~每秒 10 次。此过程使用 MySql C 连接器。 运行约2分钟后,进程挂起,系统监视器显示 "futex_wait_queue_me" ,还有
有谁知道使用 TraceListener 为 MySQL 连接器/网络启用跟踪的方法我希望它记录实际针对数据库运行的 SQL 查询,即查看参数值被替换的 SQL。 最佳答案 从 mysql 5.1.2
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 要求我们推荐或查找工具、库或最喜欢的场外资源的问题对于 Stack Overflow 来说是偏离主题的,
作为 EF 菜鸟,我正在尝试将 Entity Framework 6 Code First 与我安装在我的开发计算机上的 MySql Server 5.6 结合使用。 我做了一个非常小的测试控制台项目
我需要将 Google Spreadsheet 与 JasperReports Server 一起使用,为此我需要一个用于 Google Spreadsheet 的 JDBC 连接器。 我找到了这个
我已经安装了 Tycho m2e 连接器,如下所述:http://codeandme.blogspot.ru/2012/12/tycho-build-1-building-plug-ins.html
我被要求使用 SAP .NET 连接器。我目前使用 .NET 4.0 和 VS2010。有什么我需要降级的吗? 另外,有没有人知道有关如何使用它的任何当前在线教程?我所拥有的只是来自 SAP 的信息,
我想知道什么是 m2e 连接器。除了这个页面,我在互联网上没有找到太多描述它们的内容: http://objectledge.org/confluence/display/TOOLS/M2E+Conn
是否可以通过 VGA 连接器镜像屏幕?找不到任何关于此的内容。 最佳答案 我一直在寻找和你一样的东西。上周末我写了一个小的 UIApplication 类别来添加镜像支持。我在 Google Code
我正在开发从 SQL Server Db 提取 CDC 数据的逻辑应用程序。我正在使用“获取行”操作,但当我尝试使用过滤查询参数时,问题就出现了。 代码 eq '793'(有效) __$operati
用例: 应用程序使用spark处理数据5分钟,要处理的数据可能是数据存储中数十万条记录的数据。 数据存储的选择是Elastic Search。 问题: 我们在Elasticsearch中是否有用于 S
我已经安装了 hadoop 3 版本的 GCS 连接器,并将以下配置添加到 core-site.xml,如 Install.md 中所述.目的是将数据从本地集群中的 hdfs 迁移到云存储。 核心站点
如何删除 debezium 连接器。我正在关注本教程 https://debezium.io/documentation/reference/tutorial.html我看到了注册连接器的方法,但不知
如何删除 debezium 连接器。我正在关注本教程 https://debezium.io/documentation/reference/tutorial.html我看到了注册连接器的方法,但不知
我是一名优秀的程序员,十分优秀!