gpt4 book ai didi

java - 使用 Postgres 实现 Spring + Apache Flink 项目

转载 作者:行者123 更新时间:2023-12-01 16:31:46 24 4
gpt4 key购买 nike

我有一个 SpringBoot gradle 项目,使用 apache flink 来处理数据流信号。当新信号通过数据流时,我想使用已创建的 postgres 数据库表中的 ID 查询查找(即 findById() )它的详细信息,以便获取有关信号的附加信息并丰富数据。我想避免使用 spring 依赖项来执行查找(即 Autowire 存储库),并希望坚持使用 flink 实现来进行查找。

我可以在哪里指定如何添加 postgres 连接配置信息,例如端口、数据库、url、用户名、密码等...(为简单起见,可以假设 postgres 数据库位于我的计算机本地)。就像将配置添加到 application.properties 文件一样简单吗?如果是这样,当按非主键值搜索时,我该如何编写查询方法来查找postgres表中的记录?

一些在线资源建议使用此框架代码,但我不确定它如何/id 适合我的用例。 (我创建了一个 EventEntity 模型,其中包含我正在查找的表中的所有参数/列)。

像这样

    public class DatabaseMapper extends RichFlatMapFunction<String, EventEntity> {

// Declare DB connection & query statements

public void open(Configuration parameters) throws Exception {
//Initialize DB connection
//prepare query statements
}

@Override
public void flatMap(String value, Collector<EventEntity> out) throws Exception {

}
}

最佳答案

您的示例代码是正确的。您可以在 open() 方法中设置 PostgreSQL 的所有自定义初始化和准备代码。然后,您可以在 flatMap() 函数中使用预先配置的字段。

这是 Redis 操作的一个示例

  • 我在这里使用了 RichAsyncFunction,我建议您按照最佳实践建议的方式进行操作。阅读此处了解更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/operators/asyncio.html )
  • 您可以在构造函数方法中传递配置参数并在初始化过程中使用它

        public static class AsyncRedisOperations extends RichAsyncFunction<Object,Object> {

    private JedisPool jedisPool;
    private Configuration redisConf;

    public AsyncRedisOperations(Configuration redisConf) {
    this.redisConf = redisConf;
    }

    @Override
    public void open(Configuration parameters) {

    JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
    jedisPoolConfig.setMaxTotal(this.redisConf.getInteger("pool", 8));
    jedisPoolConfig.setMaxIdle(this.redisConf.getInteger("pool", 8));
    jedisPoolConfig.setMaxWaitMillis(this.redisConf.getInteger("maxWait", 0));

    JedisPool jedisPool = new JedisPool(jedisPoolConfig,
    this.redisConf.getString("host", "192.168.10.10"),
    this.redisConf.getInteger("port", 6379), 5000);

    try {
    this.jedisPool = jedisPool;
    this.logger.info("Redis connected: " + jedisPool.getResource().isConnected());
    } catch (Exception e) {
    this.logger.error(BaseUtil.append("Exception while connecting Redis"));
    }

    }

    @Override
    public void asyncInvoke(Object in, ResultFuture<Object> out) {

    try (Jedis jedis = this.jedisPool.getResource()) {
    String key = jedis.get(key);
    this.logger.info("Redis Key: " + key);
    }

    }
    }

关于java - 使用 Postgres 实现 Spring + Apache Flink 项目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62027404/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com