Worker-Thread 模式实现

x33g5p2x  于2022-05-05 转载在 其他  
字(5.4k)|赞(0)|评价(0)|浏览(276)

一 点睛

Worker-Thread 的设计模式有如下几个角色。

流水线工人:流水线工人主要用来对传送带上的产品进行加工。

流水线传送带:用于传送来自上线的产品。

产品组装说明书:用来说明该产品如何组装。

Worker-Thread 模式关键角色的关系图。

左侧的线程,也就是传送带上游的线程,不断地往传送带(Queue)中生产数据,而当 Channel 被启动,就会同时创建并启动若干数量的 Worker 线程,因此,可以看出,Worker 于 Channel 来说并不是单纯的依赖关系,而是聚合关系,Channel 必须知道 Worker 的存在。

二 实战

1 产品及组装说明书

package concurrent.workrtthread;

/**
* @className: InstructionBook
* @description: 产品及组装说明书,在流水线上需要加工的产品,create 作为一个模板方法,提供了用工产品的说明书
* @date: 2022/5/3
* @author: cakin
*/
public abstract class InstructionBook {
    /**
     * 功能描述:加工产品的第1个步骤
     *
     * @author cakin
     * @date 2022/5/3
     */
    protected abstract void firstProcess();

    /**
     * 功能描述:加工产品的第2个步骤
     *
     * @author cakin
     * @date 2022/5/3
     */
    protected abstract void secondProcess();

    /**
     * 功能描述:经过流水线传送带的产品通过该方法进行加工
     *
     * @author cakin
     * @date 2022/5/3
     */
    public final void create() {
        this.firstProcess();
        this.secondProcess();
    }
}

2 产品

package concurrent.workrtthread;

/**
* @className: Production
* @description: 产品
* @date: 2022/5/3
* @author: cakin
*/
public class Production extends InstructionBook {
    // 产品编号
    private final int prodID;

    public Production(int prodID) {
        this.prodID = prodID;
    }

    @Override
    protected void firstProcess() {
        System.out.println("execute the " + prodID + " first process");
    }

    @Override
    protected void secondProcess() {
        System.out.println("execute the " + prodID + " second process");
    }
}

3 流水线传送带

package concurrent.workrtthread;

/**
* @className: ProductionChannel
* @description: 产品传送带,在传送带上除了负责产品加工的工人之外,还有在传送带上等待加工的产品
* @date: 2022/5/3
* @author: cakin
*/
public class ProductionChannel {
    // 传送带上最多可以有多少个待加工的产品
    private final static int MAX_PROD = 100;
    // 用于存放待加工的产品,也就是传送带
    private final Production[] productionQueue;
    // 队列尾
    private int tail;
    // 队列头
    private int head;
    // 当前在流水线上有多少个待加工的产品
    private int total;
    // 流水线工人
    private final Worker[] workers;

    public ProductionChannel(int workerSize) {
        this.workers = new Worker[workerSize];
        this.productionQueue = new Production[MAX_PROD];
        // 实例化每一个工人并且启动
        for (int i = 0; i < workerSize; i++) {
            workers[i] = new Worker("Worker-" + i, this);
            workers[i].start();
        }
    }

    /**
     * 功能描述:接受来自上游的半成品
     *
     * @author cakin
     * @date 2022/5/3
     */
    public void offerProduction(Production production) {
        synchronized (this) {
            // 当传送带上加工的产品超过了最大值时需要阻塞上游再次传送产品
            while (total > productionQueue.length) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 将产品放到传送带,并且通知工人线程工作
            productionQueue[tail] = production;
            tail = (tail + 1) % productionQueue.length;
            total++;
            this.notifyAll();
        }
    }

    /**
     * 功能描述:工人线程从传送带上获取产品,并且进行加工
     *
     * @author 贝医
     * @date 2022/5/3
     */
    public Production takeProduction() {
        synchronized (this) {
            // 当传送带上没有产品,工人等待着从上游输送代传送带上
            while (total <= 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            // 获取产品
            Production prod = productionQueue[head];
            head = (head + 1) % productionQueue.length;
            total--;
            this.notifyAll();
            return prod;
        }
    }
}

4 工人

package concurrent.workrtthread;

import java.util.Random;
import java.util.concurrent.TimeUnit;

/**
* @className: Worker
* @description: 流水线工人
* @date: 2022/5/3
* @author: cakin24
*/
public class Worker extends Thread {
    private final ProductionChannel channel;

    private final static Random random = new Random(System.currentTimeMillis());

    public Worker(String workerName, ProductionChannel channel) {
        super(workerName);
        this.channel = channel;
    }

    @Override
    public void run() {
        while (true) {
            try {
                // 从传送带上获取产品
                Production production = channel.takeProduction();
                System.out.println(getName() + " process the " + production);
                // 对产品进行加工
                production.create();
                TimeUnit.SECONDS.sleep(random.nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

5 测试代码

package concurrent.workrtthread;

import java.util.Random;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class Test {
    public static void main(String[] args) {
        // 流水线上有 5 个工人
        final ProductionChannel channel = new ProductionChannel(5);
        AtomicInteger prouctionNo = new AtomicInteger();
        // 流水线上有8个工作人员往传送带上不断地放置等待加工的半成品
        IntStream.range(1, 8).forEach(i ->
                new Thread(() -> {
                    while (true) {
                        channel.offerProduction(new Production(prouctionNo.getAndIncrement()));

                        try {
                            TimeUnit.SECONDS.sleep(new Random().nextInt(10));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }).start()
        );
    }
}

三 测试

Worker-4 process the concurrent.workrtthread.Production@57a380d2

Worker-1 process the concurrent.workrtthread.Production@3d33b971

execute the 0 first process

execute the 0 second process

execute the 1 first process

execute the 1 second process

Worker-3 process the concurrent.workrtthread.Production@71f66139

execute the 2 first process

execute the 2 second process

Worker-0 process the concurrent.workrtthread.Production@414fec4b

execute the 3 first process

execute the 3 second process

Worker-2 process the concurrent.workrtthread.Production@3a237724

execute the 4 first process

execute the 4 second process

Worker-4 process the concurrent.workrtthread.Production@59647456

execute the 5 first process

execute the 5 second process

Worker-1 process the concurrent.workrtthread.Production@33f440a7

Worker-2 process the concurrent.workrtthread.Production@34e49e10

execute the 6 first process

execute the 6 second process

execute the 7 first process

Worker-2 process the concurrent.workrtthread.Production@30db1c54

execute the 7 second process

execute the 8 first process

execute the 8 second process

Worker-1 process the concurrent.workrtthread.Production@37cbd201

Worker-0 process the concurrent.workrtthread.Production@4f881a12

execute the 10 first process

execute the 9 first process

execute the 9 second process

execute the 10 second process

Worker-0 process the concurrent.workrtthread.Production@4bb7873b

Worker-3 process the concurrent.workrtthread.Production@7f319177

execute the 11 first process

execute the 11 second process

execute the 15 first process

execute the 15 second process

在测试中,假设上游的流水线上有8个工人将产品放到传送带上,我们的传送带上定义了5个工人,运行上面的程序,Worker 将根据产品的使用说明书对产品进行再次加工。

相关文章