CronLogScan.java
11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
package com.sitech.jmx.manage;
import com.sitech.base.AgentProperties;
import com.sitech.ismp.check.mbean.ShellExecutorUtil;
import com.sitech.ismp.coll.cron.CronConstants;
import com.sitech.ismp.messageObject.ScheduleLog;
import com.sitech.util.Formater;
import com.sitech.util.JSONUtil;
import com.sitech.util.mq.DataTunnel;
import com.sitech.util.mq.MQConstants;
import com.sitech.util.mq.TunnelFactory;
import org.apache.log4j.Logger;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;
/**
* Created with IntelliJ IDEA.
* User: Administrator
* Date: 14-1-17
* Time: 上午10:35
* To change this template use File | Settings | File Templates.
*/
public class CronLogScan implements Runnable{
private static Logger logger = Logger.getLogger("COLL");
private static Map<String,ScheduleLog> logDetail = new HashMap<String, ScheduleLog>();
private static String LOG_PATH = AgentProperties.AGENT_HOME+ "/cronlogs";
private static String NOTICE_TMP_PATH = AgentProperties.AGENT_HOME + "/script/cron/cron_tmp/";
private static String NOTICE_BUSI_TMP_PATH = AgentProperties.AGENT_HOME + "/script/busi/cron_tmp/";
private static long lastCleanTime = 0L;
private boolean isRun = true;
@Override
public void run() {
logger.info("[CronLogScan] Start CronLogScan...");
File dir = new File(LOG_PATH);
if(!dir.exists()){
logger.warn("[CronLogScan] " + LOG_PATH + " is not exists, exit");
}
while(isRun()){
try {
File[] logFile = dir.listFiles();
if (null == logFile || 0 == logFile.length) {
logger.debug("[CronLogScan] 0 file exist, wait 3 seconds...");
waitIt(3000);
continue;
}
// 每次处理100个文件
for (int i = 0; i < logFile.length / 100 + 1; i++) {
for (int j = i * 100; j < (i + 1) * 100 && j < logFile.length; j++) {
logger.info("[CronLogScan] Begin parse file " + logFile[j].getName() + "...");
if(!logFile[j].canWrite()){
continue;
}
try {
parseFile(logFile[j]);
}catch (Exception e){
logger.error("[CronLogScan] Exception while parse file " + logFile[j].getName(), e);
}finally {
logFile[j].delete();
}
logger.info("[CronLogScan] End parse file " + logFile[j].getName() + ", logDetail's size = " + logDetail.size());
}
}
// 每天执行一次
if(0L == lastCleanTime || System.currentTimeMillis() - lastCleanTime > 24 * 60 * 60 * 1000L){
cleanNoticeFile();
lastCleanTime = System.currentTimeMillis();
}
}catch (Exception e){
logger.error("[CronLogScan] Exception while run CronLogScan", e);
}
}
}
/**
* 删除超过1天的无用文件
*/
private void cleanNoticeFile() {
long cleanTimeFrom = System.currentTimeMillis() - 24 * 60 * 60 * 1000L;
// 清理crontab调度任务的notice文件
cleanDir(NOTICE_TMP_PATH, cleanTimeFrom);
// 清理shell采集的notice文件
cleanDir(NOTICE_BUSI_TMP_PATH, cleanTimeFrom);
}
private void cleanDir(String path, long cleanTimeFrom) {
try {
logger.info("[CronLogScan] Begin Clean Dir: " + path);
File dir = new File(path);
if (dir.exists() && dir.isDirectory()) {
File[] tmpFile = dir.listFiles();
for (int i = 0; i < tmpFile.length; i++) {
if (!tmpFile[i].isDirectory() && tmpFile[i].canWrite() && tmpFile[i].lastModified() < cleanTimeFrom) {
tmpFile[i].delete();
logger.info("[CronLogScan] Delete " + tmpFile[i].getAbsolutePath());
}
}
}
logger.info("[CronLogScan] End Clean Dir: " + path);
} catch (Exception e) {
logger.error("[CronLogScan] Exception while cleanDir " + path, e);
}
}
/**
* 解析日志文件
* @param file
*/
private void parseFile(File file) {
List<String> tmpList = readFile(file);
ScheduleLog schLog = parseLog(file, tmpList);
logger.info("[CronLogScan] New ScheduleLog: " + JSONUtil.toJSON(schLog));
ScheduleLog befLog = logDetail.get(schLog.getSchId());
if (null == befLog) {
// 保存到缓存中
logDetail.put(schLog.getSchId(), schLog);
return;
}
if (file.getName().startsWith("CCCP_")) {
// 集中控制分布式执行器
new ShellExecutorUtil().parseResult(schLog, befLog);
} else {
sendSchRst(schLog, befLog);
}
logDetail.remove(schLog.getSchId());
}
/**
* 分析标准输出日志和错误输出日志,并上报
* @param schLog 标准输出日志/错误输出日志
* @param befLog 标准输出日志/错误输出日志
*/
private void sendSchRst(ScheduleLog schLog, ScheduleLog befLog) {
// 脚本执行状态,0:失败 1:成功
String scrStatus = getScrStatus(schLog, befLog);
String endInfo = getEndInfo(scrStatus, schLog, befLog);
ScheduleLog endLog = copy(schLog);
endLog.setLogInfo(endInfo);
endLog.setLogType("0");
endLog.setScrStatus(scrStatus);
befLog.setScrStatus(scrStatus);
schLog.setScrStatus(scrStatus);
sendMsg(endLog);
sendMsg(befLog);
sendMsg(schLog);
}
private void sendMsg(ScheduleLog schLog) {
DataTunnel cronLogTunnel = TunnelFactory.getTunnel(MQConstants.Q_ROPORT_FROM_AGENT);
cronLogTunnel.writeData(schLog);
}
private ScheduleLog copy(ScheduleLog schLog) {
ScheduleLog log = new ScheduleLog();
log.setType("std_out");
log.setSchId(schLog.getSchId());
log.setRequestId(schLog.getRequestId());
log.setSeq(String.valueOf(Integer.parseInt(schLog.getSeq()) - 1));
log.setOperateType(schLog.getOperateType());
log.setTrigerType(schLog.getTrigerType());
log.setAgentId(schLog.getAgentId());
log.setEndTime(schLog.getEndTime());
log.setLogType(schLog.getLogType());
return log;
}
private String getEndInfo(String scrStatus, ScheduleLog schLog, ScheduleLog befLog) {
if(!CronConstants.OPERATE_TYPE_ATTACH_FILE.equals(schLog.getOperateType())){
// 非附件下发
if(CronConstants.SCRIPT_EXEC_SUCCESS.equals(scrStatus)){
return "脚本执行成功!";
} else if(CronConstants.SCRIPT_EXEC_FAILED.equals(scrStatus)){
return "脚本执行失败!";
} else {
return "脚本执行状态未知!";
}
}
// 分析附件下发的结果
logger.info("Start get ATTACH_UPLOAD_FILE_STATUS ...");
String downLoadFlag = "";
if (null != befLog.getLogInfo()) {
String[] befArr = befLog.getLogInfo().split("\t");
for (String info : befArr) {
if (info.startsWith("ATTACH_UPLOAD_FILE_STATUS")) {
downLoadFlag = info.split("=")[1];
break;
}
}
}
if(null!=schLog.getLogInfo()){
String[] logArr = schLog.getLogInfo().split("\t");
for(String info : logArr){
if(info.startsWith("ATTACH_UPLOAD_FILE_STATUS")){
downLoadFlag = info.split("=")[1];
break;
}
}
}
logger.info("ATTACH_UPLOAD_FILE_STATUS======"+downLoadFlag);
if("1".equals(downLoadFlag)){
return "附件下发成功!";
}else if("0".equals(downLoadFlag)){
return "附件下发失败!";
} else {
return "附件下发状态未知!";
}
}
/**
* 脚本执行状态
* @param schLog
* @param befLog
* @return 0:失败 1:成功
*/
private String getScrStatus(ScheduleLog schLog, ScheduleLog befLog) {
if ("err_out".equals(schLog.getType()) && !"".equals(schLog.getLogInfo())) {
return CronConstants.SCRIPT_EXEC_FAILED;
} else if ("err_out".equals(befLog.getType()) && !"".equals(befLog.getLogInfo())) {
return CronConstants.SCRIPT_EXEC_FAILED;
} else {
return CronConstants.SCRIPT_EXEC_SUCCESS;
}
}
private ScheduleLog parseLog(File file, List<String> tmpList) {
ScheduleLog log = new ScheduleLog();
log.setLogType(CronConstants.SCHEDULE_LOG_SCRIPT);
log.setEndTime(Formater.datetimeToString(new Date()));
if (file.getName().endsWith(".err")) {
log.setType("err_out");
} else {
log.setType("std_out");
}
// 保存日志信息
StringBuffer sb = new StringBuffer();
Map<String,Object> extInfo = new HashMap<String, Object>();
for(String line : tmpList){
if(line.startsWith("AGENT_HOME") || line.startsWith("SHELL_NAME")){
continue;
} else if(line.startsWith("AGENT_ID")){
log.setAgentId(line.trim().split("=")[1]);
} else if(line.startsWith("REQUEST_ID")){
log.setRequestId(line.trim().split("=")[1]);
} else if(line.startsWith("SCHEDULE_ID")){
log.setSchId(line.trim().split("=")[1]);
} else if(line.startsWith("OPERATE_TYPE")){
log.setOperateType(line.trim().split("=")[1]);
} else if(line.startsWith("TRRIGGER_TYPE")){
log.setTrigerType(line.trim().split("=")[1]);
} else if(line.startsWith("SEQ")){
int seqNo = Integer.valueOf(line.trim().split("=")[1]) + 1;
log.setSeq(String.valueOf(seqNo));
} else if(line.startsWith("CURRENT_TIME")){
String curTime = line.trim().split("=")[1];
extInfo.put("SH_EXEC_TIME",curTime);
} else {
sb.append(line+"\t");
}
}
log.setLogInfo(sb.toString().trim());
log.setExtInfo(extInfo);
return log;
}
/**
* 读日志文件保存到List中
* @param file
* @return
*/
private List<String> readFile(File file) {
List<String> rst = new ArrayList<String>();
BufferedReader br = null;
try {
br = new BufferedReader(new FileReader(file));
String line = null;
while ((line = br.readLine()) != null) {
rst.add(line);
}
} catch (Exception e) {
throw new RuntimeException("Exception while read file " + file.getName());
}finally {
if(br != null){
try {
br.close();
} catch (IOException e) {
logger.error("Exception while close BufferedReader " + file.getName(), e);
}
}
}
return rst;
}
public boolean isRun() {
return isRun;
}
public void setRun(boolean run) {
isRun = run;
}
private void waitIt(long timeout) {
try {
Thread.sleep(timeout);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
}