gpt4 book ai didi

java - kafka 流异常找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共(public)无参数构造函数

转载 作者:行者123 更新时间:2023-12-05 03:05:22 27 4
gpt4 key购买 nike

在使用 kafka 流时得到以下错误堆栈跟踪

更新:根据@matthias-j-sax,已经使用WrapperSerde 的默认构造函数实现了我自己的Serdes,但仍然出现以下异常

org.apache.kafka.streams.errors.StreamsException: stream-thread [streams-request-count-4c239508-6abe-4901-bd56-d53987494770-StreamThread-1] Failed to rebalance.
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:836)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class myapps.serializer.Serdes$WrapperSerde
at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:972)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)
Caused by: java.lang.NullPointerException
at myapps.serializer.Serdes$WrapperSerde.configure (Serdes.java:30)
at org.apache.kafka.streams.StreamsConfig.defaultValueSerde (StreamsConfig.java:968)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.<init> (AbstractProcessorContext.java:59)
at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.<init> (ProcessorContextImpl.java:42)
at org.apache.kafka.streams.processor.internals.StreamTask.<init> (StreamTask.java:136)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:405)
at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask (StreamThread.java:369)
at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks (StreamThread.java:354)
at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks (TaskManager.java:148)
at org.apache.kafka.streams.processor.internals.TaskManager.createTasks (TaskManager.java:107)
at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned (StreamThread.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete (ConsumerCoordinator.java:259)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded (AbstractCoordinator.java:367)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup (AbstractCoordinator.java:316)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll (ConsumerCoordinator.java:290)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce (KafkaConsumer.java:1149)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll (KafkaConsumer.java:1115)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests (StreamThread.java:827)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce (StreamThread.java:784)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop (StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run (StreamThread.java:720)

这是我的用例:

我将获得 json 响应作为流的输入,我想计算状态代码不是 200 的请求。最初,我浏览了官方文档中的 kafka 流的文档以及 confluent,然后实现了 WordCountDemo 工作得很好,然后我尝试编写这段代码,但是遇到了这个异常,我对 kafka 流很陌生,我浏览了堆栈跟踪,但无法理解上下文,因此来到这里寻求帮助!!!

这是我的代码

LogCount.java

package myapps;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import myapps.serializer.JsonDeserializer;
import myapps.serializer.JsonSerializer;
import myapps.Request;


public class LogCount {

public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-request-count");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
JsonSerializer<Request> requestJsonSerializer = new JsonSerializer<>();
JsonDeserializer<Request> requestJsonDeserializer = new JsonDeserializer<>(Request.class);
Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());
final StreamsBuilder builder = new StreamsBuilder();

KStream<String, Request> source = builder.stream("streams-requests-input");
source.filter((k, v) -> v.getHttpStatusCode() != 200)
.groupByKey()
.count()
.toStream()
.to("streams-requests-output", Produced.with(Serdes.String(), Serdes.Long()));
final Topology topology = builder.build();
final KafkaStreams streams = new KafkaStreams(topology, props);
final CountDownLatch latch = new CountDownLatch(1);

System.out.println(topology.describe());
// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
@Override
public void run() {
streams.close();
latch.countDown();
}
});

try {
streams.cleanUp();
streams.start();
latch.await();
} catch (Throwable e) {
System.exit(1);
}
System.exit(0);
}
}

JsonDeserializer.java

package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import java.util.Map;

public class JsonDeserializer<T> implements Deserializer<T> {

private Gson gson = new Gson();
private Class<T> deserializedClass;

public JsonDeserializer(Class<T> deserializedClass) {
this.deserializedClass = deserializedClass;
}

public JsonDeserializer() {
}

@Override
@SuppressWarnings("unchecked")
public void configure(Map<String, ?> map, boolean b) {
if(deserializedClass == null) {
deserializedClass = (Class<T>) map.get("serializedClass");
}
}

@Override
public T deserialize(String s, byte[] bytes) {
if(bytes == null){
return null;
}

return gson.fromJson(new String(bytes),deserializedClass);

}

@Override
public void close() {

}
}

JsonSerializer.java

package myapps.serializer;

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Serializer;

import java.nio.charset.Charset;
import java.util.Map;

public class JsonSerializer<T> implements Serializer<T> {

private Gson gson = new Gson();

@Override
public void configure(Map<String, ?> map, boolean b) {

}

@Override
public byte[] serialize(String topic, T t) {
return gson.toJson(t).getBytes(Charset.forName("UTF-8"));
}

@Override
public void close() {

}
}

正如我提到的,我将获取 JSON 作为输入,结构是这样的,

{
"请求ID":"1f6b2409", “协议(protocol)”:“http”, "主机":"abc.com", “方法”:“获取”, "HTTPStatusCode":"200", "用户代理":"curl%2f7.54.0",

对应的Request.java文件如下所示

package myapps;

public final class Request {
private String requestID;
private String protocol;
private String host;
private String method;
private int httpStatusCode;
private String userAgent;

public String getRequestID() {
return requestID;
}
public void setRequestID(String requestID) {
this.requestID = requestID;
}
public String getProtocol() {
return protocol;
}
public void setProtocol(String protocol) {
this.protocol = protocol;
}
public String getHost() {
return host;
}
public void setHost(String host) {
this.host = host;
}
public String getMethod() {
return method;
}
public void setMethod(String method) {
this.method = method;
}
public int getHttpStatusCode() {
return httpStatusCode;
}
public void setHttpStatusCode(int httpStatusCode) {
this.httpStatusCode = httpStatusCode;
}
public String getUserAgent() {
return userAgent;
}
public void setUserAgent(String userAgent) {
this.userAgent = userAgent;
}
}

编辑:当我退出 kafka-console-consumer.sh 时,它说 Processed total of 0 messages

最佳答案

如错误所示,类缺少 Serdes$WrapperSerde 的非参数默认构造函数:

Could not find a public no-argument constructor 

问题是这个结构:

Serde<Request> requestSerde = Serdes.serdeFrom(requestJsonSerializer, requestJsonDeserializer);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, requestSerde.getClass().getName());

Serdes.serdeFrom 返回没有空默认构造函数的 WrapperSerde。因此,您不能将其传递到 StreamsConfig。仅当您将对象传递到相应的 API 调用时,您才可以像这样使用 Serdes 生成(即,覆盖某些运算符的默认 Serde)。

要使其工作(即,能够在配置中设置 Serde),您需要实现一个实现 Serde 接口(interface)的适当类。

关于java - kafka 流异常找不到 org.apache.kafka.common.serialization.Serdes$WrapperSerde 的公共(public)无参数构造函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51063041/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com