CollCoherenceWithJMX.java 19.1 KB
package com.sitech.ismp.coll.centercoll;

import com.sitech.ismp.coll.CollBase;
import com.sitech.ismp.coll.centercoll.bean.CoherenceNodeBean;
import com.sitech.ismp.util.CreatFile;
import com.sitech.util.DES3;
import org.apache.log4j.Logger;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;

/**
 * Created with IntelliJ IDEA.
 * User: ZhouYou
 * Date: 15-7-14
 * Time: 上午11:14
 * To change this template use File | Settings | File Templates.
 */
public class CollCoherenceWithJMX extends CollBase {

//    private Logger logger = Logger.getLogger(CollStormClusterWithCMD.class);
    private Logger logger = Logger.getLogger("COLL");
    private String KBP_CLASS = "10-32-22";

    private String hostIP;
    private String jmxPort;
    private String userName;
    private String userPass;
    private String devName;
    private String nodePath;

    private CoherenceNodeBean preNode = null;

    private String normal = "正常";
    private String abnormal = "不正常";
    private String use = "在用";
    private String noUse = "不在用";
    MBeanServerConnection mbsc = null;
    JMXConnector connector = null;
    CreatFile<CoherenceNodeBean> creatFile = new CreatFile<CoherenceNodeBean>();

    public void init(HashMap params) {
        hostIP = (String) params.get("HOSTIP");
        jmxPort = (String) params.get("JMXPORT");
        devName = (String) params.get("DEVICENAME");
        userName = (String) params.get("USERNAME");
        userPass = (String) params.get("USERPASS");
        userPass = DES3.decrypt(userPass);
        nodePath = (String) params.get("NODEPATH");

        logger.info("-->hostIP:" + hostIP);
        logger.info("-->jmxPort:" + jmxPort);
        logger.info("-->devName:" + devName);
        logger.info("-->userName:" + userName);
        logger.info("-->userPass:" + userPass);
        logger.info("-->nodePath:" + nodePath);

        String jmxURL = "service:jmx:rmi:///jndi/rmi://" + hostIP + ":" + jmxPort + "/jmxrmi";//openresty jmx url
        try {
            //System.out.println("connect jmx sevice");
            System.out.println("jmxURL=" + jmxURL);
            logger.info("jmxURL=" + jmxURL);
            JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
            Map map = new HashMap();
            String[] credentials = new String[]{userName, userPass};
            map.put("jmx.remote.credentials", credentials);
            connector = JMXConnectorFactory.connect(serviceURL, map);
            mbsc = connector.getMBeanServerConnection();

        } catch (Exception e) {
            logger.error("coherence连接初始化出错", e);
            e.printStackTrace();
        }
    }

    public Vector getCluster(HashMap params) {
        logger.info(">>>Coherence集群信息采集");
        CollBase collResult = new CollBase();
        String version = "";
        String clusterName = "";
        Integer clusterSize = 0;
        String licenseMode = "未知";
        init(params);
        String unitId = this.KBP_CLASS + "-10:" + devName + "-cluster";
        try {
            ObjectName cluster = new ObjectName("Coherence:type=Cluster");
            Boolean running = (Boolean) mbsc.getAttribute(cluster, "Running");
            if (running) {
                collResult.addKPI(unitId, "PM-10-32-010-01", normal);
            } else {
                collResult.addKPI(unitId, "PM-10-32-010-01", abnormal);
            }

            version = (String) mbsc.getAttribute(cluster, "Version");
            collResult.addKPI(unitId, "PM-10-32-010-02", version);

            clusterName = (String) mbsc.getAttribute(cluster, "ClusterName");
            collResult.addKPI(unitId, "PM-10-32-010-03", clusterName);

            clusterSize = (Integer) mbsc.getAttribute(cluster, "ClusterSize");
            collResult.addKPI(unitId, "PM-10-32-010-04", String.valueOf(clusterSize));

            licenseMode = (String) mbsc.getAttribute(cluster, "LicenseMode");
            collResult.addKPI(unitId, "PM-10-32-010-05", licenseMode);

        } catch (Exception e) {
            collResult.addKPI(unitId, "PM-10-32-010-01", abnormal);
            collResult.addKPI(unitId, "PM-10-32-010-02", version);
            collResult.addKPI(unitId, "PM-10-32-010-03", clusterName);
            collResult.addKPI(unitId, "PM-10-32-010-04", String.valueOf(clusterSize));
            collResult.addKPI(unitId, "PM-10-32-010-05", licenseMode);
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();

    }

    public Vector getNotes(HashMap params) {
        logger.info(">>>Coherence节点信息采集");
        CollBase collResult = new CollBase();
        init(params);
        try {
            logger.info("-->创建node序列化文件开始");
            creatFile.creatFile(nodePath);
            logger.info("-->创建node序列化文件结束");
            try {
                String jsonObject = (String) creatFile.deSerializeObj(nodePath);
                preNode = (CoherenceNodeBean) creatFile.readJson2Entity(jsonObject, new CoherenceNodeBean()); //使用jacson转换
                //  preNode = (CoherenceNodeBean)JSON.parseObject(jsonObject,CoherenceNodeBean.class);    //使用fastjson转换
            } catch (Exception e) {
                System.out.println("coherence对象反序列化失败");
            }
            CoherenceNodeBean nodeBean = CoherenceNodeBean.getInstance();
            if (preNode == null) {
                logger.info("-->preNode为null");
                ObjectName cluster = new ObjectName("Coherence:type=Cluster");
                int[] memberIds = (int[]) mbsc.getAttribute(cluster, "MemberIds");
                for (int i = 0; i < memberIds.length; i++) {
                    collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + memberIds[i], "PM-10-32-011-01", String.valueOf(memberIds[i]));

                    ObjectName node = new ObjectName("Coherence:type=Node,nodeId=" + memberIds[i]);
                    String unicastAddress = (String) mbsc.getAttribute(node, "UnicastAddress");
                    collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + memberIds[i], "PM-10-32-011-02", unicastAddress);

                    String processName = (String) mbsc.getAttribute(node, "ProcessName");
                    collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + memberIds[i], "PM-10-32-011-03", processName);

                    String roleName = (String) mbsc.getAttribute(node, "RoleName");
                    collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + memberIds[i], "PM-10-32-011-04", roleName);

                    ObjectName memory = new ObjectName("Coherence:type=Platform,Domain=java.lang,subType=Memory,nodeId=" + memberIds[i]);
                    CompositeData heapMemoryUsage = (CompositeData) mbsc.getAttribute(memory, "HeapMemoryUsage");
                    Long heapMemory = (Long) heapMemoryUsage.get("used");
                    collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + memberIds[i], "PM-10-32-011-06", String.valueOf(heapMemory));

                    ObjectName noMemory = new ObjectName("Coherence:type=Platform,Domain=java.lang,subType=Memory,nodeId=" + memberIds[i]);
                    CompositeData nonHeapMemoryUsage = (CompositeData) mbsc.getAttribute(noMemory, "NonHeapMemoryUsage");
                    Long nonHeapMemory = (Long) nonHeapMemoryUsage.get("used");
                    collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + memberIds[i], "PM-10-32-011-07", String.valueOf(nonHeapMemory));

                    collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + memberIds[i], "PM-10-32-011-05", normal);

                    CoherenceNodeBean coherenceBean = new CoherenceNodeBean();
                    coherenceBean.setNodeId(String.valueOf(memberIds[i]));
                    coherenceBean.setHostIp(unicastAddress);
                    coherenceBean.setProcessNo(processName);
                    coherenceBean.setRole(roleName);
                    coherenceBean.setHeapMemory(heapMemory.intValue());
                    coherenceBean.setHonHeapMemory(nonHeapMemory.intValue());
                    coherenceBean.setState(normal);
                    nodeBean.getNodeBeanM().put(String.valueOf(memberIds[i]), coherenceBean);
                }
                // String jsonString = JSON.toJSONString(nodeBean);
                String jsonString = creatFile.javaToJson(nodeBean);
                creatFile.serializeObj(jsonString, nodePath);

                try {
                    logger.info("-->获取preNode");
                    String jsonObj = (String) creatFile.deSerializeObj(nodePath);
                    preNode = (CoherenceNodeBean) creatFile.readJson2Entity(jsonObj, new CoherenceNodeBean()); //使用jacson转换
                    //preNode = (CoherenceNodeBean)JSON.parseObject(jsonObj,CoherenceNodeBean.class);
                } catch (Exception e) {
                    logger.info("-->CoherenceNodeBean反序列化失败:" + nodePath);
                }

            } else {
                Map<String, CoherenceNodeBean> currentM = new HashMap<String, CoherenceNodeBean>();
                ObjectName cluster = new ObjectName("Coherence:type=Cluster");
                int[] memberIds = (int[]) mbsc.getAttribute(cluster, "MemberIds");
                for (int i = 0; i < memberIds.length; i++) {
                    ObjectName node = new ObjectName("Coherence:type=Node,nodeId=" + memberIds[i]);
                    String unicastAddress = (String) mbsc.getAttribute(node, "UnicastAddress");
                    String processName = (String) mbsc.getAttribute(node, "ProcessName");
                    String roleName = (String) mbsc.getAttribute(node, "RoleName");
                    ObjectName memory = new ObjectName("Coherence:type=Platform,Domain=java.lang,subType=Memory,nodeId=" + memberIds[i]);
                    CompositeData heapMemoryUsage = (CompositeData) mbsc.getAttribute(memory, "HeapMemoryUsage");
                    Long heapMemory = (Long) heapMemoryUsage.get("used");
                    ObjectName noMemory = new ObjectName("Coherence:type=Platform,Domain=java.lang,subType=Memory,nodeId=" + memberIds[i]);
                    CompositeData nonHeapMemoryUsage = (CompositeData) mbsc.getAttribute(noMemory, "NonHeapMemoryUsage");
                    Long nonHeapMemory = (Long) nonHeapMemoryUsage.get("used");
                    CoherenceNodeBean coherenceBean = new CoherenceNodeBean();
                    coherenceBean.setNodeId(String.valueOf(memberIds[i]));
                    coherenceBean.setHostIp(unicastAddress);
                    coherenceBean.setProcessNo(processName);
                    coherenceBean.setRole(roleName);
                    coherenceBean.setHeapMemory(heapMemory.intValue());
                    coherenceBean.setHonHeapMemory(nonHeapMemory.intValue());
                    coherenceBean.setState(normal);
                    currentM.put(String.valueOf(memberIds[i]), coherenceBean);
                }

                for (Map.Entry<String, CoherenceNodeBean> entity : preNode.getNodeBeanM().entrySet()) {    //先判断哪些节点挂掉
                    if (currentM.containsKey(entity.getKey())) { //判断前一次采集的节点,是否在当前采集的节点列表中,如果存在就把当前采集的节点信息入实时表
                        logger.info("-->更新node节点");
                        String nodeId = currentM.get(entity.getKey()).getNodeId();
                        String hostIp = currentM.get(entity.getKey()).getHostIp();
                        String processNo = currentM.get(entity.getKey()).getProcessNo();
                        String roleName = currentM.get(entity.getKey()).getRole();
                        Integer heapMemory = currentM.get(entity.getKey()).getHeapMemory();
                        Integer nonHeapMemory = currentM.get(entity.getKey()).getHonHeapMemory();

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-01", nodeId);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-02", hostIp);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-03", processNo);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-04", roleName);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-06", String.valueOf(heapMemory));

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-07", String.valueOf(nonHeapMemory));

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-05", normal);

                        CoherenceNodeBean coherenceBean = new CoherenceNodeBean();
                        coherenceBean.setNodeId(nodeId);
                        coherenceBean.setHostIp(hostIp);
                        coherenceBean.setProcessNo(processNo);
                        coherenceBean.setRole(roleName);
                        coherenceBean.setHeapMemory(heapMemory);
                        coherenceBean.setHonHeapMemory(nonHeapMemory);
                        coherenceBean.setState(normal);
                        nodeBean.getNodeBeanM().put(nodeId, coherenceBean);

                    } else { //如果不存在,说明节点挂掉或者移除了,改变前一次采集的状态
                        logger.info("-->node节点挂掉,更新节点状态");
                        String nodeId = entity.getValue().getNodeId();
                        String hostIp = entity.getValue().getHostIp();
                        String processNo = entity.getValue().getProcessNo();
                        String roleName = entity.getValue().getRole();
                        Integer heapMemory = entity.getValue().getHeapMemory();
                        Integer nonHeapMemory = entity.getValue().getHonHeapMemory();

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-01", nodeId);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-02", hostIp);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-03", processNo);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-04", roleName);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-06", String.valueOf(heapMemory));

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-07", String.valueOf(nonHeapMemory));

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-05", abnormal);
                        CoherenceNodeBean coherenceBean = new CoherenceNodeBean();
                        coherenceBean.setNodeId(nodeId);
                        coherenceBean.setHostIp(hostIp);
                        coherenceBean.setProcessNo(processNo);
                        coherenceBean.setRole(roleName);
                        coherenceBean.setHeapMemory(heapMemory);
                        coherenceBean.setHonHeapMemory(nonHeapMemory);
                        coherenceBean.setState(abnormal);
                        nodeBean.getNodeBeanM().put(nodeId, coherenceBean);

                    }

                }

                for (Map.Entry<String, CoherenceNodeBean> entity : currentM.entrySet()) { //再判断哪些节点新增
                    if (!preNode.getNodeBeanM().containsKey(entity.getKey())) {  //如果是新增节点,就入库,并且更新preSupervisor,重新序列化到文件保存
                        logger.info("-->新增supervisor节点");
                        String nodeId = entity.getValue().getNodeId();
                        String hostIp = entity.getValue().getHostIp();
                        String processNo = entity.getValue().getProcessNo();
                        String roleName = entity.getValue().getRole();
                        Integer heapMemory = entity.getValue().getHeapMemory();
                        Integer nonHeapMemory = entity.getValue().getHonHeapMemory();

                        CoherenceNodeBean coherenceBean = new CoherenceNodeBean();
                        coherenceBean.setNodeId(nodeId);
                        coherenceBean.setHostIp(hostIp);
                        coherenceBean.setProcessNo(processNo);
                        coherenceBean.setRole(roleName);
                        coherenceBean.setHeapMemory(heapMemory);
                        coherenceBean.setHonHeapMemory(nonHeapMemory);
                        coherenceBean.setState(normal);
                        nodeBean.getNodeBeanM().put(nodeId, coherenceBean);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-01", nodeId);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-02", hostIp);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-03", processNo);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-04", roleName);

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-06", String.valueOf(heapMemory));

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-07", String.valueOf(nonHeapMemory));

                        collResult.addKPI("10-32-22-11" + ":" + devName + "-node" + nodeId, "PM-10-32-011-05", normal);


                    }

                }
                //String jsonString = JSON.toJSONString(nodeBean);
                String jsonString = creatFile.javaToJson(nodeBean);
                creatFile.serializeObj(jsonString, nodePath);
                try {
                    logger.info("-->更新节点,重新获取preNode");
                    String jsonObj = (String) creatFile.deSerializeObj(nodePath);

                    preNode = (CoherenceNodeBean) creatFile.readJson2Entity(jsonObj, new CoherenceNodeBean()); //使用jacson转换
                    // preNode = (CoherenceNodeBean)JSON.parseObject(jsonObj,CoherenceNodeBean.class);
                } catch (Exception e) {
                    logger.info("-->CoherenceNodeBean反序列化失败:" + nodePath);
                }

            }


        } catch (Exception e) {
            e.printStackTrace();

        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();

    }

}