CollKafkaWithJMX211.java
10.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package com.sitech.ismp.coll.centercoll;
import com.sitech.ismp.coll.CollBase;
import com.sitech.util.DES3;
import org.apache.log4j.Logger;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXServiceURL;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
/**
* Created by diaodx on 2015/8/31.
* 采集kafka_2.11-0.8.2.1版本
*/
public class CollKafkaWithJMX211 extends CollBase implements CollKafkaI{
private Logger logger = Logger.getLogger(CollKafkaWithJMX.class);
private static String KBP_CLASS = "10-41-13";
// private static Logger logger = Logger.getLogger("CollKafkaWithJMX");
private final static String COLLIP = "COLLIP";
private final static String JMXPORT = "JMXPORT";
private final static String COLLUSER = "COLLUSER";
private final static String COLLPASSWORD = "COLLPASSWORD";
private final static String DEVICENAME = "DEVICENAME";
String hostName = "";
MBeanServerConnection mbsc = null;
JMXConnector connector = null;
public void init(HashMap params) {
String collIp = (String) params.get(COLLIP);
String jmxPort = (String) params.get(JMXPORT);
String user = (String) params.get(COLLUSER);
String password = (String) params.get(COLLPASSWORD);
password = DES3.decrypt(password);
hostName = (String) params.get(DEVICENAME);
logger.info("COLLIP=" + collIp);
logger.info("JMXPORT=" + jmxPort);
logger.info("COLLUSER=" + user);
logger.info("COLLPASSWORD=" + password);
String jmxURL = "service:jmx:rmi:///jndi/rmi://" + collIp + ":" + jmxPort + "/jmxrmi";//openresty jmx url
try {
//System.out.println("connect jmx sevice");
System.out.println("jmxURL=" + jmxURL);
logger.info("jmxURL=" + jmxURL);
JMXServiceURL serviceURL = new JMXServiceURL(jmxURL);
Map map = new HashMap();
String[] credentials = new String[]{user, password};
map.put("jmx.remote.credentials", credentials);
connector = JMXConnectorFactory.connect(serviceURL, map);
mbsc = connector.getMBeanServerConnection();
} catch (Exception e) {
logger.error("初始化出错", e);
e.printStackTrace();
}
}
public Vector getControll(HashMap params) {
CollBase collResult = new CollBase();
try {
init(params);
ObjectName activeControllerCount = new ObjectName("kafka.controller:type=KafkaController,name=ActiveControllerCount");
Integer value = (Integer) mbsc.getAttribute(activeControllerCount, "Value");
System.out.println("-->ActiveControllerCount-Value:" + value);
logger.info("-->ActiveControllerCount-Value:" + value);
String leader = "否";
if (value == 1) {
leader = "是";
}
collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-01", leader);
ObjectName leaderElectionRateAndTimeMs = new ObjectName("kafka.controller:type=ControllerStats,name=LeaderElectionRateAndTimeMs");
Double oneMinuteRate = (Double) mbsc.getAttribute(leaderElectionRateAndTimeMs, "OneMinuteRate");
System.out.println("-->LeaderElectionRateAndTimeMs-OneMinuteRate:" + oneMinuteRate);
logger.info("-->LeaderElectionRateAndTimeMs-OneMinuteRate:" + oneMinuteRate);
collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-02", String.valueOf(oneMinuteRate.longValue()));
ObjectName uncleanLeaderElectionsPerSec = new ObjectName("kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec");
Double oneMinuteRate1 = (Double) mbsc.getAttribute(uncleanLeaderElectionsPerSec, "OneMinuteRate");
System.out.println("-->UncleanLeaderElectionsPerSec-OneMinuteRate:" + oneMinuteRate1);
logger.info("-->UncleanLeaderElectionsPerSec-OneMinuteRate:" + oneMinuteRate1);
collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-03", String.valueOf(oneMinuteRate1.longValue()));
collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-04", "正常");
} catch (Exception e) {
collResult.addKPI("10-41-13-10" + ":" + hostName + "-controller", "PM-10-41-010-04", "不正常");
e.printStackTrace();
} finally {
try {
connector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return collResult.getKPISet();
}
public Vector getServer(HashMap params) {
CollBase collResult = new CollBase();
try {
init(params);
// ObjectName allTopicsMessagesInPerSec = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec");
// Double oneMinuteRate = (Double) mbsc.getAttribute(allTopicsMessagesInPerSec, "OneMinuteRate");
// System.out.println("-->AllTopicsMessagesInPerSec-OneMinuteRate:" + oneMinuteRate);
// logger.info("-->AllTopicsMessagesInPerSec-OneMinuteRate:" + oneMinuteRate);
// collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-01", String.valueOf(oneMinuteRate.longValue()));
//
// ObjectName allTopicsBytesInPerSec = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec");
// Double oneMinuteRate1 = (Double) mbsc.getAttribute(allTopicsBytesInPerSec, "OneMinuteRate");
// System.out.println("-->AllTopicsBytesInPerSec-OneMinuteRate:" + oneMinuteRate1);
// logger.info("-->AllTopicsBytesInPerSec-OneMinuteRate:" + oneMinuteRate1);
// collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-02", String.valueOf(oneMinuteRate1.longValue()));
//
// ObjectName allTopicsBytesOutPerSec = new ObjectName("kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec");
// Double oneMinuteRate2 = (Double) mbsc.getAttribute(allTopicsBytesOutPerSec, "OneMinuteRate");
// System.out.println("-->AllTopicsBytesOutPerSec-OneMinuteRate:" + oneMinuteRate2);
// logger.info("-->AllTopicsBytesOutPerSec-OneMinuteRate:" + oneMinuteRate2);
// collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-03", String.valueOf(oneMinuteRate2.longValue()));
ObjectName underReplicatedPartitions = new ObjectName("kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions");
Integer value = (Integer) mbsc.getAttribute(underReplicatedPartitions, "Value");
System.out.println("-->UnderReplicatedPartitions-value:" + value);
logger.info("-->UnderReplicatedPartitions-value:" + value);
collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-04", String.valueOf(value));
ObjectName partitionCount = new ObjectName("kafka.server:type=ReplicaManager,name=PartitionCount");
Integer value1 = (Integer) mbsc.getAttribute(partitionCount, "Value");
System.out.println("-->PartitionCount-value:" + value1);
logger.info("-->PartitionCount-value:" + value1);
collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-05", String.valueOf(value1));
ObjectName leaderCount = new ObjectName("kafka.server:type=ReplicaManager,name=LeaderCount");
Integer value2 = (Integer) mbsc.getAttribute(leaderCount, "Value");
System.out.println("-->LeaderCount-value:" + value2);
logger.info("-->LeaderCount-value:" + value2);
collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-06", String.valueOf(value2));
ObjectName isrExpandsPerSec = new ObjectName("kafka.server:type=ReplicaManager,name=IsrExpandsPerSec");
Double oneMinuteRate3 = (Double) mbsc.getAttribute(isrExpandsPerSec, "OneMinuteRate");
System.out.println("-->IsrExpandsPerSec-OneMinuteRate:" + oneMinuteRate3);
logger.info("-->IsrExpandsPerSec-OneMinuteRate:" + oneMinuteRate3);
collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-08", String.valueOf(oneMinuteRate3.longValue()));
ObjectName isrShrinksPerSec = new ObjectName("kafka.server:type=ReplicaManager,name=IsrShrinksPerSec");
Double oneMinuteRate4 = (Double) mbsc.getAttribute(isrShrinksPerSec, "OneMinuteRate");
System.out.println("-->IsrShrinksPerSec-OneMinuteRate:" + oneMinuteRate4);
logger.info("-->IsrShrinksPerSec-OneMinuteRate:" + oneMinuteRate4);
collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-07", String.valueOf(oneMinuteRate4.longValue()));
ObjectName purgatorySize = new ObjectName("kafka.server:type=FetchRequestPurgatory,name=PurgatorySize");
Integer value3 = (Integer) mbsc.getAttribute(purgatorySize, "Value");
System.out.println("-->PurgatorySize-value:" + value3);
logger.info("-->PurgatorySize-value:" + value3);
collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-10", String.valueOf(value3));
ObjectName purgatorySize2 = new ObjectName("kafka.server:type=ProducerRequestPurgatory,name=PurgatorySize");
Integer value4 = (Integer) mbsc.getAttribute(purgatorySize2, "Value");
System.out.println("-->PurgatorySize2-value:" + value4);
logger.info("-->PurgatorySize2-value:" + value4);
collResult.addKPI("10-41-13-12" + ":" + hostName + "-server", "PM-10-41-012-09", String.valueOf(value4));
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return collResult.getKPISet();
}
public Vector getNetwork(HashMap params) {
CollBase collResult = new CollBase();
try {
init(params);
ObjectName requestQueueSize = new ObjectName("kafka.network:type=RequestChannel,name=RequestQueueSize");
Integer value = (Integer) mbsc.getAttribute(requestQueueSize, "Value");
System.out.println("-->RequestQueueSize-value:" + value);
logger.info("-->RequestQueueSize-value:" + value);
collResult.addKPI("10-41-13-10" + ":" + hostName + "-network", "PM-10-41-011-01", String.valueOf(value));
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connector.close();
} catch (Exception e) {
e.printStackTrace();
}
}
return collResult.getKPISet();
}
}