Spring Boot 来自React性AMQP后台线程的Sping Boot JPA交互

qni6mghb  于 2022-11-05  发布在  Spring
关注(0)|答案(2)|浏览(188)

我们有一个定期的,阻挡 Spring Boot (2.5.4)服务REST接口的应用程序。我们使用Hibernate envers进行审核日志记录。此服务通过来自Azure SDK的ServiceBusSenderAsyncClient以异步方式在我们的服务总线上发送消息。现在,此服务需要从上述服务总线接收消息并将更新写入数据库-也是异步的。为此,我们使用带有React流的ServiceBusReceiverAsyncClient。(我将整个数据库交互封装在一个Transactional函数中),但由于beforeTransactionCompletion callbacknull,我遇到了envers问题(当我停用envers时,它可以工作)。这个问题与我的理解无关,它实际上是围绕从后台线程访问JPA存储库的。

服务总线接收控制器

@Component
public class SubscriptionReceiveController {
  @Autowired
  private MyService myService;

  @Autowired
  private ServiceBusReceiverAsyncClient receiver;

  @Autowired
  private ObjectMapper mapper;

  @PostConstruct
  public void run() {
    // continuously receive messages in the background
    receiver.receiveMessages().flatMap(message -> {
      MyObject object;
      try {
        object = mapper.readValue(message.getBody().toString(), MyClass.class);
      } catch (JsonProcessingException e) {
        throw new JsonProcessingRuntimeException("Failed to convert message to object", e);
      }

      // wrap the blocking JPA interaction
      Mono<MyObject> blockingWrapper = Mono.fromCallable(() -> {
        return myService.updateMyObject(object.getId());
      });

      // When the service returns, this is where it crashes
      return blockingWrapper.subscribeOn(Schedulers.boundedElastic());
    }).flatMap(response -> {
      // More processing steps
    }).subscribe();
  }
}

我的服务

@Service
public class MyService {

  @Autowired
  private MyRepository myRepository;

  @Transactional
  public MyObject updateMyObject(UUID objectId) {
    var object = myRepository.getById(objectId);
    object.setSomeProperty("myPropertyValue");

    return myRepository.save(object);
  }
}

导致以下错误消息

org.springframework.orm.jpa.JpaSystemException: Unable to perform beforeTransactionCompletion callback: null; nested exception is org.hibernate.HibernateException: Unable to perform beforeTransactionCompletion callback: null
Caused by: org.hibernate.HibernateException: Unable to perform beforeTransactionCompletion callback: null
        at org.hibernate.engine.spi.ActionQueue$BeforeTransactionCompletionProcessQueue.beforeTransactionCompletion(ActionQueue.java:960) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.engine.spi.ActionQueue.beforeTransactionCompletion(ActionQueue.java:525) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2381) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:448) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:562) ~[spring-orm-5.3.9.jar:5.3.9]
        ... 20 common frames omitted

...

Caused by: java.lang.NullPointerException: null
        at com.my.package.revision.AuditRevisionListener.newRevision(AuditRevisionListener.java:10) ~[classes/:na]
        at org.hibernate.envers.internal.revisioninfo.DefaultRevisionInfoGenerator.generate(DefaultRevisionInfoGenerator.java:88) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcess.getCurrentRevisionData(AuditProcess.java:133) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcess.executeInSession(AuditProcess.java:115) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcess.doBeforeTransactionCompletion(AuditProcess.java:174) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.envers.internal.synchronization.AuditProcessManager$1.doBeforeTransactionCompletion(AuditProcessManager.java:47) ~[hibernate-envers-5.4.28.Final.jar:5.4.28.Final]
        at org.hibernate.engine.spi.ActionQueue$BeforeTransactionCompletionProcessQueue.beforeTransactionCompletion(ActionQueue.java:954) ~[hibernate-core-5.4.32.Final.jar:5.4.32.Final]
        ... 28 common frames omitted

我在这里遗漏了什么,为什么事务没有在服务实现中完全完成?有没有更好的方法/模式让后台线程通过JPA写入数据库?
谢谢

ac1kyiln

ac1kyiln1#

空指针异常如下:com.my.package.revision.AuditRevisionListener.newRevision(AuditRevisionListener.java:10)与交易无关?

ymdaylpp

ymdaylpp2#

我在RevisionListener上也遇到了同样的问题。看起来Spring框架和Hibernate Envers之间缺乏通信,如本期所述:https://github.com/spring-projects/spring-data-envers/issues/249
即使在用@Service注解了我的CustomRevisionListener之后,spring也找不到这个bean并创建了它的示例。
我已经尝试了这里列出的所有解决方案:How to inject spring beans into the hibernate envers RevisionListener
但是只有使用SecurityContextHolder的解决方案(解决方案3)对我有效。我只需要检查表示登录用户的实体是否具有Id属性,并使用从数据库检索到的用户id填充它。
下面是我的代码:

public class MyRevisionListener implements RevisionListener {

   @Override
   public void newRevision(Object entity) {
      MyRevisionEntity revisionEntity = (MyRevisionEntity) entity;

      Long loggedUserId = Optional.ofNullable(SecurityContextHolder.getContext())
            .map(SecurityContext::getAuthentication)
            .filter(Authentication::isAuthenticated)
            .map(Authentication::getPrincipal)
            .map(JwtUser.class::cast)
            .map(JwtUser::getId)
            .orElse(null);

      revisionEntity.setUserId(loggedUserId);
      revisionEntity.setDate(new Date());    
   }
}

相关问题