LinkCollThread.java
18.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
package com.sitech.ismp.coll.link;
import java.io.*;
import java.util.*;
import com.sitech.base.AgentProperties;
import com.sitech.util.FileUtils;
import com.sitech.util.Formater;
import net.sf.json.JSONArray;
import net.sf.json.JSONObject;
import org.apache.log4j.Logger;
import com.sitech.ismp.coll.CollBase;
import com.sitech.ismp.coll.basic.TblATO_KPIDETAIL;
import com.sitech.util.JSONUtil;
public class LinkCollThread extends CollBase {
private String linkId;
private String interval;
private String osType;
private String shellName;
private String pingCount;
private String resultFile;
private Map<Integer, Map<String, String>> nodes = new HashMap<Integer, Map<String, String>>();
public Vector<TblATO_KPIDETAIL> getLinkPM(HashMap<String, String> params) {
// 保存采集结果,并返回值
CollBase collResult = new CollBase();
try {
init(params);
String shellContent = createPingShell(nodes);
logger.info("[LINK_COLL " + linkId + " ] Create Ping Shell: \n" + shellContent);
writeShellFile(shellContent);
//通知cron执行生成的脚本
writeNoticeFile();
List<String> result = getResult(resultFile);
if (null == result || result.isEmpty()) {
return collResult.KPISet;
}
List<List<String>> list = parseResult(result, nodes.size());
Map<String, String> dealResultMap = null;
double linkTotalTime = 0.0;
int brkNodeIndex = 0;
//循环处理各节点信息
for (int i = 0; i < nodes.size(); i++) {
int actIdx = i;
int nextIdx = actIdx + 1;
if (i == nodes.size() - 1) {
nextIdx = -1;
}
logger.info("Start deal node" + (i + 1) + " and node" + (nextIdx + 1) + " data!");
dealResultMap = dealResult(actIdx, nextIdx, list);
//链路单个节点时延 PM-11-47-01-03 ping节点n次的最小时延
collResult.addKPI(dealResultMap.get("ACT_NODE_UNIT_ID"), "PM-11-47-01-03", dealResultMap.get("ACT_NODE_MIN_DELAY_TIME"));
String linkUnitId = dealResultMap.get("LINK_UNIT_ID");
if (!isNull(linkUnitId)) {//如果当前节点不是最后一个节点
String linkNodeDelayTime = dealResultMap.get("LINK_NODE_DELAY_TIME");
//链路时延 PM-11-47-01-01 链路节点间时延
collResult.addKPI(linkUnitId, "PM-11-47-01-01", linkNodeDelayTime);
if ("-1".equals(linkNodeDelayTime)) {
linkTotalTime = -1;
brkNodeIndex = nextIdx;
logger.info("[Link Coll] Link " + linkId + " is broken, broken node index is :" + (nextIdx + 1));
break;
}
linkTotalTime += Double.parseDouble(linkNodeDelayTime);
}
}
//链路总时延 PM-11-47-01-02
collResult.addKPI(linkId, "PM-11-47-01-02", "" + (linkTotalTime==-1?"-1":(double)Math.round(linkTotalTime*1000)/1000));
//如果链路从中间断开,处理后续节点间时延数据
if (0 != brkNodeIndex && -1 != brkNodeIndex) {
collResult.addKPISet(dealBrokenNodesData(brkNodeIndex, list).getKPISet());
}
} catch (Exception e) {
logger.error("Exception while coll link:" + linkId, e);
}
return collResult.getKPISet();
}
/**
* 处理从断掉的节点开始的数据
* @param brkNodeIdx
* @param list
* @return
*/
private CollBase dealBrokenNodesData(int brkNodeIdx, List<List<String>> list){
CollBase collResult = new CollBase();
logger.info("[Link Coll] Start deal broken link!");
for (int i = brkNodeIdx; i < nodes.size(); i++) {
int nextIdx = i + 1;
if (i == nodes.size() - 1) {
nextIdx = -1;
}
Map<String,String> dealResultMap = null;
dealResultMap = dealResult(i, nextIdx, list);
String actDeviceId = dealResultMap.get("ACT_NODE_UNIT_ID");
logger.info("[Link Coll] Node:"+actDeviceId);
collResult.addKPI(actDeviceId, "PM-11-47-01-03", "-1");//当前节点间时延
Map<String, String> brkNodesData = dealResult(i, nextIdx, list);
if(-1 != nextIdx){
String linkUnitId = brkNodesData.get("LINK_UNIT_ID");
logger.info("[Link Coll] Broken Node:"+linkUnitId);
//链路时延 PM-11-47-01-01
collResult.addKPI(linkUnitId, "PM-11-47-01-01", "-1");//链路节点间时延
}
}
logger.info("[Link Coll] End deal broken link!");
return collResult;
}
/**
* 解析每个节点的ping结果
*
* @param nodeResList
* @return
*/
private Map<String, String> parseNodeResult(int idx, List<String> nodeResList) {
Map<String, String> node = new HashMap<String, String>();
try {
for (String str : nodeResList) {
//SEQ
if (str.indexOf("SEQ") >= 0) {//获得序号
String[] seqArr = str.trim().split("=");
if (seqArr.length < 2 || null == seqArr[1] || "".equals(seqArr[1])) {
node.put("SEQ", "");
continue;
}
node.put("SEQ", seqArr[1]);
} else if (str.indexOf("DEVICE_ID") >= 0) {//获得设备deviceId
String[] devArr = str.trim().split("=");
if (devArr.length < 2 || null == devArr[1] || "".equals(devArr[1])) {
throw new RuntimeException("Exception while parse link_coll[" + linkId + "] the " + idx + " node!");
}
node.put("DEVICE_ID", devArr[1]);
} else if (str.indexOf("packets transmitted") >= 0) {//获得丢包率
String[] lostPacketsArr = str.trim().split(",");
for (String part : lostPacketsArr) {
if (part.trim().indexOf("packet loss") > 0) {
String[] lostArr = part.trim().split("%");
node.put("PACKET_LOST", lostArr[0].trim());
}
}
} else if (str.indexOf("min/avg/max") >= 0) {//获得ping的时延
String tmp = str.substring(str.indexOf("min"), str.indexOf("ms"));
String[] tmpArr = tmp.trim().split("=");
String[] paramArr = tmpArr[0].trim().split("/");
String[] paramValueArr = tmpArr[1].trim().split("/");
if (paramArr.length != paramValueArr.length) {
throw new RuntimeException("ParamArray length != ParamValueArray length");
}
for (int i = 0; i < paramArr.length; i++) {
String paramName = paramArr[i];
if ("min".equals(paramName)) {
String paramValue = paramValueArr[i].trim();
if (isNull(paramValue)) {
throw new RuntimeException("ping node " + idx + " min is null");
}
node.put("MIN_TIME", paramValue);
} else if ("max".equals(paramName)) {
String paramValue = paramValueArr[i].trim();
if (isNull(paramValue)) {
throw new RuntimeException("ping node " + idx + " max is null");
}
node.put("MAX_TIME", paramValue);
}
}
}
}
} catch (Exception e) {
logger.error("Exception while parseNodeResult[node " + idx + "]", e);
}
return node;
}
private boolean isNull(String str) {
boolean flag = false;
if (null == str || "".equals(str.trim())) {
flag = true;
}
return flag;
}
/**
* 处理各节点的数据
*
* @param actIdx
* @param nextIdx
* @param list
* @return
*/
private Map<String, String> dealResult(int actIdx, int nextIdx, List<List<String>> list) {
Map<String, String> map = new HashMap<String, String>();
List<String> actResList = list.get(actIdx);
List<String> nextResList = null;
Map<String, String> actNode = parseNodeResult(actIdx, actResList);
Map<String, String> nextNode = null;
//当前节点
double actMinTime = Double.parseDouble(actNode.get("MIN_TIME") == null ? "-1" : actNode.get("MIN_TIME"));
double actMaxTime = Double.parseDouble(actNode.get("MAX_TIME") == null ? "-1" : actNode.get("MAX_TIME"));
String actDeviceId = actNode.get("DEVICE_ID");
String packetLost = actNode.get("PACKET_LOST");
map.put("ACT_NODE_UNIT_ID", linkId + ":" + actDeviceId);
map.put("ACT_NODE_MIN_DELAY_TIME", "" + (actMinTime==-1?"-1":actMinTime));
if (nextIdx == -1) {
return map;
}
//下一节点
nextResList = list.get(nextIdx);
nextNode = parseNodeResult(nextIdx, nextResList);
double nextMinTime = Double.parseDouble(nextNode.get("MIN_TIME") == null ? "-1" : nextNode.get("MIN_TIME"));
double nextMaxTime = Double.parseDouble(nextNode.get("MAX_TIME") == null ? "-1" : nextNode.get("MAX_TIME"));
String nextDeviceId = nextNode.get("DEVICE_ID");
String nextPacketLost = nextNode.get("PACKET_LOST");
//组装链路unit_id
String linkUnitId = linkId + ":" + actDeviceId + ">" + nextDeviceId;
String linkNodeDelayTime = "";
map.put("LINK_UNIT_ID", linkUnitId);
//计算节点间时延
double tmpTime = (nextMaxTime > actMaxTime ? nextMaxTime - actMaxTime : actMaxTime - nextMaxTime);
linkNodeDelayTime = "" + (double) Math.round(tmpTime*1000)/1000;
if ("100".equals(packetLost) || "100".equals(nextPacketLost)) {
linkNodeDelayTime = "-1";
}
map.put("LINK_NODE_DELAY_TIME", linkNodeDelayTime);
return map;
}
/**
* 生成notice文件,通知crontab执行脚本
*/
private void writeNoticeFile() {
String noticePath = AgentProperties.AGENT_HOME + "/notice/ibm/";
String noticeFileName = noticePath + shellName;
logger.info("Start create notice file[" + noticeFileName + "]");
PrintWriter kpiFileStream = null;
try {
kpiFileStream = new PrintWriter(new FileWriter(Formater.replaceSpace(noticeFileName), false), true);
kpiFileStream.println(shellName);
} catch (IOException e) {
logger.error("[Link COLL] Exception while exec(" + shellName + ")", e);
} finally {
if (kpiFileStream != null) {
kpiFileStream.flush();
kpiFileStream.close();
}
}
logger.info("End create notice file[" + noticeFileName + "]");
}
/**
* 写入ping脚本文件
*
* @param shellContent
*/
private void writeShellFile(String shellContent) {
writeFile(shellName, shellContent);
}
private void writeFile(String shellName, String content) {
try {
String shellPath = LinkCollConst.SCRIPT_PATH;
// String shellPath = "E:\\";
FileUtils fu = new FileUtils();
if (fu.isExists(shellPath, shellName)) {
fu.delFile(shellPath, shellName);
}
FileUtils.writeFile(shellPath, shellName, content, false);
} catch (Exception e) {
logger.error("Exception while writeFile[" + linkId + "]", e);
}
}
/**
* 根据不同的操作系统,获取不同的ping命令
*
* @param osType
* @return
*/
private String getPingCMDByOS(String osType) {
String pingCMD = "";
if (osType.indexOf("SUN") > 0) {
pingCMD = LinkCollConst.PING_SUN;
} else if (osType.indexOf("HP") > 0) {
pingCMD = LinkCollConst.PING_HP;
} else {
pingCMD = LinkCollConst.PING_LINUX;
}
return pingCMD;
}
/**
* @param hops
* @return
*/
private String createPingShell(Map<Integer, Map<String, String>> hops) {
String shellContent = "";
StringBuffer sb = new StringBuffer("");
try {
sb.append(LinkCollConst.SH_TOP);
String pingCMD = getPingCMDByOS(osType);
for (int i = 0; i < hops.size(); i++) {
Map<String, String> actNode = hops.get(i + 1);
String actSeqNo = String.valueOf(i + 1);
String actDeviceId = actNode.get("DEVICE_ID");
String actIpAddr = actNode.get("IP_ADDR");
sb.append(LinkCollConst.ECHO_STR_TO_RESFILE.replaceFirst("#STR_CONTENT#", "SEQ=" + actSeqNo).replaceFirst("#LINK_ID#", linkId)).append("\n");
sb.append(LinkCollConst.ECHO_STR_TO_RESFILE.replaceFirst("#STR_CONTENT#", "DEVICE_ID=" + actDeviceId).replaceFirst("#LINK_ID#", linkId)).append("\n");
sb.append(pingCMD.replaceFirst("#IP_ADDR#", actIpAddr).replaceFirst("#PING_COUNT#", pingCount).replaceFirst("#LINK_ID#", linkId)).append("\n");
sb.append(LinkCollConst.ECHO_STR_TO_RESFILE.replaceFirst("#STR_CONTENT#", LinkCollConst.SPLIT_LINE).replaceFirst("#LINK_ID#", linkId)).append("\n");
sb.append("\n");
}
sb.append(LinkCollConst.MV_RESULT.replaceFirst("#RESULT_TEMP_PATH#", LinkCollConst.RESULT_TEMP_PATH + "/" + resultFile).replaceFirst("#RESULT_PATH#", LinkCollConst.RESULT_PATH));
} catch (Exception e) {
e.printStackTrace();
logger.error("Exception while createPingShell[" + linkId + "]");
}
return sb.toString();
}
/**
* 初始化链路参数
*
* @param params
*/
private void init(HashMap<String, String> params) {
try {
linkId = params.get("LINK_ID");
osType = System.getProperty("os.name").toUpperCase();
interval = params.get("COLL_INTERVAL");
//默认ping5次
pingCount = params.get("PING_COUNT") == null ? "5" : params.get("PING_COUNT");
resultFile = linkId + ".txt";
shellName = "getPing" + linkId + ".sh";
String json = params.get("JSON_STR");
logger.info("Coll JsonString : " + json);
JSONArray arr = (JSONArray) JSONUtil.fromJSON(json);
if (null == arr || arr.size() < 1) {
throw new Exception("JSONArray size is 0");
}
for (int i = 0; i < arr.size(); i++) {
JSONObject obj = (JSONObject) arr.get(i);
Integer seqNo = Integer.valueOf(String.valueOf(obj.get("SEQ_NO")));
nodes.put(seqNo, obj);
}
} catch (Exception e) {
logger.error("Exception while init link task:" + JSONUtil.toJSON(params), e);
}
}
/**
* 获得脚本执行结果
*
* @throws
* @since Ver 1.1
*/
private List<String> getResult(String resultFileName) {
logger.info("Start parse result file[" + resultFileName + "]");
try {
Thread.sleep(20 * 1000L);
} catch (InterruptedException e) {
logger.warn("[Link COLL] InterruptedException.");
}
StringBuffer sb = new StringBuffer();
// 超时时间:5分钟
long timeout = 5 * 60 * 1000L;
List<String> result = new ArrayList<String>();
long beginTime = new Date().getTime();
File file = new File(LinkCollConst.RESULT_PATH);
while (true) {
if (new Date().getTime() - beginTime > timeout) {
// 超时退出
logger.warn("[Link COLL] Get file[" + resultFileName + "] timeout, exit.");
break;
}
File[] filelist = file.listFiles();
if (filelist == null || filelist.length == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
logger.error("[Link COLL] InterruptedException", e);
}
continue;
}
File resultFile = null;
// 根据文件名找到采集文件
for (int i = 0; i < filelist.length; i++) {
if (filelist[i].getName().startsWith(resultFileName)) {
resultFile = filelist[i];
break;
}
}
if (resultFile != null) {
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(resultFile));
String line = "";
while ((line = br.readLine()) != null) {
result.add(line);
sb.append(line + "\n");
}
break;
} catch (Exception e) {
logger.error("[Link COLL] Exception while readFile:" + resultFileName, e);
} finally {
if (br != null) {
try {
br.close();
} catch (IOException e) {
logger.error("[Link COLL] IOException:", e);
}
}
logger.info("[Link COLL] Delete coll result file : " + resultFile.getName());
logger.info(LinkCollConst.RESULT_PATH + ":\n" + sb.toString());
resultFile.delete();
}
}
}// end while
logger.info("End parse result file[" + resultFileName + "]");
return result;
}
/**
* 解析每个命令的执行结果
*
* @throws
* @since Ver 1.1
*/
private List<List<String>> parseResult(List<String> result, int num) {
List<List<String>> list = new ArrayList<List<String>>();
logger.info("[Link Coll:" + linkId + "] Start parseResult...");
for (int i = 0; i < num; i++) {
list.add(new ArrayList<String>());
}
int i = 0;
for (String line : result) {
if (line.trim().equals(LinkCollConst.SPLIT_LINE)) {
i++;
continue;
} else if (line.trim().endsWith(LinkCollConst.SPLIT_LINE)) {
line = line.replace(LinkCollConst.SPLIT_LINE, "");
list.get(i).add(line);
i++;
} else {
list.get(i).add(line);
}
}
logger.info("[Link Coll:" + linkId + "] End parseResult...");
return list;
}
}