CollKafkaWithJMX211.java 10.9 KB
package com.sitech.ismp.coll.centercoll;

import com.sitech.ismp.coll.CollBase;
import com.sitech.util.DES3;
import org.apache.log4j.Logger;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;

/**
 * Created by diaodx on 2015/8/31.
 * 采集kafka_2.11-0.8.2.1版本
 */
public class CollKafkaWithJMX211 extends CollBase implements CollKafkaI{

    private Logger logger = Logger.getLogger(CollKafkaWithJMX.class);
    private static String KBP_CLASS = "10-41-13";

    //    private static Logger logger = Logger.getLogger("CollKafkaWithJMX");
    private final static String COLLIP = "COLLIP";
    private final static String JMXPORT = "JMXPORT";
    private final static String COLLUSER = "COLLUSER";
    private final static String COLLPASSWORD = "COLLPASSWORD";
    private final static String DEVICENAME = "DEVICENAME";

    String hostName = "";
    MBeanServerConnection mbsc = null;
    JMXConnector connector = null;

    public void init(HashMap params) {

        String collIp = (String) params.get(COLLIP);
        String jmxPort = (String) params.get(JMXPORT);
        String user = (String) params.get(COLLUSER);
        String password = (String) params.get(COLLPASSWORD);
        password = DES3.decrypt(password);
        hostName = (String) params.get(DEVICENAME);

        logger.info("COLLIP=" + collIp);
        logger.info("JMXPORT=" + jmxPort);
        logger.info("COLLUSER=" + user);
        logger.info("COLLPASSWORD=" + password);

        String jmxURL = "service:jmx:rmi:///jndi/rmi://" + collIp + ":" + jmxPort + "/jmxrmi";//openresty jmx url
        try {
            //System.out.println("connect jmx sevice");
            System.out.println("jmxURL=" + jmxURL);
            logger.info("jmxURL=" + jmxURL);
            JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
            Map map = new HashMap();
            String[] credentials = new String[]{user, password};
            map.put("jmx.remote.credentials", credentials);
            connector = JMXConnectorFactory.connect(serviceURL, map);
            mbsc = connector.getMBeanServerConnection();
        } catch (Exception e) {
            logger.error("初始化出错", e);
            e.printStackTrace();
        }
    }

    public Vector getControll(HashMap params) {
        CollBase collResult = new CollBase();
        try {
            init(params);
            ObjectName activeControllerCount = new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount");
            Integer value = (Integer) mbsc.getAttribute(activeControllerCount, "Value");
            System.out.println("-->ActiveControllerCount-Value:" + value);
            logger.info("-->ActiveControllerCount-Value:" + value);
            String leader = "否";
            if (value == 1) {
                leader = "是";
            }
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-01", leader);

            ObjectName leaderElectionRateAndTimeMs = new ObjectName("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs");
            Double oneMinuteRate = (Double) mbsc.getAttribute(leaderElectionRateAndTimeMs, "OneMinuteRate");
            System.out.println("-->LeaderElectionRateAndTimeMs-OneMinuteRate:" + oneMinuteRate);
            logger.info("-->LeaderElectionRateAndTimeMs-OneMinuteRate:" + oneMinuteRate);
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-02", String.valueOf(oneMinuteRate.longValue()));

            ObjectName uncleanLeaderElectionsPerSec = new ObjectName("kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec");
            Double oneMinuteRate1 = (Double) mbsc.getAttribute(uncleanLeaderElectionsPerSec, "OneMinuteRate");
            System.out.println("-->UncleanLeaderElectionsPerSec-OneMinuteRate:" + oneMinuteRate1);
            logger.info("-->UncleanLeaderElectionsPerSec-OneMinuteRate:" + oneMinuteRate1);
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-03", String.valueOf(oneMinuteRate1.longValue()));

            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-04", "正常");

        } catch (Exception e) {
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-04", "不正常");
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();
    }

    public Vector getServer(HashMap params) {
        CollBase collResult = new CollBase();
        try {
            init(params);
//            ObjectName allTopicsMessagesInPerSec = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec");
//            Double oneMinuteRate = (Double) mbsc.getAttribute(allTopicsMessagesInPerSec, "OneMinuteRate");
//            System.out.println("-->AllTopicsMessagesInPerSec-OneMinuteRate:" + oneMinuteRate);
//            logger.info("-->AllTopicsMessagesInPerSec-OneMinuteRate:" + oneMinuteRate);
//            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-01", String.valueOf(oneMinuteRate.longValue()));
//
//            ObjectName allTopicsBytesInPerSec = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec");
//            Double oneMinuteRate1 = (Double) mbsc.getAttribute(allTopicsBytesInPerSec, "OneMinuteRate");
//            System.out.println("-->AllTopicsBytesInPerSec-OneMinuteRate:" + oneMinuteRate1);
//            logger.info("-->AllTopicsBytesInPerSec-OneMinuteRate:" + oneMinuteRate1);
//            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-02", String.valueOf(oneMinuteRate1.longValue()));
//
//            ObjectName allTopicsBytesOutPerSec = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec");
//            Double oneMinuteRate2 = (Double) mbsc.getAttribute(allTopicsBytesOutPerSec, "OneMinuteRate");
//            System.out.println("-->AllTopicsBytesOutPerSec-OneMinuteRate:" + oneMinuteRate2);
//            logger.info("-->AllTopicsBytesOutPerSec-OneMinuteRate:" + oneMinuteRate2);
//            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-03", String.valueOf(oneMinuteRate2.longValue()));

            ObjectName underReplicatedPartitions = new ObjectName("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
            Integer value = (Integer) mbsc.getAttribute(underReplicatedPartitions, "Value");
            System.out.println("-->UnderReplicatedPartitions-value:" + value);
            logger.info("-->UnderReplicatedPartitions-value:" + value);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-04", String.valueOf(value));

            ObjectName partitionCount = new ObjectName("kafka.server:type=ReplicaManager,name=PartitionCount");
            Integer value1 = (Integer) mbsc.getAttribute(partitionCount, "Value");
            System.out.println("-->PartitionCount-value:" + value1);
            logger.info("-->PartitionCount-value:" + value1);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-05", String.valueOf(value1));

            ObjectName leaderCount = new ObjectName("kafka.server:type=ReplicaManager,name=LeaderCount");
            Integer value2 = (Integer) mbsc.getAttribute(leaderCount, "Value");
            System.out.println("-->LeaderCount-value:" + value2);
            logger.info("-->LeaderCount-value:" + value2);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-06", String.valueOf(value2));

            ObjectName isrExpandsPerSec = new ObjectName("kafka.server:type=ReplicaManager,name=IsrExpandsPerSec");
            Double oneMinuteRate3 = (Double) mbsc.getAttribute(isrExpandsPerSec, "OneMinuteRate");
            System.out.println("-->IsrExpandsPerSec-OneMinuteRate:" + oneMinuteRate3);
            logger.info("-->IsrExpandsPerSec-OneMinuteRate:" + oneMinuteRate3);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-08", String.valueOf(oneMinuteRate3.longValue()));

            ObjectName isrShrinksPerSec = new ObjectName("kafka.server:type=ReplicaManager,name=IsrShrinksPerSec");
            Double oneMinuteRate4 = (Double) mbsc.getAttribute(isrShrinksPerSec, "OneMinuteRate");
            System.out.println("-->IsrShrinksPerSec-OneMinuteRate:" + oneMinuteRate4);
            logger.info("-->IsrShrinksPerSec-OneMinuteRate:" + oneMinuteRate4);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-07", String.valueOf(oneMinuteRate4.longValue()));

            ObjectName purgatorySize = new ObjectName("kafka.server:type=FetchRequestPurgatory,name=PurgatorySize");
            Integer value3 = (Integer) mbsc.getAttribute(purgatorySize, "Value");
            System.out.println("-->PurgatorySize-value:" + value3);
            logger.info("-->PurgatorySize-value:" + value3);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-10", String.valueOf(value3));

            ObjectName purgatorySize2 = new ObjectName("kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize");
            Integer value4 = (Integer) mbsc.getAttribute(purgatorySize2, "Value");
            System.out.println("-->PurgatorySize2-value:" + value4);
            logger.info("-->PurgatorySize2-value:" + value4);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-09", String.valueOf(value4));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();
    }

    public Vector getNetwork(HashMap params) {
        CollBase collResult = new CollBase();
        try {
            init(params);
            ObjectName requestQueueSize = new ObjectName("kafka.network:type=RequestChannel,name=RequestQueueSize");
            Integer value = (Integer) mbsc.getAttribute(requestQueueSize, "Value");
            System.out.println("-->RequestQueueSize-value:" + value);
            logger.info("-->RequestQueueSize-value:" + value);
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-network", "PM-10-41-011-01", String.valueOf(value));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();
    }

}