Java线程间通讯概述
当前位置:以往代写 > JAVA 教程 >Java线程间通讯概述
2019-06-14

Java线程间通讯概述

Java线程间通讯概述

副标题#e#

这个故事源自一个很简朴的想法:建设一个对开拓人员友好的、简朴轻量的线程间通讯框架,完全不 用锁、同步器、信号量、期待和通知,在Java里开拓一个轻量、无锁的线程内通讯框架;而且也没有行列 、动静、事件或任何其他并发专用的术语或东西。

只用普通的老式Java接话柄现POJO的通讯。

它大概跟Akka的范例化actor雷同,但作为一个必需超等轻量,而且要针对单台多核计较机举办优化的 新框架,谁人大概有点过了。

当actor超过差异JVM实例(在同一台呆板上,或漫衍在网络上的差异呆板上)的历程界线时,Akka框 架很善于处理惩罚历程间的通讯。

但对付那种只需要线程间通讯的小型项目而言,用Akka范例化actor大概有点儿像用牛刀杀鸡,不外类 型化actor仍然是一种抱负的实现方法。

我花了几天时间,用动态署理,阻塞行列缓和存线程池建设了一个办理方案。

图一是这个框架的高条理架构:

Java线程间通讯概述

图一: 框架的高条理架构

SPSC行列是指单一出产者/单一消费者行列。MPSC行列是指多出产者/单一消费者行列。

派发线程认真吸收Actor线程发送的动静,并把它们派发到对应的SPSC行列中去。

吸收到动静的Actor线程用个中的数据挪用相应的actor实例中的要领。借助其他actor的署理,actor 实例可以将动静发送到MPSC行列中,然后动静会被发送给方针actor线程。

我建设了一个简朴的例子来测试,就是下面这个打乒乓球的措施:

public interface PlayerA (
  void pong(long ball); //发完就忘的要领挪用 
}
public interface PlayerB {   
  void ping(PlayerA playerA, long ball); //发完就忘的要领挪用 
}    
public class PlayerAImpl implements PlayerA {    
  @Override    
  public void pong(long ball) {    
  }    
}
public class PlayerBImpl implements PlayerB {   
  @Override    
  public void ping(PlayerA playerA, long ball) {    
    playerA.pong(ball);    
  }    
}
public class PingPongExample {   
  public void testPingPong() {
    // 打点器埋没了线程间通讯的巨大性
    // 节制actor署理,actor实现和线程  
    ActorManager manager = new ActorManager();
    // 在打点器内注册actor实现 
    manager.registerImpl(PlayerAImpl.class);    
    manager.registerImpl(PlayerBImpl.class);
    //建设actor署理。署剖析将要领挪用转换成内部动静。 
    //会在线程间发给特定的actor实例。    
    PlayerA playerA = manager.createActor(PlayerA.class);    
    PlayerB playerB = manager.createActor(PlayerB.class);    
    for(int i = 0; i < 1000000; i++) {    
       playerB.ping(playerA, i);     
   }    
}


#p#副标题#e#

颠末测试,速度约莫在每秒500,000 次乒/乓阁下;还不错吧。然而跟单线程的运行速度比起来,我突 然就感受没那么好了。在 单线程 中运行的代码每秒速度能到达20亿 (2,681,850,373) !

居然差了5,000 多倍。太让我失望了。在大大都环境下,单线程代码的结果都比多线程代码更高效。

我开始找原因,想看看我的乒乓球运带动们为什么这么慢。颠末一番调研和测试,我发明是阻塞行列 的问题,我用来在actor间通报动静的行列影响了机能。

Java线程间通讯概述

图 2: 只有一个出产者和一个消费者的SPSC行列

所以我提倡了一场比赛,要将它换成Java里最快的行列。我发明白Nitsan Wakart的 博客 。他发了几篇文章先容单一出产者/单一消费者 (SPSC)无锁行列的实现。这些文章受到了Martin Thompson的演讲 终极机能的无锁算法的开导 。

跟基于私有锁的行列对比,无锁行列的机能更优。在基于锁的行列中,当一个线程获得锁时,其它线 程就要等着锁被释放。而在无锁的算法中,某个出产者线程出产动静时不会阻塞其它出产者线程,消费者 也不会被其它读取行列的消费者阻塞。

在Martin Thompson的演讲以及在Nitsan的博客中先容的SPSC行列的机能的确令人难以置信 —— 高出了100M ops/sec。比JDK的并刊行列实现还要快10倍 (在4核的 Intel Core i7 上的机能约莫在 8M ops/sec 阁下)。

我怀着极大的期望,将所有actor上毗连的链式阻塞行列都换成了无锁的SPSC行列。惋惜,在吞吐量上 的机能测试并没有像我预期的那样呈现大幅晋升。不外很快我就意识到,瓶颈并不在SPSC行列上,而是在 多个出产者/单一消费者(MPSC)哪里。

用SPSC行列做MPSC行列的任务并不那么简朴;在做put操纵时,多个出产者大概会包围掉互相的值。 SPSC 行列就没有节制多个出产者put操纵的代码。所以即便换成最快的SPSC行列,也办理不了我的问题。

为了处理惩罚多个出产者/单一消费者的环境,我抉择启用LMAX Disruptor ——一个基于环形缓冲区的高机能进 程间动静库。

Java线程间通讯概述

图3: 单一出产者和单一消费者的LMAX Disruptor

#p#分页标题#e#

借助Disruptor,很容易实现低延迟、高吞吐量的线程间动静通讯。它还为出产者和消费者的差异组合 提供了差异的用例。几个线程可以互不阻塞地读取环形缓冲中的动静:

Java线程间通讯概述

图 4: 单一出产者和两个消费者的LMAX Disruptor    

下面是有多个出产者写入环形缓冲区,多个消费者从中读打动静的场景。

Java线程间通讯概述

图 5: 两个出产者和两个消费者的LMAX Disruptor

#p#副标题#e#

颠末对机能测试的快速搜索,我找到了 三个宣布者和一个消费者的吞吐量测试。 这个真是正合我意,它给出了下面这个功效:

Java线程间通讯概述

在3 个出产者/1个 消费者场景下, Disruptor要比LinkedBlockingQueue快两倍多。然而这跟我所期 望的机能上晋升10倍仍有很大差距。

这让我以为很沮丧,而且我的大脑一直在搜寻办理方案。就像掷中注定一样,我最近不在跟人拼车上 下班,而是改乘地铁了。溘然灵光一闪,我的大脑开始将车站跟出产者消费者对应起来。在一个车站里, 既有出产者(车和下车的人),也有消费者(同一辆车和上车的人)。

我建设了 Railway类,并用AtomicLong追踪从一站到下一站的列车。我先从简朴的场景开始,只有一 辆车的铁轨。

public class RailWay {  
 private final Train train = new Train();  
 // stationNo追踪列车并界说哪个车站吸收到了列车
 private final AtomicInteger stationIndex = new AtomicInteger();
// 会有多个线程会见这个要领,并期待特定车站上的列车 
public Train waitTrainOnStation(final int stationNo) {
  
   while (stationIndex.get() % stationCount != stationNo) {
    Thread.yield(); // 为担保高吞吐量的动静通报,这个是必需的。
                   //但在期待列车时它会耗损CPU周期 
   }  
   // 只有站号便是stationIndex.get() % stationCount时,这个忙轮回才会返回

   return train;
 }
// 这个要领通过增加列车的站点索引将这辆列车移到下一站
  public void sendTrain() {
    stationIndex.getAndIncrement();
   }
  }

为了测试,我用的条件跟在Disruptor机能测试顶用的一样,而且也是测的SPSC行列——测 试在线程间通报long值。我建设了下面这个Train类,个中包括了一个long数组:

public class Train {   
  //   
  public static int CAPACITY = 2*1024;
  private final long[] goodsArray; // 传输运输货品的数组

  private int index;

  public Train() {   
      goodsArray = new long[CAPACITY];     
 }

 public int goodsCount() { //返回货品数量    
  return index;    
 }    
 public void addGoods(long i) { // 向列车中添加条目    
  goodsArray[index++] = i;    
 }    
 public long getGoods(int i) { //从列车中移走条目    
  index--;    
  return goodsArray[i];    
 }    
}

#p#副标题#e#

然后我写了一个简朴的测试 :两个线程通过列车相互通报long值。

Java线程间通讯概述

图 6: 利用单辆列车的单一出产者和单一消费者Railway

public void testRailWay() {   
  final Railway railway = new Railway();    
  final long n = 20000000000l;    
  //启动一个消费者历程 
  new Thread() {    
   long lastValue = 0;
   @Override   
   public void run() {    
    while (lastValue < n) {    
      Train train = railway.waitTrainOnStation(1); //在#1站等列车
      int count = train.goodsCount();    
      for (int i = 0; i < count; i++) {    
        lastValue = train.getGoods(i); // 卸货   
      }    
      railway.sendTrain(); //将当前列车送到第一站 
     }    
   }    
 }.start();

final long start = System.nanoTime();
long i = 0;   
while (i < n) {    
 Train train = railway.waitTrainOnStation(0); // 在#0站等列车    
 int capacity = train.getCapacity();    
 for (int j = 0; j < capacity; j++) {    
   train.addGoods((int)i++); // 将货品装到列车上 
 }    
 railway.sendTrain();
 if (i % 100000000 == 0) { //每隔100M个条目丈量一次机能 
    final long duration = System.nanoTime() - start;    
    final long ops = (i * 1000L * 1000L * 1000L) / duration;    
    System.out.format("ops/sec = %,d\n", ops);    
    System.out.format("trains/sec = %,d\n", ops / Train.CAPACITY);    
    System.out.format("latency nanos = %.3f%n\n", 
    duration / (float)(i) * (float)Train.CAPACITY);    
  }    
 }    
}

在差异的列车容量下运行这个测试,功效惊着我了:

Java线程间通讯概述

在列车容量到达32,768时,两个线程传送动静的吞吐量到达了767,028,751 ops/sec。比Nitsan博客中 的SPSC行列快了几倍。

#p#分页标题#e#

继承按铁路列车这个思路思考,我想知道假如有两辆列车会怎么样?我以为应该能提高吞吐量,同时 还能低落延迟。每个车站城市有它本身的列车。当一辆列车在第一个车站装货时 ,第二辆列车会在第二个车站卸货,反之亦然。

Java线程间通讯概述

图 7: 利用两辆列车的单一出产者和单一消费者Railway

#p#副标题#e#

下面是吞吐量的功效:

Java线程间通讯概述

功效是惊人的;比单辆列车的功效快了1.4倍多。列车容量为一时,延迟从192.6纳秒低落到133.5纳秒 ;这显然是一个令人激昂的迹象。

因此我的尝试还没竣事。列车容量为2048的两个线程通报动静的延迟为2,178.4 纳秒,这太高了。我 在想如何低落它,建设一个有许多辆列车 的例子:

Java线程间通讯概述

图 8: 利用多辆列车的单一出产者和单一消费者Railway  

查察本栏目

我还把列车容量降到了1个long值,开始玩起了列车数量。下面是测试功效:

Java线程间通讯概述

用32,768 列车在线程间发送一个long值的延迟低落到了13.9 纳秒。通过调解列车数量和列车容量, 当延时不那么高,吞吐量不那么低时,吞吐量和延时就到达了最佳均衡。

对付单一出产者和单一消费者(SPSC)而言,这些数值很棒;但我们怎么让它在有多个出产者和消费者 时也能生效呢?谜底很简朴,添加更多的车站!

Java线程间通讯概述

图 9:一个出产者和两个消费者的Railway

#p#副标题#e#

每个线程都等着下一趟列车,装货/卸货,然后把列车送到下一站。在出产者往列车上装货时,消费者 在从列车上卸货。列车周而复始地从一个车站转到另一个车站。

为了测试单一出产者/多消费者(SPMC) 的环境,我建设了一个有8个车站的Railway测试。 一个车站属于一个出产者,而另 外7个车站属于消费者。功效是:

列车数量 = 256 ,列车容量 = 32:

 ops/sec = 116,604,397     延迟(纳秒) = 274.4

列车数量= 32,列车容量= 256:

 ops/sec = 432,055,469     延迟(纳秒) = 592.5

如你所见,即便有8个事情线程,测试给出的功效也相当好– 32辆容量为256个long的列车吞吐量为 432,055,469 ops/sec。在测试期间,所有CPU内核的负载都是100%。

Java线程间通讯概述

图 10:在测试有8个车站的Railway 期间的CPU 利用环境

在玩这个Railway算法时,我险些忘了我最初的方针:晋升多出产者/单消费者环境下的机能。

Java线程间通讯概述

图 11:三个出产者和一个消费者的 Railway  

我建设了3个出产者和1个消费者的新测试。 每辆列车一站一站地转圈,而每个出产者只给每辆车装1/3容量的货。消费者取出每辆车上三个出产者给 出的全部三项货品。机能测试给出的平均功效如下所示:

 ops/sec = 162,597,109  列车/秒 = 54,199,036     延迟(纳秒) = 18.5

功效相当棒。出产者和消费者事情的速度高出了160M ops/sec。

#p#副标题#e#

为了填补差别,下面给出沟通环境下的Disruptor功效- 3个出产者和1个消费者:

Run 0, Disruptor=11,467,889 ops/sec
Run 1, Disruptor=11,280,315 ops/sec
Run 2, Disruptor=11,286,681 ops/sec
Run 3, Disruptor=11,254,924 ops/sec

下面是另一个批量动静的Disruptor 3P:1C 测试 (10 条动静每批):

Run 0, Disruptor=116,009,280 ops/sec
Run 1, Disruptor=128,205,128 ops/sec
Run 2, Disruptor=101,317,122 ops/sec
Run 3, Disruptor=98,716,683 ops/sec;

最后是用带LinkedBlockingQueue 实现的Disruptor 在3P:1C场景下的测试功效:

Run 0, BlockingQueue=4,546,281 ops/sec
Run 1, BlockingQueue=4,508,769 ops/sec
Run 2, BlockingQueue=4,101,386 ops/sec
Run 3, BlockingQueue=4,124,561 ops/sec

如你所见,Railway方法的平均吞吐量是162,597,109 ops/sec,而Disruptor在同样的环境下的最好结 果只有128,205,128 ops/sec。至于 LinkedBlockingQueue,最好的功效只有4,546,281 ops/sec。

Railway算法为事件批处理惩罚提供了一种可以显著增加吞吐量的浅易步伐。通过调解列车容量或列车数量 ,很容易告竣想要的吞吐量/延迟。

别的, 当同一个线程可以用来消费动静,处理惩罚它们并向环中返回功效时,通过殽杂出产者和消费者, Railway也能用来处理惩罚巨大的环境:

Java线程间通讯概述

图 12: 殽杂出产者和消费者的Railway

最后,我会提供一个颠末优化的超高吞吐量 单出产者/单消费者测试:

Java线程间通讯概述

图 13:单个出产者和单个消费者的Railway

#p#分页标题#e#

它的平均功效为:吞吐量高出每秒15亿 (1,569,884,271)次操纵,延迟为1.3 微秒。如你所见,本文 开头描写的谁人局限沟通的单线程测试的功效是每秒2,681,850,373。

你本身想想结论是什么吧。

我但愿未来再写一篇文章,阐发如何用Queue和 BlockingQueue接口支持Railway算法,用来处理惩罚差异 的出产者和消费者组合。

    关键字:

在线提交作业