明天你会感谢今天奋力拼搏的你。
ヾ(o◕∀◕)ノヾ
线程池相关类的继承树结构如下:

Executor:接口,最上层的接口,只定义了一个接收Runnable对象的execute方法。
ExecutorService:接口,继承了Executor接口,拓展了Callable、Future、关闭方法,提供了生命周期管理和更广泛的执行方式。
ScheduledExecutorService:接口,继承了ExecutorService,增加了定时任务相关的方法
ThreadPoolExecutor:实现类,基础的线程池实现
ScheduledThreadPoolExecutor:实现类,继承ThreadPoolExecutor,实现了ScheduledExcutorService中相关定时任务的方法
执行器的执行方式主要有2个:
执行单个任务:submit方法用于执行Callable和Runnable,并提供返回值Future<?>
执行批量任务:invokeAll和invokeAny用于执行批量任务。all返回所有结果,any是其中任意一个首先执行成功就返回这个的结果。
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
5,//核心线程数
10,//最大线程数
0L,//超过核心线程数的线程,超过keepAliveTime时间,这个线程就会被销毁。
TimeUnit.MILLISECONDS,//keepAliveTime 的时间单位
new LinkedBlockingQueue<Runnable>() //传入的工作队列,任务了核心线程数会先存储于此队列中
);
这个线程池有一个反常的设计,线程数量大于核心线程数后会把线程放入队列中等待,而不是直到最大线程数之后才放入队列,具体逻辑如下:
1、 是否达到核心线程数量?没达到,创建一个工作线程来执行任务。
2、 工作队列是否已满?没满,则将新提交的任务存储在工作队列里。
3、 是否达到线程池最大数量?没达到,则创建一个新的工作线程来执行任务。
4、 最后,执行拒绝策略来处理这个任务。

所以超过核心线程数后,传入的队列如果是无界队列,则会一直把任务加入队列中,而不会创建线程。
虽然说这样设计和惯性思维不一样,但存在必有理。
Java之所以这样设计是为了平衡资源使用和性能,具体原因如下:
Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
在Future接口中声明了5个方法:
FutureTask是Future的实现

FutureTask还实现了Runable,为什么要这么做?不影响主线程,新开一个线程来对任务进行管理。
看其构造函数,有2个如下图,意思很明显,把线程执行逻辑再封装一层,放入FutureTask中执行。

注意事项:
FutureTask里有状态判断,一个FutureTask对象只能被线程执行一次。
你可以自己实例化线程池,也可以用Executors 创建线程池的工厂类,常用方法如下:
newFixedThreadPool(int nThreads) :创建一个固定大小、任务队列容量无界的线程池。核心线程数=最大线程数。
newCachedThreadPool(): 创建的是一个大小无界的缓冲线程池。它的任务队列是一个同步队列。任务加入到池中,如果池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。池中的线程空闲超过60秒,将被销毁释放。线程数随任务的多少变化。适用于执行耗时较小的异步任务。池的核心线程数=0 ,最大线程数= Integer.MAX_VALUE
newSingleThreadExecutor(): 只有一个线程来执行无界任务队列的单一线程池。该线程池确保任务按加入的顺序一个一个依次执行。当唯一的线程因任务异常中止时,将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1)的区别在于,单一线程池的池大小在newSingleThreadExecutor方法中硬编码,不能再改变的。
newScheduledThreadPool(int corePoolSize) :能定时执行任务的线程池。该池的核心线程数由参数指定,最大线程数= Integer.MAX_VALUE
扩展:
同步队列:SynchronousQueue(同步队列)是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样,没等待到就会阻塞。
计算型任务:cpu数量的1-2倍
IO型任务:相对比计算型任务,需多一些线程,要根据具体的IO阻塞时长进行考量决定。也可考虑根据需要在一个最小数量和最大数量间自动增减线程数。
那么这个线程池的数据量是不是可以随便设置呢?当然不是的,请一定要记得,线程上下文切换是有代价的。目前总结了一套公式,对于IO密集型应用:
线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。
套用公式,对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的,需要根据实际情况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))
针对于阻塞系数,《Programming Concurrency on the JVM Mastering》即《Java 虚拟机并发编程》中有提到一句话:
对于阻塞系数,我们可以先试着猜测,抑或采用一些性能分析工具或java.lang.management API 来确定线程花在系统/IO操作上的时间与CPU密集任务所耗的时间比值。
JDK的线程池在线程数达到corePoolSize之后,先判断队列,再判断maximumPoolSize。如果想反过来,即先判断maximumPoolSize再判断队列,应该怎么实现?
在此参考Dubbo框架中线程池的处理方案:
具体怎么实现?继续往下看。
1、ThreadPoolExecutor源码逻辑解析
上文已经说了ThreadPoolExecutor的处理逻辑,再通过源码来深入理解下。如下图所示,如果已提交的任务数量大于核心线程数量,则会调用workQueue.offer方法,offer方法返回值决定了是否创建更多线程,true表示入队成功不创建线程,false表示入队失败需要创建线程。

2、自定义队列,修改offer方法逻辑
参考Dubbo中的TaskQueue,如下图所示,创建队列时会把线程池执行器也传入队列中,offer通过执行器获得当前线程配置的数量,来判断当前线程数是否小于最大线程数,小于就返回false不添加到队列中。

3、自定义线程池类
如下所示参考EagerThreadPoolExecutor类,继承ThreadPoolExecutor,把自定义的队列TaskQueue传入直接调用父类的execute方法即可:

常用的并发工具类:
CountDownLatch允许一个或多个线程等待其他线程完成操作。
static AtomicLong num = new AtomicLong(0);
public static void main(String args[]) throws InterruptedException {
CountDownLatch ctl = new CountDownLatch(11);
for (int i=0; i< 10; i++){
new Thread(){
@Override
public void run() {
for (int j=0; j< 10000000; j++){
num.getAndIncrement();
}
ctl.countDown();
}
}.start();
}
ctl.await(); //设置开关,设置门栓
System.out.println(num.get());
}
Semaphore 信号量用来控制同时访问特定资源的线程数量。(共享锁)
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
应用场景:Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。
Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。
Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。
还可以用tryAcquire()方法尝试获取许可证。
Semaphore还提供一些其他方法,具体如下。
法。
示例:
public class TestSemaphore {
static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i=0;i<20;i++){
int count = i+1;
executorService.submit(new Runnable() {
@Override
public void run() {
handleDb(count);
}
});
}
}
public static void handleDb(int i) {
try {
semaphore.acquire();
System.out.println("thread"+i+":去数据库做某事》》》》》");
Thread.sleep(3000);
semaphore.release();
} catch (Exception e) {
e.printStackTrace();
}
}
}
CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。
它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞,到达指定数量就唤醒全部线程。
CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
CyclicBarrier和CountDownLatch的异同:
示例:
public class TestCyclicBarrier {
private static CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
@Override
public void run() {
System.out.println("打开栅栏》》》》》》");
}
});
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
for (int i=0;i<21;i++){
int count = i+1;
executor.submit(new Runnable() {
@Override
public void run() {
System.out.println(count + "号选手已就位");
try {
barrier.await();
System.out.println(count + "号选手跑起来了");
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
executor.shutdown();
}
}
Exchanger 交换者用于在两个线程之间传输数据,被调用后等待另一个线程达到交换点,然后相互交互数据。
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
示例:
public static void main(String[] args) throws InterruptedException {
Exchanger<Integer> exchanger = new Exchanger<Integer>();
ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(new Callable<Object>() {
Integer count = new Random().nextInt(100);
@Override
public Object call() throws Exception {
System.out.println("产生的随机数是:"+count);
count = exchanger.exchange(count);
System.out.println("交互后的数是:"+count);
return count;
}
});
exchanger.exchange(0);
executor.shutdown();
}
并行执行任务的框架,把大任务拆分成很多的小任务,汇总每个小任务的结果得到大任务的结果。
Fork/Join采用“工作窃取模式”,当执行新的任务时他可以将其拆分成更小的任务执行,并将小任务加到线程队列中,当线程自己的工作完成后可以从一个其他的线程队列中偷一个任务,并把它加入自己的队列中。
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
那么,为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。
比如A线程负责处理A队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
Fork/Join使用两个类来完成以上两件事情:
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。其本质是一个线程池,默认线程数量是CPU核心数。
Fork/Join有同步和异步两种方式。常用方法:
扩展:归并排序https://www.cnblogs.com/chengxiao/p/6194356.html
在Java 8中,Stream提供了顺序流(Sequential Stream)和并行流(Parallel Stream)两种数据流处理方式。
并行流:就是将数据分成多个部分来进行处理,每个部分可以交给不同的线程来并发处理,以达到提高处理速度的效果。在数据量较大且处理操作相对比较耗时的场景下,使用并行流能够显著提高程序运行的效率。
对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,通过判断这个标志的判断来调用并行流的方法或者顺序流的方法,调用 parallel 之后进行的所有操作都并行执行。类似地,同样调用 sequential 方法就可以把它变成顺序流。
第一步、首先我们随便写几行代码,用于调试:
public static void main(String[] args) {
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
numbers.parallelStream().reduce(Integer::sum).ifPresent(System.out::println);
}
第二步、先看下JDK中是怎样区分并行流和和顺序流的。查看parallelStream方法的调用链如下:

parallelStream中调用的是StreamSupport中的stream方法,注释中已经标明parallel参数为ture表示并行流,false则为串行流,parallelStream方法中传递的parallel参数为true. 最终这个值会调用AbstractPipeline的构造方法,传递到AbstractPipeline的parallel属性中。

第三步、再执行调试代码跟踪流式编程的reduce方法,其调用链如下:

isParallel方法里就是对第二步保存的parallel参数进行判断,走并行还是串行。
第四步,F7继续往里调试,跟踪进入并行的方法terminalOp.evaluateParallel中:

其调用的是ReduceOps类种的evaluateParallel方法,查看其中的ReduceTask的继承关系可知(如下图),并行流底层还是用的Fork/Join框架实现。

ForkJoinPool 和 ParallelStream 都是 Java 中用于实现并行处理的工具,它们有一些相似之处,但也有一些不同之处。下面我来分别介绍一下这两个工具:
ParallelStream 相当于是流式编程运用Fork/Join框架实现集合数据的并行处理方式,使更简单方便去处理一个包含大量元素的集合。
虽然 ForkJoinPool 和 ParallelStream 都可以用于实现任务的并行处理,但它们在任务分解、线程调度等方面有所不同,因此在选择使用哪种工具时需要根据具体的应用场景进行判断。
全部评论