本文由 发布,转载请注明出处,如有问题请联系我们! 发布时间: 2021-08-01实现java线程通信的几种方式-讲解java多线程共享数据
序
在开发设计中,难以避免会碰到这种的情景:全部的子进程在完成后都必须通告主线任务程来解决一些逻辑性。
或是进程a已经实行,直至某一标准通告进程b实行某一实际操作。
能够利用下列方法完成:
等候通告体制
等候方式是Java中精选的线程通信方式。
2个进程根据启用同一目标上的wait()和notify()方式开展通讯。
比如,2个进程更替打印出奇偶校验码:
public class TwoThreadWaitNotify { private int start = 1; private boolean flag = false; public static void main(String[] args) { TwoThreadWaitNotify twoThread = new TwoThreadWaitNotify(); Thread t1 = new Thread(new OuNum(twoThread)); t1.setName("A"); Thread t2 = new Thread(new JiNum(twoThread)); t2.setName("B"); t1.start(); t2.start(); } /** * 双数进程 */ public static class OuNum implements Runnable { private TwoThreadWaitNotify number; public OuNum(TwoThreadWaitNotify number) { this.number = number; } @Override public void run() { while (number.start輸出結果:
t2 - 单数93t1 - 双数94t2 - 单数95t1 - 双数96t2 - 单数97t1 - 双数98t2 - 单数99t1 - 双数100这儿,进程A和进程B都为同一个目标TwoThreadWaitNotify.class获得锁,进程A启用同歩目标的wait()方式释放出来锁并进到WAITING情况。
进程B启用了notify()方式,那样进程A接到通告后就可以从wait()方式回到。
TwoThreadWaitNotify.class目标用以进行在这里的通讯。
有一些事儿需要留意:
wait() ,notify(),notifyAll() 启用的条件全是得到了目标的锁(也可称之为目标监控器)。启用 wait() 方式后进程会释放出来锁,进到 WAITING 情况,该进程也会被挪动到等候序列中。启用 notify() 方式会将等候序列中的进程挪动到同歩序列中,线程状态也会升级为 BLOCKED从 wait() 方式回到的先决条件是启用 notify() 方式的进程释放出来锁,wait() 方式的进程得到锁。等候通告有一个有趣的案例:
做为顾客的进程:
获得目标的锁。进到 while(分辨标准),并启用 wait() 方式。当标准达到跳出循环实行实际解决逻辑性。b .进程做为经营者:
获得目标锁。变更与进程 A 同用的判定标准。启用 notify() 方式。伪代码如下所示:
//Thread Asynchronized(Object){ while(标准){ Object.wait(); } //do something}//Thread Bsynchronized(Object){ 标准=false;//更改标准 Object.notify();}Join()方式
private static void join() throws InterruptedException { Thread t1 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; Thread t2 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running2"); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } } }) ; t1.start(); t2.start(); //等候进程1停止 t1.join(); //等候进程2停止 t2.join(); LOGGER.info("main over"); }輸出結果:
2018-03-16 20:21:30.967 [Thread-1] INFO c.c.actual.ThreadCommunication - running22018-03-16 20:21:30.967 [Thread-0] INFO c.c.actual.ThreadCommunication - running2018-03-16 20:21:34.972 [main] INFO c.c.actual.ThreadCommunication - main over在t1.join()处,它将堵塞直至t1强制执行,因而主线任务程将等候t1和t2进程实行。
实际上从源码能够看得出join()也是一种等候通告体制:
关键逻辑性:
while (isAlive()) { wait(0); }电极连接线程进行后,将启用notifyAll()方式,这也是在JVM完成中启用的,因此这儿看不见。
易失性共享内存。
由于Java应用共享内存开展线程通信,因此主线任务程能够利用下列方法关掉A进程:
public class Volatile implements Runnable{ private static volatile boolean flag = true ; @Override public void run() { while (flag){ System.out.println(Thread.currentThread().getName() "已经运作。。。"); } System.out.println(Thread.currentThread().getName() "实行结束"); } public static void main(String[] args) throws InterruptedException { Volatile aVolatile = new Volatile(); new Thread(aVolatile,"thread A").start(); System.out.println("main 进程已经运作") ; TimeUnit.MILLISECONDS.sleep(100) ; aVolatile.stopThread(); } private void stopThread(){ flag = false ; }}輸出結果:
thread A已经运作。。。thread A已经运作。。。thread A已经运作。。。thread A已经运作。。。thread A实行结束这儿的标示储存在主运行内存中,因此主线任务程和进程a都能够看见它。
标示用volatile装饰设计,主要是为了更好地运行内存由此可见性,其他信息能够在这儿寻找。
CountDownLatch高并发专用工具。
CountDownLatch能够完成和join一样的作用,可是更为灵便。
private static void countDownLatch() throws Exception{ int thread = 3 ; long start = System.currentTimeMillis(); final CountDownLatch countDown = new CountDownLatch(thread); for (int i= 0 ;i輸出結果:
2018-03-16 20:19:44.126 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run2018-03-16 20:19:44.126 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run2018-03-16 20:19:44.126 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run2018-03-16 20:19:46.136 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end2018-03-16 20:19:46.136 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end2018-03-16 20:19:46.136 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end2018-03-16 20:19:46.136 [main] INFO c.c.actual.ThreadCommunication - main over total time=2012CountDownLatch也是根据AQS(抽象性序列同步控制器)完成的,大量的完成参照了可重入锁的运用基本原理。
复位一个 CountDownLatch 时告知高并发的进程,随后在每一个进程重新处理以后启用 countDown() 方式。该方式会将 AQS 内嵌的一个 state 情况 -1 。最后在主线任务程启用 await() 方式,它会堵塞直至 state == 0 的情况下回到。CyclicBarrier高并发专用工具
private static void cyclicBarrier() throws Exception { CyclicBarrier cyclicBarrier = new CyclicBarrier(3) ; new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); new Thread(new Runnable() { @Override public void run() { LOGGER.info("thread run"); try { Thread.sleep(5000); cyclicBarrier.await() ; } catch (Exception e) { e.printStackTrace(); } LOGGER.info("thread end do something"); } }).start(); LOGGER.info("main thread"); }CyclicBarrier的中文名称是Barrier或fence,还可以用以线程间通信。
它能够等候n个进程做到某一情况,随后再次运作。
最先复位进程参加者。启用 await() 可能在全部参加者进程都启用以前等候。直至全部参加者都启用了 await() 后,全部进程从 await() 回到再次事后逻辑性。运作結果:
2018-03-18 22:40:00.731 [Thread-0] INFO c.c.actual.ThreadCommunication - thread run2018-03-18 22:40:00.731 [Thread-1] INFO c.c.actual.ThreadCommunication - thread run2018-03-18 22:40:00.731 [Thread-2] INFO c.c.actual.ThreadCommunication - thread run2018-03-18 22:40:00.731 [main] INFO c.c.actual.ThreadCommunication - main thread2018-03-18 22:40:05.741 [Thread-0] INFO c.c.actual.ThreadCommunication - thread end do something2018-03-18 22:40:05.741 [Thread-1] INFO c.c.actual.ThreadCommunication - thread end do something2018-03-18 22:40:05.741 [Thread-2] INFO c.c.actual.ThreadCommunication - thread end do something能够见到,因为在其中一个进程早已休眠状态了五秒左右,全部别的进程都务必等候这一进程启用await()。
这一专用工具能够完成和CountDownLatch一样的作用,可是更为灵便。您乃至能够启用reset()方式来重设CyclicBarrier(您必须自身捕捉BrokenBarrierException解决),随后再度实行它。
进程回应终断。
public class StopThread implements Runnable { @Override public void run() { while ( !Thread.currentThread().isInterrupted()) { // 进程实行实际逻辑性 System.out.println(Thread.currentThread().getName() "运作中。。"); } System.out.println(Thread.currentThread().getName() "撤出。。"); } public static void main(String[] args) throws InterruptedException { Thread thread = new Thread(new StopThread(), "thread A"); thread.start(); System.out.println("main 进程已经运作") ; TimeUnit.MILLISECONDS.sleep(10) ; thread.interrupt(); }}輸出結果:
thread A运作中。。thread A运作中。。thread A撤出。。您能够根据终断进程来开展通讯。启用thread.interrupt()方式事实上将进程中的标示特性设定为true。
这并不代表着启用这一方式能够终断进程。如果不回应这一标示,就没实际效果(这儿分辨这一标示)。
可是,假如引起了终断出现异常,JVM会将该标示重设为false。
线程池的AwaitTermination()方式。
假如线程池用以管理方法进程,主线任务程能够利用下列方法等候线程池中的全部每日任务实行:
private static void executorService() throws Exception{ BlockingQueue queue = new LinkedBlockingQueue(10) ; ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5,5,1, TimeUnit.MILLISECONDS,queue) ; poolExecutor.execute(new Runnable() { @Override public void run() { LOGGER.info("running"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } } }); poolExecutor.execute(new Runnable() { @Override public void run() { LOGGER.info("running2"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } }); poolExecutor.shutdown(); while (!poolExecutor.awaitTermination(1,TimeUnit.SECONDS)){ LOGGER.info("进程仍在实行。。。"); } LOGGER.info("main over"); }輸出結果:
2018-03-16 20:18:01.273 [pool-1-thread-2] INFO c.c.actual.ThreadCommunication - running22018-03-16 20:18:01.273 [pool-1-thread-1] INFO c.c.actual.ThreadCommunication - running2018-03-16 20:18:02.273 [main] INFO c.c.actual.ThreadCommunication - 进程仍在实行。。。2018-03-16 20:18:03.278 [main] INFO c.c.actual.ThreadCommunication - 进程仍在实行。。。2018-03-16 20:18:04.278 [main] INFO c.c.actual.ThreadCommunication - main over应用此awaitTermination()方式的前提条件是关掉线程池,比如启用shutdown()方式。
启用shutdown()后,线程池将暂停接纳新每日任务,线程池中已经有的目标将成功关掉。
通讯管路
public static void piped() throws IOException { //朝向于标识符 PipedInputStream 朝向于字节数 PipedWriter writer = new PipedWriter(); PipedReader reader = new PipedReader(); //I/O流创建联接 writer.connect(reader); Thread t1 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running"); try { for (int i = 0; i < 10; i ) { writer.write(i "); Thread.sleep(10); } } catch (Exception e) { } finally { try { writer.close(); } catch (IOException e) { e.printStackTrace(); } } } }); Thread t2 = new Thread(new Runnable() { @Override public void run() { LOGGER.info("running2"); int msg = 0; try { while ((msg = reader.read()) != -1) { LOGGER.info("msg={}", (char) msg); } } catch (Exception e) { } } }); t1.start(); t2.start(); }輸出結果:
2018-03-16 19:56:43.014 [Thread-0] INFO c.c.actual.ThreadCommunication - running2018-03-16 19:56:43.014 [Thread-1] INFO c.c.actual.ThreadCommunication - running22018-03-16 19:56:43.130 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=02018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=12018-03-16 19:56:43.132 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=22018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=32018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=42018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=52018-03-16 19:56:43.133 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=62018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=72018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=82018-03-16 19:56:43.134 [Thread-1] INFO c.c.actual.ThreadCommunication - msg=9Java尽管是根据运行内存通讯,可是还可以应用生产流水线通讯。
必须特别注意的是,必须先联接键入流和輸出流。那样,进程b能够从进程a接受信息..
在具体开发设计中,能够按照要求灵便挑选最适用的线程通信方法。








