我尝试使用流规则处理Drools时内存不足。我插入了大约2-3百万条规则,然后收到内存不足异常。我假设@expiration
不工作。我使用流事件,据我所知,会话应该在1秒后过期Measurement
。
库版本:
compile("org.drools:drools-core:7.0.0.Final")
compile("org.kie:kie-spring:7.0.0.Final")
你知道为什么它们不会过期吗?
我的规则是:
import hello.measurements.Measurement
declare Measurement
@role( event )
@expiration( 1s )
end
rule "Monitor average measurement"
when
$dv: Number( doubleValue > 0 || doubleValue < 0 || doubleValue == 0)
from accumulate(
Measurement( $value : value ) over window:time( 1s ),
average( $value )
)
then
System.out.println("zzZZzz: " + $dv);
end
下面是Java代码:
public class Measurement {
private static final Logger LOGGER = LoggerFactory.getLogger(Measurement.class);
int value;
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
}
以下是测量服务:
@Service
public class AverageMeasurementService {
private static final Logger LOGGER = LoggerFactory.getLogger(AverageMeasurementService.class);
private AverageMeasurementServiceThread averageMeasurementServiceThread;
private final KieContainer kieContainer;
private KieSession kieSession;
private KieBaseConfiguration kieBaseConfiguration;
Thread t;
@Autowired
public AverageMeasurementService(KieContainer kc, KieBaseConfiguration kbc) {
kieContainer = kc;
kieBaseConfiguration = kbc;
}
@PostConstruct
public void postConstruct() {
KieBase kieBase = kieContainer.newKieBase(kieBaseConfiguration);
kieSession = kieBase.newKieSession();
averageMeasurementServiceThread = new AverageMeasurementServiceThread(kieSession);
t = new Thread(averageMeasurementServiceThread);
t.start();
LOGGER.info("Thread started...");
}
public void addMeasurement(Measurement m){
kieSession.insert(m);
}
@PreDestroy
public void stop(){
LOGGER.info("Halting");
kieSession.halt();
LOGGER.info("Halted");
}
}
该线程:
public class AverageMeasurementServiceThread implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(AverageMeasurementServiceThread.class);
KieSession kieSession;
public AverageMeasurementServiceThread(KieSession sessionToRun){
kieSession = sessionToRun;
}
@Override
public void run() {
kieSession.fireUntilHalt();
}
}
KIE豆:
@Bean public
KieBaseConfiguration kieBaseConfiguration(){
KieServices kieServices = KieServices.Factory.get();
KieBaseConfiguration config = kieServices.newKieBaseConfiguration();
config.setOption( EventProcessingOption.STREAM );
return config;
}
@Bean
public KieContainer kieContainer() {
KieServices kieServices = KieServices.Factory.get();
KieFileSystem kieFileSystem = kieServices.newKieFileSystem();
kieFileSystem.write(ResourceFactory.newClassPathResource(drlFile));
KieBuilder kieBuilder = kieServices.newKieBuilder(kieFileSystem);
kieBuilder.buildAll();
KieModule kieModule = kieBuilder.getKieModule();
return kieServices.newKieContainer(kieModule.getReleaseId());
}
下面是我失败的单元测试:
@RunWith(SpringRunner.class)
@SpringBootTest
public class AverageMeasurementServiceTest {
@Autowired
AverageMeasurementService averageMeasurementService;
@Test
public void test1() throws InterruptedException {
Measurement m = new Measurement();
m.setValue(5);
System.err.println("Inserting and looping");
averageMeasurementService.addMeasurement(m);
Random random = new Random();
for(int i = 0 ; i < 10000000 ; i++){
m = new Measurement();
m.setValue(random.nextInt());
averageMeasurementService.addMeasurement(m);
if(i % 100000 == 0) {
System.out.println(i);
}
}
}
}
下面是我遇到的例外:
... snipped output ...
zzZZzz: -5.3260380032E7
zzZZzz: -3.78710975323741E7
2700000
Exception in thread "SimplePauseDetectorThread_0" java.lang.OutOfMemoryError: GC overhead limit exceeded
at org.LatencyUtils.PauseDetector.notifyListeners(PauseDetector.java:37)
at org.LatencyUtils.SimplePauseDetector$SimplePauseDetectorThread.run(SimplePauseDetector.java:144)
Exception in thread "http-nio-auto-1-50063-AsyncTimeout" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "http-nio-auto-1-ClientPoller-1" java.lang.OutOfMemoryError: GC overhead limit exceeded
Exception in thread "Thread-5" java.lang.OutOfMemoryError: GC overhead limit exceeded
2018-03-20 19:19:17.602 WARN 978 --- [ Test worker] o.s.test.context.TestContextManager : Caught exception while invoking 'afterTestMethod' callback on TestExecutionListener [org.springframework.boot.test.mock.mockito.ResetMocksTestExecutionListener@5f0ccac8] for test method [public void hello.measurements.AverageMeasurementServiceTest.test1() throws java.lang.InterruptedException] and test instance [hello.measurements.AverageMeasurementServiceTest@243bb02e]
java.lang.OutOfMemoryError: GC overhead limit exceeded
1条答案
按热度按时间11dmarpk1#
您应该使用
@expires( 1s )
:而不是
@expiration( 1s )