CronLogScan.java 11.7 KB
package com.sitech.jmx.manage;

import com.sitech.base.AgentProperties;
import com.sitech.ismp.check.mbean.ShellExecutorUtil;
import com.sitech.ismp.coll.cron.CronConstants;
import com.sitech.ismp.messageObject.ScheduleLog;
import com.sitech.util.Formater;
import com.sitech.util.JSONUtil;
import com.sitech.util.mq.DataTunnel;
import com.sitech.util.mq.MQConstants;
import com.sitech.util.mq.TunnelFactory;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.*;

/**
 * Created with IntelliJ IDEA.
 * User: Administrator
 * Date: 14-1-17
 * Time: 上午10:35
 * To change this template use File | Settings | File Templates.
 */
public class CronLogScan implements Runnable{
    private static Logger logger = Logger.getLogger("COLL");

    private static Map<String,ScheduleLog> logDetail = new HashMap<String, ScheduleLog>();

    private static String LOG_PATH = AgentProperties.AGENT_HOME+ "/cronlogs";

    private static String NOTICE_TMP_PATH = AgentProperties.AGENT_HOME + "/script/cron/cron_tmp/";

    private static String NOTICE_BUSI_TMP_PATH = AgentProperties.AGENT_HOME + "/script/busi/cron_tmp/";

    private static long lastCleanTime = 0L;

    private boolean isRun = true;

    @Override
    public void run() {
        logger.info("[CronLogScan] Start CronLogScan...");
        File dir = new File(LOG_PATH);
        if(!dir.exists()){
            logger.warn("[CronLogScan] " + LOG_PATH + " is not exists, exit");
        }

        while(isRun()){
            try {
                File[] logFile = dir.listFiles();

                if (null == logFile || 0 == logFile.length) {
                    logger.debug("[CronLogScan] 0 file exist, wait 3 seconds...");
                    waitIt(3000);
                    continue;
                }

                // 每次处理100个文件
                for (int i = 0; i < logFile.length / 100 + 1; i++) {
                    for (int j = i * 100; j < (i + 1) * 100 && j < logFile.length; j++) {
                        logger.info("[CronLogScan] Begin parse file " + logFile[j].getName() + "...");
                        if(!logFile[j].canWrite()){
                            continue;
                        }

                        try {
                            parseFile(logFile[j]);
                        }catch (Exception e){
                            logger.error("[CronLogScan] Exception while parse file " + logFile[j].getName(), e);
                        }finally {
                            logFile[j].delete();
                        }
                        logger.info("[CronLogScan] End parse file " + logFile[j].getName() + ", logDetail's size = " + logDetail.size());
                    }
                }

                // 每天执行一次
                if(0L == lastCleanTime || System.currentTimeMillis() - lastCleanTime >  24 * 60 * 60 * 1000L){
                    cleanNoticeFile();
                    lastCleanTime = System.currentTimeMillis();
                }

            }catch (Exception e){
                logger.error("[CronLogScan] Exception while run CronLogScan", e);
            }
        }
    }

    /**
     * 删除超过1天的无用文件
     */
    private void cleanNoticeFile() {
        long cleanTimeFrom = System.currentTimeMillis() - 24 * 60 * 60 * 1000L;

        // 清理crontab调度任务的notice文件
        cleanDir(NOTICE_TMP_PATH, cleanTimeFrom);
        // 清理shell采集的notice文件
        cleanDir(NOTICE_BUSI_TMP_PATH, cleanTimeFrom);
    }

    private void cleanDir(String path, long cleanTimeFrom) {
        try {
            logger.info("[CronLogScan] Begin Clean Dir: " + path);
            File dir = new File(path);
            if (dir.exists() && dir.isDirectory()) {
                File[] tmpFile = dir.listFiles();
                for (int i = 0; i < tmpFile.length; i++) {
                    if (!tmpFile[i].isDirectory() && tmpFile[i].canWrite() && tmpFile[i].lastModified() < cleanTimeFrom) {
                        tmpFile[i].delete();
                        logger.info("[CronLogScan] Delete " + tmpFile[i].getAbsolutePath());
                    }
                }
            }
            logger.info("[CronLogScan] End Clean Dir: " + path);
        } catch (Exception e) {
            logger.error("[CronLogScan] Exception while cleanDir " + path, e);
        }
    }


    /**
     * 解析日志文件
     * @param file
     */
    private void parseFile(File file) {
        List<String> tmpList = readFile(file);

        ScheduleLog schLog = parseLog(file, tmpList);

        logger.info("[CronLogScan] New ScheduleLog: " + JSONUtil.toJSON(schLog));
        ScheduleLog befLog = logDetail.get(schLog.getSchId());

        if (null == befLog) {
            // 保存到缓存中
            logDetail.put(schLog.getSchId(), schLog);
            return;
        }

        if (file.getName().startsWith("CCCP_")) {
            // 集中控制分布式执行器
            new ShellExecutorUtil().parseResult(schLog, befLog);
        } else {
            sendSchRst(schLog, befLog);
        }


        logDetail.remove(schLog.getSchId());
    }

    /**
     * 分析标准输出日志和错误输出日志,并上报
     * @param schLog 标准输出日志/错误输出日志
     * @param befLog 标准输出日志/错误输出日志
     */
    private void sendSchRst(ScheduleLog schLog, ScheduleLog befLog) {
        // 脚本执行状态,0:失败 1:成功
        String scrStatus = getScrStatus(schLog, befLog);

        String endInfo = getEndInfo(scrStatus, schLog, befLog);

        ScheduleLog endLog = copy(schLog);
        endLog.setLogInfo(endInfo);
        endLog.setLogType("0");
        endLog.setScrStatus(scrStatus);
        befLog.setScrStatus(scrStatus);
        schLog.setScrStatus(scrStatus);

        sendMsg(endLog);
        sendMsg(befLog);
        sendMsg(schLog);
    }

    private void sendMsg(ScheduleLog schLog) {
        DataTunnel cronLogTunnel = TunnelFactory.getTunnel(MQConstants.Q_ROPORT_FROM_AGENT);
        cronLogTunnel.writeData(schLog);
    }

    private ScheduleLog copy(ScheduleLog schLog) {
        ScheduleLog log = new ScheduleLog();
        log.setType("std_out");
        log.setSchId(schLog.getSchId());
        log.setRequestId(schLog.getRequestId());
        log.setSeq(String.valueOf(Integer.parseInt(schLog.getSeq()) - 1));
        log.setOperateType(schLog.getOperateType());
        log.setTrigerType(schLog.getTrigerType());
        log.setAgentId(schLog.getAgentId());
        log.setEndTime(schLog.getEndTime());
        log.setLogType(schLog.getLogType());
        return log;
    }

    private String getEndInfo(String scrStatus, ScheduleLog schLog, ScheduleLog befLog) {
        if(!CronConstants.OPERATE_TYPE_ATTACH_FILE.equals(schLog.getOperateType())){
            // 非附件下发
            if(CronConstants.SCRIPT_EXEC_SUCCESS.equals(scrStatus)){
                return "脚本执行成功!";
            } else if(CronConstants.SCRIPT_EXEC_FAILED.equals(scrStatus)){
                return "脚本执行失败!";
            } else {
                return "脚本执行状态未知!";
            }
        }

        // 分析附件下发的结果
        logger.info("Start get ATTACH_UPLOAD_FILE_STATUS ...");
        String downLoadFlag = "";
        if (null != befLog.getLogInfo()) {
            String[] befArr = befLog.getLogInfo().split("\t");
            for (String info : befArr) {
                if (info.startsWith("ATTACH_UPLOAD_FILE_STATUS")) {
                    downLoadFlag = info.split("=")[1];
                    break;
                }
            }
        }

        if(null!=schLog.getLogInfo()){
            String[] logArr = schLog.getLogInfo().split("\t");
            for(String info : logArr){
                if(info.startsWith("ATTACH_UPLOAD_FILE_STATUS")){
                    downLoadFlag = info.split("=")[1];
                    break;
                }
            }
        }
        logger.info("ATTACH_UPLOAD_FILE_STATUS======"+downLoadFlag);

        if("1".equals(downLoadFlag)){
            return "附件下发成功!";
        }else if("0".equals(downLoadFlag)){
            return "附件下发失败!";
        } else {
            return "附件下发状态未知!";
        }
    }


    /**
     * 脚本执行状态
     * @param schLog
     * @param befLog
     * @return 0:失败 1:成功
     */
    private String getScrStatus(ScheduleLog schLog, ScheduleLog befLog) {
        if ("err_out".equals(schLog.getType()) && !"".equals(schLog.getLogInfo())) {
            return CronConstants.SCRIPT_EXEC_FAILED;
        } else if ("err_out".equals(befLog.getType()) && !"".equals(befLog.getLogInfo())) {
            return CronConstants.SCRIPT_EXEC_FAILED;
        } else {
            return CronConstants.SCRIPT_EXEC_SUCCESS;
        }
    }

    private ScheduleLog parseLog(File file, List<String> tmpList) {
        ScheduleLog log = new ScheduleLog();
        log.setLogType(CronConstants.SCHEDULE_LOG_SCRIPT);
        log.setEndTime(Formater.datetimeToString(new Date()));
        if (file.getName().endsWith(".err")) {
            log.setType("err_out");
        } else {
            log.setType("std_out");
        }

        // 保存日志信息
        StringBuffer sb = new StringBuffer();
        Map<String,Object> extInfo = new HashMap<String, Object>();
        for(String line : tmpList){
            if(line.startsWith("AGENT_HOME") || line.startsWith("SHELL_NAME")){
                continue;
            } else if(line.startsWith("AGENT_ID")){
                log.setAgentId(line.trim().split("=")[1]);
            } else if(line.startsWith("REQUEST_ID")){
                log.setRequestId(line.trim().split("=")[1]);
            } else if(line.startsWith("SCHEDULE_ID")){
                log.setSchId(line.trim().split("=")[1]);
            } else if(line.startsWith("OPERATE_TYPE")){
                log.setOperateType(line.trim().split("=")[1]);
            } else if(line.startsWith("TRRIGGER_TYPE")){
                log.setTrigerType(line.trim().split("=")[1]);
            } else if(line.startsWith("SEQ")){
                int seqNo = Integer.valueOf(line.trim().split("=")[1]) + 1;
                log.setSeq(String.valueOf(seqNo));
            } else if(line.startsWith("CURRENT_TIME")){
                String curTime = line.trim().split("=")[1];
                extInfo.put("SH_EXEC_TIME",curTime);
            } else {
                sb.append(line+"\t");
            }
        }
        log.setLogInfo(sb.toString().trim());
        log.setExtInfo(extInfo);
        return log;
    }

    /**
     * 读日志文件保存到List中
     * @param file
     * @return
     */
    private List<String> readFile(File file) {
        List<String> rst = new ArrayList<String>();
        BufferedReader br = null;
        try {
            br = new BufferedReader(new FileReader(file));

            String line = null;
            while ((line = br.readLine()) != null) {
                rst.add(line);
            }
        } catch (Exception e) {
            throw new RuntimeException("Exception while read file " + file.getName());
        }finally {
            if(br != null){
                try {
                    br.close();
                } catch (IOException e) {
                    logger.error("Exception while close BufferedReader " + file.getName(), e);
                }
            }
        }
        return rst;
    }

    public boolean isRun() {
        return isRun;
    }

    public void setRun(boolean run) {
        isRun = run;
    }

    private void waitIt(long timeout) {
        try {
            Thread.sleep(timeout);
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }
}