AsyncCassandra操作示例

xytpbqjk  于 2021-06-15  发布在  Cassandra
关注(0)|答案(1)|浏览(466)

基于这里的另一篇文章,我正在阅读asynccassandraoperations来执行异步插入以提高性能。但我在google或spring数据文档上找不到很多帮助。
以前,我使用cassandra存储库进行所有数据提取和插入/更新,我发现这非常慢。根据建议,我现在将asynccassandraoperations单独用于insert操作,但它不允许我这样做。我遇到“org.springframework.data.cassandra.core.asynccassandraoperations”类型的bean错误。
请问使用AsyncCassandra操作的正确方法是什么?

@Autowired private MyRepository repository_name;
@Autowired private AsyncCassandraOperations acops;
public void persist(List<POJO> l_POJO)
{
        System.out.println("Enter Persist: "+new java.util.Date());

        List<l_POJO> l_POJO_stale = repository_name.findBycol1AndStale("sample",false);

        l_POJO_stale.forEach(s -> s.setStale(true));

        l_POJO_stale.forEach(s -> acops.update(s));

        try 
        {
            acops.insert(l_POJO);
        } 
        catch (Exception e) 
        {
            System.out.println("Error in persisting new data");
        }
}
lstz6jyr

lstz6jyr1#

不知道是否使用了spring引导,如果是这样的话,应该自动创建asynccassandraoperations(asynccassandratemplate是实现类)。如果错误显示您需要asynccassandraoperations bean,那么直接的方法是创建一个,如下所示。

@Bean
AsyncCassandraTemplate asyncCassandraTemplate(Session session) {
    return new AsyncCassandraTemplate(session);
}

因为您使用的是spring数据存储库接口,所以还可以使用 ReactiveCrudRepository 接口来更新或插入实体对象到cassandra,这在这个spring数据示例项目中显示,作为使用 AsyncCassandraTemplate 班级。
如果使用 ReactiveCrudRepository 关于您想做什么,您的代码需要做以下更改。
更改的返回类型 WRRepository.findByCol1AndCol2AndCol3(String, boolean, String)List<WRpojo>Flux<WRpojo> ,以充分利用React功能。
更改的返回类型 persist(List<WRpojo>) 从布尔值到 Mono<Void> ,使结果也非阻塞。
改变你的想法 persist(List<WRpojo>) 到下面。

public Mono<Void> persist(List<WRpojo> l_wr) {
    Flux<WRpojo> l_old_wr = objWRRepository.findByCol1AndCol2AndCol3("1", false, "2").doOnNext(s -> s.setStale(true));
    return objWRRepository.saveAll(l_old_wr).thenMany(objWRRepository.saveAll(l_wr)).then();
  }

在React式编程中,基本上我们不阻塞任何代码,这意味着 Mono<Void> 应该在下游某个地方订阅,如果确实要阻止并等待所有操作完成,可以调用 block()Mono<Void> ,这是不推荐的。

相关问题