使用查询注解的r2dbc Spring Data

k7fdbhmy  于 2023-02-28  发布在  Spring
关注(0)|答案(1)|浏览(240)

我正在用r2dbc探索Spring Data,我正在尝试在我的接口中使用一个自定义存储库方法,该方法是从ReactiveCrudRepository扩展而来的,它将对象的通量作为参数,并返回同一对象的通量。

@Query("SELECT id, name  FROM Person WHERE id = ?")
    Flux<Person> myMethod(Flux<Person> person);

我希望它具有与Flux findAllById(发布者idStream)方法相同的行为。
这是我的接口存储库定义:

public interface PersonRepository extends ReactiveCrudRepository<Person, Person> {

    @Query("SELECT id, name  FROM Person WHERE id = ?")
    Flux<Person> myMethod(Flux<Person> person);

}

这是我的实体:

@Getter
@Setter
@ToString
@Data
@NoArgsConstructor
@AllArgsConstructor
@Table(value = "Person", schema = "mySchema")
public class Person {
    @Id
    @Column("id")
    Long id;

    @Column("name")
    String name;

}

这是我的数据库配置类:

@Configuration
@EnableR2dbcRepositories
@EnableR2dbcAuditing
@EnableTransactionManagement

public class DatabaseConfiguration extends AbstractR2dbcConfiguration {

......

}

这是我的主要应用程序:

@SpringBootApplication
@EnableR2dbcAuditing
@EnableConfigurationProperties({ApplicationProperties.class})
public class MyApplication {

    private static final Logger logger = LoggerFactory.getLogger(MyApplication.class);

    @Bean
    public CommandLineRunner consume(PersonRepository personRepository) {

        LongSupplier randomLong = () -> {
            return RandomUtils.nextLong(10L,20L);
        };

        Flux<Long> personIds = Flux.fromStream(LongStream.generate(randomLong).boxed());
        Flux<Person> persons = personIds.map( p -> {
            Person person = new Person();
            person.setId(p);
            return person;
        });

        personRepository.myMethod(persons.take(3)).subscribe( id -> logger.info("Processed Person Id: " + id));
        
        return null;
    }   

    .......
}

当使用接口方法findById(Publisher id)时,它可以工作,但当尝试使用带有quentry注解的自定义方法myMethod(Flux id)时,我遇到以下异常:

reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IllegalArgumentException: Value must not be null
Caused by: java.lang.IllegalArgumentException: **Value must not be null**
    at org.springframework.util.Assert.notNull(Assert.java:201)
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.MonoMapFuseable] :
    reactor.core.publisher.Mono.map(Mono.java:3411)
    org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
Error has been observed at the following site(s):
    *__________Mono.map ⇢ at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.createQuery(StringBasedR2dbcQuery.java:156)
    |_ Mono.flatMapMany ⇢ at org.springframework.data.r2dbc.repository.query.AbstractR2dbcQuery.execute(AbstractR2dbcQuery.java:88)
    *____Flux.usingWhen ⇢ at org.springframework.data.repository.core.support.RepositoryMethodInvoker$ReactiveInvocationListenerDecorator.decorate(RepositoryMethodInvoker.java:242)
Original Stack Trace:
        at org.springframework.util.Assert.notNull(Assert.java:201)
        at org.springframework.r2dbc.core.Parameter.from(Parameter.java:54)
        at org.springframework.data.r2dbc.query.QueryMapper.getBindValue(QueryMapper.java:426)
        at org.springframework.data.r2dbc.core.DefaultReactiveDataAccessStrategy.getBindValue(DefaultReactiveDataAccessStrategy.java:288)
        at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.getBindValue(ExpressionEvaluatingParameterBinder.java:134)
        at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bindExpressions(ExpressionEvaluatingParameterBinder.java:82)
        at org.springframework.data.r2dbc.repository.query.ExpressionEvaluatingParameterBinder.bind(ExpressionEvaluatingParameterBinder.java:72)
        at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery$ExpandedQuery.<init>(StringBasedR2dbcQuery.java:197)
        at org.springframework.data.r2dbc.repository.query.StringBasedR2dbcQuery.lambda$createQuery$0(StringBasedR2dbcQuery.java:156)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
        at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:101)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
        at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
        at reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:128)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:368)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onComplete(FluxConcatMap.java:276)
        at reactor.core.publisher.Operators.complete(Operators.java:137)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:148)
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
        at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:94)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8469)
        at reactor.core.publisher.Flux.subscribeWith(Flux.java:8642)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8439)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8363)
        at reactor.core.publisher.Flux.subscribe(Flux.java:8306)

我在Spring参考文档中找不到类似的用例,我想知道这是否受支持。

lmvvr0a8

lmvvr0a81#

我用传入和传出Flux检查了@Query注解,发现@Query注解确实不支持传入Publisher
我还从下面的测试代码中获得了Value must not be null

Flux<Integer> ids = Flux.range(1, 10);

userInfoRepository.findAllById(ids)
    .subscribeOn(Schedulers.boundedElastic())
    .subscribe(v1 -> log.info("User {}", v1.getUsername()));

@Query("select * from user.user_infos t where t.id = :id")
Flux<UserInfo> findAllById(Flux<Integer> id);

但是如果你删除@Query,它实际上是工作的:

Flux<UserInfo> findAllById(Flux<Integer> id);

其他领域也是如此:

Flux<UserInfo> findAllByUsername(Flux<String> username);

在我看来,@Query注解在这一点上似乎不支持订阅传入参数Publisher(即,它在内部不支持订阅.flatMap())。

相关问题