Java中使用线程池进行多线程执行结果汇总Demo

发布时间:

当我们在执行很多个同一个任务时,例如查询同一条SQL只不过条件不一样,或者对一个数据进行处理等操作时,它们返回的类型或者对象是相同的情况下可以考虑使用多线程执行这些任务,然后将结果进行汇总返回,这样可以提高执行的效率。下面通过一个简单的示例来演示这个合并的过程。

Java中使用线程池进行多线程执行结果汇总Demo

实现过程

创建线程池

首先创建一个线程池,这一步骤大同小异

/**
 * 创建线程池
 * @author huhailong
 *
 */
public class MyThreadPool {
	
	private static final int CORE_POOL_SIZE = 10;	//核型线程数,最小可以同时运行的线程数量
	private static final int MAX_POOL_SIZE = 10;	//最大线程数,当队列中存放的任务到达队列容量时,当前可以同时运行的线程数量变为最大线程数
	private static final int QUEUE_CAPACITY = 100;	//当新任务来的时候会先判断当前运行的线程数是否到达量核心线程数,如果达到量,就将新任务存放到队列中
	private static final long KEEP_ALIVE_TIME = 1L;	//当线程池中的线程数量大于核心线程池数量的时候,如果这时候没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,知道等待的时间超了设定的时间才会销毁
	
	/*
	 * 线程池中的饱和错略
	 * 1. ThreadPoolExecutor.AbortPolicy: 抛出RejectedExecutoionExpection来拒绝新任务的处理
	 * 2. ThreadPoolExecutor.CallerRunsPolicy: 调用执行自己的线程运行任务,也就是直接调用execute方法线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交的速度,
	 * 影响程序的整体性能。
	 * 3. ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃
	 * 4. ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求
	 * 5. 
	 */
	
	public static ThreadPoolExecutor createThreadPool() {
		ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
				new ArrayBlockingQueue<>(QUEUE_CAPACITY),new ThreadPoolExecutor.CallerRunsPolicy());
		return executor;
	}
}

上面的参数可以根据自己电脑的情况进行更改,我这里设置了10个核心线程

创建一个执行任务的类

本次测试是对一个整型集合进行加1操作,因此这个类主要就是负责给每一个整数加1的操作,用来模拟实际中的任务。这里实现了Callable接口,因为它是有返回值的,如果使用Runnable是没有返回值的。


import java.util.concurrent.Callable;

public class MyTask implements Callable<Integer> {
	
	private Integer num;
	
	
	public MyRunnable(Integer num) {
		this.num = num;
	}

	
	private Integer process() {
		try {
			Thread.sleep(200);	//模拟处理延时
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		return this.num+1;	//整数进行加1操作
	}

	@Override
	public Integer call() throws Exception {
		return process();
	}
	
	

}

编写测试用例

测试用例为了方便就写到了创建线程池的那个类里,在它里面添加一个主方法用来测试,下面是完成线程池类代码


import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* 创建线程池
* @author huhailong
*
*/
public class MyThreadPool {
private static final int CORE_POOL_SIZE = 10;	//核型线程数,最小可以同时运行的线程数量
private static final int MAX_POOL_SIZE = 10;	//最大线程数,当队列中存放的任务到达队列容量时,当前可以同时运行的线程数量变为最大线程数
private static final int QUEUE_CAPACITY = 100;	//当新任务来的时候会先判断当前运行的线程数是否到达量核心线程数,如果达到量,就将新任务存放到队列中
private static final long KEEP_ALIVE_TIME = 1L;	//当线程池中的线程数量大于核心线程池数量的时候,如果这时候没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,知道等待的时间超了设定的时间才会销毁
/*
* 线程池中的饱和错略
* 1. ThreadPoolExecutor.AbortPolicy: 抛出RejectedExecutoionExpection来拒绝新任务的处理
* 2. ThreadPoolExecutor.CallerRunsPolicy: 调用执行自己的线程运行任务,也就是直接调用execute方法线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交的速度,
* 影响程序的整体性能。
* 3. ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃
* 4. ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求
* 5. 
*/
public static ThreadPoolExecutor createThreadPool() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(QUEUE_CAPACITY),new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
public static void main(String[] args) throws InterruptedException, ExecutionException {
long start = System.currentTimeMillis();
ThreadPoolExecutor executor = createThreadPool();
List<Future<Integer>> list = new LinkedList<>();
List<Integer> tempList = new ArrayList<>();
//创建测试用例
for(int i=0; i<100; i++) {
tempList.add(100*(i+1));
}
for(int i=0; i<tempList.size(); i++) {
MyTask worker = new MyTask(tempList.get(i));	//执行任务
//下面两行代码是将执行的返回结果进行汇总
Future<Integer> submit = executor.submit(worker);
list.add(submit);
}
List<Integer> result = new LinkedList<>();
for(Future<Integer> f : list) {
//将汇总好的结果进行轮询,判断任务是否执行完成,确保每个任务执行完成后将结果添加到结果集中
while(true) {
if(f.isDone() && !f.isCancelled()) {
Integer object = f.get();
result.add(object);
break;
}
}
}
//		List<Integer> result = new LinkedList<>();
//		for(int i=0; i<tempList.size(); i++) {
//			Thread.sleep(200);
//			Integer integer = tempList.get(i);
//			result.add(integer+1);
//		}
System.out.println("最终汇总结果:");
System.out.println(result);
long end = System.currentTimeMillis();
System.out.println("work time:"+(end - start)+"ms");
//终止线程
executor.shutdown();
while(!executor.isTerminated()) {}
System.out.println("Finished all threads");
}
}

这里我们主要看一下主方法中的代码,代码中注释调的是使用for循环进行的测试对比,同样里面加了200毫秒的延时用来模拟处理耗时。

最终运行结果

使用for循环的结果

最终汇总结果:
[101, 201, 301, 401, 501, 601, 701, 801, 901, 1001, 1101, 1201, 1301, 1401, 1501, 1601, 1701, 1801, 1901, 2001, 2101, 2201, 2301, 2401, 2501, 2601, 2701, 2801, 2901, 3001, 3101, 3201, 3301, 3401, 3501, 3601, 3701, 3801, 3901, 4001, 4101, 4201, 4301, 4401, 4501, 4601, 4701, 4801, 4901, 5001, 5101, 5201, 5301, 5401, 5501, 5601, 5701, 5801, 5901, 6001, 6101, 6201, 6301, 6401, 6501, 6601, 6701, 6801, 6901, 7001, 7101, 7201, 7301, 7401, 7501, 7601, 7701, 7801, 7901, 8001, 8101, 8201, 8301, 8401, 8501, 8601, 8701, 8801, 8901, 9001, 9101, 9201, 9301, 9401, 9501, 9601, 9701, 9801, 9901, 10001]
work time:20381ms
Finished all threads

使用for循环的操作使用了20多秒

使用线程池汇总的结果

最终汇总结果:
[101, 201, 301, 401, 501, 601, 701, 801, 901, 1001, 1101, 1201, 1301, 1401, 1501, 1601, 1701, 1801, 1901, 2001, 2101, 2201, 2301, 2401, 2501, 2601, 2701, 2801, 2901, 3001, 3101, 3201, 3301, 3401, 3501, 3601, 3701, 3801, 3901, 4001, 4101, 4201, 4301, 4401, 4501, 4601, 4701, 4801, 4901, 5001, 5101, 5201, 5301, 5401, 5501, 5601, 5701, 5801, 5901, 6001, 6101, 6201, 6301, 6401, 6501, 6601, 6701, 6801, 6901, 7001, 7101, 7201, 7301, 7401, 7501, 7601, 7701, 7801, 7901, 8001, 8101, 8201, 8301, 8401, 8501, 8601, 8701, 8801, 8901, 9001, 9101, 9201, 9301, 9401, 9501, 9601, 9701, 9801, 9901, 10001]
work time:2050ms
Finished all threads

使用线程池汇总结果的时间是2秒左右,速度提升了10倍