请帮助我理解并解决当我尝试获取select sql语句在mssqlserver db上使用r2dbc获取的许多记录中的第一条记录时出现错误的原因。
我的gradle依赖项:
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-webflux'
implementation 'org.springframework.boot:spring-boot-starter-actuator'
implementation 'org.springframework.security:spring-security-oauth2-client'
implementation 'io.springfox:springfox-boot-starter:3.0.0'
implementation 'org.springframework.data:spring-data-r2dbc'
implementation 'io.r2dbc:r2dbc-mssql'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
annotationProcessor 'org.projectlombok:lombok'
}
我的代码(敏感内容用x屏蔽):[请理解,这不是生产准备好的代码,是非常初级阶段的poc的一部分。所以,大多数代码都不符合生产就绪标准。所以,请容忍这一点。]
@Configuration
@EnableR2dbcRepositories(basePackages="com.xxxx.admin.db.repo")
public class DatabaseConfiguration extends AbstractR2dbcConfiguration
{
@Autowired
private Environment env;
@Bean
@Override
public ConnectionFactory connectionFactory() {
return new MssqlConnectionFactory(MssqlConnectionConfiguration.builder().host("xxxxxx")
.database("xxxx").username(env.getProperty("xxxxx")).password(env.getProperty("xxxxxx")).build());
}
}
@RestController
@RequestMapping("/api/v1/admin")
public class AdminController {
@GetMapping("/junk")
public Mono<Map<String, Object>> getUser() {
return tmoUserRepo.getUser("XXXXXXXXXXX");
}
}
@Repository
public class TmoUserRepo {
private static final String GET_USER_SQL = "SELECT xxxxxxx WHERE ID=:userId ORDER BY XXX DESC, YYY ASC, ZZZ ASC;" ;
@Autowired
private DatabaseClient dbClient;
public Mono<Map<String, Object>> getUser(String userId) {
dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
return null;
}
}
情景一。这会产生错误(dbclient是databaseclient的示例):
dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().subscribe(System.out::println);
场景2。这成功地通过了:
dbClient.execute(GET_USER_SQL).bind("userId",userId).fetch().first().all(System.out::println);
=========================
场景1发生的情况:以下内容在日志中打印两次:
2020-12-04 18:37:38.569 DEBUG 18636 --- [actor-tcp-nio-1] o.s.d.r2dbc.core.DefaultDatabaseClient : Executing SQL statement [SELECT usr.name_first AS namefirst,
usr.name_last AS namelast,
<<.... rest of SQL...>>
错误消息:
2020-12-04 18:37:40.278 ERROR 18636 --- [actor-tcp-nio-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2020-12-04 18:37:40.280 ERROR 18636 --- [actor-tcp-nio-1] r.n.channel.ChannelOperationsHandler : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] Error was received while reading the incoming data. The connection will be closed.
reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
at reactor.core.Exceptions.bubble(Exceptions.java:173) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.Operators.onErrorDropped(Operators.java:635) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at io.r2dbc.mssql.util.FluxDiscardOnCancel$FluxDiscardOnCancelSubscriber.onError(FluxDiscardOnCancel.java:95) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onError(FluxFilter.java:151) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxHandle$HandleConditionalSubscriber.onError(FluxHandle.java:406) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxHandle$HandleSubscriber.onError(FluxHandle.java:196) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onError(MonoFlatMapMany.java:247) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:214) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.checkTerminated(EmitterProcessor.java:489) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:356) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.EmitterProcessor.onError(EmitterProcessor.java:286) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.drainError(ReactorNettyClient.java:629) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.handleClose(ReactorNettyClient.java:614) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient.access$600(ReactorNettyClient.java:85) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.r2dbc.mssql.client.ReactorNettyClient$2.onComplete(ReactorNettyClient.java:289) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onComplete(Operators.java:2016) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) ~[reactor-core-3.3.11.RELEASE.jar:3.3.11.RELEASE]
at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:441) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:238) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.FluxReceive.onInboundNext(FluxReceive.java:362) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperations.onInboundNext(ChannelOperations.java:358) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.13.RELEASE.jar:0.9.13.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.r2dbc.mssql.client.ssl.TdsSslHandler.channelRead(TdsSslHandler.java:380) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.53.Final.jar:4.1.53.Final]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
at io.r2dbc.mssql.client.ReactorNettyClient.lambda$static$1(ReactorNettyClient.java:93) ~[r2dbc-mssql-0.8.5.RELEASE.jar:0.8.5.RELEASE]
... 32 common frames omitted
2020-12-04 18:37:40.280 WARN 18636 --- [actor-tcp-nio-1] reactor.netty.channel.FluxReceive : [id: 0x2fb997cb, L:/10.100.26.236:55810 ! R:tmidevsql12/10.211.0.250:1433] An exception has been observed post termination, use DEBUG level to see the full stack: reactor.core.Exceptions$BubblingException: io.r2dbc.mssql.client.ReactorNettyClient$MssqlConnectionClosedException: Connection closed
1条答案
按热度按时间iqih9akk1#
简短的回答
你的问题可能是:
您可能需要将此更改为:
冗长的回答
进行React式编程与常规java编程有很大的不同,如果您尝试进行常规java编程,您会遇到困难。
首先,
null
在React式编程中不允许有值,所以永远不要返回null
.React式编程与生产者和消费者一起工作。您的应用程序是生产者,而呼叫客户端(web应用程序、移动应用程序、 Postman 、curl等)是消费者。发起呼叫的人通常是消费者。使用者从订阅生产者开始,因此客户端订阅服务器。
当这种情况发生时,服务器将从代码的末尾开始遍历,直到找到产生值的内容(数据库)。在这个过程中,它将组装一个回调链。这一部分称为装配阶段。完成此阶段后,应用程序将输出一个(如果是
Mono
)或者很多(如果是Flux
)价值观。所以在有人订阅之前什么都不会发生。
而:
这也适用于以下功能:
但如果你不想退货怎么办:
或者你可以退回
Mono.empty()
这将转化为Mono<Void>
```public Mono getFoobar() {
return Mono.just("Foobar").flatMap(s -> {
// Return an empty, which will translate to a Mono
return Mono.empty();
});
}
getFoobar().subscribe();
dbClient.execute(GET_USER_SQL)
.bind("userId",userId)
.fetch()
.first()
.subscribe(System.out::println);
dbClient.execute(GET_USER_SQL)
.bind("userId",userId)
.fetch()
.first()
.all(System.out::println);