gpt4 book ai didi

redis - 如何在flink map()中使用Jedis

转载 作者:可可西里 更新时间:2023-11-01 11:12:53 24 4
gpt4 key购买 nike

我的代码是这样的:

  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(500);

DataStream<String> stream = env.addSource(getConsumer(TOPIC_1));

Jedis jedis = new Jedis("master1");
stream.map(new RichMapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
String result = jedis.hget("rtc", value);
return result;
}
});

我想在 map() 中从 Redis 获取一些数据,但是它无法运行,因为 Jedis.class 是不可序列化的。

如何在map()中使用ZkClient、Jedis等不可序列化的类?

最佳答案

所有 rich 函数(如 RichMapFunction)都有一个 open(Configuration)close 调用,您可以覆盖它们.一旦将函数部署到执行函数的 TaskManager 中,就会调用这些生命周期方法。

class MyMapFunction extends RichMapFunction<String, String> {

private transient Jedis jedis;

@Override
public void open(Configuration parameters) {
// open connection to Redis, for example
jedis = new Jedis("master1");
}

@Override
public void close() {
// close connection to Redis
jedis.close();
}
}

关于redis - 如何在flink map()中使用Jedis,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48151728/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com