系列文章:
生产者消费者问题(第一部分)
生产者消费者问题也称为有限缓冲问题,是线程同步的一个经典问题:生产者线程和消费者线程共享一块固定大小的缓存,生产者负责生成产品然后存入共享缓冲区中,消费者负责从共享缓冲区中取出产品进行消费。该问题的关键在于生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区空时消耗数据。
要解决这个问题就必须:让生产者在缓冲区满时休眠,等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样地,让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。
解决生产者消费者问题的方法有很多,这里先介绍最简单的一种,后续的文章中会陆续给出其他的解决方案
1、Object的wait
和notify
方法
Object.wait()
和Thread.sleep()
方法在功能上很相似,它们都会导致线程挂起。
但是Thread.sleep()
可以指定线程被挂起的时间,当然Object.wait()
也有一个重载的方法也可以指定被挂起的时间。
可Thread.sleep()
挂起时不会释放线程占有的资源(不会释放锁),而Object.wait()
会暂时释放线程所占有的资源(会释放锁)。
因此Object.wait()
调用后其他线程就可以进入synchronized
同步代码块执行了。
而Object.notify()
就是用来唤醒因调用Object.wait()
而挂起的一个线程,另外还有一个Object.notifyAll()
方法用来唤醒所有因调用Object.wait()
而挂起的线程。
使用Object.wait()
方法和notifyAll()
方法来实现线程的休眠和唤醒。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| import java.util.Random;
public class ProducerConsumer { private static final int BUFFER_SIZE = 100; static int[] buffer = new int[BUFFER_SIZE]; static int head, tail = 0; static int count = 0;
static class Producer implements Runnable { Random random = new Random();
public void run() { while (true) { synchronized (buffer) { while (count >= buffer.length) { try { buffer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int product = random.nextInt(10); System.out.println("Producer: 我生产了一个随机数" + product); buffer[tail] = product; tail = (tail + 1) % buffer.length; count++; buffer.notifyAll(); } } } }
static class Consumer implements Runnable { public void run() { while (true) { synchronized (buffer) { while (count <= 0) { try { buffer.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int product = buffer[head]; head = (head + 1) % buffer.length; count--; System.out.println("Consumer: 我消费了一个随机数" + product); buffer.notifyAll(); } } } }
public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } }
|
2、对共享缓冲区进行封装
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| import java.util.Random;
public class ProducerConsumer { static class Buffer { private int[] buffer; private int head = 0, tail = 0; private int count = 0;
public Buffer(int size) { buffer = new int[size]; }
public synchronized void put(int data) { while (count >= buffer.length) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } buffer[tail] = data; tail = (tail + 1) % buffer.length; count++; notifyAll(); }
public synchronized int take() { while (count <= 0) { try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } int data = buffer[head]; head = (head + 1) % buffer.length; count--; notifyAll(); return data; } }
static Buffer buffer = new Buffer(10);
static class Producer implements Runnable { Random random = new Random();
public void run() { while (true) { int product = random.nextInt(100); buffer.put(product); System.out.println("Producer: 我生产了一个随机数" + product); } } }
static class Consumer implements Runnable { public void run() { while (true) { int product = buffer.take(); System.out.println("Consumer: 我消费了一个随机数" + product); } } }
public static void main(String[] args) { new Thread(new Producer()).start(); new Thread(new Consumer()).start(); } }
|
3、BlockingQueue
在java.util.concurrent
包中有很多类似于上面的Buffer的数据结构,不同的是它们大都使用并发库中的ReentrantLock
实现线程的互斥访问。通常它们都是BlockingQueue
接口的实现类:
BlockingQueue.put()
和BlockingQueue.take()
方法和上面的我写的例子中的Buffer.put()
和Buffer.take()
方法基本类似,不同之处是BlockingQueue.put()
和BlockingQueue.take()
把InterruptedException
抛出来交给外部处理。
在后续的文章中我会对ReentrantLock和这一系列BlockingQueue进行简单的使用和原理分析。
关于java.util.concurrent包中的集合类页可以参考我的这篇文章Java 集合框架总结与巩固
本作品采用 知识共享署名 4.0 国际许可协议 进行许可。
转载时请注明原文链接:https://blog.hufeifei.cn/2017/06/Java/multithread/05-Provider-Consumer/