一. Producer-Consumer介绍

        producer 是 "生产者" 的意思, 指的是生产线程的数据, Consumer 是 "消费者" 的意思, 指的是使用数据的线程

        生产者安全的将数据交给消费者, 虽然仅是这样看似简单的操作, 但当生产者和消费者以不同的线程运行时, 两者之间的处理数据的速度差异便会引起问题, 例如, 消费者想要想要获取数据, 但是数据还没生成, 或者生产者想要交付数据, 但是消费者的状态还无法接受数据等

        Producer-Consumer 模式在生产者和消费者之间加入了一个 "桥梁角色", 该桥梁角色用于消除线程间处理数据速度的差异

二. 示例程序

类名说明Main测试主程序MakerThread表示糕点师的类EaterThread表示客人的类Table表示桌子的类

2.1 Main类

        Main 类会创建一个桌子的实例, 并启动糕点师和顾客线程, MakerThread 和 EaterThread 的构造函数中传入的数字只是用来作为随机数的种子, 数字本身没有特别含义

public class Main {

public static void main(String[] args) {

// 代表一个最多能放3个蛋糕的桌子

Table table = new Table(3);

// 3个糕点师线程

new MakerThread("Maker-1", table, 32415).start();

new MakerThread("Maker-2", table, 32415).start();

new MakerThread("Maker-3", table, 32415).start();

// 3个顾客线程

new EaterThread("Enter-1", table, 32415).start();

new EaterThread("Enter-2", table, 32415).start();

new EaterThread("Enter-3", table, 32415).start();

}

}

2.2 MakerThread类

        MakerThread用于制作蛋糕, 并将其放置到桌子上, 以流水号(AtomicInteger), 和制作蛋糕的线程名称来表示蛋糕, 无限循环的执行 "制作蛋糕->放置到桌子上", 是蛋糕的生产者

public class MakerThread extends Thread {

private final Random random;

private final Table table;

private static final AtomicInteger ID = new AtomicInteger(1);

public MakerThread(String name, Table table, long seed) {

super(name);

this.random = new Random(seed);

this.table = table;

}

@Override

public void run() {

try {

while (true) {

Thread.sleep(random.nextInt(1000));

String cake = "[Cake No." + ID.getAndIncrement() + " by " + getName() + " ]";

table.put(cake);

}

} catch (InterruptedException e) {

//

}

}

}

      

2.3 EaterThraed类

        EaterThread 类用于表示从桌子上取出蛋糕吃的客人, 客人通过 Table 类的 take 方法取出桌子上的蛋糕, 然后, 与 MakerThread 类一样, EaterThread 也会 sleep, 模拟吃掉蛋糕花费的时间

        EaterThread 无限循环执行 "从桌子上取出蛋糕 -> 吃蛋糕", 是蛋糕的消费者

public class EaterThread extends Thread {

private final Random random;

private final Table table;

public EaterThread(String name, Table table, long seed) {

super(name);

this.random = new Random(seed);

this.table = table;

}

@Override

public void run() {

try {

while (true) {

String take = table.take();

Thread.sleep(random.nextInt(1000));

}

} catch (InterruptedException e) {

//

}

}

}

2.4 Table类

        Table 类用于表示放置蛋糕的桌子, 可放置的蛋糕个数通过构造函数指定, 在示例程序中, 蛋糕以 String 实例表示, Table 类声明一个 String 类型数据的 buffer 字段, 用于作为蛋糕实际放置的位置, 为了正确的放置 put, 和取出 take 蛋糕, Table 类还声明了 int 类型的字段 tail, head, count, 含义如下:

        tail 字段 : 表示下一次放置蛋糕的位置

        head 字段 : 表示下一次取出蛋糕的位置

        count 字段 : 表示当前桌子放置的蛋糕的个数

public class Table {

private final String[] buffer;

private int tail;

private int head;

private int count;

public Table(int count) {

this.buffer = new String[count];

this.tail = 0;

this.head = 0;

this.count = 0;

}

// 放置蛋糕

public synchronized void put(String cake) throws InterruptedException {

System.out.println(Thread.currentThread().getName() + ", puts" + cake);

// 桌子上的蛋糕过多, 等待客人消费

while (count >= buffer.length) {

wait();

}

buffer[tail] = cake;

tail = (tail + 1) % buffer.length;

count++;

// 唤醒客人线程吃蛋糕

notifyAll();

}

// 拿取蛋糕

public synchronized String take() throws InterruptedException {

// 桌子上无蛋糕, 等待糕点师制作

while (count <= 0) {

wait();

}

String cake = buffer[head];

head = (head + 1) % buffer.length;

count--;

notifyAll();

System.out.println(Thread.currentThread().getName() + ", takes" + cake);

return cake;

}

}

2.5 运行结果

        运行结果如图, 你会发现 EaterThread 是按照 MakerThread 放置蛋糕的顺序取蛋糕的

Maker-1, puts[Cake No.1 by Maker-1 ] 制作1号蛋糕

Enter-3, takes[Cake No.1 by Maker-1 ] 消费1号蛋糕

Maker-3, puts[Cake No.2 by Maker-3 ]

Enter-1, takes[Cake No.2 by Maker-3 ]

Maker-2, puts[Cake No.3 by Maker-2 ]

Enter-2, takes[Cake No.3 by Maker-2 ]

三. Producer-Consumer模式中的登场的角色

3.1 Data数据角色

        Data 角色有 Producer 角色产生, 供 Consuemr 角色使用, 在示例程序中, 由 String 类(蛋糕)扮演此角色

3.2 Producer生产者角色

        Producer 角色生成的 Data 角色, 并将其传递给 Channel 角色, 在示例程序中, 由 MakerThread 扮演此角色

3.3 Consumer消费者角色

        Consumer 角色从 Channel 角色获取 Data 角色并使用, 在示例程序中, 有 EaterThread 扮演此角色

3.4 Channel通道角色

        Channel 角色保管从 Producer 角色获取的 Data 角色, 还会响应 Consumer 角色的请求, 传递 Data 角色, 为了确保程序的安全性, Channel 角色会对 Producer 角色和 Consumer 角色的访问执行互斥处理

        当 Producer 角色将 Data 角色传递给 Channle 角色时, 如果 Channel 角色的状态不适合接受 Data 角色, 那么 Producer 角色会一直等待, 直到 Channle 角色的状态变为可接收为止

        当 Consumer 角色从 Channle 角色获取 Data 角色时, 如果 Channle 角色中没有可以传递的 Data 角色, 那么 Consumer 角色也会一直等待, 直到 Channle 角色的状态变为可以传递 Data 角色为止

        当存在多个Producer-Consumer角色时, 为了避免各处理的互相影响, Channel 角色需要执行互斥处理

        这样来看, Channle 角色位于 Producer 角色和 Consumer 角色之间, 承担用于传递 Data 角色的通道任务

        模式类图如下:

        

3.5 为什么不可以直接传递

         可能会有疑问, 为什么 Producer 角色不可以直接调用 Consumer 角色的方法?

        Consumer 角色想要获取 Data 角色, 通常都是因为想使用这些 Data 角色来执行某些处理, 如果 Producer 角色直接调用 Consumer 角色的方法, 那么执行处理的就不再是 Consumer 角色的线程, 而是 Producer 角色的线程, 这样一来, 执行处理花费的时间就必须由 Producer 角色的线程来承担, 准备下一次数据的处理也会相应的发生延迟, 这样会让程序响应性变得很差, 耦合太高

        正因为有 Channle 角色的存在, Producer 角色和 Consumer 角色的这些线程才能够保持协调运行

        RocketMq, RabbitMq 以及 线程池 使用的都是这种模式

        说简单点, 直接调用, 就好比是糕点师做好蛋糕, 直接交给客人, 在等待客人吃完后, 再生产下一个蛋糕, 那这样糕点店可以关门大吉了

四. 使用juc包下的工具类简化Table类

        在理解上面的 Producer-Consumer 模式后, 我们可以使用 juc 包下的类简化 Channel 角色的编写

public class Table extends ArrayBlockingQueue {

public Table(int count) {

super(count);

}

// 放置蛋糕

@Override

public void put(String cake) throws InterruptedException {

System.out.println(Thread.currentThread().getName() + ", puts" + cake);

super.put(cake);

}

// 拿取蛋糕

@Override

public String take() throws InterruptedException {

String cake = super.take();

System.out.println(Thread.currentThread().getName() + ", takes" + cake);

return cake;

}

}

精彩内容

评论可见,请评论后查看内容,谢谢!!!
 您阅读本篇文章共花了: