gpt4 book ai didi

java - KafkaUtils.createDirectStream 未采用正确的参数 - Spark Streaming + Kafka

转载 作者:行者123 更新时间:2023-11-30 06:20:46 26 4
gpt4 key购买 nike

我有一个应用程序,它将序列化的 Twitter 数据发送到 Kafka 主题。到目前为止一切都很好。

消费者应用程序应该读取数据并将其反序列化。现在,当我调用 KafkaUtils.createDirectStream 时,我认为我输入了正确的参数(正如您将在抛出的错误中看到的那样),所以我无法理解为什么它不起作用。

The method createDirectStream(JavaStreamingContext, Class -K-, Class -V-, Class -KD-, Class -VD-, Map -String,String-, Set -String-) in the type KafkaUtils is not applicable for the arguments (JavaStreamingContext, Class-String-, Class-Status-, Class -StringDeserializer-, Class -StatusDeserializer-, Map-String,String-, Set-String-)

检查 Spark Javadoc ,我的参数对我来说仍然是正确的。

我的代码是:

Set<String> topics = new HashSet<>();
topics.add("twitter-test");
JavaStreamingContext jssc = new JavaStreamingContext(jsc, new Duration(duration));
Map<String, String> props = new HashMap<>();
//some properties...
JavaPairInputDStream messages = KafkaUtils.createDirectStream(jssc, String.class, Status.class, org.apache.kafka.common.serialization.StringDeserializer.class, stream_data.StatusDeserializer.class, props, topics);

状态序列化器代码:

public class StatusSerializer implements Serializer<Status> {

@Override public byte[] serialize(String s, Status o) {

try {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
oos.writeObject(o);
oos.close();
byte[] b = baos.toByteArray();
return b;
} catch (IOException e) {
return new byte[0];
}
}

@Override public void close() {

}

@Override
public void configure(Map<String, ?> configs, boolean isKey) {


}

}

最佳答案

看起来问题出在“stream_data.StatusDeserializer.class”。您能请一下这个自定义反序列化器类的代码吗?另外,你能看看这个Kafka Consumer for Spark written in Scala for Kafka API 0.10: custom AVRO deserializer .

在 KafkaParam 参数中包含以下内容。

key.deserializer -> classOf[StringDeserializer]
value.deserializer -> classOf[StatusDeserializer]

关于java - KafkaUtils.createDirectStream 未采用正确的参数 - Spark Streaming + Kafka,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48205828/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com