gpt4 book ai didi

amazon-s3 - kafka s3 接收器连接器在获取 NULL 数据时崩溃

转载 作者:行者123 更新时间:2023-12-04 02:35:42 24 4
gpt4 key购买 nike

我有一个工作 s3 接收器连接器,直到源连接器发送一个 NULL 值; s3 连接器崩溃。当我从 MS SQL db 中删除一条记录时出现问题。源连接器将删除信息发送到 s3 连接器并且 s3 连接器崩溃。我删除并重新创建了一个不同名称的 s3 连接器,没有任何改变。

    org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2020-05-24 10:10:50,577 WARN WorkerSinkTask{id=minio-connector1-0} Ignoring invalid task provided offset filesql1.dbo.Files-0/OffsetAndMetadata{offset=16, leaderEpoch=null, metadata=''} -- not yet consumed, taskOffset=16 currentOffset=0 (org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-minio-connector1-0]
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:568)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:326)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:228)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:196)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:184)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:234)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: Null valued records are not writeable with current behavior.on.null.values 'settings.
at io.confluent.connect.s3.format.avro.AvroRecordWriterProvider$1.write(AvroRecordWriterProvider.java:91)
at io.confluent.connect.s3.TopicPartitionWriter.writeRecord(TopicPartitionWriter.java:502)
at io.confluent.connect.s3.TopicPartitionWriter.checkRotationOrAppend(TopicPartitionWriter.java:275)
at io.confluent.connect.s3.TopicPartitionWriter.executeState(TopicPartitionWriter.java:220)
at io.confluent.connect.s3.TopicPartitionWriter.write(TopicPartitionWriter.java:189)
at io.confluent.connect.s3.S3SinkTask.put(S3SinkTask.java:190)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:546)
... 10 more
2020-05-24 10:10:50,577 ERROR WorkerSinkTask{id=minio-connector1-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask) [task-thread-minio-connector1-0]

...这是我的 s3 连接器配置:
    apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: "minio-connector1"
labels:
strimzi.io/cluster: mssql-minio-connect-cluster
spec:
class: io.confluent.connect.s3.S3SinkConnector
config:
storage.class: io.confluent.connect.s3.storage.S3Storage
partitioner.class: io.confluent.connect.storage.partitioner.DefaultPartitioner
tasks.max: '1'
topics: filesql1.dbo.Files
s3.bucket.name: dosyalar
s3.part.size: '5242880'
flush.size: '2'
format: binary
schema.compatibility: NONE
max.request.size: "536870912"
store.url: http://minio.dev-kik.io
format.class: io.confluent.connect.s3.format.avro.AvroFormat
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: http://schema-registry-cp-schema-registry:8081
internal.key.converter: org.apache.kafka.connect.json.JsonConverter
internal.value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
schema.generator.class: io.confluent.connect.storage.hive.schema.DefaultSchemaGenerator

我有两个问题:

1)
如何让 s3 连接器再次运行?

2)
不能期望不从源数据库中删除记录。如何防止 s3 连接器再次崩溃?

最佳答案

请查看连接器文档并查找 behavior.on.null.values 。您可以将其设置为 ignore

关于amazon-s3 - kafka s3 接收器连接器在获取 NULL 数据时崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61984664/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com