这篇文章主要介绍“java并发ThreadPoolExecutor如何使用”的相关知识,小编通过实际案例向大家展示操作过程,操作方法简单快捷,实用性强,希望这篇“java并发ThreadPoolExecutor如何使用”文章能帮助大家解决问题。
一. 线程池的简单原理
当一个任务提交到线程池ThreadPoolExecutor时,该任务的执行如下图所示。
如果当前运行的线程数小于corePoolSzie(核心线程数),则创建新线程来执行任务(需要获取全局锁);
如果当前运行的线程数等于或大于corePoolSzie,则将任务加入BlockingQueue(任务阻塞队列);
如果BlockingQueue已满,则创建新的线程来执行任务(需要获取全局锁);
如果创建新线程会使当前线程数大于maximumPoolSize(最大线程数),则拒绝任务并调用RejectedExecutionHandler的rejectedExecution() 方法。
由于ThreadPoolExecutor存储工作线程使用的集合是HashSet,因此执行上述步骤1和步骤3时需要获取全局锁来保证线程安全,而获取全局锁会导致线程池性能瓶颈,因此通常情况下,线程池完成预热后(当前线程数大于等于corePoolSize),线程池的execute() 方法都是执行步骤2。
二. 线程池的创建
通过ThreadPoolExecutor能够创建一个线程池,ThreadPoolExecutor的构造函数签名如下。
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
通过ThreadPoolExecutor创建线程池时,需要指定线程池的核心线程数,最大线程数,线程保活时间,线程保活时间单位和任务阻塞队列,并按需指定线程工厂和饱和拒绝策略,如果不指定线程工厂和饱和拒绝策略,则ThreadPoolExecutor会使用默认的线程工厂和饱和拒绝策略。下面分别介绍这些参数的含义。
参数 | 含义 |
---|---|
corePoolSize | 核心线程数,即线程池的基本大小。当一个任务被提交到线程池时,如果线程池的线程数小于corePoolSize,那么无论其余线程是否空闲,也需创建一个新线程来执行任务。 |
maximumPoolSize | 最大线程数。当线程池中线程数大于等于corePoolSize时,新提交的任务会加入任务阻塞队列,但是如果任务阻塞队列已满且线程数小于maximumPoolSize,此时会继续创建新的线程来执行任务。该参数规定了线程池允许创建的最大线程数 |
keepAliveTime | 线程保活时间。当线程池的线程数大于核心线程数时,多余的空闲线程会最大存活keepAliveTime的时间,如果超过这个时间且空闲线程还没有获取到任务来执行,则该空闲线程会被回收掉。 |
unit | 线程保活时间单位。通过TimeUnit指定线程保活时间的时间单位,可选单位有DAYS(天),HOURS(时),MINUTES(分),SECONDS(秒),MILLISECONDS(毫秒),MICROSECONDS(微秒)和NANOSECONDS(纳秒),但无论指定什么时间单位,ThreadPoolExecutor统一会将其转换为NANOSECONDS。 |
workQueue | 任务阻塞队列。线程池的线程数大于等于corePoolSize时,新提交的任务会添加到workQueue中,所有线程执行完上一个任务后,会循环从workQueue中获取任务来执行。 |
threadFactory | 创建线程的工厂。可以通过线程工厂给每个创建出来的线程设置更有意义的名字。 |
handler | 饱和拒绝策略。如果任务阻塞队列已满且线程池中的线程数等于maximumPoolSize,说明线程池此时处于饱和状态,应该执行一种拒绝策略来处理新提交的任务。 |
三. 线程池执行任务
1. 执行无返回值任务
通过ThreadPoolExecutor的execute() 方法,能执行Runnable任务,示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor执行简单无返回值任务() throws Exception {
// 创建一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 创建两个任务
Runnable firstRunnable = new Runnable() {
@Override
public void run() {
System.out.println("第一个任务执行");
}
};
Runnable secondRunnable = new Runnable() {
@Override
public void run() {
System.out.println("第二个任务执行");
}
};
// 让线程池执行任务
threadPoolExecutor.execute(firstRunnable);
threadPoolExecutor.execute(secondRunnable);
// 让主线程睡眠1秒,等待线程池中的任务被执行完毕
Thread.sleep(1000);
}
}
运行测试程序,结果如下。
2. 执行有返回值任务
通过ThreadPoolExecutor的submit() 方法,能够执行Callable任务,通过submit() 方法返回的RunnableFuture能够拿到异步执行的结果。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor执行简单有返回值任务() throws Exception {
// 创建一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 创建两个任务,任务执行完有返回值
Callable<String> firstCallable = new Callable<String>() {
@Override
public String call() throws Exception {
return "第一个任务返回值";
}
};
Callable<String> secondCallable = new Callable<String>() {
@Override
public String call() throws Exception {
return "第二个任务返回值";
}
};
// 让线程池执行任务
Future<String> firstFuture = threadPoolExecutor.submit(firstCallable);
Future<String> secondFuture = threadPoolExecutor.submit(secondCallable);
// 获取执行结果,拿不到结果会阻塞在get()方法上
System.out.println(firstFuture.get());
System.out.println(secondFuture.get());
}
}
运行测试程序,结果如下。
3. 执行有返回值任务时抛出错误
如果ThreadPoolExecutor在执行Callable任务时,在Callable任务中抛出了异常并且没有捕获,那么这个异常是可以通过Future的get() 方法感知到的。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor执行简单有返回值任务时抛出错误() {
// 创建一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 创建一个任务,任务有返回值,但是执行过程中抛出异常
Callable<String> exceptionCallable = new Callable<String>() {
@Override
public String call() throws Exception {
throw new RuntimeException("发生了异常");
}
};
// 让线程池执行任务
Future<String> exceptionFuture = threadPoolExecutor.submit(exceptionCallable);
try {
System.out.println(exceptionFuture.get());
} catch (Exception e) {
System.out.println(e.getMessage());
}
}
}
运行测试程序,结果如下。
4. ThreadPoolExecutor通过submit方式执行Runnable
ThreadPoolExecutor可以通过submit() 方法来运行Runnable任务,并且还可以异步获取执行结果。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void ThreadPoolExecutor通过submit的方式来提交并执行Runnable() throws Exception {
// 创建一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 创建结果对象
MyResult myResult = new MyResult();
// 创建Runnable对象
Runnable runnable = new Runnable() {
@Override
public void run() {
myResult.setResult("任务执行了");
}
};
// 通过ThreadPoolExecutor的submit()方法提交Runnable
Future<MyResult> resultFuture = threadPoolExecutor.submit(runnable, myResult);
// 获取执行结果
MyResult finalResult = resultFuture.get();
// myResult和finalResult的地址实际相同
Assert.assertEquals(myResult, finalResult);
// 打印执行结果
System.out.println(resultFuture.get().getResult());
}
private static class MyResult {
String result;
public MyResult() {}
public MyResult(String result) {
this.result = result;
}
public String getResult() {
return result;
}
public void setResult(String result) {
this.result = result;
}
}
}
运行测试程序,结果如下。
实际上ThreadPoolExecutor的submit() 方法无论是提交Runnable任务还是Callable任务,都是将任务封装成了RunnableFuture接口的子类FutureTask,然后调用ThreadPoolExecutor的execute() 方法来执行FutureTask。
四. 关闭线程池
关闭线程池可以通过ThreadPoolExecutor的shutdown() 方法,但是shutdown() 方法不会去中断正在执行任务的线程,所以如果线程池里有Worker正在执行一个永远不会结束的任务,那么shutdown() 方法是无法关闭线程池的。示例如下。
public class ThreadPoolExecutorTest {
@Test
public void 通过shutdown关闭线程池() {
// 创建一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 创建Runnable对象
Runnable runnable = new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
LockSupport.parkNanos(1000 * 1000 * 1000);
}
System.out.println(Thread.currentThread().getName() + " 被中断");
}
};
// 让线程池执行任务
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
// 调用shutdown方法关闭线程池
threadPoolExecutor.shutdown();
// 等待3秒观察现象
LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
}
}
运行测试程序,会发现在主线程中等待3秒后,也没有得到预期的打印结果。如果上述测试程序中使用shutdownNow,则是可以得到预期打印结果的,示例如下。
public class ThreadPoolExecutorTest {
@Test
public void 通过shutdownNow关闭线程池() {
// 创建一个线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 4,
60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(300));
// 创建Runnable对象
Runnable runnable = new Runnable() {
@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
LockSupport.parkNanos(1000 * 1000 * 1000);
}
System.out.println(Thread.currentThread().getName() + " 被中断");
}
};
// 让线程池执行任务
threadPoolExecutor.execute(runnable);
threadPoolExecutor.execute(runnable);
// 调用shutdown方法关闭线程池
threadPoolExecutor.shutdownNow();
// 等待3秒观察现象
LockSupport.parkNanos(1000 * 1000 * 1000 * 3L);
}
}
运行测试程序,打印如下。
因为测试程序中的任务是响应中断的,而ThreadPoolExecutor的shutdownNow() 方法会中断所有Worker,所以执行shutdownNow() 方法后,正在运行的任务会响应中断并结束运行,最终线程池关闭。
假如线程池中运行着一个永远不会结束的任务,且这个任务不响应中断,那么无论是shutdown() 方法还是shutdownNow() 方法,都是无法关闭线程池的。