引言
开发设计中难免会碰到必须全部子线程执行结束通知主线程解决一些逻辑性的情景。
或是是线程 A 在执行到某一标准通知线程 B 执行某一实际操作。
可以利用下面几类方式完成:
等待通知体制
等待通知方式是 Java 中较为传统的线程通讯方式。
2个线程根据对同一目标启用等待 wait() 和通知 notify() 方式来开展通信。
如2个线程更替打印出奇数偶数:
public class TwoThreadWaitNotify {
private int start = 1;
private boolean flag = false;
public static void main(String[] args) {
TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify();
Thread t1 = new Thread(new OuNum(twoThread));
t1.setName(\"A\");
Thread t2 = new Thread(new JiNum(twoThread));
t2.setName(\"B\");
t1.start();
t2.start();
}
/**
* 双数线程
*/
public static class OuNum implements Runnable {
private TwoThreadWaitNotify number;
public OuNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println(\"偶数线程抢到锁了\");
if (number.flag) {
System.out.println(Thread.currentThread().getName() \" - 双数\" number.start);
number.start ;
number.flag = false;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
/**
* 单数线程
*/
public static class JiNum implements Runnable {
private TwoThreadWaitNotify number;
public JiNum(TwoThreadWaitNotify number) {
this.number = number;
}
@Override
public void run() {
while (number.start <= 100) {
synchronized (TwoThreadWaitNotify.class) {
System.out.println(\"单数线程抢到锁了\");
if (!number.flag) {
System.out.println(Thread.currentThread().getName() \" - 奇数\" number.start);
number.start ;
number.flag = true;
TwoThreadWaitNotify.class.notify();
}else {
try {
TwoThreadWaitNotify.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
}
}
輸出結果:
t2 - 单数93
t1 - 双数94
t2 - 单数95
t1 - 双数96
t2 - 单数97
t1 - 双数98
t2 - 单数99
t1 - 双数100
这儿的线程 A 和线程 B 都对同一个目标 TwoThreadWaitNotify.class 获得锁,A 线程启用了同歩目标的 wait() 方式释放出来了锁并进到 WAITING 情况。
B 线程启用了 notify() 方式,那样 A 线程接到通知以后就可以从 wait() 方式中回到。
这儿运用了 TwoThreadWaitNotify.class 目标完成了通讯。
有一些必须留意:
- wait() 、notify()、notifyAll() 启用的前提条件全是得到了另一半的锁(也可称之为目标监控器)。
- 启用 wait() 方式后线程会释放出来锁,进到 WAITING 情况,该线程也会被运动到等待序列中。
- 启用 notify() 方式会将等待序列中的线程挪动到同歩序列中,线程情况也会升级为 BLOCKED
- 从 wait() 方式回到的先决条件是启用 notify() 方式的线程释放出来锁,wait() 方式的线程得到锁。
等待通知拥有一个經典现代性:
线程 A 做为顾客:
- 获得目标的锁。
- 进到 while(分辨标准),并启用 wait() 方式。
- 当标准达到跳出循环执行实际解决逻辑性。
线程 B 做为经营者:
- 获得目标锁。
- 变更与线程 A 同用的判定标准。
- 启用 notify() 方式。
伪代码如下所示:
//Thread A
synchronized(Object){
while(标准){
Object.wait();
}
//do something
}
//Thread B
synchronized(Object){
条件=false;//更改标准
Object.notify();
}
join() 方式
private static void join() throws InterruptedException {
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"running\");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"running2\");
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) ;
t1.start();
t2.start();
//等待线程1停止
t1.join();
//等待线程2终止
t2.join();
LOGGER.info(\"main over\");
}
输出結果:
2018-03-16 20:21:30.967 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 20:21:30.967 [Thread-0] INFO c.c.actual.ThreadCommunication - running
2018-03-16 20:21:34.972 [main] INFO c.c.actual.ThreadCommunication - main over
在 t1.join() 的时候会一直堵塞到 t1 实行结束,因此最后主线程会等待 t1 和 t2 线程实行结束。
实际上从源代码可以看得出,join() 也是运用的等待通告体制:
关键逻辑性:
while (isAlive()) {
wait(0);
}
在 join 线程进行后会启用 notifyAll() 方式,是在 JVM 完成中启用,因此这儿看不出。
volatile 共享内存
由于 Java 是选用共享内存的形式开展线程通讯的,因此可以采取下列方法用主线程关掉 A 线程:
public class Volatile implements Runnable{
private static volatile boolean flag = true ;
@Override
public void run() {
while (flag){
System.out.println(Thread.currentThread().getName() \"已经运作。。。\");
}
System.out.println(Thread.currentThread().getName() \"实行结束\");
}
public static void main(String[] args) throws InterruptedException {
Volatile aVolatile = new Volatile();
new Thread(aVolatile,\"thread A\").start();
System.out.println(\"main 线程已经运作\") ;
TimeUnit.MILLISECONDS.sleep(100) ;
aVolatile.stopThread();
}
private void stopThread(){
flag = false ;
}
}
输出結果:
thread A已经运作。。。
thread A已经运作。。。
thread A已经运作。。。
thread A已经运作。。。
thread A实行结束
这儿的 flag 储放于主运行内存中,因此主线程和线程 A 都能够见到。
flag 选用 volatile 装饰主要是为了更好地运行内存由此可见性,大量內容可以查询这儿。
CountDownLatch 高并发专用工具
CountDownLatch 可以完成 join 同样的作用,可是更为的灵便。
private static void countDownLatch() throws Exception{
int thread = 3 ;
long start = System.currentTimeMillis();
final CountDownLatch countDown = new CountDownLatch(thread);
for (int i= 0 ;i<thread ; i ){
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"thread run\");
try {
Thread.sleep(2000);
countDown.countDown();
LOGGER.info(\"thread end\");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
countDown.await();
long stop = System.currentTimeMillis();
LOGGER.info(\"main over total time={}\",stop-start);
}
输出結果:
2018-03-16 20:19:44.126 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:44.126 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run
2018-03-16 20:19:46.136 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end
2018-03-16 20:19:46.136 [main] INFO c.c.actual.ThreadCommunication - main over total time=2012
CountDownLatch 也是根据 AQS(AbstractQueuedSynchronizer) 完成的,大量完成参照 ReentrantLock 完成基本原理
- 复位一个 CountDownLatch 时告知高并发的线程,随后在每一个线程处理完毕以后启用 countDown() 方式。
- 该方法会将 AQS 内嵌的一个 state 情况 -1 。
- 最后在主线程启用 await() 方式,它会堵塞直到 state == 0 的情况下回到。
CyclicBarrier 高并发专用工具
private static void cyclicBarrier() throws Exception {
CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ;
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"thread run\");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info(\"thread end do something\");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"thread run\");
try {
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info(\"thread end do something\");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"thread run\");
try {
Thread.sleep(5000);
cyclicBarrier.await() ;
} catch (Exception e) {
e.printStackTrace();
}
LOGGER.info(\"thread end do something\");
} }).start();
LOGGER.info(\"main thread\");
}
CyclicBarrier 中文名字称为天然屏障或是是护栏,还可以用以线程间通信。
它可以等待 N 个线程都做到某一情况后再次运行的实际效果。
- 最先复位线程参与者。
- 调用 await() 可能在全部参与者线程都调用以前等待。
- 直到全部参与者都调用了 await() 后,所有线程从 await() 回到再次后面逻辑性。
运行結果:
2018-03-18 22:40:00.731 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run
2018-03-18 22:40:00.731 [main] INFO c.c.actual.ThreadCommunication - main thread
2018-03-18 22:40:05.741 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end do something
2018-03-18 22:40:05.741 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end do something
可以看到因为在其中一个线程休眠状态了五秒,全部其他全部的线程都得等待这一线程调用 await() 。
该专用工具可以完成 CountDownLatch 一样的作用,可是要更为灵便。乃至可以调用 reset() 方式重设 CyclicBarrier (必须自主捕获 BrokenBarrierException 解决) 随后再次实行。
线程回应终断
public class StopThread implements Runnable {
@Override
public void run() {
while ( !Thread.currentThread().isInterrupted()) {
// 线程实行实际逻辑性
System.out.println(Thread.currentThread().getName() \"运行中。。\");
}
System.out.println(Thread.currentThread().getName() \"撤出。。\");
}
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new StopThread(), \"thread A\");
thread.start();
System.out.println(\"main 线程已经运行\") ;
TimeUnit.MILLISECONDS.sleep(10) ;
thread.interrupt();
}
}
輸出結果:
thread A运行中。。
thread A运行中。。
thread A撤出。。
可以选用终断线程的方法来通讯,调用了 thread.interrupt() 方式实际上便是将 thread 中的一个标示特性置为了更好地 true。
并不是说调用了该方式就可以终断线程,如果不对这一标示开展回应实际上是没什么功效(这儿对这一标示开展了分辨)。
可是假如抛出去了 InterruptedException 出现异常,该标示便会被 JVM 重设为 false。
线程池 awaitTermination() 方式
如果是用线程池来管理方法线程,可以应用下列方法来让主线程等待线程池里全部每日任务实行结束:
private static void executorService() throws Exception{
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10) ;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ;
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info(\"running\");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.execute(new Runnable() {
@Override
public void run() {
LOGGER.info(\"running2\");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
poolExecutor.shutdown();
while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){
LOGGER.info(\"线程仍在实行。。。\");
}
LOGGER.info(\"main over\");
}
輸出結果:
2018-03-16 20:18:01.273 [pool-1-thread-2] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 20:18:01.273 [pool-1-thread-1] INFO c.c.actual.ThreadCommunication - running
2018-03-16 20:18:02.273 [main] INFO c.c.actual.ThreadCommunication - 线程仍在实行。。。
2018-03-16 20:18:03.278 [main] INFO c.c.actual.ThreadCommunication - 线程仍在实行。。。
2018-03-16 20:18:04.278 [main] INFO c.c.actual.ThreadCommunication - main over
应用这一 awaitTermination() 方式的前提条件必须关掉线程池,如调用了 shutdown() 方式。
调用了 shutdown() 以后线程池会终止接纳新每日任务,而且会光滑的关掉线程池里目前的每日任务。
管路通讯
public static void piped() throws IOException {
//面对于标识符 PipedInputStream 面对于字节数
PipedWriter writer = new PipedWriter();
PipedReader reader = new PipedReader();
//I/O流创建联接
writer.connect(reader);
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"running\");
try {
for (int i = 0; i < 10; i ) {
writer.write(i \"\");
Thread.sleep(10);
}
} catch (Exception e) {
} finally {
try {
writer.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
LOGGER.info(\"running2\");
int msg = 0;
try {
while ((msg = reader.read()) != -1) {
LOGGER.info(\"msg={}\", (char) msg);
}
} catch (Exception e) {
}
}
});
t1.start();
t2.start();
}
输出結果:
2018-03-16 19:56:43.014 [Thread-0] INFO c.c.actual.ThreadCommunication - running
2018-03-16 19:56:43.014 [Thread-1] INFO c.c.actual.ThreadCommunication - running2
2018-03-16 19:56:43.130 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=0
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=1
2018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=2
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=3
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=4
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=5
2018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=6
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=7
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=8
2018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=9
Java 虽然是根据运行内存通讯的,但还可以应用管路通讯。
必须留意的是,键入流和输出流必须最先创建联接。那样进程 B 就可以接到进程 A 传出的最新消息了。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。