gpt4 book ai didi

java - JpaRepository 不在自定义 RichSinkFunction 中 Autowiring

转载 作者:搜寻专家 更新时间:2023-11-01 03:15:55 25 4
gpt4 key购买 nike

我已经创建了一个自定义的 Flink RichSinkFunction 并尝试在这个自定义类中 Autowiring 一个 JpaRepository,但我不断收到一个 NullPointerException。如果我在构造函数中 Autowiring 它,我可以看到已找到 JpaRepo - 但是当调用 invoke 方法时,我收到一个 NullPointerException

public interface MessageRepo extends JpaRepository<Message, Long> {
}


@Component
public class MessageSink extends RichSinkFunction<Message> {

private final transient MessageRepo messageRepo; //if i don't make this transient, i get the error message "The implementation of the RichSinkFunction is not serializable"

@Autowired
public MessageSink(MessageRepo messageRepo){
this.messageRepo = messageRepo;
messageRepo.save(new Message()); //no issues when i do this
}

@Override
public void invoke(Message message, Context context) {
// the message is not null
messageRepo.save(message); // NPE
}

有没有人遇到过这个问题?看起来 MessageSink 调用方法是在一个单独的线程中调用的,这就是为什么 messageRepo 总是 null 的原因?我的代码的其他部分可以使用 MessageRepo,除非我有自己的自定义接收器。

最佳答案

我认为这里的问题是 flink 需要在将自定义接收器功能分发给其工作人员之前对其进行序列化。

通过标记MessageRepo中转,意味着当worker节点反序列化该函数时,该字段将为空。通常,您会在 open 函数中初始化传输依赖,该函数将在对象反序列化后调用。

关于java - JpaRepository 不在自定义 RichSinkFunction 中 Autowiring ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52750376/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com