kafka-quarkus反压管理

uoifb46i  于 2021-06-04  发布在  Kafka
关注(0)|答案(0)|浏览(436)

我使用quarkus与kafka的React式消息传递获得以下stacktrace:

at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossThread.run(JBossThread.java:479)

//very very long sql function call

2020-08-04 12:05:35,739 ERROR [io.sma.rea.mes.kafka] (executor-thread-78) SRMSG18207: Unable to dispatch message to Kafka: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer is full due to lack of downstream consumption
        at io.smallrye.mutiny.operators.multi.overflow.MultiOnOverflowBufferOp$OnOverflowBufferProcessor.onItem(MultiOnOverflowBufferOp.java:80)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
        at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
        at io.smallrye.mutiny.operators.multi.builders.IntervalMulti$IntervalRunnable.run(IntervalMulti.java:77)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossThread.run(JBossThread.java:479)

这是我的密码:

@Outgoing( "eqs-crossing-mid" )
    public Multi< EQSAlert > eqsCrossingTGV_MID(){

        final String series = CrossingEnum.TGV_MID.getSeries();
        final String equipment = CrossingEnum.TGV_MID.getEquipment();

        final String vehicleRegex = vehicleRegexService.getRegexBySeries( series );

        log.info( "Incoming request for {} - {}",  series, equipment);
        log.info( "Vehicle regex : {}", vehicleRegex );

        return Multi
                .createFrom()
                .ticks()
                .every(
                        Duration.ofSeconds( poolingInterval )
                )
                .onOverflow()
                .buffer(10)
                .concatMap(i -> {
                    final Multi<CrossingState> crossingStateBySeriesAndEquipment = CrossingState.getCrossingStateBySeriesAndEquipment(client, series, equipment);

                    return crossingStateBySeriesAndEquipment.flatMap(crossingState ->
                            crossingState.isActive() ?
                                    EQSAlert.getEQSAlertBySeriesAndEquipment(
                                            client,
                                            series,
                                            vehicleRegex,
                                            equipment
                                    )
                                    :
                                    Multi.createFrom().empty()
                    );
                });
    }

正如您所看到的,我每隔5秒(池间隔)调用一次sql函数,我注意到在提交sql函数的错误版本后出现了这个错误。通常情况下,该调用发出的时间少于1秒,并且函数最多需要2分钟才能发送响应。
所以我对我的sql函数做了修正,现在一切都好了。
我这篇文章的目的是了解申请表上发生了什么?是因为我有太多的sql调用吗?
谢谢您

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题