jparepository在自定义函数中不自动连接

cgfeq70w  于 2021-06-21  发布在  Flink
关注(0)|答案(2)|浏览(365)

我创建了一个自定义的flink richsink函数,并尝试自动连接 JpaRepository 在这个自定义类,但我不断得到一个 NullPointerException . 如果我在构造器中自动连接它,我可以看到jparepo已经找到了——但是当调用invoke方法时,我会收到一个 NullPointerException .

public interface MessageRepo extends JpaRepository<Message, Long> {
}

@Component
public class MessageSink extends RichSinkFunction<Message> {

    private final transient MessageRepo messageRepo; //if i don't make this transient, i get the error message "The implementation of the RichSinkFunction is not serializable"

    @Autowired
    public MessageSink(MessageRepo messageRepo){
        this.messageRepo = messageRepo;
        messageRepo.save(new Message()); //no issues when i do this
    }

    @Override
    public void invoke(Message message, Context context) {
         // the message is not null
         messageRepo.save(message); // NPE
    }

以前有人遇到过这个问题吗?看起来像是 MessageSink invoke方法正在一个单独的线程中调用,这就是 messageRepo 总是 null ? 除了我有自己的自定义接收器之外,我的代码的其他部分还可以使用messagerepo。

snz8szmq

snz8szmq1#

我认为这里的问题是,flink需要在将自定义sink函数分发给其工作人员之前对其进行序列化。
通过标记messagerepo transit,意味着当工作节点反序列化此函数时,该字段将为空。通常,您将在open函数中初始化transit依赖项,该函数将在对象反序列化后调用。

inkz8wg9

inkz8wg92#

我不清楚原因,但我认为springboot在注入bean时优先考虑服务类。我在尝试为实体类编写侦听器时也遇到过类似的问题。我就是这样解决的。创建一个实现applicationcontextaware接口并重写setapplicationcontext方法的组件类。在类中有一个名为getbean的静态方法,它将在第一个请求时自动连接。样本代码---

@Component
public class SpringBeansUtil implements ApplicationContextAware {
private static ApplicationContext context;

    @SuppressWarnings("static-access")
    @Override
    public void setApplicationContext(ApplicationContext applicationContext) 
    throws BeansException {
    this.context = applicationContext;
}

    public static <T> T getBean(Class<T> beanClass) {
        return context.getBean(beanClass);
    }
}

然后只需在代码中获取bean---->>classname referencename=(classname)springbeansutil.getbean(classname.class);

相关问题