CollKafkaWithJMX.java 11.1 KB
package com.sitech.ismp.coll.centercoll;

import com.sitech.ismp.coll.CollBase;
import com.sitech.util.DES3;
import org.apache.log4j.Logger;

import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;

/**
 * Created with IntelliJ IDEA.
 * User: ZhouYou
 * Date: 15-7-6
 * Time: 上午11:44
 * To change this template use File | Settings | File Templates.
 */
public class CollKafkaWithJMX extends CollBase implements CollKafkaI{

    private Logger logger = Logger.getLogger(CollKafkaWithJMX.class);
    private static String KBP_CLASS = "10-41-13";

//    private static Logger logger = Logger.getLogger("CollKafkaWithJMX");
    private final static String COLLIP = "COLL_IP";
    private final static String JMXPORT = "JMX_PORT";
    private final static String COLLUSER = "COLL_USER";
    private final static String COLLPASSWORD = "COLL_PASSWORD";
    private final static String DEVICENAME = "DEVICENAME";

    String hostName = "";
    MBeanServerConnection mbsc = null;
    JMXConnector connector = null;

    public void init(HashMap params) {

        String collIp = (String) params.get(COLLIP);
        String jmxPort = (String) params.get(JMXPORT);
        String user = (String) params.get(COLLUSER);
        String password = (String) params.get(COLLPASSWORD);
        password = DES3.decrypt(password);
        hostName = (String) params.get(DEVICENAME);

        logger.info("COLLIP=" + collIp);
        logger.info("JMXPORT=" + jmxPort);
        logger.info("COLLUSER=" + user);
        logger.info("COLLPASSWORD=" + password);

        String jmxURL = "service:jmx:rmi:///jndi/rmi://" + collIp + ":" + jmxPort + "/jmxrmi";//openresty jmx url
        try {
            //System.out.println("connect jmx sevice");
            System.out.println("jmxURL=" + jmxURL);
            logger.info("jmxURL=" + jmxURL);
            JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
            Map map = new HashMap();
            String[] credentials = new String[]{user, password};
            map.put("jmx.remote.credentials", credentials);
            connector = JMXConnectorFactory.connect(serviceURL, map);
            mbsc = connector.getMBeanServerConnection();
        } catch (Exception e) {
            logger.error("初始化出错", e);
            e.printStackTrace();
        }
    }

    public Vector getControll(HashMap params) {
        CollBase collResult = new CollBase();
        try {
            init(params);
            ObjectName activeControllerCount = new ObjectName("\"kafka.controller\":type=\"KafkaController\",name=\"ActiveControllerCount\"");
            Integer value = (Integer) mbsc.getAttribute(activeControllerCount, "Value");
            System.out.println("-->ActiveControllerCount-Value:" + value);
            logger.info("-->ActiveControllerCount-Value:" + value);
            String leader = "否";
            if (value == 1) {
                leader = "是";
            }
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-01", leader);

            ObjectName leaderElectionRateAndTimeMs = new ObjectName("\"kafka.controller\":type=\"ControllerStats\",name=\"LeaderElectionRateAndTimeMs\"");
            Double oneMinuteRate = (Double) mbsc.getAttribute(leaderElectionRateAndTimeMs, "OneMinuteRate");
            System.out.println("-->LeaderElectionRateAndTimeMs-OneMinuteRate:" + oneMinuteRate);
            logger.info("-->LeaderElectionRateAndTimeMs-OneMinuteRate:" + oneMinuteRate);
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-02", String.valueOf(oneMinuteRate.longValue()));

            ObjectName uncleanLeaderElectionsPerSec = new ObjectName("\"kafka.controller\":type=\"ControllerStats\",name=\"UncleanLeaderElectionsPerSec\"");
            Double oneMinuteRate1 = (Double) mbsc.getAttribute(uncleanLeaderElectionsPerSec, "OneMinuteRate");
            System.out.println("-->UncleanLeaderElectionsPerSec-OneMinuteRate:" + oneMinuteRate1);
            logger.info("-->UncleanLeaderElectionsPerSec-OneMinuteRate:" + oneMinuteRate1);
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-03", String.valueOf(oneMinuteRate1.longValue()));

            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-04", "正常");

        } catch (Exception e) {
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-04", "不正常");
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();
    }

    public Vector getServer(HashMap params) {
        CollBase collResult = new CollBase();
        try {
            init(params);
            ObjectName allTopicsMessagesInPerSec = new ObjectName("\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsMessagesInPerSec\"");
            Double oneMinuteRate = (Double) mbsc.getAttribute(allTopicsMessagesInPerSec, "OneMinuteRate");
            System.out.println("-->AllTopicsMessagesInPerSec-OneMinuteRate:" + oneMinuteRate);
            logger.info("-->AllTopicsMessagesInPerSec-OneMinuteRate:" + oneMinuteRate);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-01", String.valueOf(oneMinuteRate.longValue()));

            ObjectName allTopicsBytesInPerSec = new ObjectName("\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesInPerSec\"");
            Double oneMinuteRate1 = (Double) mbsc.getAttribute(allTopicsBytesInPerSec, "OneMinuteRate");
            System.out.println("-->AllTopicsBytesInPerSec-OneMinuteRate:" + oneMinuteRate1);
            logger.info("-->AllTopicsBytesInPerSec-OneMinuteRate:" + oneMinuteRate1);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-02", String.valueOf(oneMinuteRate1.longValue()));

            ObjectName allTopicsBytesOutPerSec = new ObjectName("\"kafka.server\":type=\"BrokerTopicMetrics\",name=\"AllTopicsBytesOutPerSec\"");
            Double oneMinuteRate2 = (Double) mbsc.getAttribute(allTopicsBytesOutPerSec, "OneMinuteRate");
            System.out.println("-->AllTopicsBytesOutPerSec-OneMinuteRate:" + oneMinuteRate2);
            logger.info("-->AllTopicsBytesOutPerSec-OneMinuteRate:" + oneMinuteRate2);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-03", String.valueOf(oneMinuteRate2.longValue()));

            ObjectName underReplicatedPartitions = new ObjectName("\"kafka.server\":type=\"ReplicaManager\",name=\"UnderReplicatedPartitions\"");
            Integer value = (Integer) mbsc.getAttribute(underReplicatedPartitions, "Value");
            System.out.println("-->UnderReplicatedPartitions-value:" + value);
            logger.info("-->UnderReplicatedPartitions-value:" + value);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-04", String.valueOf(value));

            ObjectName partitionCount = new ObjectName("\"kafka.server\":type=\"ReplicaManager\",name=\"PartitionCount\"");
            Integer value1 = (Integer) mbsc.getAttribute(partitionCount, "Value");
            System.out.println("-->PartitionCount-value:" + value1);
            logger.info("-->PartitionCount-value:" + value1);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-05", String.valueOf(value1));

            ObjectName leaderCount = new ObjectName("\"kafka.server\":type=\"ReplicaManager\",name=\"LeaderCount\"");
            Integer value2 = (Integer) mbsc.getAttribute(leaderCount, "Value");
            System.out.println("-->LeaderCount-value:" + value2);
            logger.info("-->LeaderCount-value:" + value2);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-06", String.valueOf(value2));

            ObjectName isrExpandsPerSec = new ObjectName("\"kafka.server\":type=\"ReplicaManager\",name=\"IsrExpandsPerSec\"");
            Double oneMinuteRate3 = (Double) mbsc.getAttribute(isrExpandsPerSec, "OneMinuteRate");
            System.out.println("-->IsrExpandsPerSec-OneMinuteRate:" + oneMinuteRate3);
            logger.info("-->IsrExpandsPerSec-OneMinuteRate:" + oneMinuteRate3);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-08", String.valueOf(oneMinuteRate3.longValue()));

            ObjectName isrShrinksPerSec = new ObjectName("\"kafka.server\":type=\"ReplicaManager\",name=\"IsrShrinksPerSec\"");
            Double oneMinuteRate4 = (Double) mbsc.getAttribute(isrShrinksPerSec, "OneMinuteRate");
            System.out.println("-->IsrShrinksPerSec-OneMinuteRate:" + oneMinuteRate4);
            logger.info("-->IsrShrinksPerSec-OneMinuteRate:" + oneMinuteRate4);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-07", String.valueOf(oneMinuteRate4.longValue()));

            ObjectName purgatorySize = new ObjectName("\"kafka.server\":type=\"FetchRequestPurgatory\",name=\"PurgatorySize\"");
            Integer value3 = (Integer) mbsc.getAttribute(purgatorySize, "Value");
            System.out.println("-->PurgatorySize-value:" + value3);
            logger.info("-->PurgatorySize-value:" + value3);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-10", String.valueOf(value3));

            ObjectName purgatorySize2 = new ObjectName("\"kafka.server\":type=\"ProducerRequestPurgatory\",name=\"PurgatorySize\"");
            Integer value4 = (Integer) mbsc.getAttribute(purgatorySize2, "Value");
            System.out.println("-->PurgatorySize2-value:" + value4);
            logger.info("-->PurgatorySize2-value:" + value4);
            collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-09", String.valueOf(value4));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();
    }

    public Vector getNetwork(HashMap params) {
        CollBase collResult = new CollBase();
        try {
            init(params);
            ObjectName requestQueueSize = new ObjectName("\"kafka.network\":type=\"RequestChannel\",name=\"RequestQueueSize\"");
            Integer value = (Integer) mbsc.getAttribute(requestQueueSize, "Value");
            System.out.println("-->RequestQueueSize-value:" + value);
            logger.info("-->RequestQueueSize-value:" + value);
            collResult.addKPI("10-41-13-10" + ":" + hostName + "-network", "PM-10-41-011-01", String.valueOf(value));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                connector.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        return collResult.getKPISet();
    }

}