Thread

来了,来了,如约而至,今天说一下Java中的Thread。

在正式开始前,说一点题外话,最近在翻这些基础知识时,感受到如下:

  • 学习知识最好时一次性系统学习,这样最高效,节省时间,后面只需要通过实践深入就好了
  • 我的记忆一向不是很”可靠”,时间一长很多知道东西就开始模糊了,这个确实需要翻一翻回忆一下。其实还是用得少

之前给同事分享过一个presentation关于K8S的,结果这两天和人聊天时其中一个概念就印象不深了,还好后来回忆起来了

昨天晚上(7月2日)一口气写了六个服务的OCP部署脚步,有点晕,没有感到一丝的成就感,纯体力劳动。吐槽一下,跑题了,收~~

Thread的定义我就不再赘述了,大家还是上网自行查找吧,毕竟我自己描述会不太准确,而每个人想了解的深度也有所不同,是否需要深入到计算机原理层面,请自行决定。
这里我所描述的内容,主要是为了帮助自己的记忆和记录一些发现,如果大家想深入还是多看看官方文档和Java源码,然后自己多试试吧

Thread & Runnable

线程的开头就是这两个家伙了,Thread class 和 Runnable 接口
Java多线程的鼻祖,从1.0版本就有了

我们可以继承Thread 类,也可以实现Runnable 接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static class NewThread extends Thread {
@Override
public void run() {
long startTime = System.currentTimeMillis();
int i = 0;
while (true) {
System.out.println(this.getName());
System.out.println(Thread.currentThread().getName() + ": New Thread is running..." + i++);
try {
//Wait for one sec so it doesn't print too fast
Thread.sleep(1000);
if (i >= 10) {
break;
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

如上,你能说出this.getName() 和 Thread.currentThread().getName()的区别吗?
以及在什么操作下可以呈现这个区别,如果你可以回答这个问题,我相信你已经深入理解Thread的部分知识

Executors

1.5开始就引入了Executors,来声明多线程。此时我们开始关注java.util.concurrent
这里我们会关注如下几点

  • atomic 原子类型
  • locks 一堆锁,前一篇文章已经提到
  • Concurrent* 各种并发安全的基础数据结构类型
  • callable 以及各种Future 实现
  • 以及现在所说的 ExecutorService 相关实现
1
2
3
4
ExecutorService executor = Executors.newFixedThreadPool(10);
executor.submit(new NewThread());
executor.submit(new NewThread());
executor.shutdown();

常见的有这三个线程池创建方式

1
2
3
Executors.newFixedThreadPool(10);
Executors.newCachedThreadPool();
Executors.newSingleThreadExecutor();

其实都是ThreadPoolExecutor不同参数化
有一个比较特殊的

1
2
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2);
executorService.schedule(new NewThread(), 1, TimeUnit.SECONDS);

这个来自ScheduledThreadPoolExecutor方法创建,可以周期执行

此处要注意一个小问题,executor默认不会自己释放,所以需要手动shutdown。shutdown相关有两个方法,区别请自行查找

另一个思考题,你能清晰的认识到以下两个写法的区别吗?

1
2
executor.submit(NewThread::new);
executor.submit(new NewThread());

Future

Future 接口实现也很多,最常用,最简单使用的莫过于CompletableFuture吧,只需要简单将要处理的内容包裹一下,就可以启动多线程了
值得注意的是,supplyAsync 方法默认使用的也是 ForkJoinPool.commonPool(),后面也会提到。 当然也是可以指定自己定义的executor的。

1
2
3
4
5
6
CompletableFuture<String> stringCompletableFuture = CompletableFuture.supplyAsync(() -> "Hello");
try {
System.out.println(stringCompletableFuture.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

CompletableFuture 在Java9之后完善了很多,添加很多新方法,其实针对completableFuture 就可以单独喝一壶的,大家还是自行去深入吧。
这里只是点到为止。CompletableFuture可以将计算步骤形成一个链,合并处理结果,例如:

1
2
3
4
5
CompletableFuture<String> completableFuture 
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));

assertEquals("Hello World", completableFuture.get());

其另一个优势在于,他可以handle异常,例如:

1
2
3
4
5
6
7
8
9
CompletableFuture<String> completableFuture  
= CompletableFuture.supplyAsync(() -> {
if (name == null) {
throw new RuntimeException("Computation error!");
}
return "Hello, " + name;
}).handle((s, t) -> s != null ? s : "Hello, Stranger!");

assertEquals("Hello, Stranger!", completableFuture.get());

关于completableFuture中方法,每个主要分三类:

  • 普通当前线程方法
  • Async结尾使用默认ForkJoinPool.commonPool()线程池的,异步方法
  • Async结尾使用自定义Executor线程的,异步方法

大家自行查看源码吧
我也是看了一些网上的文章,大致了解了一下,以后会在实际工作中合适的场下尝试各个功能。

TimerTask

刚才提到了ScheduledExecutorService,在其之前还有一个TimerTask创建定时线程的方式,只是了解一下,估计后面不会太多使用了

1
2
3
4
5
6
7
8
9
10
TimerTask task = new TimerTask() {
@Override
public void run() {
System.out.println("Task performed on: " + new Date() + "n"
+ "Thread's name: " + Thread.currentThread().getName());
}
};
Timer timer = new Timer("Timer");
long delay = 1000L;
timer.schedule(task, delay);

Callable

与Runnable对应,一个只是执行,一个可以返回结果。现在来看Callable使用更加频繁一些
同时Callable执行后对应的接受类型为Future类型

1
2
3
4
5
6
public class FactorialTask implements Callable<Integer> {}

FactorialTask task = new FactorialTask(5);
Future<Integer> future = executorService.submit(task);

assertEquals(120, future.get().intValue());

ForkJoinPool

它的设计目的就是将问题分段处理,逐一解决然后在合并,这就是Fork/Join两个步骤的意思。
今后应该会关注其实际应用,貌似应用场景很多,功能非常使用
至于Worker是如何管理的,这个我就没有深入研究了,有时间再深入了

默认使用ForkJoinPool.commonPool(),我们也可以自己定义新的ForkJoinPool,例如:

1
ForkJoinPool forkJoinPool = new ForkJoinPool(2);

和Thread相比,下面的代码最大的问题你知道是什么吗?记得Thread setDaemon(true)吗?

1
2
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.execute(new NewThread());

如何使用ForkJoinPool,可以创建一个递归任务, RecursiveTask继承自ForkJoinTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.gino.thread;

import java.util.*;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

public class CustomRecursiveTask extends RecursiveTask<Integer> {

private int[] arr;

private static final int THRESHOLD = 5;

public CustomRecursiveTask(int[] arr) {
this.arr = arr;
}

@Override
protected Integer compute() {
if (arr.length > THRESHOLD) {

return ForkJoinTask.invokeAll(createSubtasks()).stream().mapToInt(ForkJoinTask::join).sum();

} else {
return processing(arr);
}
}

private Collection<CustomRecursiveTask> createSubtasks() {
List<CustomRecursiveTask> dividedTasks = new ArrayList<>();
dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, 0, arr.length / 2)));
dividedTasks.add(new CustomRecursiveTask(Arrays.copyOfRange(arr, arr.length / 2, arr.length)));
return dividedTasks;
}

private Integer processing(int[] arr) {
System.out.println(Arrays.toString(arr)+"processed by " + Thread.currentThread().getName());
return Arrays.stream(arr).filter(a -> a > 10 && a < 27).map(a -> a * 10).sum();
}
}

运行代码

1
2
3
4
5
6
7
8
CustomRecursiveTask customRecursiveTask = new CustomRecursiveTask(new int[]{1, 2, 3, 11, 12, 32, 12, 54, 22, 20, 19});
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

// 这两行分别运行一下,观察他们的区别,你会发现什么?
Integer compute = customRecursiveTask.compute();
Integer compute = forkJoinPool.invoke(customRecursiveTask);

System.out.println(compute);

最近实验了不少,先记录这些吧。写这些内容主要是为了自己的学习总结,并不是为了知识分享,所以内容不会很细致讲解到位。
看到的人,如有疑问可以自己搜索相关细节,深入研究。 代码都是来自网络,自己稍加修改。

题外话

其实这篇文章7月3日就创建了,由于个人时间紧张迟迟没有时间完成,现在是7月11日凌晨1点,终于总结完了。

窗外已经开始下雨,如果按照预报说,白天可能还会有大暴雨。 如果没有,我就继续去游泳,自由泳换气转体还是差很多,继续改进。
鞭腿推进力不足,还是有些吃力,难道这就老了吗?不服~~哈哈

下一个总结目标,各种实用的基础类型,各种list map queue deque等,应该是数据结构方面