Java多线程高并发系列:(三)线程池和多线程工具

2024-10-24 21:02
405
0

一、线程池

  • 线程不仅是Java中的一个对象,每个线程都有自己的工作内存。
  • 线程创建、销毁需要时间,消耗性能。
  • 线程过多,会占用很多内存。
  • 操作系统频繁切换线程上下文会影响性能。
  • 创建时间+销毁事件>执行任务时间,就不划算。

1.1、相关概念

  • 线程池管理器:用于创建并管理线程池,包括创建线程池,销毁线程池,添加新任务;
  • 工作线程:线程池中线程,可以循环的执行任务,在没有任务时处于等待状态;
  • 任务接口:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
  • 任务队列:用于存放没有处理的任务。提供一种缓冲机制。

1.2、线程池相关的类

线程池相关类的继承树结构如下:

Executor接口,最上层的接口,只定义了一个接收Runnable对象的execute方法。

ExecutorService接口,继承了Executor接口,拓展了CallableFuture、关闭方法,提供了生命周期管理和更广泛的执行方式。

ScheduledExecutorService接口,继承了ExecutorService,增加了定时任务相关的方法

ThreadPoolExecutor实现类,基础的线程池实现

ScheduledThreadPoolExecutor实现类,继承ThreadPoolExecutor,实现了ScheduledExcutorService中相关定时任务的方法

  • Executor框架是Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下,通过该框架来控制线程的启动、执行和关闭,可以简化并发编程的操作。
  • 通过Executor来启动线程比使用Thread的start方法更好,除了更易管理,效率更好(用线程池实现,节约开销)外,还有关键的一点:有助于避免this逃逸问题——如果我们在构造器中启动一个线程,因为另一个任务可能会在构造器结束之前开始执行,此时可能会访问到初始化了一半的对象用Executor在构造器中。

执行器的执行方式主要有2个:

执行单个任务:submit方法用于执行Callable和Runnable,并提供返回值Future<?>

执行批量任务:invokeAll和invokeAny用于执行批量任务。all返回所有结果,any是其中任意一个首先执行成功就返回这个的结果。

1.2.1、ThreadPoolExecutor介绍

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
        5,//核心线程数
        10,//最大线程数
        0L,//超过核心线程数的线程,超过keepAliveTime时间,这个线程就会被销毁。
        TimeUnit.MILLISECONDS,//keepAliveTime 的时间单位
        new LinkedBlockingQueue<Runnable>()  //传入的工作队列,任务了核心线程数会先存储于此队列中
);

这个线程池有一个反常的设计,线程数量大于核心线程数后会把线程放入队列中等待,而不是直到最大线程数之后才放入队列,具体逻辑如下:

1、 是否达到核心线程数量?没达到,创建一个工作线程来执行任务。

2、 工作队列是否已满?没满,则将新提交的任务存储在工作队列里。

3、 是否达到线程池最大数量?没达到,则创建一个新的工作线程来执行任务。 

4、 最后,执行拒绝策略来处理这个任务。

所以超过核心线程数后,传入的队列如果是无界队列,则会一直把任务加入队列中,而不会创建线程。

虽然说这样设计和惯性思维不一样,但存在必有理。

Java之所以这样设计是为了平衡资源使用和性能,具体原因如下:

  • 创建和销毁线程都会带来性能的开销。
  • 核心线程数来保证有一个足够大的核心池以获得较高的处理效率,同时不会因为闲置的线程而浪费资源。
  • 工作队列是为了来不及处理的任务数量超过核心池大小时,用来暂存这些任务(用时间来换资源)。
  • 当阻塞的队列把工作队列也挤满后,那说明积压的任务真的很多了,这才需要新增线程增加处理的并发数量。举例:潮汐车道、水库大坝排水口

1.2.2、Future和FutureTask

Future就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

在Future接口中声明了5个方法:

  • cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false,即如果取消已经完成的任务会返回false;如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
  • isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
  • isDone方法表示任务是否已经完成,若任务完成,则返回true;
  • get()方法用来获取执行结果,这个方法会产生阻塞,会一直等到任务执行完毕才返回;
  • get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null

FutureTask是Future的实现

FutureTask还实现了Runable,为什么要这么做?不影响主线程,新开一个线程来对任务进行管理。

看其构造函数,有2个如下图,意思很明显,把线程执行逻辑再封装一层,放入FutureTask中执行。

注意事项:

FutureTask里有状态判断,一个FutureTask对象只能被线程执行一次。

 

1.3、Executors工具类

1.3.1、创建线程池

你可以自己实例化线程池,也可以用Executors 创建线程池的工厂类,常用方法如下:

newFixedThreadPool(int nThreads) :创建一个固定大小、任务队列容量无界的线程池。核心线程数=最大线程数。

newCachedThreadPool(): 创建的是一个大小无界的缓冲线程池。它的任务队列是一个同步队列。任务加入到池中,如果池中有空闲线程,则用空闲线程执行,如无则创建新线程执行。池中的线程空闲超过60秒,将被销毁释放。线程数随任务的多少变化。适用于执行耗时较小的异步任务。池的核心线程数=0 ,最大线程数= Integer.MAX_VALUE

newSingleThreadExecutor(): 只有一个线程来执行无界任务队列的单一线程池。该线程池确保任务按加入的顺序一个一个依次执行。当唯一的线程因任务异常中止时,将创建一个新的线程来继续执行后续的任务。与newFixedThreadPool(1)的区别在于,单一线程池的池大小在newSingleThreadExecutor方法中硬编码,不能再改变的。

newScheduledThreadPool(int corePoolSize) :能定时执行任务的线程池。该池的核心线程数由参数指定,最大线程数= Integer.MAX_VALUE

扩展:

同步队列:SynchronousQueue(同步队列)是一个没有数据缓冲的BlockingQueue,生产者线程对其的插入操作put必须等待消费者的移除操作take,反过来也一样,没等待到就会阻塞。

1.3.2、如何确定合适数量的线程

计算型任务:cpu数量的1-2倍

IO型任务:相对比计算型任务,需多一些线程,要根据具体的IO阻塞时长进行考量决定。也可考虑根据需要在一个最小数量和最大数量间自动增减线程数。

那么这个线程池的数据量是不是可以随便设置呢?当然不是的,请一定要记得,线程上下文切换是有代价的。目前总结了一套公式,对于IO密集型应用:

线程数 = CPU核心数/(1-阻塞系数) 这个阻塞系数一般为0.8~0.9之间,也可以取0.8或者0.9。

套用公式,对于双核CPU来说,它比较理想的线程数就是20,当然这都不是绝对的,需要根据实际情况以及实际业务来调整:final int poolSize = (int)(cpuCore/(1-0.9))

针对于阻塞系数,《Programming Concurrency on the JVM Mastering》即《Java 虚拟机并发编程》中有提到一句话:

对于阻塞系数,我们可以先试着猜测,抑或采用一些性能分析工具或java.lang.management API 来确定线程花在系统/IO操作上的时间与CPU密集任务所耗的时间比值。

1.4、自定义线程池

JDK的线程池在线程数达到corePoolSize之后,先判断队列,再判断maximumPoolSize。如果想反过来,即先判断maximumPoolSize再判断队列,应该怎么实现?

在此参考Dubbo框架中线程池的处理方案:

  • 自定义Queue,改写offer方法逻辑(offer方法:把任务提交到队列中,返回true表示成功,false表示失败)。
  • 自定义线程池类,继承自ThreadPoolExecutor。

具体怎么实现?继续往下看。

1.4.1、实现步骤

1、ThreadPoolExecutor源码逻辑解析

上文已经说了ThreadPoolExecutor的处理逻辑,再通过源码来深入理解下。如下图所示,如果已提交的任务数量大于核心线程数量,则会调用workQueue.offer方法,offer方法返回值决定了是否创建更多线程,true表示入队成功不创建线程,false表示入队失败需要创建线程。

2、自定义队列,修改offer方法逻辑

参考Dubbo中的TaskQueue,如下图所示,创建队列时会把线程池执行器也传入队列中,offer通过执行器获得当前线程配置的数量,来判断当前线程数是否小于最大线程数,小于就返回false不添加到队列中。

3、自定义线程池类

如下所示参考EagerThreadPoolExecutor类,继承ThreadPoolExecutor,把自定义的队列TaskQueue传入直接调用父类的execute方法即可:

二、多线程工具类

常用的并发工具类:

  • 闭锁:CountDownLatch
  • 栅栏:CyclicBarrier
  • 信号量:Semaphore
  • 交换者:Exchanger

2.1、CountDownLatch闭锁

CountDownLatch允许一个或多个线程等待其他线程完成操作。

  • CountDownLatch的构造函数接收一个int类型的参数作为计数器,如果你想等待N个点完成,这里就传入N。
  • 当我们调用CountDownLatch的countDown方法时,N就会减1。
  • CountDownLatch的await方法会阻塞当前线程,直到N变成零。
  • 由于countDown方法可以用在任何地方,所以这里说的N个点,可以是N个线程,也可以是1个线程里的N个执行步骤。
  • 用在多个线程时,只需要把这个CountDownLatch的引用传递到线程里即可。
static AtomicLong num = new AtomicLong(0);

public static void main(String args[]) throws InterruptedException {

    CountDownLatch ctl = new CountDownLatch(11);

    for (int i=0; i< 10; i++){
        new Thread(){
            @Override
            public void run() {
                for (int j=0; j< 10000000; j++){
                    num.getAndIncrement();
                }
                ctl.countDown();
            }
        }.start();
    }

    ctl.await();     //设置开关,设置门栓
    System.out.println(num.get());
}

2.2、Semaphore信号量

Semaphore 信号量用来控制同时访问特定资源的线程数量。(共享锁)

Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。

应用场景:Semaphore可以用于做流量控制,特别是公用资源有限的应用场景,比如数据库连接。假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这时我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这个时候,就可以使用Semaphore来做流量控制。

Semaphore的构造方法Semaphore(int permits)接受一个整型的数字,表示可用的许可证数量。

Semaphore的用法也很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。

还可以用tryAcquire()方法尝试获取许可证。

Semaphore还提供一些其他方法,具体如下。

  • intavailablePermits():返回此信号量中当前可用的许可证数。
  • intgetQueueLength():返回正在等待获取许可证的线程数。
  • booleanhasQueuedThreads():是否有线程正在等待获取许可证。
  • void reducePermits(int reduction):减少reduction个许可证,是个protected方法。
  • Collection getQueuedThreads():返回所有等待获取许可证的线程集合,是个protected方

法。

示例:

public class TestSemaphore {
    static Semaphore semaphore = new Semaphore(3);
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i=0;i<20;i++){
            int count = i+1;
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                        handleDb(count);
                }
            });
        }
    }
    public static void handleDb(int i) {
        try {
            semaphore.acquire();
            System.out.println("thread"+i+":去数据库做某事》》》》》");
            Thread.sleep(3000);
            semaphore.release();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

2.3、CyclicBarrier栅栏

CyclicBarrier的字面意思是可循环使用(Cyclic)的屏障(Barrier)。

它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。

CyclicBarrier默认的构造方法是CyclicBarrier(int parties),其参数表示屏障拦截的线程数量,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞,到达指定数量就唤醒全部线程。

CyclicBarrier还提供一个更高级的构造函数CyclicBarrier(int parties,Runnable barrierAction),用于在线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景。

CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。

CyclicBarrier和CountDownLatch的异同:

  • CyclicBarrier和CountDownLatch都可以协同多个线程,让指定数量的线程等待期他所有的线程都满足某些条件之后才继续执行。
  • CountDownLatch的计数器只能使用一次,如果还需要使用,必须重现new一个CountDownLatch对象。
  • CyclicBarrier的计数器可以使用reset()方法重置。
  • CountDownLatch.await一般阻塞主线程,所有的工作线程执行countDown,而CyclicBarrierton通过工作线程调用await从而阻塞工作线程,直到所有工作线程达到屏障。

示例:

public class TestCyclicBarrier {
   private static CyclicBarrier barrier = new CyclicBarrier(3, new Runnable() {
        @Override
        public void run() {
            System.out.println("打开栅栏》》》》》》");
        }
    });
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        for (int i=0;i<21;i++){
            int count = i+1;
            executor.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println(count + "号选手已就位");
                    try {
                        barrier.await();
                        System.out.println(count + "号选手跑起来了");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        executor.shutdown();
    }
}

2.4、Exchanger交换者

Exchanger 交换者用于在两个线程之间传输数据,被调用后等待另一个线程达到交换点,然后相互交互数据。

Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。

示例:

public static void main(String[] args) throws InterruptedException {
    Exchanger<Integer> exchanger = new Exchanger<Integer>();
    ExecutorService executor = Executors.newCachedThreadPool();
    executor.submit(new Callable<Object>() {
        Integer count = new Random().nextInt(100);
        @Override
        public Object call() throws Exception {
            System.out.println("产生的随机数是:"+count);
            count = exchanger.exchange(count);
            System.out.println("交互后的数是:"+count);
            return count;
        }
    });
    exchanger.exchange(0);
    executor.shutdown();
}

三、Fork/Join框架

并行执行任务的框架,把大任务拆分成很多的小任务,汇总每个小任务的结果得到大任务的结果。

Fork/Join采用“工作窃取模式”,当执行新的任务时他可以将其拆分成更小的任务执行,并将小任务加到线程队列中,当线程自己的工作完成后可以从一个其他的线程队列中偷一个任务,并把它加入自己的队列中。

工作窃取算法

工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。

那么,为什么需要使用工作窃取算法呢?假如我们需要做一个比较大的任务,可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。

比如A线程负责处理A队列里的任务。但是,有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

Fork/Join使用两个类来完成以上两件事情:

ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。

  • RecursiveAction:用于没有返回结果的任务。
  • RecursiveTask:用于有返回结果的任务。

ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。其本质是一个线程池,默认线程数量是CPU核心数。

Fork/Join有同步和异步两种方式。常用方法:

  • invoke:跟主线程同步执行,他不代表ForkJoin内部线程采用同步执行,就是主线程执行到这一步阻塞,必须要等invoke执行完毕再往下面继续执行,有返回结果集
  • execute:不会跟主线程同步执行,就是主线程执行到这一步不会阻塞,可以继续往下面执行,没有返回结果集
  • submit:不会跟主线程同步执行,就是主线程执行到这一步不会阻塞,可以继续往下面执行,有任务返回结果集

扩展:归并排序https://www.cnblogs.com/chengxiao/p/6194356.html

四、流式编程-并行流

Java 8中,Stream提供了顺序流(Sequential Stream)和并行流(Parallel Stream)两种数据流处理方式。

并行流:就是将数据分成多个部分来进行处理,每个部分可以交给不同的线程来并发处理,以达到提高处理速度的效果。在数据量较大且处理操作相对比较耗时的场景下,使用并行流能够显著提高程序运行的效率。

4.1、并行流的适用场景

  • 大规模数据集:当需要处理大规模数据集时,使用并行流可以充分利用多核处理器的优势,提高程序的执行效率。并行流将数据切分成多个小块,并在多个线程上并行处理这些小块,从而缩短了处理时间。
  • 复杂的计算操作:对于复杂的计算操作,使用并行流可以加速计算过程。由于并行流能够将计算操作分配到多个线程上并行执行,因此可以有效地利用多核处理器的计算能力,提高计算的速度。
  • 无状态转换操作:并行流在执行无状态转换操作(如 mapfilter)时表现较好。这类操作不依赖于其他元素的状态,每个元素的处理是相互独立的,可以很容易地进行并行处理。

4.2、并行流的注意事项

  • 线程安全问题:并行流的操作是在多个线程上并行执行的,因此需要注意线程安全问题。如果多个线程同时访问共享的可变状态,可能会导致数据竞争和不确定的结果。在处理并行流时,应避免共享可变状态,或者采用适当的同步措施来确保线程安全。
  • 性能评估和测试:并行流的性能提升并不总是明显的。在选择使用并行流时,应根据具体情况进行评估和测试,以确保获得最佳的性能提升。有时,并行流的开销(如线程的创建和销毁、数据切割和合并等)可能超过了其带来的性能提升。
  • 并发操作限制:某些操作在并行流中的性能表现可能较差,或者可能导致结果出现错误。例如,在并行流中使用有状态转换操作(如 sorted)可能导致性能下降或结果出现错误。在使用并行流时,应注意避免这类操作,或者在需要时采取适当的处理措施。
  • 内存消耗:并行流需要将数据分成多个小块进行并行处理,这可能导致额外的内存消耗。在处理大规模数据集时,应确保系统有足够的内存来支持并行流的执行,以避免内存溢出等问题
  • 底层数据结构:并行流适用于支持分割的数据结构,例如ArrayListLinkedList等。对于不支持分割的数据结构,例如HashSet,由于无法有效地将数据分割到多个线程进行并行处理,可能无法发挥并行流的性能优势。
  • 并行度:并行流的性能还取决于可用的硬件资源,例如CPU核心数和内存带宽。增加并行度可以提高处理速度,但过多的并行度可能导致线程竞争和资源争用,反而降低性能。可以通过调整并行流的并行度来优化性能,例如使用parallelStream().parallel()方法显式设置并行度。

4.3、并行流和顺序流切换

对顺序流调用 parallel 方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个 boolean 标志,通过判断这个标志的判断来调用并行流的方法或者顺序流的方法,调用 parallel 之后进行的所有操作都并行执行。类似地,同样调用 sequential 方法就可以把它变成顺序流。

4.4、源码解析

第一步、首先我们随便写几行代码,用于调试:

public static void main(String[] args) {
    List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
    numbers.parallelStream().reduce(Integer::sum).ifPresent(System.out::println);
}

第二步、先看下JDK中是怎样区分并行流和和顺序流的。查看parallelStream方法的调用链如下:

parallelStream中调用的是StreamSupport中的stream方法,注释中已经标明parallel参数为ture表示并行流,false则为串行流,parallelStream方法中传递的parallel参数为true. 最终这个值会调用AbstractPipeline的构造方法,传递到AbstractPipelineparallel属性中。

第三步、再执行调试代码跟踪流式编程的reduce方法,其调用链如下:

isParallel方法里就是对第二步保存的parallel参数进行判断,走并行还是串行。

第四步,F7继续往里调试,跟踪进入并行的方法terminalOp.evaluateParallel中:

其调用的是ReduceOps类种的evaluateParallel方法,查看其中的ReduceTask的继承关系可知(如下图),并行流底层还是用的Fork/Join框架实现

4.5、ForkJoinPool和ParallelStream

ForkJoinPool  ParallelStream 都是 Java 中用于实现并行处理的工具,它们有一些相似之处,但也有一些不同之处。下面我来分别介绍一下这两个工具:

  • ForkJoinPool Java SE 7 引入的一个用于实现任务并行化的框架,通过将大的任务分解成多个子任务,并将这些子任务分配给多个线程来处理,从而实现了任务的并行处理。在分解任务的过程中,ForkJoinPool 使用了分治策略,将大的任务逐步细分成小的子任务,直到无法继续细分或者达到某个预定阈值时停止。
  • ParallelStream Java SE 8 中新增的一个用于并行处理集合数据的 API,其底层还是通过Fork/join框架实现。通过将 Stream 中的元素划分成多个子集,将这些子集分配给多个线程来处理,从而实现了集合数据的并行处理。在分割 Stream 元素时,ParallelStream 采用的是水平分割策略,即将元素均分成多个子集,每个子集由一个线程进行处理,最后再将处理结果合并起来。

ParallelStream 相当于是流式编程运用Fork/Join框架实现集合数据的并行处理方式,使更简单方便去处理一个包含大量元素的集合。

虽然 ForkJoinPool 和 ParallelStream 都可以用于实现任务的并行处理,但它们在任务分解、线程调度等方面有所不同,因此在选择使用哪种工具时需要根据具体的应用场景进行判断。

 

 

全部评论