文章写于 2025/9/15 ,发布于语雀个人空间,今转载至个人博客
事情的是这样的,在我练习 ThreadPoolExecutor 时写了如下的代码:
public class MyRunnable implements Runnable{
@Override
public void run() {
// 非重点
}
}
并且自己写了一个简单的线程工厂(错就错在了这个工厂中):
public class MyThreadFactory implements ThreadFactory {
private AtomicInteger index = new AtomicInteger();
@Override
public Thread newThread(Runnable r) {
if(Objects.nonNull(r)){
Thread thread = new Thread();
thread.setName("线程" + index.incrementAndGet());
return thread;
}
return null;
}
}
可以看到,我在 new Thead 的时候忘记给他绑定 Runnable 了!!
然后又写了一个简单的线程池实例:
public class ThreadPoolDemo {
public static void main(String[] args) {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,
10,
1000l,
TimeUnit.MICROSECONDS,
new ArrayBlockingQueue<>(100),
new MyThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
for (int i = 0; i < 10; i++) {
Runnable runnable = new MyRunnable(Integer.toString(i));
threadPoolExecutor.submit(runnable);
}
threadPoolExecutor.shutdown();
while (!threadPoolExecutor.isTerminated()) {
}
System.out.println("Finished all threads");
}
启动线程就发现线程一直处于阻塞状态。
2. 初步解决问题
在发现问题后,我很快就找到了问题的来源,也就是线程工厂中创建线程时的错误。
但是令我不解的是,如果我们直接执行一个空的线程(如下代码),线程会立即返回,为什么在线程池中会进入阻塞状态呢?
@Test
public void simpleThreadTest(){
new Thread().start();
}
3. 源码剖析
进行剖析,我发现问题所在:
首先让我们查看 submit(Runnable) 方法的实现(下方的源码均为简化版,没有考虑加锁、核心线程池满的情况):
public Future<?> submit(Runnable task) {
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
在 submit 中,其实还是调用了 execute 方法,并通过 Future 封装达到了可查看运行状态的效果,让我们进入 execute 方法中:
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) // 🔴 关键
return;
}
...
}
继续查看 addWorker 方法:
private boolean addWorker(Runnable firstTask, boolean core) {
Worker w = new Worker(firstTask); // 🔴 Worker 包含任务队列消费逻辑
final Thread t = threadFactory.newThread(w);
if (t != null) {
workers.add(w);
t.start();
return true;
}
return false;
}
可以看到这里将任务封装给了一个 Worker 对象并作为了创建 Thread 的参数。同时通过下面的 Worker 对象的定义也可以看到 Worker 类实现了 Runnable 接口:
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable{
……
}
回到 addWorker 方法可以看到这个方法做了两件事:
- 将 w 将入到工作区
- 启动 t 线程
理想情况下,t 线程中是有一个 Runnable 对象的,如果启动线程,就会调用 w.run();所以先让我看看 w.run() 是如何执行的:
public void run() {
runWorker(this);
}
final void runWorker(Worker w) {
Runnable task = w.firstTask;
while (task != null || (task = getTask()) != null) {
task.run();
task = null;
}
}
可以看到在方法 runWorker 中不仅会执行任务还会不断地将任务取出并最终将该任务从工作区删除,为新的任务腾出空间。
而且我们的错误实现 Thread thread = new Thread()中只会执行将任务添加到任队列的方法,执行不到将任务从队列中取出的方法,于是就造成了工作区的阻塞。
4. 总结
线程池执行代码的过程,不单单是执行 Thread.start() ,他还维护了一个工作区,工作区添加任务是在创建线程时发生的,这个代码一定会执行,而工作区的清除工作是在线程执行过程(也是 Worker.run())中发生,我们的错误就是没有正常的执行 Worker.run() ,然后导致了线程阻塞。
Comments NOTHING