CollJobThreadTest.java 11.8 KB
package com.sitech.schedule;

import java.util.*;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import com.sitech.base.Config;
import com.sitech.domain.MonitorBase;
import com.sitech.ismp.coll.centercoll.CollLinuxHost;
//import com.sitech.kafka.KafkaConfig;
//import com.sitech.kafka.KafkaProperties;
import com.sitech.schedule.bean.Job;
import com.sitech.util.*;
import com.sitech.util.mq.TunnelFactory;
import kafka.producer.KeyedMessage;
import org.apache.log4j.Logger;

import com.sitech.database.dao.TbCfgEventDao;
import com.sitech.database.domain.TbCfgEvent;
import com.sitech.ismp.app.event.KPI2Event;
import com.sitech.ismp.coll.basic.TblATO_KPIDETAIL;
import com.sitech.ismp.messageObject.PerformanceObject;
import com.sitech.jmx.manage.MBeanManager;

/**
 * 采集MBean的执行改为异步,quartz只负责发送通知
 *
 * @author LINXC
 */
public class CollJobThreadTest implements Runnable {
	private Logger logger = Logger.getLogger("PF_TEST");

	private Job job;
	//private static KafkaConfig kafkaConfig = new KafkaConfig();
	//private static kafka.javaapi.producer.Producer<Integer, String> producer;
	//private static Properties props;

	public CollJobThreadTest(Job job) {
		this.job = job;
	}

	public  void run() {
        long beginTime = System.currentTimeMillis();
		try {
//            logger.info("-- Begin excute job, objectName["
//                    + job.getObjectName() + "], methodName["
//                    + job.getOperation() + "], params=" + JSONUtil.toJSON(job.getParams()));

//			Vector<TblATO_KPIDETAIL> kpiList = excuteJob();
			System.out
					.println("***************Linux Center Collect Test Begin*********************");
//		String ipAddr = "10.191.143.36";
//		String username = "cloud";
//		String password = "cloud!@1234";
//		String protocal = "ssh";
//		String hostName = "cloud-test-web";
			String ipAddr = "172.21.0.31";
			String username = "bnms";
			String password = "bnms";
			String protocal = "ssh";
			String hostName = "172_21_0_31";
			String processName = "unreachable poller #1 ,poller #17 ";
			String listeningPort = "22,48781";
			String correctListening = "1,1";
			String pointIPs = "172.21.0.31,172.21.0.252";
			String methodStr = "2";
//		String ipAddr = read("IP_ADDR(remote host ip):\n");
//		String username = read("USERNAME:\n");
//		String password = read("PASSWORD:\n");
//		String protocal = read("PROTOCAL(ssh/telent):\n");
//		String hostName = read("HOST_NAME(remote host name):\n");
//        String processName = read("PROCESS_NAME:\n");
//        String listeningPort = read("LISTENING_PORT:\n");
//        String correctListening = read("CORRECT_LISTENING:\n");
//		String methodStr = read("METHOD(0:getAll, 1:getConfig, 2:getCpu, 3:getMemory, 4:getDisk, " +
//                "5:getFileSystem, 6:getPortListenings, 7:getBusiProcess, 8:getProcessTop5Mem, 9:getProcessTop5Cpu):\n");

			System.out.println("IP_ADDR=" + ipAddr + ", USERNAME=" + username
					+ ", PASSWORD=" + password + ", PROTOTAL=" + protocal
					+ ", HOST_NAME=" + hostName + ", METHOD=" + methodStr);

			HashMap<String, String> params = new HashMap<String, String>();
			params.put("IP_ADDR", ipAddr);
			params.put("USERNAME", username);
			params.put("PASSWORD", DES3.encrypt(password));
			params.put("PROTOCOL", protocal);
			params.put("HOSTNAME", hostName);
			params.put("PROCESS_NAME", processName);
			params.put("LISTENING_PORT", listeningPort);
			params.put("CORRECT_LISTENING", correctListening);
			params.put("POINT_IPS", pointIPs);

			int method = Integer.parseInt(methodStr);
			Vector<TblATO_KPIDETAIL> result = new Vector<TblATO_KPIDETAIL>();

			CollLinuxHost collector = new CollLinuxHost();

			switch (method) {
				case 0:
					result.addAll(collector.getConfig(params));
					result.addAll(collector.getCpu(params));
					result.addAll(collector.getMemory(params));
					result.addAll(collector.getDisk(params));
					result.addAll(collector.getFileSystem(params));
					result.addAll(collector.getPortListenings(params));
					result.addAll(collector.getBusiProcess(params));
					result.addAll(collector.getProcessTop5Mem(params));
					result.addAll(collector.getProcessTop5Cpu(params));
					break;
				case 1:
					result.addAll(collector.getConfig(params));
					break;
				case 2:
					result.addAll(collector.getCpu(params));
					break;
				case 3:
					result.addAll(collector.getMemory(params));
					break;
				case 4:
					result.addAll(collector.getDisk(params));
					break;
				case 5:
					result.addAll(collector.getFileSystem(params));
					break;
				case 6:
					result.addAll(collector.getPortListenings(params));
					break;
				case 7:
					result.addAll(collector.getBusiProcess(params));
					break;
				case 8:
					result.addAll(collector.getProcessTop5Mem(params));
					break;
				case 9:
					result.addAll(collector.getProcessTop5Cpu(params));
					break;
				default:
					break;
			}
//            long timeSpan = System.currentTimeMillis() - beginTime;
//            logger.info("-- End excute job, used [" + timeSpan + "/1000]s, objectName[" + job.getObjectName()
//                    + "], methodName[" + job.getOperation()
//                    + "], params=" + JSONUtil.toJSON(job.getParams()));
//			if (kpiList == null || kpiList.size() <= 0) {
//				return;
//			}

			// 对采集结果进行预处理
//			doPreProcess(kpiList);

			// 发送采集结果到Performance进行入库
			send2Performance(result);

			// 告警分析,并将告警发送到Workstation
//			doAlarmAnalysis(kpiList);

            // 判断是否发送给云平台的rabbitMQ
//            if (AgentProperties.IS_SEND_TO_CLOUD_RABBITMQ) {
//                send2CloudRabbitMQ(kpiList);
//            }
		} catch (Exception e) {
			logger.error("Exception while excute job, objectName["
					+ job.getObjectName() + "], methodName["
					+ job.getOperation() + "], params=" + JSONUtil.toJSON(job.getParams()), e);
		}finally {
            long timeSpan = System.currentTimeMillis() - beginTime;
            logger.info("-- End excute job, used [" + timeSpan + "/1000]s, objectName[" + job.getObjectName()
                    + "], methodName[" + job.getOperation()
                    + "], params=" + JSONUtil.toJSON(job.getParams()));
        }

	}

    /***
     * 发送给云平台的rabbitMQ
     * @param kpiList
     */
    private void send2CloudRabbitMQ(Vector<TblATO_KPIDETAIL> kpiList) {
        logger.info("Send to Cloud RabbitMQ[" + new Date() + "], kpiList's size() is: " + kpiList.size());
        for (TblATO_KPIDETAIL kpiDetail : kpiList) {
            MonitorBase base = new MonitorBase();
            base.setBeginTime(new Date());
            base.setCollTime(new Date());
            base.setEndTime(new Date());
            base.setEntityId(kpiDetail.UNIT_ID);
            base.setKpiIds(new String[]{kpiDetail.KPI_ID});
            base.setKpiValues(new String[]{kpiDetail.KPI_VALUE});
            base.setType(MonitorBase.VEntity.HOST);
            logger.info("Send to Cloud [" + kpiDetail.UNIT_ID + ", " + kpiDetail.KPI_ID + ", " + kpiDetail.KPI_VALUE + "] " + base.toString());
            try {
                RabbitmqUtil.sentMessage(base);
            } catch (Exception e) {
                logger.error("please check your Rabbit MQ config", e);
            }
        }
    }

    /**
	 * 告警分析,并将告警发送到Workstation
	 *
	 * @param kpiList
	 */
	private void doAlarmAnalysis(Vector<TblATO_KPIDETAIL> kpiList) {
		if (null == kpiList || kpiList.isEmpty()) {
			return;
		}

		// 得到与当前unit_id相关的所有告警配置
		List<TbCfgEvent> eventConfigList = null;
		try {
			TbCfgEventDao eventConfigdao = new TbCfgEventDao();
			String unitId = kpiList.get(0).UNIT_ID;
			eventConfigList = eventConfigdao.queryEventCfgByUnitId(unitId);

		} catch (Exception e) {
			logger.error("Exception while doAlarmAnalysis: ", e);
			return;
		}

		if (eventConfigList != null && eventConfigList.size() > 0) {
			KPI2Event kpi2event = new KPI2Event();
			for (TblATO_KPIDETAIL kpidetail : kpiList) {
				try {
					kpidetail.setCLL_TIME_STR(kpidetail.CLL_TIME);

					kpi2event.generation(kpidetail.UNIT_ID, kpidetail.KPI_ID,
							kpidetail.CLL_TIME, kpidetail.KPI_VALUE,
							eventConfigList);
				} catch (Exception e) {
					logger.error("Exception while kpi2event.generation("
							+ kpidetail.UNIT_ID + "," + kpidetail.KPI_ID + ","
							+ kpidetail.CLL_TIME + "," + kpidetail.KPI_VALUE
							+ ")", e);
				}
			}
		}
	}

	/**
	 * 发送采集结果到Performance进行入库
	 *
	 * @param kpiList
	 */
	private void send2Performance(Vector<TblATO_KPIDETAIL> kpiList) {
		//props = kafkaConfig.createProducerConfig();
		//producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
		try {
            List messageList = new ArrayList<KeyedMessage<String, String>>();
			for (TblATO_KPIDETAIL kpiDetail : kpiList) {
				kpiDetail.display();

				PerformanceObject pmObj = new PerformanceObject();
				pmObj.setUnitId(kpiDetail.UNIT_ID);
				pmObj.setKpiId(kpiDetail.KPI_ID);
				pmObj.setKpiValue(kpiDetail.KPI_VALUE);
				pmObj.setCllTime(kpiDetail.CLL_TIME);
				pmObj.setCllTimeStr(Formater.datetimeToString(kpiDetail.CLL_TIME));
				pmObj.setInterval(job.getInterval());
				pmObj.setExtUnitId(getExtUnitId(kpiDetail.UNIT_ID));
                pmObj.setKpiName("");
                pmObj.setKpiDetail("");
                pmObj.setUnitName("");

				//TunnelFactory.getTunnel(Config.BMC_Q_PERFORMANCE).writeData(pmObj);
				//kafka producer
				//String messageStr = JacksonUtil.toJson(pmObj);
				//producer.send(new KeyedMessage<Integer, String>(kafkaConfig.getPerfTopic(), messageStr));
                //messageList.add(new KeyedMessage<String, String>(kafkaConfig.getPerfTopic(), "35", messageStr));
			}
            //kafkaConfig.getProducerInfo().send(messageList);
		} catch (Exception e) {
			e.printStackTrace();
		} finally{
			//producer.close();
		}


	}

	/**
	 * 对采集结果进行预处理
	 *
	 * @param kpiList
	 */
	private void doPreProcess(Vector<TblATO_KPIDETAIL> kpiList) {
		for (int i = kpiList.size() - 1; i >= 0; i--) {
			TblATO_KPIDETAIL kpi = kpiList.get(i);
			if (kpi.KPI_VALUE == null || kpi.KPI_VALUE.trim().equals("")) {
				// 采集值为空的过滤掉
				kpiList.remove(i);
			}
		}
	}

	/**
	 * 执行采集任务
	 */
	@SuppressWarnings("unchecked")
	private Vector<TblATO_KPIDETAIL> excuteJob() throws Exception {

		Vector<TblATO_KPIDETAIL> kpiList = null;
		try {
			// 采集参数
			Object[] params = new Object[] { job.getParams() };
			// MBean实例名
			ObjectName jmxobjectname = new ObjectName(job.getObjectName());

			String[] signature = new String[] { "java.util.HashMap" };

			MBeanServer mbs = MBeanManager.getInstance().getMBeanServer();

			kpiList = (Vector<TblATO_KPIDETAIL>) mbs.invoke(jmxobjectname, job
					.getOperation(), params, signature);
		} catch (Exception e) {
			logger.error("Exception while excute job, objectName["
					+ job.getObjectName() + "], methodName["
					+ job.getOperation() + "], collInterval["
					+ job.getInterval() + "].", e);
		}

		return kpiList;
	}

	/**
	 * 获得扩展UNIT_ID,如:10-10-20:deviceId
	 */
	private String getExtUnitId(String unitId) {
		if(null == unitId || "".equals(unitId)){
			return "";
		}

		String extUnitId = "";
		if (unitId.startsWith("10")) {
			// 平台类EXT_UNIT_ID为:10-XX-YY:DEVICE_ID
			String[] elem = unitId.split(":");
			if (elem.length >= 2) {
				String[] kbp = elem[0].split("-");
				if (kbp.length >= 3) {
					extUnitId = kbp[0] + "-" + kbp[1] + "-" + kbp[2];
				} else {
					extUnitId = elem[0];
				}

				String deviceId = elem[1].split("-")[0];

				extUnitId += ":" + deviceId;
			} else {
				extUnitId = elem[0];
			}
		} else {
			// 业务类的EXT_UNIT_ID与UNIT_ID相等
			extUnitId = unitId.split(":")[0];
		}
		return extUnitId;
	}
	public static void main(String[] args) {
		new CollJobThreadTest(new Job()).run();
	}
}