带你玩转CompletableFuture异步编程
package com.zto.lbd;p>import org.slf4j.Logger;import org.slf4j.LoggerFactory;</p><p>import java.util.Date;import java.util.List;import java.util.Objects;import java.util.concurrent.*;import java.util.concurrent.atomic.AtomicInteger;</p><p>/** * 线程池监控类 * * @author wangtongzhou 18635604249 * @since 2022-02-23 07:27 */public class ThreadPoolMonitor extends ThreadPoolExecutor {</pp> private static final Logger LOGGER = LoggerFactory.getLogger(ThreadPoolMonitor.class);</p><p> /** * 保存任务开始执行的时间,当任务结束时,用任务结束时间减去开始时间计算任务执行时间 */ private ConcurrentHashMap<String, Date> startTimes;</p><p> /** * 线程池名称,一般以业务名称命名,方便区分 */ private String poolName;</p><p> /** * 调用父类的构造方法,并初始化HashMap和线程池名称 * * @param corePoolSize 线程池核心线程数 * @param maximumPoolSize 线程池最大线程数 * @param keepAliveTime 线程的最大空闲时间 * @param unit 空闲时间的单位 * @param workQueue 保存被提交任务的队列 * @param poolName 线程池名称 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable workQueue, String poolName) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), poolName); }/p><p> /** * 调用父类的构造方法,并初始化HashMap和线程池名称 * * @param corePoolSize 线程池核心线程数 * @param maximumPoolSize 线程池最大线程数 * @param keepAliveTime 线程的最大空闲时间 * @param unit 空闲时间的单位 * @param workQueue 保存被提交任务的队列 * @param threadFactory 线程工厂 * @param poolName 线程池名称 */ public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable workQueue, ThreadFactory threadFactory, String poolName) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); this.startTimes = new ConcurrentHashMap>(); this.poolName = poolName; }</p><p> /** * 线程池延迟关闭时(等待线程池里的任务都执行完毕),统计线程池情况 */ @Override public void shutdown() { // 统计已执行任务、正在执行任务、未执行任务数量 LOGGER.info("{} 关闭线程池, 已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); super.shutdown(); }</pp> /** * 线程池立即关闭时,统计线程池情况 */ @Override public List<Runnable shutdownNow() { // 统计已执行任务、正在执行任务、未执行任务数量 LOGGER.info("{} 立即关闭线程池,已执行任务: {}, 正在执行任务: {}, 未执行任务数量: {}", this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size()); return super.shutdownNow(); }/p><p> /** * 任务执行之前,记录任务开始时间 */ @Override protected void beforeExecute(Thread t, Runnable r) { startTimes.put(String.valueOf(r.hashCode()), new Date()); }</pp> /** * 任务执行之后,计算任务结束时间 */ @Override protected void afterExecute(Runnable r, Throwable t) { Date startDate = startTimes.remove(String.valueOf(r.hashCode())); Date finishDate = new Date(); long diff = finishDate.getTime() - startDate.getTime(); // 统计任务耗时、初始线程数、核心线程数、正在执行的任务数量、 // 已完成任务数量、任务总数、队列里缓存的任务数量、池中存在的最大线程数、 // 最大允许的线程数、线程空闲时间、线程池是否关闭、线程池是否终止 LOGGER.info("{}-pool-monitor: " + "任务耗时: {} ms, 初始线程数: {}, 核心线程数: {}, 正在执行的任务数量: {}, " + "已完成任务数量: {}, 任务总数: {}, 队列里任务数量: {}, 池中存在的最大线程数: {}, " + "最大线程数: {}, 线程空闲时间: {}, 线程池是否关闭: {}, 线程池是否终止: {}", this.poolName, diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(), this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(), this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated()); }</pp> /** * 生成线程池所用的线程,改写了线程池默认的线程工厂,传入线程池名称,便于问题追踪 */ static class MonitorThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix;</p><p> /** * 初始化线程工厂 * * @param poolName 线程池名称 */ MonitorThreadFactory(String poolName) { SecurityManager s = System.getSecurityManager(); group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = poolName + "-pool-" + poolNumber.getAndIncrement() + "-thread-"; }</pp> @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { t.setDaemon(false); } if (t.getPriority() != Thread.NORM_PRIORITY) { t.setPriority(Thread.NORM_PRIORITY); } return t; } }}</p