java并行反序列化步骤

o7jaxewo  于 2021-07-08  发布在  Java
关注(0)|答案(2)|浏览(339)

有以下管道:
生产项目(生产商在管道外部);
项被反序列化(json到java对象);
项目已处理;
目前,这一切都在一个线程中同步发生:

while(producer.next()) {
   var item = gson.deserialize(producer.item());

   processItem(item);
}

或示意图:

PRODUCER -> DESERIALIZATION -> CONSUMER

(sync)      (sync)            (sync)

问题是反序列化步骤没有副作用,可以并行化以节省一些时间。
总体代码应如下所示:

var pipeline = new Pipeline<Item>();
pipeline.setProducer(producer);
pipeline.setDeserialization(gson::deserialize);
pipeline.setConsumer(item -> {
    ...
});
pipeline.run();

或示意图:

-> DESERIALIZATION
         -> DESERIALIZATION
         -> DESERIALIZATION
PRODUCER -> ...             -> CONSUMER
         -> DESERIALIZATION
         -> DESERIALIZATION
         -> DESERIALIZATION

(sync)      (parallel)         (sync)

重要提示:应生成反序列化项:
同步地;
以相同的顺序,原始生产者产生编码的项目。
问。有没有一种标准化的方法来编码这样的管道?

baubqpgj

baubqpgj1#

实现模式的一种方法是:
构造一个多线程执行器来处理解码请求
拥有消费者队列;每次提交要解码的项时,还将相应的future对象添加到使用者队列中
让使用者线程等待从队列中取出项目[因此按照发布的顺序使用它们],调用相应的get()方法[等待项目被解码]
所以“消费者”看起来是这样的:

BlockingQueue<Future<Item>> consumerQueue = new LinkedBlockingDeque<>();
Thread consumerThread = new Thread(() -> {
        try {
            while (true) {
                Future<Item> item = consumerQueue.take();
                try {
                    // Get the next decoded item that's ready
                    Item decodedItem = item.get();
                    // 'Consume' the item
                    ...
                } catch (ExecutionException ex) {
                }
            }
        } catch (InterruptedException irr) {

        }
    });
    consumerThread.start()

同时,“producer”端及其多线程“decoder”将如下所示:

ExecutorService decoder = Executors.newFixedThreadPool(4);
while (!producer.hasNext()) {
        Item item = producer.next()

        // Submit the decode job for asynchronous processing
        Future<Item> p = decoder.submit(() -> {
            item.decode();
        }, item);

        // Also queue this decode job for future consumption once complete
        consumerQueue.add(p);
    }

作为一个单独的问题,我想知道你是否真的会在实践中看到很多好处,因为坚持以同样的顺序消费,你就内在地在这个过程中引入了一个连续的条件。但从技术上讲,这是一种方法,你可以实现你所追求的。
p、 如果您不需要一个单独的使用者线程,那么同一个“生产者”线程可以轮询队列中已完成的项目并按行执行。

vatpfxk5

vatpfxk52#

尝试

while(producer.next()) {
   CompletableFuture.supplyAsync(()-> gson.deserialize(producer.item()))
    .thenRunAsync(item->processItem(item));
}

相关问题