在多线程程序设计中,经常会遇到一个线程等待一个或多个线程的场景,遇到这样的场景应该如何解决?
如果是一个线程等待一个线程,则可以通过await()和notify()来实现;
如果是一个线程等待多个线程,则就可以使用CountDownLatch和CyclicBarrier来实现比较好的控制。
下面来详细描述下CountDownLatch的应用场景:
例如:百米赛跑:8名运动员同时起跑,由于速度的快慢,肯定有会出现先到终点和晚到终点的情况,而终点有个统计成绩的仪器,当所有选手到达终点时,它会统计所有人的成绩并进行排序,然后把结果发送到汇报成绩的系统。
其实这就是一个CountDownLatch的应用场景:一个线程或多个线程等待其他线程运行达到某一目标后进行自己的下一步工作,而被等待的“其他线程”达到这个目标后继续自己下面的任务。
这个场景中:
1. 被等待的“其他线程”------>8名运动员
2. 等待“其他线程”的这个线程------>终点统计成绩的仪器
那么,如何来通过CountDownLatch来实现上述场景的线程控制和调度呢?
jdk中CountDownLatch类有一个常用的构造方法:CountDownLatch(int count);
两个常用的方法:await()和countdown()
其 中count是一个计数器中的初始化数字,比如初始化的数字是2,当一个线程里调用了countdown(),则这个计数器就减一,当线程调用了 await(),则这个线程就等待这个计数器变为0,当这个计数器变为0时,这个线程继续自己下面的工作。下面是上述CountDownLatch场景的 实现:
Work类(运动员):
import java.util.concurrent.CountDownLatch;
public class Work implements Runnable { private int id; private CountDownLatch beginSignal; private CountDownLatch endSignal; public Work(int id, CountDownLatch begin, CountDownLatch end) { this.id = id; this.beginSignal = begin; this.endSignal = end; }
@Override public void run() { try { beginSignal.await(); System.out.println("起跑..."); System.out.println("work" + id + "到达终点"); endSignal.countDown(); System.out.println("work" + id + "继续干其他事情"); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }}
Main类(终点统计仪器):
import java.util.concurrent.CountDownLatch;
public class Main { public static void main(String[] args) { CountDownLatch begSignal = new CountDownLatch(1); CountDownLatch endSignal = new CountDownLatch(8); for (int i = 0; i < 8; i++) { new Thread(new Work(i, begSignal, endSignal)).start(); } try { begSignal.countDown(); //统一起跑 endSignal.await(); //等待运动员到达终点 System.out.println("结果发送到汇报成绩的系统"); } catch (InterruptedException e) { e.printStackTrace(); } }}
下面详细描述下CyclicBarrier的应用场景:
有四个游戏玩家玩游戏,游戏有三个关卡,每个关卡必须要所有玩家都到达后才能允许通关。
其 实这个场景里的玩家中如果有玩家A先到了关卡1,他必须等待其他所有玩家都到达关卡1时才能通过,也就是说线程之间需要互相等待,这和 CountDownLatch的应用场景有区别,CountDownLatch里的线程是到了运行的目标后继续干自己的其他事情,而这里的线程需要等待其 他线程后才能继续完成下面的工作。
jdk中CyclicBarrier类有两个常用的构造方法:
1. CyclicBarrier(int parties)
这里的parties也是一个计数器,例如,初始化时parties里的计数是3,于是拥有该CyclicBarrier对象的线程当parties的计数为3时就唤醒,注:这里parties里的计数在运行时当调用CyclicBarrier:await()时,计数就加1,一直加到初始的值
2. CyclicBarrier(int parties, Runnable barrierAction)
这里的parties与上一个构造方法的解释是一样的,这里需要解释的是第二个入参(Runnable barrierAction),这个参数是一个实现Runnable接口的类的对象,也就是说当parties加到初始值时就出发barrierAction的内容。
下面来实现上述的应用场景:
Player类(玩家类)
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Player implements Runnable {
private CyclicBarrier cyclicBarrier;
private int id;
public Player(int id, CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
this.id = id;
}
@Override
public void run() {
try {
System.out.println("玩家" + id + "正在玩第一关...");
cyclicBarrier.await();
System.out.println("玩家" + id + "进入第二关...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
GameBarrier类(关卡类,这里控制玩家必须全部到达第一关结束的关口才能进入第二关)
import java.util.concurrent.CyclicBarrier;
public class GameBarrier { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(4, new Runnable() { @Override public void run() { System.out.println("所有玩家进入第二关!"); } }); for (int i = 0; i < 4; i++) { new Thread(new Player(i, cyclicBarrier)).start(); } }}
Semaphore当前在多线程环境下被扩放使用,操作系统的信号量是个很重要的概念,在进程控制方面都有应用。Java 并发库 的Semaphore 可以很轻松完成信号量控制,Semaphore可以控制某个资源可被同时访问的个数,通过 acquire() 获取一个许可,如果没有就等待,而 release() 释放一个许可。比如在Windows下可以设置共享文件的最大客户端访问个数。
Semaphore实现的功能就类似厕所有5个坑,假如有10个人要上厕所,那么同时只能有多少个人去上厕所呢?同时只能有5个人能够占用,当5个人中 的任何一个人让开后,其中等待的另外5个人中又有一个人可以占用了。另外等待的5个人中可以是随机获得优先机会,也可以是按照先来后到的顺序获得机会,这取决于构造Semaphore对象时传入的参数选项。单个信号量的Semaphore对象可以实现互斥锁的功能,并且可以是由一个线程获得了“锁”,再由另一个线程释放“锁”,这可应用于死锁恢复的一些场合。
Semaphore维护了当前访问的个数,提供同步机制,控制同时访问的个数。在数据结构中链表可以保存“无限”的节点,用Semaphore可以实现有限大小的链表。另外重入锁 ReentrantLock 也可以实现该功能,但实现上要复杂些。下面的Demo中申明了一个只有5个许可的Semaphore,而有20个线程要访问这个资源,通过acquire()和release()获取和释放访问许可。
package com.test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class TestSemaphore {
public static void main(String[] args) {
// 线程池
ExecutorService exec = Executors.newCachedThreadPool();
// 只能5个线程同时访问
final Semaphore semp = new Semaphore(5);
// 模拟20个客户端访问
for (int index = 0; index < 20; index++) {
final int NO = index;
Runnable run = new Runnable() {
public void run() {
try {
// 获取许可
semp.acquire();
System.out.println("Accessing: " + NO);
Thread.sleep((long) (Math.random() * 10000));
// 访问完后,释放
semp.release();
System.out.println("-----------------"+semp.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
exec.execute(run);
}
// 退出线程池
exec.shutdown();
}
}
执行结果如下:
Accessing: 0
Accessing: 1
Accessing: 3
Accessing: 4
Accessing: 2
-----------------0
Accessing: 6
-----------------1
Accessing: 7
-----------------1
Accessing: 8
-----------------1
Accessing: 10
-----------------1
Accessing: 9
-----------------1
Accessing: 5
-----------------1
Accessing: 12
-----------------1
Accessing: 11
-----------------1
Accessing: 13
-----------------1
Accessing: 14
-----------------1
Accessing: 15
-----------------1
Accessing: 16
-----------------1
Accessing: 17
-----------------1
Accessing: 18
-----------------1
Accessing: 19