spring 随时间推移的Drools窗口事件未过期-内存不足

rkue9o1l  于 2023-01-12  发布在  Spring
关注(0)|答案(1)|浏览(152)

我尝试使用流规则处理Drools时内存不足。我插入了大约2-3百万条规则,然后收到内存不足异常。我假设@expiration不工作。我使用流事件,据我所知,会话应该在1秒后过期Measurement
库版本:

compile("org.drools:drools-core:7.0.0.Final")
compile("org.kie:kie-spring:7.0.0.Final")

你知道为什么它们不会过期吗?
我的规则是:

import hello.measurements.Measurement

declare Measurement
    @role( event )
    @expiration( 1s )
end

rule "Monitor average measurement"
    when
        $dv: Number( doubleValue > 0 || doubleValue < 0 || doubleValue == 0)
             from accumulate(
                Measurement( $value : value ) over window:time( 1s ),
                average( $value )
            )
    then
        System.out.println("zzZZzz: " + $dv);
end

下面是Java代码:

public class Measurement {
    private static final Logger LOGGER = LoggerFactory.getLogger(Measurement.class);
    int value;

    public int getValue() {
        return value;
    }

    public void setValue(int value) {
        this.value = value;
    }
}

以下是测量服务:

@Service
public class AverageMeasurementService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AverageMeasurementService.class);
    private AverageMeasurementServiceThread averageMeasurementServiceThread;
    private final KieContainer kieContainer;
    private KieSession kieSession;
    private KieBaseConfiguration kieBaseConfiguration;
    Thread t;

    @Autowired
    public AverageMeasurementService(KieContainer kc, KieBaseConfiguration kbc) {

        kieContainer = kc;
        kieBaseConfiguration = kbc;
    }

    @PostConstruct
    public void postConstruct() {
        KieBase kieBase = kieContainer.newKieBase(kieBaseConfiguration);
        kieSession = kieBase.newKieSession();
        averageMeasurementServiceThread = new AverageMeasurementServiceThread(kieSession);
        t = new Thread(averageMeasurementServiceThread);
        t.start();
        LOGGER.info("Thread started...");
    }

    public void addMeasurement(Measurement m){
        kieSession.insert(m);
    }

    @PreDestroy
    public void stop(){
        LOGGER.info("Halting");
        kieSession.halt();
        LOGGER.info("Halted");
    }
}

该线程:

public class AverageMeasurementServiceThread implements Runnable {
    private static final Logger LOGGER = LoggerFactory.getLogger(AverageMeasurementServiceThread.class);
    KieSession kieSession;

    public AverageMeasurementServiceThread(KieSession sessionToRun){
        kieSession = sessionToRun;
    }

    @Override
    public void run() {
        kieSession.fireUntilHalt();
    }
}

KIE豆:

@Bean public
KieBaseConfiguration kieBaseConfiguration(){
    KieServices kieServices = KieServices.Factory.get();
    KieBaseConfiguration config = kieServices.newKieBaseConfiguration();
    config.setOption( EventProcessingOption.STREAM );
    return config;
}

@Bean
public KieContainer kieContainer() {
    KieServices kieServices = KieServices.Factory.get();

    KieFileSystem kieFileSystem = kieServices.newKieFileSystem();
    kieFileSystem.write(ResourceFactory.newClassPathResource(drlFile));
    KieBuilder kieBuilder = kieServices.newKieBuilder(kieFileSystem);
    kieBuilder.buildAll();
    KieModule kieModule = kieBuilder.getKieModule();

    return kieServices.newKieContainer(kieModule.getReleaseId());
}

下面是我失败的单元测试:

@RunWith(SpringRunner.class)
@SpringBootTest
public class AverageMeasurementServiceTest {
    @Autowired
    AverageMeasurementService averageMeasurementService;

    @Test
    public void test1() throws InterruptedException {
        Measurement m = new Measurement();
        m.setValue(5);
        System.err.println("Inserting and looping");
        averageMeasurementService.addMeasurement(m);
        Random random = new Random();
        for(int i = 0 ; i < 10000000 ; i++){
            m = new Measurement();
            m.setValue(random.nextInt());
            averageMeasurementService.addMeasurement(m);
            if(i % 100000 == 0) {
                System.out.println(i);
            }
        }
    }
}

下面是我遇到的例外:

... snipped output ...
zzZZzz: -5.3260380032E7
zzZZzz: -3.78710975323741E7
2700000
Exception in thread "SimplePauseDetectorThread_0" java.lang.OutOfMemoryError: GC overhead limit exceeded
    at org.LatencyUtils.PauseDetector.notifyListeners(PauseDetector.java:37)
    at org.LatencyUtils.SimplePauseDetector$SimplePauseDetectorThread.run(SimplePauseDetector.java:144)
Exception in thread "http-nio-auto-1-50063-AsyncTimeout" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "http-nio-auto-1-ClientPoller-1" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "Thread-5" java.lang.OutOfMemoryError: GC overhead limit exceeded
2018-03-20 19:19:17.602  WARN 978 --- [    Test worker] o.s.test.context.TestContextManager      : Caught exception while invoking 'afterTestMethod' callback on TestExecutionListener [org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener@5f0ccac8] for test method [public void hello.measurements.AverageMeasurementServiceTest.test1() throws java.lang.InterruptedException] and test instance [hello.measurements.AverageMeasurementServiceTest@243bb02e]

java.lang.OutOfMemoryError: GC overhead limit exceeded
11dmarpk

11dmarpk1#

您应该使用@expires( 1s )

declare Measurement
    @role( event )
    @expires( 1s ) end

而不是@expiration( 1s )

相关问题