CollScheduleManager.java 9.07 KB
package com.sitech.jmx.manage;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import javax.management.MBeanServer;

import com.sitech.util.Formater;
import com.sitech.util.JSONUtil;
import org.apache.log4j.Logger;
import org.quartz.CronTrigger;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SimpleTrigger;
import org.quartz.TriggerUtils;
import org.quartz.impl.StdSchedulerFactory;

import com.sitech.ismp.coll.busi.LogCollThread;
import com.sitech.ismp.coll.busi.util.LogCollManager;
import com.sitech.ismp.messageObject.AgentSyncObject;
import com.sitech.schedule.SchedulerService;

/**
 * 任务调度管理
 * (对业务采集-日志采集单独处理)
 *
 * @author   linxc
 * @version  
 * @since    Ver 6.0
 * @Date	 2012	May 21, 2012		1:43:57 PM
 */
public class CollScheduleManager {
	public static String GroupName = "Group";
	private static Logger logger = Logger.getLogger("SCHEDULE");

	private Scheduler sched = null;	
	private MBeanServer mbs = null;
	
	private static CollScheduleManager scheduleManager;
	
	private static Map<String, ScheduleDetail> scheduleDetailList = new HashMap<String, ScheduleDetail>();

	private CollScheduleManager() throws Exception {
		sched = new StdSchedulerFactory().getScheduler();
	}

    public boolean scheduleExist(String scheduleId){
        boolean flag = false;
        ScheduleDetail detail = scheduleDetailList.get(scheduleId);
        String json = JSONUtil.toJSON(scheduleDetailList);
        logger.info("#########################################################");
        logger.info(json);
        logger.info("#########################################################");
        if(null!=detail){
            flag = true;
        }
        return flag;
    }
	
	public static CollScheduleManager getInstance() {
		if (scheduleManager == null) {
			try {
				scheduleManager = new CollScheduleManager();
			} catch (Exception e) {
				logger.error("Exception while init coll schedule.", e);
				return null;
			}
		}
		return scheduleManager;
	}
	
	public void startSchedule() {
		try {
			sched.start();
			SchedulerService.getInstance().start();
		} catch (SchedulerException e) {
			logger.error("Exception while start schedule", e);
		}
	}
	
	/**
	 * 增加采集调度
	 * 6.1版本之前使用该方法
	 */
	public void addCollSchedule(AgentSyncObject msg) throws Exception {
		String scheduleId = msg.getScheduleId();
		String objectName = msg.getObjectName();
		String methodName = msg.getFunction();
		scheduleDetailList.put(scheduleId, new ScheduleDetail(msg));
		
		// 若为日志采集,单独处理
		if ("CollByLog:type=MBean".equals(objectName)) {
			LogCollThread thread = new LogCollThread(msg.getParams());
			LogCollManager.add(scheduleId, thread);
			return;
		}
		
		JobDetail job = new JobDetail(scheduleId, GroupName, LaunchJob.class);
		JobDataMap jobDataMap = job.getJobDataMap();
		jobDataMap.put(LaunchJob.SCHEDULE_ID, scheduleId);
		jobDataMap.put(LaunchJob.OBJECTNAME, objectName);
		jobDataMap.put(LaunchJob.OPERATIONNAME, methodName);
		jobDataMap.put(LaunchJob.PARAMS, msg.getParams());
		jobDataMap.put(LaunchJob.EXEOBJECT, "MBEAN");

		logger.info("Loading schedule: [" + scheduleId + "\t" + objectName + "\t" + methodName + "]");

		String crontab = msg.getCrontab();
		if (crontab != null && !crontab.trim().equals("")
				&& !crontab.trim().equals("null")) {
			CronTrigger trigger = new CronTrigger(scheduleId,
					GroupName, scheduleId, GroupName, crontab);
			sched.addJob(job, true);
			sched.scheduleJob(trigger);
			return;
		}
		
		Date beginTime = msg.getBeginTime();
		Date endTime = msg.getEndTime();
		if(beginTime == null){
			beginTime = new Date(TriggerUtils.getNextGivenSecondDate(null, 15).getTime());
			endTime = null;
		}else{
			beginTime = TriggerUtils.getEvenMinuteDate(beginTime);
		}
		
		int repeatCount = msg.getRepeatCount();
		if (repeatCount < 1) {
			repeatCount = SimpleTrigger.REPEAT_INDEFINITELY;
		}
		
		long interval = msg.getInterval();

		SimpleTrigger trigger = new SimpleTrigger(scheduleId, GroupName,
				beginTime, endTime, repeatCount, interval);
		sched.scheduleJob(job, trigger);
	}
	
	/**
	 * 更新采集调度
	 */
	public void updateCollSchedule(AgentSyncObject msg) throws Exception {
		deleteCollSchedule(msg.getScheduleId());
		
		Thread.sleep(5 * 1000L);
		
		addCollSchedule(msg);
	}
	
	/**
	 * 删除采集调度
	 */
	public void deleteCollSchedule(String scheduleId) throws Exception {
		scheduleDetailList.remove(scheduleId);
		logger.info("Unloading schedule: [" + scheduleId + "]");
		if (LogCollManager.contains(scheduleId)) {
			// 若为日志采集,单独处理
			LogCollManager.remove(scheduleId);
		} else {
			sched.deleteJob(scheduleId, GroupName);
		}
	}

	public MBeanServer getMBeanServer() {
		return mbs;
	}

	public void setMBeanServer(MBeanServer mbs) {
		this.mbs = mbs;
	}
	
	public void print(){
		synchronized (scheduleDetailList) {
			String str = "\n";
			str += "MBEAN_ID\tOBJECT_NAME\tCLASS_NAME\tSCHEDULE_ID\tFUNCTION\tINTERVAL\tCRONTAB\n";
			str += "===========================================================================\n";
			for(String scheduleId : scheduleDetailList.keySet()){
				str += scheduleDetailList.get(scheduleId).toString();
			}
			logger.info(str);
		}
	}
	
	
	/**
	 * 增加采集调度
	 * 7.0版本之后使用该方法
	 */
	public Map<String,Object> addCollSchedule(com.sitech.schedule.Scheduler scheduler, AgentSyncObject obj) throws Exception {
		String scheduleId = scheduler.getScheduleId();
		String objectName = scheduler.getObjectName();
		String methodName = scheduler.getFunction();

        Map<String,Object> rtnDatas = new HashMap<String, Object>();

        if(null!=scheduleDetailList.get(scheduleId)){
            throw new RuntimeException("ScheduleExist");
        }

        //放入任务信息
        scheduleDetailList.put(scheduleId, new ScheduleDetail(obj));

        // 若为日志采集,单独处理
		if ("CollByLog:type=MBean".equals(objectName)) {
			LogCollThread thread = new LogCollThread(scheduler.getParams());
			LogCollManager.add(scheduleId, thread);
			return rtnDatas;
		}
		
		JobDetail job = new JobDetail(scheduleId, GroupName, LaunchJob.class);
		JobDataMap jobDataMap = job.getJobDataMap();
		jobDataMap.put(LaunchJob.SCHEDULE_ID, scheduleId);
		jobDataMap.put(LaunchJob.OBJECTNAME, objectName);
		jobDataMap.put(LaunchJob.OPERATIONNAME, methodName);
		jobDataMap.put(LaunchJob.PARAMS, scheduler.getParams());
		jobDataMap.put(LaunchJob.EXEOBJECT, "MBEAN");

		logger.info("Loading schedule: [" + scheduleId + "\t" + objectName + "\t" + methodName + "]");

		String crontab = scheduler.getCrontab();
		if (crontab != null && !crontab.trim().equals("")
				&& !crontab.trim().equals("null")) {
			CronTrigger trigger = new CronTrigger(scheduleId,
					GroupName, scheduleId, GroupName, crontab);

			sched.addJob(job, true);
			sched.scheduleJob(trigger);

            Date nextDate = sched.getTrigger(scheduleId,GroupName).getNextFireTime();
            String nextFireTime = Formater.datetimeToString(nextDate,"yyyy-MM-dd HH:mm:ss");
            logger.info("This Schedule's NEXT_FIRE_TIME is : "+nextFireTime);
            rtnDatas.put("NEXT_FIRE_TIME",nextFireTime);

			return rtnDatas;
		}
		
		Date beginTime = scheduler.getBeginTime();
		Date endTime = scheduler.getEndTime();
		if(beginTime == null){
			beginTime = new Date(TriggerUtils.getNextGivenSecondDate(null, 15).getTime());
			endTime = null;
		}else{
			beginTime = TriggerUtils.getEvenMinuteDate(beginTime);
		}
		
		int repeatCount = scheduler.getRepeatCount();
		if (repeatCount < 1) {
			repeatCount = SimpleTrigger.REPEAT_INDEFINITELY;
		}
		
		long interval = scheduler.getInterval();

		SimpleTrigger trigger = new SimpleTrigger(scheduleId, GroupName,
				beginTime, endTime, repeatCount, interval);
		sched.scheduleJob(job, trigger);
        return rtnDatas;
	}
	
	
	/**
	 * 更新采集调度
	 * 7.0版本之后使用该方法
	 */
	public Map<String,Object> updateCollSchedule(com.sitech.schedule.Scheduler scheduler, AgentSyncObject obj) throws Exception {
        Map<String,Object> scheDatas = null;
		deleteCollSchedule(scheduler.getScheduleId());
		
		Thread.sleep(5 * 1000L);

        scheDatas = addCollSchedule(scheduler, obj);
        return scheDatas;
	}
}

class ScheduleDetail {
	private String mbeanId;

	private String objectName;

	private String className;

	private String scheduleId;

	private String function;

	private long interval;

	private String crontab;

	private HashMap<String, String> params;

	public ScheduleDetail(AgentSyncObject msg) {
		mbeanId = msg.getMbeanId();
		objectName = msg.getObjectName();
		className = msg.getClassName();
		scheduleId = msg.getScheduleId();
		function = msg.getFunction();
		interval = msg.getInterval();
		crontab = msg.getCrontab();
		params = msg.getParams();
	}

	public String toString() {
		String str = mbeanId + "\t" + objectName + "\t" + className + "\t"
				+ scheduleId + "\t" + function + "\t" + interval + "\t"
				+ crontab + "\n";
		for (String key : params.keySet()) {
			str += key + "\t" + params.get(key) + "\n";
		}
		return str;
	}
}