groovy脚本中的多线程

ni65a41a  于 2022-11-21  发布在  其他
关注(0)|答案(1)|浏览(349)

我有一个生产者消费者问题,我有一个单一的生产者推入阻塞队列和单一的消费者从队列消费。一旦一个消息被消费,我正在做几个操作,该批消息。我如何并行化逻辑处理每批消息。下面是代码片段。还建议我是否应该考虑多个消费者来做这项任务。

ThreadX = Thread.start('producer') {
//data retrieve from DB
while(row){
   queue.put(message)
  }
   queue.put("KILL")
}
ThreadY = Thread.start('Consumer') {
    while(true){
        sleep(200)
      //  print(Thread.currentThread().name)
        def jsonSlurper = new JsonSlurper()
        def var = jsonSlurper.parseText(queue.take().toString())
        if(var.getAt(0).equals("KILL"))
            return
        var.each { fileExists(it) } // **need  parallelize this part**
    }

boolean fileExists(key){
    if(key) {
    //some logic
        sleep 1000
    }
}
}

更新:尝试以下代码,但不知何故,它只处理消费者使用的第一批10条消息

ExecutorService exeSvc = Executors.newFixedThreadPool(5)

ThreadY = Thread.start('Consumer') {
    while(true){
        sleep(200)
      //  print(Thread.currentThread().name)
        def jsonSlurper = new JsonSlurper()
        def var = jsonSlurper.parseText(queue.take().toString())
        if(var.getAt(0).equals("KILL"))
            return
        var.each { exeSvc.execute({-> fileExists(it)
            sleep(200)
        }) }
    }
}

请帮帮忙

ntjbwcob

ntjbwcob1#

var.each { exeSvc.execute({-> fileExists(it)行中有一个bug,其中来自外部闭包的隐式变量it被用于内部闭包中。应该是类似var.each { fileName -> exeSvc.execute { fileExists(filName)的东西。除此之外,我只是添加了一些日志用于故障排除和确认执行流。脚本的工作版本如下:

import groovy.json.JsonSlurper

import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit

Queue queue = new LinkedBlockingQueue<String>()
def messages = [
        '["key1", "key2", "key3", "key4", "key5"]',
        '["key6", "key7", "key8", "key9", "key10"]',
        '["key11", "key12", "key13", "key14", "key15"]',
        '["key16", "key17", "key18", "key19", "key20"]',
        '["key21", "key22", "key23", "key24", "key25"]',
]

Thread.start('producer') {
    messages.each { message ->
        println("Producing message $message")
        queue.put(message)
    }
    queue.put('KILL')
}

ExecutorService exeSvc = Executors.newFixedThreadPool(5)
def jsonSlurper = new JsonSlurper()

Thread.start('Consumer') {
    while (true) {
        sleep(200)

        String message = queue.take()
        println("Consumed message $message")
        if (message == 'KILL') {
            exeSvc.shutdown()
            exeSvc.awaitTermination(10, TimeUnit.SECONDS)
            return
        }

        def var = jsonSlurper.parseText(message)
        var.each { fileName ->
            exeSvc.execute {
                fileExists(fileName)
                sleep(200)
            }
        }
    }
}

boolean fileExists(key) {
    println("Key: $key")
    if (key) {
        sleep 1000
    }
}

相关问题