gpt4 book ai didi

java - 如何在 Apache Beam 中使用流输入 PCollection 请求 Redis 服务器?

转载 作者:行者123 更新时间:2023-12-01 18:50:53 26 4
gpt4 key购买 nike

使用RedisIO ,我正在尝试向 Redis 服务器查询集合。

仅当批处理管道(无流式传输)时,Redis 服务器才正常且响应良好。

但是,使用流输入数据(来自文件),如下所示:

  PCollection<String> stream = pipeline.apply("ReadMyFile", TextIO.read().from("/home/out/**")
.watchForNewFiles(Duration.standardSeconds(60), Watch.Growth.<String>never()))
.apply("ParseFn", ParDo.of(new ParseFn()))
.apply("GlobalString", GlobalString.get(Duration.ZERO, Duration.standardSeconds(60)));

然后,应用redisIO read()函数:

 PCollection<KV<String, String>> redis = stream.apply(RedisIO.readAll().withEndpoint("127.0.0.1", 6379));

最后想使用结果集合,所以:

 PCollection<String> result = redis.apply("Compose Final Object", ParDo.of(new DoFn<KV<String, String>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element().getKey());
c.output(c.element().getKey());
}
}));

据我测试,文件正在根据需要加载和处理。

最佳答案

对于进一步的开发,这是一个错误:

https://github.com/apache/beam/pull/10624/commits

https://issues.apache.org/jira/browse/BEAM-9134

很快就会修复。

关于java - 如何在 Apache Beam 中使用流输入 PCollection 请求 Redis 服务器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59739788/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com