`

自定义异步线程池,实现异步请求以及监控当前线程运行情况

阅读更多

由于工作的需要,写了一份异步调用的小框架,分享出来。。。

 

启动类:

 

/**
 * 线程启动
 * @author yfguopeng
 */
public class ThreadExecutorListener  implements ServletContextListener{
	private final static Log log = LogFactory.getLog(ThreadExecutorListener.class);

	@SuppressWarnings("unchecked")
	public void contextInitialized(ServletContextEvent sce) {
		ServletContext servletContext = sce.getServletContext();
		WebApplicationContext wac = WebApplicationContextUtils.getRequiredWebApplicationContext(servletContext);
		List<ThreadConfigBean> worders = (List<ThreadConfigBean>) wac.getBean("workers"); 
		log.info("=====================初始化线程池========================");
		//创建线程组
		SecurityManager s = System.getSecurityManager();
		ThreadGroup  father = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
		ThreadGroup group = new ThreadGroup(father, "root-threadgroup");
		
		for (ThreadConfigBean configBean : worders) {
			//设置排队队列大小
			ArrayBlockingQueue<Runnable> taskQueue = new ArrayBlockingQueue<Runnable>(configBean.getQueueCapacity());
			//设置线程工厂
			ThreadFactory threadFactory = new DecorateThreadFactory(new ThreadGroup(group,configBean.getBusinessId()),configBean.getBusinessId());
			
			ThreadPoolExecutor worker = new ThreadPoolExecutor(configBean.getBusinessId(),configBean.getMin(), configBean.getMax(), configBean.getKeepAliveTime(),
                    TimeUnit.SECONDS, taskQueue, threadFactory, configBean.getRejectHandler());
			
			ThreadGroupUtil.addThreadWorker(configBean.getBusinessId(), worker);
		}
		
		log.info("=====================线程池初始化完毕========================");
		log.info("=====================初始化监控线程========================");
		ThreadGroupUtil.monitorThreadStart(group,2000l);
		log.info("=====================监控线程初始化完毕========================");
	}

	public void contextDestroyed(ServletContextEvent sce) {
	}

	
}

 

 

业务监控配置:

 

<bean id="xxxIndex" class="xxx.xxx.xxx.xxx.web.thread.ThreadConfigBean">
    	<property name="businessId" value="xxxIndex"></property><!-- 业务ID,唯一 -->
    	<property name="max" value="40"></property><!-- 最好为请求线程的倍数 -->
    	<property name="min" value="10"></property><!-- 最好为请求线程的倍数 -->
    	<property name="queueCapacity" value="80"></property><!-- 最好为请求线程的倍数 -->
    	<property name="keepAliveTime" value="600"></property><!-- 线程空闲保存时间 -->
    	<property name="rejectHandler" ><!--任务拒绝处理策略 -->
	    	<bean class="com.jd.m.pay.web.thread.RejectedPolicyHandler" >
	    		<property name="bizName" value="pay-index"></property><!-- 业务ID,唯一 -->
	    	</bean>
    	</property>
    </bean>

	<bean id="workers" class="java.util.ArrayList">
		<constructor-arg >
			<list> 
				<ref bean="xxxIndex"/>
			</list>
		</constructor-arg>
	</bean>

 线程工厂:

 

 

/**
 *  线程工厂, 加入了线程名的业务描述
 * 
 * @User: guopeng
 * @Date: 2013-02-28
 */
public class DecorateThreadFactory implements ThreadFactory {
    static final AtomicInteger poolNumber = new AtomicInteger(1);
    final ThreadGroup group;
    final AtomicInteger threadNumber = new AtomicInteger(1);
    final String namePrefix;

    public DecorateThreadFactory(final ThreadGroup group,final String bizName) {
        this.group = group;
        namePrefix = bizName + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
    }

    public Thread newThread(Runnable r) {
        Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
        if (t.isDaemon())
            t.setDaemon(false);
        if (t.getPriority() != Thread.NORM_PRIORITY)
            t.setPriority(Thread.NORM_PRIORITY);
        return t;
    }
}

 监控线程:

/**
 * 监控业务线程池运行情况
 * @author yfguopeng
 * @Date 2013-02-28
 */
public class MonitorThread implements Runnable {
	private final static Log log = LogFactory.getLog(MonitorThread.class);
	private final ThreadGroup group;
	
	public MonitorThread(ThreadGroup group) {
		this.group = group;
	}
	
	public void run() {
		Map<String, ThreadPoolExecutor>workers =  ThreadGroupUtil.getThreadWorkers();
		Iterator<String> iterator = workers.keySet().iterator();
		
		log.info("total threadpools:[ "+workers.size()+" ],total threads:[ "+group.activeCount()+" ]");
		while(iterator.hasNext()) {
			ThreadPoolExecutor worker = ThreadGroupUtil.getThreadWorker(iterator.next());
			
			RejectedExecutionHandler handler = worker.getRejectedExecutionHandler();
			String rejectedSize = "";
			if (RejectedPolicyHandlerInteface.class.isAssignableFrom(handler.getClass())) {
				rejectedSize = " ],rejected threads:[ "+((RejectedPolicyHandlerInteface) handler).getRejectedSize();
			}
			
			log.info("business name:[ "+worker.getBizName()+" ]" +
					", core threads:[ "+worker.getCorePoolSize()+" ], max threads:[ "+worker.getMaximumPoolSize()+" ]" +
							", queue capacitys:[ "+worker.getQueue().size()+" ], running threads:[ "+worker.getActiveCount()+"] " +
									", reject threads:[ "+rejectedSize+" ],  largest threads:[ "+worker.getLargestPoolSize()+" ], complete threads:[ "+worker.getCompletedTaskCount()+" ]");
		}
	}

}

 线程拒绝处理器:

/**
 * 线程拒绝执行控制球
 * @author yfguopeng
 * @Date 2013-02-28
 */
public class RejectedPolicyHandler extends ThreadPoolExecutor.AbortPolicy implements RejectedPolicyHandlerInteface{
	private final static Log log = LogFactory.getLog(RejectedPolicyHandler.class);
	private static AtomicLong totals = new AtomicLong(0l);
	
	private String bizName;
	
	public RejectedPolicyHandler(){}
	
	@Override
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		String tip = "["+bizName+"] 线程忙,请求被拒绝.max: "+executor.getMaximumPoolSize()+", queue: "+executor.getQueue().size();
		log.info(tip);
		//业务报警 TODO
		totals.addAndGet(1);
		super.rejectedExecution(r, executor);
	}

	public String getBizName() {
		return bizName;
	}

	public void setBizName(String bizName) {
		this.bizName = bizName;
	}
	
	public long getRejectedSize() {
		return totals.get();
	}
}

 

import java.util.concurrent.RejectedExecutionHandler;

public interface RejectedPolicyHandlerInteface extends RejectedExecutionHandler{
	public long getRejectedSize() ;
}

 线程配置bean:

@SuppressWarnings("serial")
public class ThreadConfigBean implements Serializable{
	/**
	 * 业务ID
	 */
	private String businessId;
	/**
	 * 任务队列最大线程数
	 * 默认:80
	 */
	private Integer max = 160;
	/**
	 * 任务队列最小线程数
	 * 默认:40
	 */
	private Integer min = 80;
	/**
	 * 等待队列请求数
	 * 默认:300
	 */
	private Integer queueCapacity = 300;
	/**
	 * 空闲线程存活时间
	 * 默认:3分钟
	 */
	private Long keepAliveTime = 3 * 60l;
	/**
	 * 线程拒绝策略
	 */
	private RejectedExecutionHandler rejectHandler = new ThreadPoolExecutor.AbortPolicy();
	
	public ThreadConfigBean() {
		super();
	}
	
	public Integer getMax() {
		return max;
	}
	public void setMax(Integer max) {
		this.max = max;
	}
	public Integer getMin() {
		return min;
	}
	public void setMin(Integer min) {
		this.min = min;
	}
	public Integer getQueueCapacity() {
		return queueCapacity;
	}
	public void setQueueCapacity(Integer queueCapacity) {
		this.queueCapacity = queueCapacity;
	}
	public Long getKeepAliveTime() {
		return keepAliveTime;
	}
	public void setKeepAliveTime(Long keepAliveTime) {
		this.keepAliveTime = keepAliveTime;
	}
	public RejectedExecutionHandler getRejectHandler() {
		return rejectHandler;
	}
	public void setRejectHandler(RejectedExecutionHandler rejectHandler) {
		this.rejectHandler = rejectHandler;
	}
	public String getBusinessId() {
		return businessId;
	}
	public void setBusinessId(String businessId) {
		this.businessId = businessId;
	}
}

 线程组:

/**
 * 各个业务获取响应线程池
 * @author yfguopeng
 */
public class ThreadGroupUtil {
	private static Map<String, ThreadPoolExecutor> threadworkers;
	private static ScheduledExecutorService monitorThread;//监视线程
	private static final long delay = 200l;
	private static long cycle_default = 5000l;
	
	static {
		threadworkers = new ConcurrentHashMap<String, ThreadPoolExecutor>();
		monitorThread = Executors.newScheduledThreadPool(1);
	}
	
	public static void addThreadWorker(String bizName,ThreadPoolExecutor executor){
		threadworkers.put(bizName, executor);
	}
	
	public static ThreadPoolExecutor getThreadWorker(String bizName) {
		return threadworkers.get(bizName);
	}
	
	public static Map<String, ThreadPoolExecutor> getThreadWorkers(){
		return threadworkers;
	}

	public static ScheduledExecutorService getMonitorThread() {
		return monitorThread;
	}

	public static void setMonitorThread(ScheduledExecutorService monitorThread) {
		ThreadGroupUtil.monitorThread = monitorThread;
	}
	
	public static void monitorThreadClosed(){
		if (monitorThread != null)
			if (!monitorThread.isTerminated()) 
				monitorThread.shutdown();
	}
	
	public static void monitorThreadStart(ThreadGroup group,Long cycle){
		MonitorThread monitor = new MonitorThread(group);
		if (cycle > 0l) {
			try {
				cycle_default = cycle;
			} catch (Exception e) {
			}
		}
		monitorThread.scheduleWithFixedDelay(monitor, delay, cycle_default, TimeUnit.MILLISECONDS);
	}
}

 线程池实现类:

/**
 * 线程池
 * @author yfguopeng
 *
 */
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {

	private String bizName;
	
	public ThreadPoolExecutor(String bizName,int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit,
			BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory,
			RejectedExecutionHandler handler) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
				threadFactory, handler);
		this.bizName = bizName;
	}

	public String getBizName() {
		return bizName;
	}

	public void setBizName(String bizName) {
		this.bizName = bizName;
	}
	
}

 

web.xml配置:

  <listener>
		<listener-class>xxx.xx.xx.xxx.web.thread.ThreadExecutorListener</listener-class>
	</listener> 

调用:

		ThreadPoolExecutor exc = ThreadGroupUtil.getThreadWorker("xxxIndex");
		
		String payOrgInfo = null;
		String cards = null;
		Future<String> xxxFuture = null;
		Future<String> yyyFuture = null;
		long start = System.currentTimeMillis();
		xxxTask xxxTask = new xxxTask(//参数);
		yyyTask yyyTask = new yyyTask(//参数);
		System.out.println("开始......");
		xxxFuture = exc.submit(xxxTask );
		yyyFuture = exc.submit(yyyTask );
		
		try {
			xxx= xxxFuture .get();
			yyy= yyyFuture .get();
			System.out.println(xxx);
			System.out.println(yyy);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (ExecutionException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		long end = System.currentTimeMillis();
		System.out.println("结束......   "+(end-start));
		
		return "";
	

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics