gpt4 book ai didi

java - Samza/Kafka 更新元数据失败

转载 作者:塔克拉玛干 更新时间:2023-11-03 03:57:46 25 4
gpt4 key购买 nike

我目前正在编写一个 Samza 脚本,它只会从 Kafka 主题获取数据并将数据输出到另一个 Kafka 主题。我写了一个非常基本的 StreamTask 但是在执行时我遇到了错误。

错误如下:

Exception in thread "main" org.apache.samza.SamzaException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms.
at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.send(CoordinatorStreamSystemProducer.java:112)
at org.apache.samza.coordinator.stream.CoordinatorStreamSystemProducer.writeConfig(CoordinatorStreamSystemProducer.java:129)
at org.apache.samza.job.JobRunner.run(JobRunner.scala:79)
at org.apache.samza.job.JobRunner$.main(JobRunner.scala:48)
at org.apache.samza.job.JobRunner.main(JobRunner.scala)
Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 193 ms

我不完全确定如何配置或让脚本写入所需的 Kafka 元数据。下面是我的 StreamTask 代码和属性文件。在属性文件中,我添加了元数据部分以查看这是否有助于之后的过程,但无济于事。这是正确的方向还是我完全错过了什么?

import org.apache.samza.task.StreamTask;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.TaskCoordinator;
import org.apache.samza.system.SystemStream;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;

/*
* Take all messages received and send them to
* a Kafka topic called "words"
*/

public class TestStreamTask implements StreamTask{

private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka" , "words"); // create new system stream for kafka topic "words"

@Override
public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator){

String message = (String) envelope.getMessage(); // pull message from stream

for(String word : message.split(" "))
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1)); // output messsage to new system stream for kafka topic "words"
}
}

# Job
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=test-words

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Task
task.class=samza.examples.wikipedia.task.TestStreamTask
task.inputs=kafka.test
task.checkpoint.factory=org.apache.samza.checkpoint.kafka.KafkaCheckpointManagerFactory
task.checkpoint.system=kafka
task.checkpoint.replication.factor=1

# Metrics
metrics.reporters=snapshot,jmx
metrics.reporter.snapshot.class=org.apache.samza.metrics.reporter.MetricsSnapshotReporterFactory
metrics.reporter.snapshot.stream=kafka.metrics
metrics.reporter.jmx.class=org.apache.samza.metrics.reporter.JmxReporterFactory

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.metrics.class=org.apache.samza.serializers.MetricsSnapshotSerdeFactory

# Systems
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=string
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.consumer.auto.offset.reset=largest
systems.kafka.producer.bootstrap.servers=localhost:9092

# Metadata
systems.kafka.metadata.bootstrap.servers=localhost:9092

最佳答案

这个问题是关于 Kafka 0.8 的,如果我没记错的话应该是不支持的。

这个事实,再加上人们只是偶尔遇到这个问题,但并非总是如此(近年来似乎没有人为此苦苦挣扎),让我非常有信心升级到更新版本的Kafka 会解决这个问题。

关于java - Samza/Kafka 更新元数据失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30653213/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com