Spring Integration JDBC分布式锁:多线程和事务

nfs0ujit  于 12个月前  发布在  Spring
关注(0)|答案(1)|浏览(122)

重现问题:

docker run -e POSTGRES_USER=localtest -e POSTGRES_PASSWORD=localtest -e POSTGRES_DB=orders -p 5432:5432 -d postgres:15.3

字符串

然后运行以下代码:

https://github.com/cuipengfei/Spikes/tree/master/jpa/lock-transaction-threads

@Controller
public class TestLockController {
    private Logger logger = LoggerFactory.getLogger(TestLockController.class);

    @Autowired
    private TestLockService service;

    @GetMapping("test-lock")
    public ResponseEntity<String> testLock() {
        logger.info("start");

        Arrays.asList("a", "b", "c").parallelStream().forEach(key -> {
            logger.info("going to call test lock method with key: {}", key);
            service.testLock(key);
        });

        logger.info("end");

        return ResponseEntity.ok("done");
    }
}


当调用/test-lock API时,它将使用并行流启动新线程来运行testLock方法3次。

@Service
public class TestLockService {
    private Logger logger = LoggerFactory.getLogger(TestLockService.class);

    @Autowired
    private JdbcLockRegistry jdbcLockRegistry;

    @Transactional
    public void testLock(String key) {
        logger.info("{} start with transaction {}", key,
                TransactionAspectSupport.currentTransactionStatus().hashCode());

        boolean isLocked = false;
        Lock lock = jdbcLockRegistry.obtain(key);
        try {
            pretendToDoWork(key);
            // ↑ comment this out to see errors

            isLocked = lock.tryLock(10, TimeUnit.SECONDS);
            if (isLocked) {
                logger.info("{} lock ok", key);
            } else {
                logger.info("{} lock failed", key);
            }
        } catch (Throwable t) {
            logger.error("{} lock failed with exception: {}", key, t.getMessage());
        } finally {
            if (isLocked) {
                logger.info("{} unlock", key);
                lock.unlock();
            } else {
                logger.info("{} skip unlock since it was never locked", key);
            }
        }

        logger.info("{} end with transaction {}", key,
                TransactionAspectSupport.currentTransactionStatus().hashCode());
    }

    private void pretendToDoWork(String key) throws InterruptedException {
        Random random = new Random();
        int randomNumber = random.nextInt(3000);
        logger.info("{} pretend to work for {} before try to lock", key, randomNumber);
        Thread.sleep(randomNumber);
    }
}


然后在服务的testLock方法中,它将尝试获取锁。
当我触发这个API时,它会打印以下日志:

2023-12-22 21:49:26.188  INFO 20180 --- [nio-8080-exec-1] c.g.s.e.controllers.TestLockController   : start
2023-12-22 21:49:26.189  INFO 20180 --- [nio-8080-exec-1] c.g.s.e.controllers.TestLockController   : going to call test lock method with key: b
2023-12-22 21:49:26.189  INFO 20180 --- [nPool-worker-37] c.g.s.e.controllers.TestLockController   : going to call test lock method with key: c
2023-12-22 21:49:26.190  INFO 20180 --- [nPool-worker-51] c.g.s.e.controllers.TestLockController   : going to call test lock method with key: a
2023-12-22 21:49:26.208  INFO 20180 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a start with transaction 1915798295
2023-12-22 21:49:26.208  INFO 20180 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b start with transaction 353593802
2023-12-22 21:49:26.208  INFO 20180 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c start with transaction 1079367566
2023-12-22 21:49:26.209  INFO 20180 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a pretend to work for 2609 before try to lock
2023-12-22 21:49:26.209  INFO 20180 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b pretend to work for 2217 before try to lock
2023-12-22 21:49:26.209  INFO 20180 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c pretend to work for 918 before try to lock
2023-12-22 21:49:27.174  INFO 20180 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c lock ok
2023-12-22 21:49:27.174  INFO 20180 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c unlock
2023-12-22 21:49:27.182  INFO 20180 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c end with transaction 1079367566
2023-12-22 21:49:28.444  INFO 20180 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b lock ok
2023-12-22 21:49:28.444  INFO 20180 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b unlock
2023-12-22 21:49:28.449  INFO 20180 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b end with transaction 353593802
2023-12-22 21:49:28.834  INFO 20180 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a lock ok
2023-12-22 21:49:28.834  INFO 20180 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a unlock
2023-12-22 21:49:28.838  INFO 20180 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a end with transaction 1915798295
2023-12-22 21:49:28.838  INFO 20180 --- [nio-8080-exec-1] c.g.s.e.controllers.TestLockController   : end


如果我把这一行注解掉:

pretendToDoWork(key);
// ↑ Comment this out to see errors!


然后再次运行它,它将打印以下日志:

2023-12-22 21:36:30.192  INFO 39588 --- [nio-8080-exec-1] c.g.s.e.controllers.TestLockController   : start
2023-12-22 21:36:30.192  INFO 39588 --- [nio-8080-exec-1] c.g.s.e.controllers.TestLockController   : going to call test lock method with key: b
2023-12-22 21:36:30.193  INFO 39588 --- [nPool-worker-51] c.g.s.e.controllers.TestLockController   : going to call test lock method with key: a
2023-12-22 21:36:30.193  INFO 39588 --- [nPool-worker-37] c.g.s.e.controllers.TestLockController   : going to call test lock method with key: c
2023-12-22 21:36:30.210  INFO 39588 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c start with transaction 344273392
2023-12-22 21:36:30.211  INFO 39588 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b start with transaction 2109875677
2023-12-22 21:36:30.210  INFO 39588 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a start with transaction 1083949093
2023-12-22 21:36:30.241  INFO 39588 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b lock ok
2023-12-22 21:36:30.241  INFO 39588 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b unlock
2023-12-22 21:36:30.247  INFO 39588 --- [nio-8080-exec-1] c.g.s.example.service.TestLockService    : b end with transaction 2109875677
2023-12-22 21:36:30.247 ERROR 39588 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a lock failed with exception: Failed to lock mutex at 0cc175b9-c0f1-36a8-b1c3-99e269772661; nested exception is org.springframework.orm.jpa.JpaSystemException: Unable to commit against JDBC Connection; nested exception is org.hibernate.TransactionException: Unable to commit against JDBC Connection
2023-12-22 21:36:30.247 ERROR 39588 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c lock failed with exception: Failed to lock mutex at 4a8a08f0-9d37-3737-9564-9038408b5f33; nested exception is org.springframework.orm.jpa.JpaSystemException: Unable to commit against JDBC Connection; nested exception is org.hibernate.TransactionException: Unable to commit against JDBC Connection
2023-12-22 21:36:30.247  INFO 39588 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a skip unlock since it was never locked
2023-12-22 21:36:30.247  INFO 39588 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c skip unlock since it was never locked
2023-12-22 21:36:30.247  INFO 39588 --- [nPool-worker-51] c.g.s.example.service.TestLockService    : a end with transaction 1083949093
2023-12-22 21:36:30.247  INFO 39588 --- [nPool-worker-37] c.g.s.example.service.TestLockService    : c end with transaction 344273392
2023-12-22 21:36:30.247  INFO 39588 --- [nio-8080-exec-1] c.g.s.e.controllers.TestLockController   : end


现在我得到错误。

**问题是:有3个线程,每个线程使用不同的transaction,每个线程尝试使用不同的key来获得lock,它们看起来是相互独立的,为什么在删除pretendToDoWork**方法后,它们会相互冲突?

如果上面的代码是错误的,那么在使用多个事务的多线程中使用jdbc分布式锁的正确方法是什么?

-更新2023-12-25-

代码更新:https://github.com/cuipengfei/Spikes/tree/master/jpa/lock-transaction-threads/src/main/java/com/github/spring/example/service

*Problem 1 Service.java:上述问题
*Problem 1FixService.java:引入一个自定义的锁存储库,它使用自己独立的transmanager
*Problem 2Service.java:基于问题1修复,在尝试获取锁之前运行一些SQL,由于在同一事务中运行biz sql和锁,将获得事务隔离级别更改错误
*Problem 2BadFixService.java:尝试通过使并行流不使用当前线程来修复问题2,但此修复导致在transm外部运行biz sql
*Problem 2GoodFixService.java:尝试通过缩小transs的范围来修复问题2,并且不在同一transs中混合使用biz sql和lock
Problem 2GoodFixService看起来像是在多线程和@ translator中使用jdbc锁的正确方法吗?

6bc51xsx

6bc51xsx1#

删除pretendToDoWork方法后为什么会冲突?

首先,当pretendToDoWork(key);方法在注解行之外时,锁访问发生在不同的时间,但是当方法被注解掉时:锁访问同时发生,你会得到一个错误。

要理解这里发生了什么;您可以将t.printStackTrace();添加到catch块,或者通过将logging.level.org.springframework.orm.jpa=DEBUG添加到application.properties文件来激活DEBUG模式。
这里需要注意的错误是**org.postgresql.util.PSQLException: ERROR: could not serialize access due to read/write dependencies among transactions.**

org.springframework.orm.jpa.JpaSystemException: Unable to commit against JDBC Connection; nested exception is org.hibernate.TransactionException: Unable to commit against JDBC Connection
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:331) ~[spring-orm-5.3.28.jar:5.3.28]
...
org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) ~[tomcat-embed-core-9.0.76.jar:9.0.76]
    at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
Caused by: org.hibernate.TransactionException: Unable to commit against JDBC Connection
    at org.hibernate.resource.jdbc.internal.AbstractLogicalConnectionImplementor.commit(AbstractLogicalConnectionImplementor.java:92) ~[hibernate-core-5.6.15.Final.jar:5.6.15.Final]
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:282) ~[hibernate-core-5.6.15.Final.jar:5.6.15.Final]
    at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101) ~[hibernate-core-5.6.15.Final.jar:5.6.15.Final]
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:562) ~[spring-orm-5.3.28.jar:5.3.28]
    ... 89 common frames omitted
Caused by: org.postgresql.util.PSQLException: ERROR: could not serialize access due to read/write dependencies among transactions
  Detail: Reason code: Canceled on identification as a pivot, during commit attempt.
  Hint: The transaction might succeed if retried.
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676) ~[postgresql-42.5.4.jar:42.5.4]
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366) ~[postgresql-42.5.4.jar:42.5.4]
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356) ~[postgresql-42.5.4.jar:42.5.4]
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:316) ~[postgresql-42.5.4.jar:42.5.4]
    at org.postgresql.jdbc.PgConnection.executeTransactionCommand(PgConnection.java:928) ~[postgresql-42.5.4.jar:42.5.4]
    at org.postgresql.jdbc.PgConnection.commit(PgConnection.java:950) ~[postgresql-42.5.4.jar:42.5.4]
    at com.zaxxer.hikari.pool.ProxyConnection.commit(ProxyConnection.java:387) ~[HikariCP-4.0.3.jar:na]
    at com.zaxxer.hikari.pool.HikariProxyConnection.commit(HikariProxyConnection.java) ~[HikariCP-4.0.3.jar:na]
    at org.hibernate.resource.jdbc.internal.AbstractLogicalConnectionImplementor.commit(AbstractLogicalConnectionImplementor.java:86) ~[hibernate-core-5.6.15.Final.jar:5.6.15.Final]
    ... 92 common frames omitted

字符串

解决方案

https://github.com/spring-projects/spring-integration/issues/3733有关于错误及其发生原因的解释。
这个问题在几个版本之后通过PR https://github.com/spring-projects/spring-integration/pull/3782解决了。*(例如,您可以将pom.xml文件中的spring-integration-jdbc升级到6.0.9,或者升级您的Spring Boot版本。
如果你想在当前版本中手动解决这个问题;标题中的解决方案:How to retry a PostgreSQL serializable transaction with Spring?将工作。

相关问题