gpt4 book ai didi

java - Flink : java. io.NotSerializedException : redis.clients.jedis.JedisCluster

转载 作者:行者123 更新时间:2023-12-02 09:55:11 25 4
gpt4 key购买 nike

当我提交新的 flink 作业时,它会抛出

Caused by: java.io.NotSerializableException: redis.clients.jedis.JedisCluster
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:512)
at org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
... 24 more

这是我的代码:

    JedisCluster jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);


DataStream<MobileClickEvent> clickEventDataStream = environment.addSource(clickConsumer);


clickEventDataStream
.filter(Objects::nonNull)
.keyBy(new KeySelector<MobileClickEvent, String>() {
@Override
public String getKey(MobileClickEvent value) throws Exception {
return value.getItemId() + "_" + value.getItemType();
}
})
.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy("{item_feature}" + key, "click", 1);
jedisCluster.expire("{item_feature}" + key, 60 * 10);
}
});

最佳答案

In OP's answer , jedisCluster 将为每个元素进行初始化。

考虑覆盖open以及并在那里初始化。

Initialization method for the function. It is called before the actual working methods (like map or join) and thus suitable for one time setup work.

.process(new KeyedProcessFunction<String, MobileClickEvent, Object>() {
private JedisCluster jedisCluster;

@Override
public void open(Configuration parameters) {
jedisCluster = JedisClusterBuilder.getInstance(JedisClusterEnum.THIRD);
}

@Override
public void processElement(MobileClickEvent value, Context ctx, Collector<Object> out) throws Exception {
String key = ctx.getCurrentKey();
jedisCluster.hincrBy(REDIS_PREFIX + key, "click", 1);
jedisCluster.expire(REDIS_PREFIX + key, 60 * 10);
}
});

关于java - Flink : java. io.NotSerializedException : redis.clients.jedis.JedisCluster,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56057346/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com