Java线程池大揭秘:揭开并发编程的神秘面纱,让你的程序快到起飞

本文最后更新于:8 个月前

每一次的选择,都是对未来的期许,让我们在不确定中寻找到属于自己的方向。破冰

  • 🥇 推荐阅读:(2023/10/22 晚)
🔥 线程池:
💥 异步编程:
🍖 锁:

线程池
线程池的创建(参数)
线程池的运行机制(理论 + 演示)
线程池的使用(实战 submit + CompatableFuture 异步)

精髓所在

线程基础

🔥 推荐阅读:Java 开发 - 不知道算不算详细的 JUC 详解_java juc_CodingFire 的博客-CSDN 博客

线程状态总结

Java 创建线程的十种方式

🌮 推荐阅读:大家都说 Java 有三种创建线程的方式!并发编程中的惊天骗局! - 掘金 (juejin.cn)

继承 Thread 类

  • 代码示例:(2023/11/11 早)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author 邓哈哈
* 2023/11/11 9:30
* Function: 继承 Thread 类
* Version 1.0
*/

public class ExtendsThread extends Thread {
@Override
public void run() {
System.out.println("1......");
}

public static void main(String[] args) {
new ExtendsThread().start();
}
}
  • 运行结果:

image-20231111101903672

实现 Runnable 接口

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author 邓哈哈
* 2023/11/11 9:32
* Function: 实现 Runnable 接口
* Version 1.0
*/

public class ImplementsRunnable implements Runnable {
@Override
public void run() {
System.out.println("2.....");
}

public static void main(String[] args) {
new Thread(new ImplementsRunnable()).start();
}
}
  • 运行结果:

image-20231111102014242

实现 Callable 接口

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author 邓哈哈
* 2023/11/11 9:33
* Function: 实现 Callable 接口
* Version 1.0
*/

public class ImplementsCallable implements Callable {
@Override
public Object call() {
System.out.println("3....");
return "hhh";
}

public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<>(new ImplementsCallable());
new Thread(futureTask).start();
}
}
  • 运行结果:

image-20231111102052773

使用 ExecutorService 线程池

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/**
* @author 邓哈哈
* 2023/11/11 9:37
* Function: 使用 ExecutorService 线程池
* Version 1.0
*/

public class UseExecutorsService {
public static void main(String[] args) {
// 1.自定义线程池
ThreadPoolExecutor threadPoolB =
new ThreadPoolExecutor(3, 5,
0, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(4),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());

threadPoolB.submit(() -> {
System.out.println("4B......");
});
threadPoolB.shutdown();

// 2.Executors 线程池
ExecutorService threadPoolA = Executors.newFixedThreadPool(5);

while (true) {
threadPoolA.execute(() -> {
System.out.println("4.1...." + Thread.currentThread().getName());
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
  • 运行结果:

image-20231111102231708

使用 CompletableFuture 类

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author 邓哈哈
* 2023/11/11 9:53
* Function: 使用 CompletableFuture 类
* Version 1.0
*/

public class UseCompletableFuture {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
System.out.println("5.1....");
return "hhh";
});

CompletableFuture.runAsync(() -> {
System.out.println("5.2....");
});
}
}
  • 运行结果:

image-20231111102428953

基于 ThreadGroup 线程组

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* @author 邓哈哈
* 2023/11/11 9:59
* Function: 基于 ThreadGroup 线程组
* Version 1.0
*/

public class UseThreadGroup {
public static void main(String[] args) {
ThreadGroup threadGroup = new ThreadGroup("groupName");

new Thread(threadGroup, () -> {
System.out.println("6...." + Thread.currentThread().getName());
}).start();

new Thread(threadGroup, () -> {
System.out.println("6...." + Thread.currentThread().getName());
},"T2").start();

new Thread(threadGroup, () -> {
System.out.println("6...." + Thread.currentThread().getName());
}).start();
}
}
  • 运行结果:

image-20231111102525557

使用 FutureTask 类

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* @author 邓哈哈
* 2023/11/11 10:02
* Function: 使用 FutureTask 类
* Version 1.0
*/

public class UseFutureTask {
public static void main(String[] args) {
FutureTask<String> futureTask = new FutureTask<>(() -> {
System.out.println("7....");
return "hhh";
});

new Thread(futureTask).start();
}
}
  • 运行结果:

image-20231111102608295

使用匿名内部类或 Lambda 表达式

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
/**
* @author 邓哈哈
* 2023/11/11 10:05
* Function: 使用匿名内部类或 Lambda 表达式
* Version 1.0
*/

public class UseAnonymousClass {
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("8....");
}
}).start();

new Thread(() -> {
System.out.println("8....");
}).start();
}
}
  • 运行结果:

image-20231111102656656

使用 Timer 定时器类

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* @author 邓哈哈
* 2023/11/11 10:07
* Function: 使用 Timer 定时器类
* Version 1.0
*/

public class UseTimer {
public static void main(String[] args) {
Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("9....");
}
}, 2000, 1000);
}
}
  • 《打造流畅编码之旅:IntelliJ IDEA 的高效使用技巧与工作流程优化》 一文中 定时任务实现 栏目下有过详细的扩展讲解

  • 运行结果:

image-20231111101645358

使用 ForkJoin 线程池或 Stream 并行流

  • 代码示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/**
* @author 邓哈哈
* 2023/11/11 10:10
* Function: 使用 ForkJoin 线程池或 Stream 并行流
* Version 1.0
*/

public class UseForkJoinPool {
public static void main(String[] args) {
ForkJoinPool joinPool = new ForkJoinPool();

joinPool.execute(() -> {
System.out.println("10....");
});

List<String> list = Arrays.asList("10B......");
list.parallelStream().forEach(System.out::println);
}
}

线程池

使用线程池的优势

构造线程池

核心参数介绍

  • corePoolSize(核心线程数)(2023/10/22 晚)
  • maximumPoolSize(最大线程数)
  • keepAliveTime(闲置超时时长)
  • unit(闲置超时单位)
  • workQueue(任务队列)
  • threadFactory(线程工厂)
  • handler(拒绝策略)

线程池的工作原理

图文介绍

实操测试

  • 这里我们简单演示下线程池的工作原理,按如下步骤依次进行

  • 自定义线程工厂(2023/10/22 晚)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* @author 邓哈哈
* 2023/10/16
* Function: 自定义线程池
* Version 1.0
*/
@Configuration
public class ThreadPoolExecutorConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
ThreadFactory threadFactory = new ThreadFactory() {
// 记录线程数
private int count = 1;

@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("线程" + count);
count++;
return thread;
}
};

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
(2, 4, 100,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(4), threadFactory);
return threadPoolExecutor;
}
}
  • 注入自定义线程工厂依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* @author 邓哈哈
* Function: 任务队列测试 提交任务到线程池
* Version 1.0
*/

@RestController
@RequestMapping("/queue")
@Slf4j
public class QueueController {
@Resource
private ThreadPoolExecutor threadPoolExecutor;
...............................
}
  • 编写接口:查看当前线程池状态、新增线程
1
2
3
4
5
6
7
8
9
10
11
12
13
@GetMapping("/get")
public String get() {
HashMap<String, Object> map = new HashMap<>();
int size = threadPoolExecutor.getQueue().size();
map.put("队列长度", size);
long taskCount = threadPoolExecutor.getTaskCount();
map.put("任务总数", taskCount);
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
map.put("已完成任务数", completedTaskCount);
int activeCount = threadPoolExecutor.getActiveCount();
map.put("正在工作的线程数", activeCount);
return JSONUtil.toJsonStr(map);
}
1
2
3
4
5
6
7
8
9
10
11
@GetMapping("/add")
public void add(String name) {
CompletableFuture.runAsync(() -> {
log.info(Thread.currentThread().getName() + "正在执行中");
try {
Thread.sleep(600000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, threadPoolExecutor);
}
  • 开启测试(2023/10/22 晚)
  • 详细的流程介绍可以在《简单测试,理解线程池的工作流程》一栏中学习了解(2023/12/22 早)

其他参数介绍

阻塞队列

线程工厂与自定义工厂

拒绝策略与自定义策略

线程池的工作机制

  • 这部分内容我计划在新的博文中介绍,可以看这篇文章学习:

🥣 推荐阅读:

简单测试,理解线程池工作流程

自定义线程池

  • 自定义线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* 自定义线程池
*/
@Configuration
public class ThreadPoolExecutorConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
ThreadFactory threadFactory = new ThreadFactory() {
// 记录线程数
private int count = 1;
@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName("线程" + count);
count++;
return thread;
}
};
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
(2, 4, 100,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(4), threadFactory);
return threadPoolExecutor;
}
}
  • 在代码中,首先定义了一个ThreadFactory 接口的实现类,用于创建线程(2023/10/15 晚)
  • newThread 方法中,通过创建线程并设置名称的方式来实现线程的创建。每次创建线程时,都会使用一个计数器 count,用于记录线程的数量,保证线程名称的唯一性
  • 然后,使用 ThreadPoolExecutor 类创建一个线程池对象。括号内的参数依次为:核心线程数、最大线程数、线程空闲时间、时间单位、任务队列以及线程工厂。这些参数分别表示线程池的基本配置,如最小/最大线程数、线程空闲时间等。其中,任务队列使用了 ArrayBlockingQueue,表示使用有界队列来存储线程任务
  • 最后,将创建好的线程池对象返回,供其他地方进行调用和使用。

提交任务到线程池

  • 提交任务到线程池:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* 任务队列测试 提交任务到线程池
*/
@RestController
@RequestMapping("/queue")
@Slf4j
public class QueueController {
@Resource
private ThreadPoolExecutor threadPoolExecutor;

@GetMapping("/get")
public String get() {
HashMap<String, Object> map = new HashMap<>();
int size = threadPoolExecutor.getQueue().size();
map.put("队列长度", size);
long taskCount = threadPoolExecutor.getTaskCount();
map.put("任务总数", taskCount);
long completedTaskCount = threadPoolExecutor.getCompletedTaskCount();
map.put("已完成任务数", completedTaskCount);
int activeCount = threadPoolExecutor.getActiveCount();
map.put("正在工作的线程数", activeCount);
return JSONUtil.toJsonStr(map);
}

@GetMapping("/add")
public void add(String name) {
CompletableFuture.runAsync(() -> {
log.info(Thread.currentThread().getName() + "正在执行中");
try {
Thread.sleep(600000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, threadPoolExecutor);
}
}
  • 这段代码是一个简单的示例,在Spring Boot 中使用自定义的线程池。首先通过@Resource 注解将之前配置好的线程池对象 threadPoolExecutor 注入进来,然后定义了两个接口方法:get 和 add

  • 在 get 方法中,通过调用线程池对象的不同方法,获取了线程池的一些状态信息,如队列长度、任务总数、已完成任务数和正在工作的线程数,并将这些信息封装进一个HashMap中,最后使用 JSONUtil.toJsonStr 方法将其转换成 JSON 格式的字符串返回。

  • 在 add 方法中,使用 CompletableFuture.runAsync 方法来在线程池中执行一个任务。这里的任务是一个简单的代码块,通过 log 输出当前线程的名称,并休眠 10 分钟。通过将线程池对象 threadPoolExecutor 作为参数传递给 runAsync 方法,使得任务在该线程池中执行。

  • 通过这段代码,我们可以在 Spring Boot 项目中方便地使用自定义的线程池,并对其进行状态监控和管理(2023/10/15 晚)

  • 由 ThreadPoolExecutorConfig 配置可知,我们自定义的线程池参数设置如下:

    • 核心线程数:2
    • 最大线程数:4
    • 任务队列数:4
    • 超时等待时间:未设置,默认拒绝
    • 时间单位:秒(SECEND)
  • 我们开始测试,依次添加任务,观察执行的线程数和任务队列数,详情如下:

image-20231015192743819

image-20231015192927650

image-20231015193033848

image-20231015193106483

image-20231015193134223

  • 简单描述一下测试情况吧:

    • 正在运行的线程数未达到核心线程数阈值时,优先添加线程处理新任务
    • 正在运行的线程数达到核心线程数阈值,但任务队列未满时,优先将任务放入任务队列中
    • 任务队列放满后,但正在运行的线程数未达到最大线程数阈值时,优先添加线程处理新任务
    • 正在运行的线程数达到最大线程数阈值后,采用合适的拒绝策略(这里我们采用默认的拒绝策略:直接扔掉这个任务)
  • 测试完成(2023/10/15 晚)

真实业务场景下的实战

批量插入用户记录

  • 这个需求场景是我在做 Memory 伙伴匹配系统 时遇到的:批量插入用户记录

  • 新建 StopWatch 对象,计时开始:(2024/01/09 晚)

1
2
3
4
// new一个StopWatch对象
StopWatch stopWatch = new StopWatch();
// 计时开始
stopWatch.start();
  • 限制可创建的最大线程数量,限制每个线程最大可插入的用户记录条数:
1
2
3
4
// 每条线程插入1000条
int batchSize = 1000;
// 最大线程数为 10 条
int maxSize = 100;
  • 异步条件下,执行批量插入,并使用异步任务集合 futureList 维护所有正在执行中的异步任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 开10条线程
for (int i = 0, j = 0; i < maxSize; i++) {
// 每条线程下new一个userList
ArrayList<User> userList = new ArrayList<>();
while (true) {
j++;
User user = new User();
user.setUserAccount("memory" + "_" + (UUID.randomUUID() + "").substring(0, 8));
String password = DigestUtils.md5DigestAsHex((SALT + 12345678).getBytes());
user.setUserPassword(password);
user.setAvatarUrl("https://fastly.jsdelivr.net/npm/@vant/assets/ipad.jpeg");
user.setGender("0");
user.setPhone("18535854763");
user.setEmail("3348407547@qq.com");
user.setUserStatus(0);
user.setUserRole(0);
user.setPlanetCode("17625");
user.setTags("[\"女\",\"Vue\",\"Python\",\"在校本科\",\"发呆\",\"emo中\"]");
userList.add(user);
// 当该线程插满1000条数据,便退出该线程循环
if (j % batchSize == 0) {
break;
}
}
// 异步条件下, 执行批量插入
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
System.out.println("threadName: " + Thread.currentThread().getName());
userService.saveBatch(userList, batchSize);
});
// 将该任务存储到异步任务集合当中
futureList.add(future);
}
  • 结束所有异步任务,计时结束,计算批量插入数据所消耗的时间:
1
2
3
4
5
6
7
// 结束所有异步任务
CompletableFuture<Void> future = CompletableFuture.allOf(futureList.toArray(new CompletableFuture[]{}));
future.join();
// 计时结束
stopWatch.stop();
// 计算插入所用总时间
System.out.println(stopWatch.getTotalTimeMillis());

异步查询图片数据

  • 这个需求场景是我在做 Memory 聚合搜索平台时遇到的:服务端聚合搜索不同来源的数据后返回

  • 为每个搜索请求开启异步任务,并发执行诗词搜索博文搜索图片搜索三个任务:(2024/01/09 晚)

1
2
3
4
5
6
7
8
9
10
11
// 诗词搜索
CompletableFuture<Page<PostVO>> postTask = CompletableFuture.supplyAsync(() ->
postDataSource.search(searchText, pageSize, current));

// 博文搜索
CompletableFuture<Page<ArticleVO>> articleTask = CompletableFuture.supplyAsync(() ->
articleDataSource.search(searchText, pageSize, current));

// 图片搜索
CompletableFuture<Page<Picture>> pictureTask = CompletableFuture.supplyAsync(() ->
pictureDataSource.search(searchText, pageSize, current));
  • 结束所有异步任务,获取搜索结果:
1
CompletableFuture.allOf(postTask, pictureTask, articleTask).join();
  • 整合不同搜索来源的搜索结果,返回聚合搜索结果:
1
2
3
4
5
6
7
8
9
10
11
12
13
try {
Page<PostVO> postVOPage = postTask.get();
Page<Picture> picturePage = pictureTask.get();
Page<ArticleVO> articlePage = articleTask.get();

searchVO = new SearchVO();
searchVO.setPostVOList(postVOPage.getRecords());
searchVO.setPictureList(picturePage.getRecords());
searchVO.setArticleList(articlePage.getRecords());

} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}

AI 服务处理请求任务

  • 这个需求场景是我在做 Memory BI 智能分析平台 时遇到的:调用第三方 AI 服务处理用户请求
  • 自定义线程池:(2024/01/09 晚)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Configuration
public class ThreadPoolExecutorConfig {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
ThreadFactory threadFactory = new ThreadFactory() {
// 记录线程数
private int count = 1;

@Override
public Thread newThread(@NotNull Runnable r) {
Thread thread = new Thread(r);
thread.setName("线程" + count);
count++;
return thread;
}
};

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
(2, 4, 100,
TimeUnit.MINUTES, new ArrayBlockingQueue<>(4), threadFactory);
return threadPoolExecutor;
}
}
  • 请求图表信息(图表名, 生成目标, 图表类型):
1
2
3
String name = genChartByAiRequest.getName();
String goal = genChartByAiRequest.getGoal();
String chartType = genChartByAiRequest.getChartType();
  • 校验用户登录:
1
2
User loginUser = userService.getLoginUser(request);
ThrowUtils.throwIf(loginUser == null, ErrorCode.NOT_LOGIN_ERROR, "请先登录后再尝试调用接口");
  • 限流(限制用户的调用次数,以用户 id 为 key,区分各个限流器):
1
2
// 3.限流(限制用户的调用次数,以用户id为key,区分各个限流器)
redisLimiterManager.doRateLimit("genCharByAi_" + loginUser.getId());
  • 提取图表名信息、分析需求(分析目标 图表类型),做好参数校验:
1
2
3
4
5
6
// 4.1.校验图表名信息
ThrowUtils.throwIf(StringUtils.isNotBlank(name) && name.length() > 100, ErrorCode.PARAMS_ERROR, "名称过长");
// 4.2.校验分析目标
ThrowUtils.throwIf(StringUtils.isBlank(goal), ErrorCode.PARAMS_ERROR, "分析目标为空");
// 4.3.校验图表类型
ThrowUtils.throwIf(StringUtils.isBlank(chartType), ErrorCode.PARAMS_ERROR, "分析图表类型为空");
  • 分析 Excel 图表,获取原始数据:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 5.1.校验文件
// 5.1.1.校验文件大小
long size = multipartFile.getSize();
ThrowUtils.throwIf(size > ONE_MB, ErrorCode.PARAMS_ERROR, "文件超过 1M");

// 5.1.2.校验文件后缀
String originalFilename = multipartFile.getOriginalFilename();
String suffix = FileUtil.getSuffix(originalFilename);
ThrowUtils.throwIf(!VALID_FILE_SUFFIX_LIST.contains(suffix), ErrorCode.PARAMS_ERROR, "文件后缀非法");

// 5.2.分析文件,获取csv数据
StringBuilder userInput = new StringBuilder();
String excelToCsv = ExcelUtils.excelToCsv(multipartFile);
userInput.append("\n")
.append("分析需求:").append("\n")
.append(goal).append(", ").append("请生成一张").append(chartType).append("\n")
.append("原始数据:").append("\n")
.append(excelToCsv);
  • 处理任务前, 先插入图表信息到数据库:
1
2
3
4
5
6
7
8
9
Chart chart = new Chart();
chart.setName(name);
chart.setGoal(goal);
chart.setChartType(chartType);
chart.setChartData(excelToCsv);
chart.setUserId(loginUser.getId());
chart.setStatus(ChartStatusEnum.WAIT.getValue());
boolean save = save(chart);
ThrowUtils.throwIf(!save, ErrorCode.OPERATION_DATABASE_ERROR, "插入图表信息失败");
  • 使用任务队列处理任务:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// todo 建议处理任务队列满了之后, 抛出异常的情况
CompletableFuture.runAsync(() -> {
// 7.1.更新图表状态执行中(running)
Chart updateChart = new Chart();
updateChart.setId(chart.getId());
updateChart.setStatus(ChartStatusEnum.RUNNING.getValue());

boolean updateById = updateById(updateChart);
ThrowUtils.throwIf(!updateById, ErrorCode.OPERATION_DATABASE_ERROR, "更新图表执行中状态失败");

// 7.2.AI 执行, 智能生成图表
String result = aiManager.doChat(AiConstant.BI_MODEL_ID, userInput.toString());

// 7.3.校验图表生成失败情况
// todo 校验图表生成失败情况
if (result == null) {
// 更新图表信息(图表生成失败)
Chart updateChartResult = new Chart();
updateChartResult.setId(chart.getId());
updateChartResult.setStatus(ChartStatusEnum.FAILED.getValue());

boolean updateResultById = updateById(updateChartResult);
ThrowUtils.throwIf(!updateResultById, ErrorCode.OPERATION_DATABASE_ERROR, "更新图表失败状态失败");
}

// 7.4.处理AI响应的对话信息
String[] split = result.split("【【【【【");
String genChart = split[1];
String genResult = split[2];

// 7.5.更新图表信息(图表成功状态、图表生成情况、图表分析结果)
Chart updateChartResult = new Chart();
updateChartResult.setId(chart.getId());
updateChartResult.setStatus(ChartStatusEnum.SUCCEED.getValue());
updateChartResult.setGenChart(genChart);
updateChartResult.setGenResult(genResult);

boolean updateResultById = updateById(updateChartResult);
ThrowUtils.throwIf(!updateResultById, ErrorCode.OPERATION_DATABASE_ERROR, "更新图表成功状态失败");
}, threadPoolExecutor);
  • 封装分析结果并返回:(2024/01/09 晚)
1
2
3
4
BiResponse biResponse = new BiResponse();
biResponse.setUserId(loginUser.getId());

return biResponse;

异步编程

CompletableFuture 简介

概述

优势与特点

CompletableFuture 基本用法

小试牛刀

  • 我们使用入门级案例,快速使用 CompletableFuture 对象执行异步编程:

  • 首先自定义线程:(2023/10/22 晚)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ThreadTask implements Runnable {
public static int index = 0;

public ThreadTask() {
}

public void run() {
Thread.currentThread().setName("MyThread: " + index++);
System.out.println(Thread.currentThread().getName());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
  • 创建线程池,新增线程并使用 CompletableFuture 对象异步执行线程,实现并发:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ThreadPool3 {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
1000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(4),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
// for (int i = 0; i < 8; i++) {
// threadPoolExecutor.execute(new ThreadTask());
// }
//
// for (int i = 0; i < 8; i++) {
// Future<?> submit = threadPoolExecutor.submit(new ThreadTask());
// }

for (int i = 0; i < 8; i++) {
CompletableFuture.runAsync(new ThreadTask(), threadPoolExecutor);
}
}
}
  • 在上面的代码中,我们还使用了三种方法处理异步执行任务
  • threadPoolExecutor.execute(new ThreadTask()): 这个方法将新的任务提交给线程池来执行。新的任务将创建一个新的线程来执行。如果线程池已满,新提交的任务将会等待现有任务完成后再开始执行。
  • Future<?> submit = threadPoolExecutor.submit(new ThreadTask()): 这个方法将新的任务提交给线程池,但不立即执行,而是返回一个 Future 对象。Future 对象表示任务的结果。在等待任务结果时,你还可以对其他任务进行调度,这是使用 Future 的优点。如果线程池已满,新提交的任务也会等待现有任务完成后再开始执行。
  • CompletableFuture.runAsync(new ThreadTask(), threadPoolExecutor): 这个方法用于提交一个异步任务到线程池中执行。这个方法创建了一个 CompletableFuture 对象,该对象允许你使用回调或 future 对象来等待任务的完成并获取结果。这种方法的优点是可以并发执行多个任务,无需等待上一个任务完成就开始下一个任务。
  • 先简单了解一下吧,稍后我们会详细说明的(2023/10/22 晚)
  • 运行程序,得到执行结果如下:

image-20231022221903932

创建 CompletableFuture 对象

获取任务结果

异步回调

多任务组合回调

异常处理与错误处理

自定义线程池与资源管理

并发任务的调度与控制

IO 操作与网络请求

Java 锁机制

🔥 推荐阅读:(2023/10/24 晚)

总结

  • 悲观锁:悲观锁总是假设最坏的情况,认为在高并发情况下,共享变量每次被访问都会出现问题,所以每个线程在获取资源时都会加锁。激烈的锁竞争会导致线程阻塞和频繁的上下文切换,大大增加系统的性能开销,还可能引发死锁问题
  • 乐观锁:乐观锁总是假设最好的情况,认为共享资源被访问时不会出现问题,线程无需加锁也无需等待,不存在锁竞争和线程阻塞。只需在提交修改时确认该资源是否被其他线程修改了,这就是验证线程之间是否发生冲突了,一般采用版本号机制CAS 算法实现。如果线程提交修改频繁失败、频繁重试,同样会大大增加系统性能开销。
  • 乐观锁还存在 ABA 问题,即错误地判断要修改的资源没有被其他线程修改,可以通过追加版本号引入时间戳来解决
  • 总体来说,悲观锁适用于写比较多的场景,避免频繁失败和频繁重试影响性能;乐观锁适用于写比较少的情况,避免频繁加锁影响性能(2023/10/27 晚)

亮点集锦


Java线程池大揭秘:揭开并发编程的神秘面纱,让你的程序快到起飞
https://test.atomgit.net/blog/2023/10/22/Java线程池大揭秘:揭开并发编程的神秘面纱,让你的程序快到起飞/
作者
Memory
发布于
2023年10月22日
更新于
2024年1月9日
许可协议