我有一个生产者消费者问题,我有一个单一的生产者推入阻塞队列和单一的消费者从队列消费。一旦一个消息被消费,我正在做几个操作,该批消息。我如何并行化逻辑处理每批消息。下面是代码片段。还建议我是否应该考虑多个消费者来做这项任务。
ThreadX = Thread.start('producer') {
//data retrieve from DB
while(row){
queue.put(message)
}
queue.put("KILL")
}
ThreadY = Thread.start('Consumer') {
while(true){
sleep(200)
// print(Thread.currentThread().name)
def jsonSlurper = new JsonSlurper()
def var = jsonSlurper.parseText(queue.take().toString())
if(var.getAt(0).equals("KILL"))
return
var.each { fileExists(it) } // **need parallelize this part**
}
boolean fileExists(key){
if(key) {
//some logic
sleep 1000
}
}
}
更新:尝试以下代码,但不知何故,它只处理消费者使用的第一批10条消息
ExecutorService exeSvc = Executors.newFixedThreadPool(5)
ThreadY = Thread.start('Consumer') {
while(true){
sleep(200)
// print(Thread.currentThread().name)
def jsonSlurper = new JsonSlurper()
def var = jsonSlurper.parseText(queue.take().toString())
if(var.getAt(0).equals("KILL"))
return
var.each { exeSvc.execute({-> fileExists(it)
sleep(200)
}) }
}
}
请帮帮忙
1条答案
按热度按时间ntjbwcob1#
在
var.each { exeSvc.execute({-> fileExists(it)
行中有一个bug,其中来自外部闭包的隐式变量it
被用于内部闭包中。应该是类似var.each { fileName -> exeSvc.execute { fileExists(filName)
的东西。除此之外,我只是添加了一些日志用于故障排除和确认执行流。脚本的工作版本如下: