gpt4 book ai didi

java - 对 Hbase 连接器 Kafka 使用 Schemaless JSON 转换器

转载 作者:太空宇宙 更新时间:2023-11-04 09:59:17 25 4
gpt4 key购买 nike

我正在使用 kafka 的 hbase 接收器连接器 ( https://github.com/mravi/kafka-connect-hbase )。因此,我尝试在事件解析器类中使用其 JsonConverter 来实现此连接器,如下所示。

{
"name": "test-hbase",
"config": {
"connector.class": "io.svectors.hbase.sink.HBaseSinkConnector",
"tasks.max": "1",
"topics": "hbase_connect",
"zookeeper.quorum": "xxxxx.xxxx.xx.xx,xxxxx.xxxx.xx.xx,xxxxx.xxxx.xx.xx",
"event.parser.class": "io.svectors.hbase.parser.JsonEventParser",
"hbase.hbase_connect.rowkey.columns": "id",
"hbase.hbase_connect.family": "col1",
}
}

这是我运行的 kafka connect 分布式属性:

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

问题是当我尝试生成没有架构的 JSON 消息时,连接器抛出了下面的空指针异常:

 [2018-12-10 16:45:06,607] ERROR WorkerSinkTask{id=hbase_connect-0}
Task threw an uncaught and unrecoverable exception.
Task is being killed and will not recover until manually restarted.
(org.apache.kafka.connect.runtime.WorkerSinkTask:515)
java.lang.NullPointerException
at io.svectors.hbase.util.ToPutFunction.apply(ToPutFunction.java:78)
at io.svectors.hbase.sink.HBaseSinkTask.lambda$4(HBaseSinkTask.java:105)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at io.svectors.hbase.sink.HBaseSinkTask.lambda$3(HBaseSinkTask.java:105)
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1321)
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169)
at java.util.HashMap$EntrySpliterator.forEachRemaining(HashMap.java:1696)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at io.svectors.hbase.sink.HBaseSinkTask.put(HBaseSinkTask.java:104)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:495)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748) [2018-12-10 16:45:06,607] ERROR WorkerSinkTask{id=hbase_connect-0}
Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:517)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:288)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

这是我使用的消息:

{"id": "9","name": "wis"}

对于这个错误有什么建议吗?

最佳答案

这是 schema less json 连接器的问题。

它已在此处的版本中修复:https://github.com/nishutayal/kafka-connect-hbase/issues/5

关于java - 对 Hbase 连接器 Kafka 使用 Schemaless JSON 转换器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53703370/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com