CollJobThread.java 7.39 KB
package com.sitech.schedule;

import java.text.ParseException;
import java.util.Date;
import java.util.List;
import java.util.Vector;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import com.sitech.base.AgentProperties;
import com.sitech.domain.MonitorBase;
import com.sitech.util.JSONUtil;
import com.sitech.util.RabbitmqUtil;
import org.apache.log4j.Logger;

import com.sitech.base.Config;
import com.sitech.database.dao.TbCfgEventDao;
import com.sitech.database.domain.TbCfgEvent;
import com.sitech.ismp.app.event.KPI2Event;
import com.sitech.ismp.coll.basic.TblATO_KPIDETAIL;
import com.sitech.ismp.messageObject.PerformanceObject;
import com.sitech.jmx.manage.MBeanManager;
import com.sitech.schedule.bean.Job;
import com.sitech.util.Formater;
import com.sitech.util.mq.TunnelFactory;

/**
 * 采集MBean的执行改为异步,quartz只负责发送通知
 *
 * @author LINXC
 */
public class CollJobThread implements Runnable {
	private Logger logger = Logger.getLogger("PF_TEST");

	private Job job;

	public CollJobThread(Job job) {
		this.job = job;
	}

	public void run() {
		long beginTime = System.currentTimeMillis();
		try {
			logger.info("-- Begin excute job, objectName["
					+ job.getObjectName() + "], methodName["
					+ job.getOperation() + "], params=" + JSONUtil.toJSON(job.getParams()));

			Vector<TblATO_KPIDETAIL> kpiList = excuteJob();

			long timeSpan = System.currentTimeMillis() - beginTime;
			logger.info("-- End excute job, used [" + timeSpan + "/1000]s, objectName[" + job.getObjectName()
					+ "], methodName[" + job.getOperation()
					+ "], params=" + JSONUtil.toJSON(job.getParams()));
			if (kpiList == null || kpiList.size() <= 0) {
				logger.info("-- Coll objectName[" + job.getObjectName() +
						"], methodName[" + job.getOperation()+"],result size : 0,no result!!!!" );
				return;
			}else{
				logger.info("-- Coll objectName[" + job.getObjectName() +
						"], methodName[" + job.getOperation()+"],result size : " + kpiList.size());
			}

			// 对采集结果进行预处理
			doPreProcess(kpiList);

			// 发送采集结果到Performance进行入库
			send2Performance(kpiList);

			// 告警分析,并将告警发送到Workstation
			doAlarmAnalysis(kpiList);

			// 判断是否发送给云平台的rabbitMQ
			if (AgentProperties.IS_SEND_TO_CLOUD_RABBITMQ) {
				send2CloudRabbitMQ(kpiList);
			}
		} catch (Exception e) {
			logger.error("Exception while excute job, objectName["
					+ job.getObjectName() + "], methodName["
					+ job.getOperation() + "], params=" + JSONUtil.toJSON(job.getParams()), e);
		}finally {
			long timeSpan = System.currentTimeMillis() - beginTime;
			logger.info("-- End excute job, used [" + timeSpan + "/1000]s, objectName[" + job.getObjectName()
					+ "], methodName[" + job.getOperation()
					+ "], params=" + JSONUtil.toJSON(job.getParams()));
		}

	}

	/***
	 * 发送给云平台的rabbitMQ
	 * @param kpiList
	 */
	private void send2CloudRabbitMQ(Vector<TblATO_KPIDETAIL> kpiList) {
		logger.info("Send to Cloud RabbitMQ[" + new Date() + "], kpiList's size() is: " + kpiList.size());
		for (TblATO_KPIDETAIL kpiDetail : kpiList) {
			MonitorBase base = new MonitorBase();
			base.setBeginTime(new Date());
			base.setCollTime(new Date());
			base.setEndTime(new Date());
			base.setEntityId(kpiDetail.UNIT_ID);
			base.setKpiIds(new String[]{kpiDetail.KPI_ID});
			base.setKpiValues(new String[]{kpiDetail.KPI_VALUE});
			base.setType(MonitorBase.VEntity.HOST);
			logger.info("Send to Cloud [" + kpiDetail.UNIT_ID + ", " + kpiDetail.KPI_ID + ", " + kpiDetail.KPI_VALUE + "] " + base.toString());
			try {
				RabbitmqUtil.sentMessage(base);
			} catch (Exception e) {
				logger.error("please check your Rabbit MQ config", e);
			}
		}
	}

	/**
	 * 告警分析,并将告警发送到Workstation
	 *
	 * @param kpiList
	 */
	private void doAlarmAnalysis(Vector<TblATO_KPIDETAIL> kpiList) {
		if (null == kpiList || kpiList.isEmpty()) {
			return;
		}

		// 得到与当前unit_id相关的所有告警配置
		List<TbCfgEvent> eventConfigList = null;
		try {
			TbCfgEventDao eventConfigdao = new TbCfgEventDao();
			String unitId = kpiList.get(0).UNIT_ID;
			eventConfigList = eventConfigdao.queryEventCfgByUnitId(unitId);

		} catch (Exception e) {
			logger.error("Exception while doAlarmAnalysis: ", e);
			return;
		}

		if (eventConfigList != null && eventConfigList.size() > 0) {
			KPI2Event kpi2event = new KPI2Event();
			for (TblATO_KPIDETAIL kpidetail : kpiList) {
				try {
					kpidetail.setCLL_TIME_STR(kpidetail.CLL_TIME);

					kpi2event.generation(kpidetail.UNIT_ID, kpidetail.KPI_ID,
							kpidetail.CLL_TIME, kpidetail.KPI_VALUE,
							eventConfigList);
				} catch (Exception e) {
					logger.error("Exception while kpi2event.generation("
							+ kpidetail.UNIT_ID + "," + kpidetail.KPI_ID + ","
							+ kpidetail.CLL_TIME + "," + kpidetail.KPI_VALUE
							+ ")", e);
				}
			}
		}
	}

	/**
	 * 发送采集结果到Performance进行入库
	 *
	 * @param kpiList
	 */
	private void send2Performance(Vector<TblATO_KPIDETAIL> kpiList) {
		for (TblATO_KPIDETAIL kpiDetail : kpiList) {
			kpiDetail.display();

			PerformanceObject pmObj = new PerformanceObject();
			pmObj.setUnitId(kpiDetail.UNIT_ID);
			pmObj.setKpiId(kpiDetail.KPI_ID);
			pmObj.setKpiValue(kpiDetail.KPI_VALUE);
			pmObj.setCllTime(kpiDetail.CLL_TIME);
			pmObj.setCllTimeStr(Formater.datetimeToString(kpiDetail.CLL_TIME));
			pmObj.setInterval(job.getInterval());
			pmObj.setExtUnitId(getExtUnitId(kpiDetail.UNIT_ID));

			TunnelFactory.getTunnel(Config.Q_PERFORMANCE).writeData(pmObj);
		}
	}

	/**
	 * 对采集结果进行预处理
	 *
	 * @param kpiList
	 */
	private void doPreProcess(Vector<TblATO_KPIDETAIL> kpiList) {
		for (int i = kpiList.size() - 1; i >= 0; i--) {
			TblATO_KPIDETAIL kpi = kpiList.get(i);
			if (kpi.KPI_VALUE == null || kpi.KPI_VALUE.trim().equals("")) {
				// 采集值为空的过滤掉
				kpiList.remove(i);
			}
		}
	}

	/**
	 * 执行采集任务
	 */
	@SuppressWarnings("unchecked")
	private Vector<TblATO_KPIDETAIL> excuteJob() throws Exception {

		Vector<TblATO_KPIDETAIL> kpiList = null;
		try {
			// 采集参数
			Object[] params = new Object[] { job.getParams() };
			// MBean实例名
			ObjectName jmxobjectname = new ObjectName(job.getObjectName());

			String[] signature = new String[] { "java.util.HashMap" };

			MBeanServer mbs = MBeanManager.getInstance().getMBeanServer();

			kpiList = (Vector<TblATO_KPIDETAIL>) mbs.invoke(jmxobjectname, job
					.getOperation(), params, signature);
		} catch (Exception e) {
			logger.error("Exception while excute job, objectName["
					+ job.getObjectName() + "], methodName["
					+ job.getOperation() + "], collInterval["
					+ job.getInterval() + "].", e);
		}

		return kpiList;
	}

	/**
	 * 获得扩展UNIT_ID,如:10-10-20:deviceId
	 */
	private String getExtUnitId(String unitId) {
		if(null == unitId || "".equals(unitId)){
			return "";
		}

		String extUnitId = "";
		if (unitId.startsWith("10")) {
			// 平台类EXT_UNIT_ID为:10-XX-YY:DEVICE_ID
			String[] elem = unitId.split(":");
			if (elem.length >= 2) {
				String[] kbp = elem[0].split("-");
				if (kbp.length >= 3) {
					extUnitId = kbp[0] + "-" + kbp[1] + "-" + kbp[2];
				} else {
					extUnitId = elem[0];
				}

				String deviceId = elem[1].split("-")[0];

				extUnitId += ":" + deviceId;
			} else {
				extUnitId = elem[0];
			}
		} else {
			// 业务类的EXT_UNIT_ID与UNIT_ID相等
			extUnitId = unitId.split(":")[0];
		}
		return extUnitId;
	}
}