多线程_线程csdn-CSDN博客 万字图解Java多线程

线程概述

程序

为完成特定任务,用某种语言编写的一组指令的集合。

进程

进程是程序的一次执行过程,或是正在运行的一个程序,是一个动态的过程,有自己的产生,存在,消亡的过程

  1. 当一个程序被运行,就开启了一个进程,如微信;
  2. 程序由指令和数据组成,指令要运行,数据要加载,指令被CPU加载运行,数据加载到内存,执行运行时可由CPU调度硬盘、网络等设备;

线程

  1. 一个进程内可分为多个线程;
  2. 一个线程就是一个指令流,CPU调度的最小单位,由CPU一条一条执行指令。

多线程的基本概念:

线程:线程是程序执行流的最小单元,是操作系统调度的基本单位。一个进程至少包含一个线程,也可以包含多个线程。

进程:进程是资源分配的基本单位,每个进程都有自己的独立内存空间。一个程序运行时会创建一个或多个进程。

多线程:在一个进程中创建多个线程,这些线程可以并发执行,共享进程的内存空间和资源。

优点:

  • 提高应用程序的响应。对图形化界面更有意义,可增强用户体验。
  • 提高计算机系统CPU的利用率。
  • 改善程序结构,将既长又复杂的进程分为多个线程,独立运行,有利于理解和修改。

并发与并行

并发:单核CPU运行多线程,一个cpu同时执行多个任务,比如:秒杀、多个人做同一件事;

并行:多核CPU运行多线程,多个cpu同时执行多个任务,比如:多个人同时做不同的事。

线程创建

继承Thread类

创建一个继承Thread类的子类;重写Thread类的run方法;创建子类对象;子类对象调用start方法执行。

public class demo1 {
    @Test
    public void demo1() {
        Thread1 thread1 = new Thread1();
        thread1.start();
    }
 
    class Thread1 extends Thread{
        @Override
        public void run() {
            System.out.println("线程代码:thread demo1");
        }
    }
}

实现Runnable接口

创建Runnable实现类,实现run方法,创建实现类对象,调用start方法执行。

public class Demo2 {
    @Test
    public void test1() {
        new Thread(new Thread2()).start();
    }
 
    class Thread2 implements Runnable {
        @Override
        public void run() {
            System.out.println("线程代码:Thread2");
        }
    }
}

匿名内部类重写run方法,并调用start方法执行线程代码。

public class Demo3 {
    @Test
    public void test3(){
        new Thread(() -> System.out.println("thread3")).start();
        /*new Thread(){
            @Override
            public void run() {
                System.out.println("thread3");
            }
        }.start();*/
    }
}

实现Callable接口

创建Callable实现类,重写call方法,创建FutureTask对象并将Callbale实现类作为参数,调用Thread类的start方法,调用FutureTask的get方法获取线程返回值。

  1. call方法的有返回值且可以抛出异常;
  2. call方法的返回值需要等待线程代码直接结束获取;
  3. Callable支持泛型。
public class Demo9 {
    @Test
    public void test1() throws ExecutionException, InterruptedException {
        CallableDemo callableDemo = new CallableDemo();
        FutureTask<Integer> futureTask = new FutureTask<>(callableDemo);
        new Thread(futureTask).start();
        //获取运算结果是同步过程,即call方法执行完成,才能获取值
        Integer num = futureTask.get();
        System.out.println(num);
    }
 
    class CallableDemo implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("callable");
            Integer num = 0;
            for (int i = 0; i < 20; i++) {
                num= num+i;
            }
            Thread.sleep(1000);
            return num;
        }
    }
}

使用线程池实现

ExecutorService 待补充

Thread使用方法

常见构造方法

方法说明
Thread();创建线程对象;
Thread(Runnable target);使用Runnable对象创建线程;
Thread(String name);创建线程对象并命名;
Thread(Runnable target, String name);使用Runnable对象创建线程并命名;
Thread(ThreadGroup group, Runnable target);分组创建线程;

常见属性

属性获取方法
线程idgetId();
线程名称getName();
线程状态getState();
优先级getPriority();
是否后台线程isDeamon();
是否存活isAlive();
是否被中断isInterrupted();

后台线程:有成为守望线程,不参与程序的业务逻辑,执行的是支持性功能,如垃圾回收、网络监控等。当所有非后台进程结束时会自动结束(无论守望线程任务是否结束);

是否存活,即run方法是否运行结束了。

线程优先级 Thread.MIN_PRIORITY(最小优先级);Thread.NORM_PRIORITY(默认优先级);Thread.MAX_PRIORITY(最大优先级)。

说明:高优先级的线程要抢占低优先级线程cpu的执行权,但是只是从概率上将,高优先级的线程高概率的情况下被执行,并不意味着只有当高优先级的线程执行完以后,低优先级的线程才执行。

常用方法

方法描述
currentThread();返回当前线程的引用
sleep(long millis);睡眠当前线程n毫秒
sleep(long millis, int nanos);睡眠当前线程n毫秒,n1指定的纳秒数
start()启动当前线程,调用当前线程的run()方法
run()通常需要重写Thread类中的此方法,将创建的线程要执行的操作声明在此方法中
currentThread()静态方法,返回执行当前当前代码的线程
yield()释放当前cpu的执行权
stop()强制线程生命周期结束,已过时

线程状态

五种状态

初始状态:创建线程对象时的状态

可运行状态(就绪状态):调用start()方法后进入就绪状态,也就是准备好被cpu调度执行

运行状态:线程获取到cpu的时间片,执行run()方法的逻辑

阻塞状态: 线程被阻塞,放弃cpu的时间片,等待解除阻塞重新回到就绪状态争抢时间片

终止状态: 线程执行完成或抛出异常后的状态

六种状态

  1. NEW 线程对象被创建
  2. Runnable 线程调用了start()方法后进入该状态,该状态包含了三种情况
    1. 就绪状态:等待cpu分配时间片
    2. 运行状态:进入Runnable方法执行任务
    3. 阻塞状态:BIO 执行阻塞式io流时的状态
  3. BLOCKED 没获取到锁时的阻塞状态(同步锁章节会细说)
  4. WAITING 调用wait()、join()等方法后的状态
  5. TIMED_WAITING 调用 sleep(time)、wait(time)、join(time)等方法后的状态
  6. TERMINATED 线程执行完成或抛出异常后的状态

线程的状态和转移

示例1:NEW、RUNNABLE、TERMINATED状态转换

public class Demo7 {
    @Test
    public void test1() {
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 1000_000; i++) {
 
            }
        },"李四");
        System.out.println(thread.getName()+":"+thread.getState());
        thread.start();
        while (thread.isAlive()){
            System.out.println(thread.getName()+":"+thread.getState());
        }
        System.out.println(thread.getName()+":"+thread.getState());
    }
}

示例2:WAITING、BLOCKED、TIMED_WAITING状态转换

public class Demo8 {
    @Test
    public void test() {
        final Object object = new Object();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (object) {
                    while (true) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }
        }, "t1");
        t1.start();
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (object) {
                    System.out.println("hehe");
                }
            }
        }, "t2");
        t2.start();
        try {
            Thread.sleep(2000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
结论:
     BLOCKED表示等待获取锁,WAITING和TIMED_WAITING表示等待其他线程发来通知.
     TIMED_WAITING线程在等待唤醒,但设置了时限;WAITING线程在无限等待唤醒

线程安全

public class Demo11 {
    @Test
    public void test() throws InterruptedException {
        User user = new User();
        Thread thread = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                user.getCount();
            }
        });
        Thread thread1 = new Thread(() -> {
            for (int i = 0; i < 5000; i++) {
                user.getCount();
            }
        });
        thread.start();
        thread1.start();
        thread.join();
        thread1.join();
        System.out.println(user.getAge());
    }
}
 
@Data
public class User {
    private Integer age=0;
    public void getCount(){
        age++;
    }
}
 
//最终打印内容  5639

上述代码中,age属性被两个线程抢占执行,导致引发线程安全问题。

本质原因:线程在系统中的调度是无序的/随机的(抢占式执行的)

线程安全定义

如果多线程环境下代码运行的结果是符合我们预期的,即在单线程环境应该的结果,则说这个程序是线程安全的。

线程不安全原因

线程调度是随机的,随机调度使⼀个程序在多线程环境下;多个线程修改同一个变量。

一个线程修改一个变量-》安全

多个线程读取一个变量-》安全

多个线程修改不同变量-》安全

线程特性

原子性

我们把一段代码想象成一个房间,每个线程就是要进入这个房间的人。如果没有任何机制保证,A进入房间之后,还没有出来;B 是不是也可以进入房间,打断 A 在房间里的隐私。这个就是不具备原子性的。

那我们应该如何解决这个问题呢?是不是只要给房间加一把锁,A 进去就把门锁上,其他人是不是就进 不来了。这样就保证了这段代码的原子性了。 有时也把这个现象叫做同步互斥,表示操作是互相排斥的。

一条 java 语句不一定是原子的,也不一定只是一条指令 比如刚才我们看到的 n++,其实是由三步操作组成的: 1. 从内存把数据读到 CPU 2. 进行数据更新 3. 把数据写回到 CPU

定义:原子性指的是操作不可被中断,要么全部执行完成,要么完全不执行。

特性:

  1. 原子性操作在执行时不会被其他线程干扰;
  2. 如果多个线程同时访问共享资源,原子性可以防止数据不一致。

java中的原子性:

原子性

1. 读取和写入基本数据类型是原子性的;
2. 对<font style="color:rgb(192, 52, 29);background-color:rgb(251, 229, 225);">volatile</font><font style="background-color:rgba(255, 255, 255, 0);">变量的读和写是原子性的(不适用于复合操作);</font>

非原子性

1. 复合操作如 a++ 或 a=a+b 是非原子性的,这些擦操作实际包括三步:读取变量、修改值、写会变量。

解决方案

  1. 使用同步机制(如synchronized代码块);
  2. 使用原子类(如AtomicInteger)。
可见性

定义:可见性指的是一个线程对共享变量的修改对其他线程是可见的。

特性:

  1. 多线程环境,如果没有同步机制,一个线程对变量的修改可能不会立刻被其他线程看到;
  2. 线程可能会一直使用自己CPU缓存中的值,看不到更新后数据。

java中的可见性

public class Demo12 {
    boolean flag = false;
 
    @Test
    public void demo1() {
        new Thread(() -> {
            while (!flag) {
                
            }
            System.out.println("hello");
        }).start();
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        flag = true;//主线程修改了flag的值,但是子线程中的flag值还是false,所以子线程会一直循环,直到flag的值为true,子线程才会执行。
    }
}

解决方案:

  1. 使用volatile 关键字修改变量,确保变量的修改对所有线程可见;
  2. 使用同步机制(如 synchronized 或显式锁 Lock)。
有序性

定义:有序性指的是程序代码的执行顺序,通常来说,程序会按照代码编写的顺序执行,但编译器和处理器会为了优化性能进行 指令重排

特性:

  1. 在单线程环境,指令重拍不会影响程序的正确性;
  2. 多线程环境,指令重排可能导致意向不到的结果,因为线程之间的执行顺序无法预测。

解决方案:volatilesynchronized

总结
特性描述解决方法
原子性操作不可中断,要么全部执行成功,要么完全不执行。使用 <font style="color:rgb(192, 52, 29);background-color:rgb(251, 229, 225);">synchronized</font> 或原子类如 <font style="color:rgb(192, 52, 29);background-color:rgb(251, 229, 225);">AtomicInteger</font>
可见性一个线程的修改对其他线程立刻可见。使用 <font style="color:rgb(192, 52, 29);background-color:rgb(251, 229, 225);">volatile</font> 或同步机制如 <font style="color:rgb(192, 52, 29);background-color:rgb(251, 229, 225);">synchronized</font>
有序性程序执行顺序符合预期,避免指令重排导致问题。使用 <font style="color:rgb(192, 52, 29);background-color:rgb(251, 229, 225);">volatile</font>、同步机制(<font style="color:rgb(192, 52, 29);background-color:rgb(251, 229, 225);">synchronized</font> 或锁)。

ThreadLocal

简介

  1. ThreadLocal是本地线程变量,线程自带的变量副本(实现每一个线程拥有自己专属的本地变量,主要解决的就是让每一个线程绑定自己的值,自己用自己的,不与其他线程共享资源),可以通过get和set方法,获取默认值或将其值更改为当前线程所存的副本的值避免线程安全问题。
  2. synchronized或者lock,对资源进行加锁,其他线程在锁内资源被占用时,其他线程都处于等待状态。

synchronized或lockThreadLocal

api介绍

方法名描述
get()返回当前线程的此线程局部变量的副本中的值。
set(T value)将当前线程的此线程局部变量的副本设置为指定的值。
remove()清除线程副本中存储的数据。
initialValue()返回此线程局部变量的当前线程的“初始值”。
withInitial(Supplier<? extends S> supplier)创建线程局部变量
/**
 * @author ZD
 * @date 2022/7/4 10:10
 * @describe ThreadLocal案例
 */
public class House {
    //对ThreadLocal进行初始化
    public ThreadLocal<Integer> threadLocal = ThreadLocal.withInitial(() -> 0);
 
    public void saleHouse() {
        Integer i = threadLocal.get();
        i++;
        threadLocal.set(i);
    }
 
}
 
 
/**
 * @author ZD
 * @date 2022/7/4 10:14
 * @describe ThreadLocal模拟卖房子业务
 */
public class ThreadLocalHouseDemo {
    public static void main(String[] args) {
        House house = new House();
        new Thread(() -> {
            try {
                for (int i = 1; i <= 3; i++) {
                    house.saleHouse();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "卖出" + house.threadLocal.get());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                house.threadLocal.remove();
                System.out.println("获取ThreadLocal内容:" + house.threadLocal.get());
            }
        }, "t1").start();
        new Thread(() -> {
            try {
                for (int i = 1; i <= 5; i++) {
                    house.saleHouse();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "卖出" + house.threadLocal.get());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                house.threadLocal.remove();
                System.out.println("获取ThreadLocal内容:" + house.threadLocal.get());
            }
        }, "t2").start();
        new Thread(() -> {
            try {
                for (int i = 1; i <= 8; i++) {
                    house.saleHouse();
                }
                System.out.println(Thread.currentThread().getName() + "\t" + "卖出" + house.threadLocal.get());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                house.threadLocal.remove();
                System.out.println("获取ThreadLocal内容:" + house.threadLocal.get());
            }
        }, "t3").start();
    }
}
  • 因为每个Thread内有自己的实例副本且该副本只由当前线程自己使用
  • 既然其他Thread不可访问,那就不存在多线程共享的问题
  • 统一设置初始值,但是每个线程对这个值的修改都是各自线程互相独立的
  • 加入synchronized或者lock控制线程的访问顺序,而ThreadLocal人手一份,大家各自安好,没必要抢夺

ThreadLocal阿里规范

非线程安全的SimpleDataFormat

public class DateUtils{
    public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    /**
     * 模拟并发环境下使用SimpleDateFormat的parse方法将字符串转换成Date对象
     * @param stringDate
     * @return
     * @throws Exception
     */
    public static Date parseDate(String stringDate)throws Exception{
        return sdf.parse(stringDate);
    }
    
    public static void main(String[] args) throws Exception{
        for (int i = 1; i <=30; i++) {
            new Thread(() -> {
                try {
                    System.out.println(DateUtils.parseDate("2020-11-11 11:11:11"));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}
public class DateUtils{
    public static final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    /**
     * 模拟并发环境下使用SimpleDateFormat的parse方法将字符串转换成Date对象
     * @param stringDate
     * @return
     * @throws Exception
     */
    public static Date parseDate(String stringDate)throws Exception{
        return sdf.parse(stringDate);
    }
 
    public static void main(String[] args) throws Exception{
        for (int i = 1; i <=30; i++) {
            new Thread(() -> {
                try {
                    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                    System.out.println(sdf.parse("2020-11-11 11:11:11"));
                    sdf = null;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();
        }
    }
}
package com.sercive.impl;
 
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
 
/**
 * 在对一些业务日志写入数据库的时候,日期调用了sdf的静态,导致了会报错或者日期乱了
 */
public class ThreadLocalSimpDataFormatDemo {
 
    public static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
    /**
     解决方案一:加入synchronized,用时间换空间,效率低
     */
    /**
     * 如果不加会导致线程安全问题,SimpleDateFormat类内部有一个Calendar对象引用,
     * SimpleDateFormat相关的日期信息,例如sdf.parse(dateStr),sdf.format(date)
     * 诸如此类的方法参数传入的日期相关String,Date等等, 都是交由Calendar引用来储存的.
     * 这样就会导致一个问题如果你的SimpleDateFormat是个static的,那么多个thread之间
     * 就会共享这个SimpleDateFormat,同时也是共享这个Calendar引用(相当于买票案列)
     */
    //public static synchronized Date parse(String stringDate) throws ParseException {
    public static Date parse(String stringDate) throws ParseException {
        System.out.println(sdf.parse(stringDate));
        return sdf.parse(stringDate);
    }
 
    /***
     * 解决方案二:使用ThreadLocal,用空间换时间,效率高
     * ThreadLocal中变量副本会人手一份,每次使用完了threadLocal后都要将资源进行释放的处理
     */
    public static final ThreadLocal<SimpleDateFormat> sdfThreadLocal =
            ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
 
    public static Date parseByThreadLocal(String stringDate) throws ParseException {
        return sdfThreadLocal.get().parse(stringDate);
    }
 
    //3 DateTimeFormatter 代替 SimpleDateFormat
    public static final DateTimeFormatter DATE_TIME_FORMAT = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
 
    public static String formatForDateTime(LocalDateTime localDateTime) {
        return DATE_TIME_FORMAT.format(localDateTime);
    }
 
    public static LocalDateTime parseForDateTime(String dateString) {
        return LocalDateTime.parse(dateString, DATE_TIME_FORMAT);
    }
 
    public static void main(String[] args) throws Exception {
        for (int i = 1; i <= 3; i++) {
            new Thread(() -> {
                try {
                    //ThreadLocalDataUtils.parse("2021-03-30 11:20:30");
                    //System.out.println(ThreadLocalDataUtils.parseByThreadLocal("2021-03-30 11:20:30"));
                    System.out.println(ThreadLocalSimpDataFormatDemo.parseForDateTime("2021-03-30 11:20:30"));
                    // System.out.println(ThreadLocalDataUtils.formatForDateTime(LocalDateTime.now()));
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    ThreadLocalSimpDataFormatDemo.sdfThreadLocal.remove();
                }
            }, String.valueOf(i)).start();
        }
    }
}

Thread | ThreadLocal | ThreadLocalMap关系

  1. Thread类中有一个ThreadLocal.ThreadLocalMap threadLocals = null的变量,这个ThreadLocal相当于是Thread类和ThreadLocalMap的桥梁,在ThreadLocal中有静态内部类ThreadLocalMap,ThreadLocalMap中有Entry数组;
  2. 当我们为threadLocal变量赋值,实际上就是以当前threadLocal实例为key,值为value的Entry往这个threadLocalMap中存放;
  3. t.threadLocals = new ThreadLocalMap(this, firstValue) 如下这行代码,可以知道每个线程都会创建一个ThreadLocalMap对象,每个线程都有自己的变量副本;

ThreadLocal内存泄露问题

线程池

什么是线程池

线程池是线程处理多线程的一种方式,处理过程中可以将任务添加到队列中,然后在创建线程后自动启动这些任务。使用线程池可以根据系统需求和硬件灵活控制线程数量,且对所有线程进行管理和控制,从而提高系统的运行效率,降低系统运行运行压力。

使用线程池的优点

  • 降低资源消耗。通过重复利⽤已创建的线程降低线程创建和销毁造成的消耗。
  • 提⾼响应速度。当任务到达时,任务可以不需要的等到线程创建就能⽴即执⾏。
  • 提⾼线程的可管理性。线程是稀缺资源,如果⽆限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使⽤线程池可以进⾏统⼀的分配,调优和监控。

JAVA内置线程池

ThreadPoolExecutor

/**
使用给定的初始参数创建一个新的ThreadPoolExecutor 。
抛出:
IllegalArgumentException – 如果以下条件之一成立: 
        corePoolSize < 0 
        keepAliveTime < 0 
        maximumPoolSize <= 0 
        maximumPoolSize < corePoolSize
NullPointerException – 如果workQueue或threadFactory或handler为 null
*/
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)
参数设置
  1. 核心线程数(corePoolSize):核⼼线程数线程数定义了最⼩可以同时运⾏的线程数量;
  2. 最大线程数(maximumPoolSize):当队列中存放的任务达到队列容量的时候,当前可以同时运⾏的线程数量变为最⼤线程数;
  3. 最大空闲时间(keepAliveTime):当线程池中的线程数量⼤于 corePoolSize 的时候,如果这时没有新的任务提交,核⼼线程外的线程不会⽴即销毁,⽽是会等待,直到等待的时间超过了keepAliveTime 才会被回收销毁;
  4. 任务队列长度(workQueue):当新任务来的时候会先判断当前运⾏的线程数量是否达到核⼼线程数,如果达到的话,新任务就会被存放在队列中;
  5. 最大空闲时间的时间单位(unit):keepAliveTime 参数的时间单位;
  6. 饱和策略(handler):饱和策略。关于饱和策略下⾯单独介绍⼀下;
  7. 执行器创建新线程时使用的工厂(threadFactory):executor 创建新线程的时候会⽤到。
ThreadPoolExecutor(饱和策略)

若当前同时运行的线程数达到最大线程数并且队列也已经被放满,ThreadPoolExecutor定义了一些策略:

  • ThreadPoolExecutor.AbortPolicy :抛出 RejectedExecutionException 来拒绝新任务的处理,默认策略
  • ThreadPoolExecutor.CallerRunsPolicy :调⽤执⾏⾃⼰的线程运⾏任务。您不会任务请求。但是这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量;
  • ThreadPoolExecutor.DiscardPolicy :不处理新任务,直接丢弃掉,不抛出异常;
  • ThreadPoolExecutor.DiscardOldestPolicy : 丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

画板

流程:调用submit方法后,先判断核心线程数是否空余,若核心线程数未满,则先放入核心线程数中,若核心线程数满了则判断队列是否放满,若队列不满则放入队列,若队列满了判断最大线程数+队列数是否大于核心线程数+队列,若大于等于则创建新的线程,若小于则执行饱和策略。

ExecutorService

获取ExecutorService

方法描述
public static ExecutorService newCachedThreadPool()创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)线程池中的所有线程都使用ThreadFactory来创建,这样的线程无需手动启动,自动执行
public static ExecutorService newFixedThreadPool(int nThreads)创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建。
public static ExecutorService newSingleThreadExecutor()创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,所有线程都使用ThreadFactory来创建
@Test
public void demo2() {
    ExecutorService executorService = Executors.newCachedThreadPool();
    for (int i = 0; i < 10; i++) {
        executorService.submit(new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"-->开始执行!");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo3() {
    ExecutorService executorService = Executors.newCachedThreadPool(new ThreadFactory() {
        int n =1;
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"自定义线程名"+n++);
        }
    });
    for (int i = 0; i < 10; i++) {
        executorService.submit(new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"-->开始执行!");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo4() {
    ExecutorService executorService = Executors.newFixedThreadPool(1);
    for (int i = 0; i < 4; i++) {
        executorService.submit(new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"-->开始执行!");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo5() {
    ExecutorService executorService = Executors.newFixedThreadPool(2, new ThreadFactory() {
        int n=1;
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"自定义线程名"+n++);
        }
    });
    for (int i = 0; i < 4; i++) {
        executorService.submit(new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"-->开始执行!");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo6() {
    ExecutorService executorService = Executors.newSingleThreadExecutor(new ThreadFactory() {
        int n = 1;
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"自定义线程名"+n++);
        }
    });
    for (int i = 0; i < 4; i++) {
        executorService.submit(new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+"-->开始执行!");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
    try {
        Thread.sleep(10000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

常用方法

方法描述
void shutdown();启动一次顺序关闭,执行以前提交的任务,但不接受新任务。
List_<Runnable> shutdownNow()_停止所有正在执行的任务,暂停处理正在等待的任务,并返回等待执行的任务列表。
<T> Future<T> submit(Callable<T> task);执行带返回值的任务,返回一个Future对象。
<T> Future<T> submit(Runnable task, T result);执行 Runnable 任务,并返回一个表示该任务的 Future。
Future_<?> submit(Runnable task)_;执行 Runnable 任务,并返回一个表示该任务的 Future。
@Test
public void demo7() {
    ExecutorService service = Executors.newSingleThreadExecutor();
    for (int i = 0; i < 4; i++) {
        service.submit(new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "执行线程");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "执行线程结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
    try {
        Thread.sleep(2300);
        service.shutdown();
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo8() {
    ExecutorService service = Executors.newFixedThreadPool(4);
    for (int i = 0; i < 4; i++) {
        service.submit(new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName() + "执行线程");
                Thread.sleep(1000);
                System.out.println(Thread.currentThread().getName() + "执行线程结束");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }));
    }
    service.shutdownNow();
}

ScheduledExecutorService

ScheduledExecutorService是ExecutorService的子接口,具备了延迟运行或定期执行任务的能力。

创建线程池

方法描述
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)创建一个可重用固定线程数的线程池且允许延迟运行或定期执行任务;
public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) **创建一个可重用固定线程数的线程池且线程池中的所有线程都使用ThreadFactory来创建,且允许延迟运行或定期执行任务; **
public static ScheduledExecutorService newSingleThreadScheduledExecutor()**创建一个单线程执行程序,它允许在给定延迟后运行命令或者定期地执行。 **
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory)创建一个单线程执行程序,它可安排在给定延迟后运行命令或者定期地执行。
@Test
public void demo9(){
    ScheduledExecutorService service = Executors.newScheduledThreadPool(3);
    for (int i = 0; i < 4; i++) {
        service.schedule(new Thread(() -> System.out.println(Thread.currentThread().getName()+"执行线程")),2,TimeUnit.SECONDS);
    }
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo10(){
    ScheduledExecutorService service = Executors.newScheduledThreadPool(3, new ThreadFactory() {
        int i = 1;
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"线程名"+i++);
        }
    });
    for (int i = 0; i < 4; i++) {
        service.schedule(new Thread(() -> System.out.println(Thread.currentThread().getName()+"执行线程")),2,TimeUnit.SECONDS);
    }
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo11(){
    ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
    for (int i = 0; i < 4; i++) {
        service.schedule(new Thread(() -> System.out.println(Thread.currentThread().getName()+"执行线程")),2,TimeUnit.SECONDS);
    }
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
 
@Test
public void demo12(){
    ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
        int i = 1;
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,"线程名"+i++);
        }
    });
    for (int i = 0; i < 4; i++) {
        service.schedule(new Thread(() -> System.out.println(Thread.currentThread().getName()+"执行线程")),2,TimeUnit.SECONDS);
    }
    try {
        Thread.sleep(2000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

常用方法

方法描述
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);**延迟时间单位是unit,数量是delay的时间后执行callable。 **
public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);延迟时间单位是unit,数量是delay的时间后执行command。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit);延迟时间单位是unit,数量是initialDelay的时间后,每间隔period时间重复执行一次command。
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay,TimeUnit unit);创建并执行一个在给定初始延迟后首次启用的定期操作,随后,在每一次执行终止和下一次执行开始之间都存在给定的延迟。

异步计算结果(Future)

获取每一个线程的结果集;

方法描述
boolean cancel(boolean mayInterruptIfRunning);尝试取消此任务的执行。如果任务已完成、已被取消或由于某些其他原因无法取消,则此尝试将失败。如果成功,并且在调用cancel时此任务尚未启动,则此任务不应该运行。如果任务已经开始,则mayInterruptIfRunning参数确定是否应该中断执行该任务的线程以尝试停止该任务。
V get(long timeout, TimeUnit unit)如有必要,最多等待给定时间以完成计算,然后检索其结果(如果可用)。
boolean isCancelled();如果此任务在正常完成之前被取消,则返回true 。
boolean isDone();如果此任务完成,则返回true 。完成可能是由于正常终止、异常或取消——在所有这些情况下,此方法都将返回true 。
@Test
public void demo13() {
    ExecutorService executorService = Executors.newFixedThreadPool(3);
    Future<String> submit = executorService.submit(new Callable<String>() {
        @Override
        public String call() throws Exception {
            System.out.println(Thread.currentThread().getName() + "call线程开始执行!");
            Thread.sleep(1000);
            return new BigDecimal(2).add(new BigDecimal(3)).toString();
        }
    });
    try {
        while (true) {
            if (submit.isDone()) {
                System.out.println(submit.get());
                return;
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

异步实现方式

Thread和Runnable

/**
 * 继承Thread类实现多线程
 */
public class ThreadDemo1 {
    public static void main(String[] args) {
        Demo1Thread thread1 = new Demo1Thread();
        thread1.start();
    }
 
    public static class Demo1Thread extends Thread {
        @Override
        public void run() {
            System.out.println("Thread1");
        }
    }
}
 
/**
 * 实现Runnable接口实现多线程
 */
public class RunnableDemo2 {
    public static void main(String[] args) {
        Demo2Thread demo2Thread = new Demo2Thread();
        demo2Thread.run();
    }
 
    public static class Demo2Thread implements Runnable {
 
        @Override
        public void run() {
            System.out.println("runnable2");
        }
    }
}

缺点:

  1. 资源消耗大:每次创建新线程会消耗系统资源,频繁创建和销毁线程会导致性能下降。
  2. 难以管理:手动管理线程的生命周期、异常处理、任务调度等非常复杂。
  3. 缺乏扩展性:无法轻松控制并发线程的数量,容易导致系统资源耗尽。
  4. 线程复用问题:每次任务都创建新线程,无法复用已有的线程,效率低下。

使用Executors提供线程池

优点

  1. 它帮我们管理线程,避免增加创建线程和销毁线程的资源损耗。因为线程其实也是一个对象,创建一个对象,需要经过类加载过程,销毁一个对象,需要走GC垃圾回收流程,都是需要资源开销的。
  2. 提高响应速度。如果任务到达了,相对于从线程池拿线程,重新去创建一条线程执行,速度肯定慢很多。
  3. 重复利用。线程用完,再放回池子,可以达到重复利用的效果,节省资源。
public class ExecutorsDemo {
    public static void main(String[] args) {
        System.out.println("main thread"+Thread.currentThread().getName());
        ExecutorService threadPool = Executors.newFixedThreadPool(3);
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("threadPool thread"+Thread.currentThread().getName());
            }
        });
    }
}

newFixedThreadPool默认使用LinkedBlockingQueue作为任务队列,而LinkedBlockingQueue是一个无界队列(默认容量为Integer.MAX_VALUE)。如果任务提交速度远大于线程池处理速度,队列会不断堆积任务,最终可能导致内存耗尽.。

使用自定义线程池

/**
 * 自定义线程池
 */
public class ThreadPoolExecutorDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,//核心线程数
                4,//最大线程数
                50,//线程空闲时间
                TimeUnit.SECONDS,//时间单位
                new ArrayBlockingQueue<>(3),//任务队列
                Executors.defaultThreadFactory(),//线程工厂
                new ThreadPoolExecutor.DiscardPolicy()//拒绝策略
        );
 
        System.out.println("main thread:"+Thread.currentThread().getName());
 
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    //模拟耗时操作
                    Thread.sleep(500);
                    System.out.println("threadPoolExecutor thread:"+Thread.currentThread().getName());
                } catch (InterruptedException e) {
                    System.out.println(e.getMessage());
                }
            }
        });
    }
}

使用Future和Callable

组合使用返回异步结果

public class CallableDemo3 {
    public static void main(String[] args) {
        Demo3Thread demo3Thread = new Demo3Thread();
        FutureTask<Integer> futureTask = new FutureTask<>(demo3Thread);
        new Thread(futureTask).start();
        try {
            Integer num = futureTask.get();
            System.out.println(num);
        } catch (Exception e) {
            System.out.println("发生异常" + e);
        }
    }
 
    public static class Demo3Thread implements Callable<Integer> {
        @Override
        public Integer call() {
            return 123;
        }
    }
}
 
public class CallableThreadPoolDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //自定义线程池
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                2,//核心线程数
                4,//最大线程数
                50,//线程空闲时间
                TimeUnit.SECONDS,//时间单位
                new ArrayBlockingQueue<>(3),//任务队列
                Executors.defaultThreadFactory(),//线程工厂
                new ThreadPoolExecutor.DiscardPolicy()//拒绝策略
        );
        System.out.println("main thread:" + Thread.currentThread().getName());
        Callable<String> callable = new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(1000);
                System.out.println("threadPoolExecutor thread:" + Thread.currentThread().getName());
                return "hello";
            }
        };
        Future<String> future = threadPoolExecutor.submit(callable);
        String result = future.get();//阻塞至任务完成
        System.out.println(result);
    }
}

CompletableFuture实现

CompletableFuture是Java 8引入的,提供了更强大的异步编程能力,支持链式调用、异常处理、组合多个异步任务等。

public class CompletableFutureDemo {
    public static void main(String[] args) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4, 50,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<>(3),
                Executors.defaultThreadFactory(),
                new ThreadPoolExecutor.DiscardPolicy());
 
        System.out.println("main thread:" + Thread.currentThread().getName());
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                //模拟耗时操作
                Thread.sleep(500);
                System.out.println("threadPoolExecutor thread:" + Thread.currentThread().getName());
                return "Hello";
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
            return null;
        }, threadPoolExecutor);
        future.thenAccept(result -> {
            System.out.println("异步结果:" + result);
        });
        future.join();
    }
}

使用ForkJoinPool

有些时候,我们希望开启异步,将一个大任务拆分成多个小任务(Fork),然后将这些小任务的结果合并(Join)。这时候,我们就可以使用ForkJoinPool。

ForkJoinPool 是 Java 7 引入的一个线程池实现,专门用于处理分治任务。

  1. 它的特点就是任务拆分(Fork)和结果合并(Join),以及工作窃取(Work-Stealing)。
  2. ForkJoinPool 特别适合处理递归任务或可以分解的并行任务。
public class ForkJoinPoolDemo {
    public static void main(String[] args) {
        ForkJoinPool pool = new ForkJoinPool();
        int result = pool.invoke(new SumTask(1, 100)); // 提交任务并获取结果
        System.out.println("1 到 100 的和为: " + result);
    }
 
    static class SumTask extends RecursiveTask<Integer> {
        private final int start;
        private final int end;
 
        SumTask(int start, int end) {
            this.start = start;
            this.end = end;
        }
 
        @Override
        protected Integer compute() {
            if (end - start <= 10) { // 直接计算小任务
                int sum = 0;
                for (int i = start; i <= end; i++) sum += i;
                return sum;
            } else { // 拆分任务
                int mid = (start + end) / 2;
                SumTask left = new SumTask(start, mid);
                SumTask right = new SumTask(mid + 1, end);
                left.fork(); // 异步执行左任务
                return right.compute() + left.join(); // 等待左任务完成并合并结果
            }
        }
    }
}

spring的@Async异步

Spring 提供了 @Async 注解来实现异步方法调用,非常方便。使用 @Async 可以让方法在单独的线程中执行,而不会阻塞主线程。

Spring @Async 的使用步骤其实很简单:

  • 启用异步支持:在 Spring Boot项目中,需要在配置类或主应用类上添加 @EnableAsync 注解。
  • 标记异步方法:在需要异步执行的方法上添加 @Async 注解。
  • 配置线程池:默认情况下,Spring 使用一个简单的线程池。如果需要自定义线程池,可以通过配置实现。
@SpringBootApplication
@EnableAsync // 启用异步支持
public class AsyncDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(AsyncDemoApplication.class, args);
    }
}
 
@Service
public class TianLuoAsyncService {
 
    @Async // 标记为异步方法
    public void asyncTianLuoTask() {
        try {
            Thread.sleep(2000); // 模拟耗时操作
            System.out.println("异步任务完成,线程: " + Thread.currentThread().getName());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

默认情况下,Spring 使用一个简单的线程池(SimpleAsyncTaskExecutor),每次调用都会创建一个新线程。因此,在使用Spring的@Async进行异步时,推荐使用自定义线程池。

@Configuration
public class AsyncConfig {
 
    @Bean(name = "taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(10); // 核心线程数
        executor.setMaxPoolSize(20);  // 最大线程数
        executor.setQueueCapacity(50); // 队列容量
        executor.setThreadNamePrefix("AsyncThread-"); // 线程名前缀
        executor.initialize();
        return executor;
    }
}
 
@Async("taskExecutor") // 指定使用自定义的线程池
public void asyncTianLuoTask() {
    // 方法逻辑
}

MQ实现异步

// 用户注册方法
public void registerUser(String username, String email, String phoneNumber) {
  // 保存用户信息(简化版)
  userService.add(buildUser(username,email,phoneNumber))
  // 发送消息
  String registrationMessage = "User " + username + " has registered successfully.";
  // 发送消息到队列
  rabbitTemplate.convertAndSend("registrationQueue", registrationMessage);
}
 
@Service
public class NotificationService {
 
    // 监听消息队列中的消息并发送短信/邮件
    @RabbitListener(queues = "registrationQueue")
    public void handleRegistrationNotification(String message) {
        // 这里可以进行短信或邮件的发送操作
        System.out.println("Sending registration notification: " + message);
    
        // 假设这里是发送短信的操作
        sendSms(message);
    
        // 也可以做其他通知(比如发邮件等)
        sendEmail(message);
    }
}

使用Hutool工具库的ThreadUtil

可以使用的是类似 Hutool 工具库中的 ThreadUtil,它提供了丰富的线程池管理和异步任务调度功能。

<dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.8.11</version> <!-- 请使用最新版本 -->
</dependency>
public class Test {
    public static void main(String[] args) {
        System.out.println("田螺主线程");
        ThreadUtil.execAsync(
                () -> {
                    System.out.println("田螺异步测试:" + Thread.currentThread().getName());
                }
        );
    }
}