我创建了一个自定义的flink richsink函数,并尝试自动连接 JpaRepository
在这个自定义类,但我不断得到一个 NullPointerException
. 如果我在构造器中自动连接它,我可以看到jparepo已经找到了——但是当调用invoke方法时,我会收到一个 NullPointerException
.
public interface MessageRepo extends JpaRepository<Message, Long> {
}
@Component
public class MessageSink extends RichSinkFunction<Message> {
private final transient MessageRepo messageRepo; //if i don't make this transient, i get the error message "The implementation of the RichSinkFunction is not serializable"
@Autowired
public MessageSink(MessageRepo messageRepo){
this.messageRepo = messageRepo;
messageRepo.save(new Message()); //no issues when i do this
}
@Override
public void invoke(Message message, Context context) {
// the message is not null
messageRepo.save(message); // NPE
}
以前有人遇到过这个问题吗?看起来像是 MessageSink
invoke方法正在一个单独的线程中调用,这就是 messageRepo
总是 null
? 除了我有自己的自定义接收器之外,我的代码的其他部分还可以使用messagerepo。
2条答案
按热度按时间snz8szmq1#
我认为这里的问题是,flink需要在将自定义sink函数分发给其工作人员之前对其进行序列化。
通过标记messagerepo transit,意味着当工作节点反序列化此函数时,该字段将为空。通常,您将在open函数中初始化transit依赖项,该函数将在对象反序列化后调用。
inkz8wg92#
我不清楚原因,但我认为springboot在注入bean时优先考虑服务类。我在尝试为实体类编写侦听器时也遇到过类似的问题。我就是这样解决的。创建一个实现applicationcontextaware接口并重写setapplicationcontext方法的组件类。在类中有一个名为getbean的静态方法,它将在第一个请求时自动连接。样本代码---
然后只需在代码中获取bean---->>classname referencename=(classname)springbeansutil.getbean(classname.class);