SqlCollUtil.java 10.5 KB
package com.sitech.ismp.coll.busi;

import java.sql.*;
import java.util.*;

import net.sf.json.JSONArray;
import net.sf.json.JSONObject;

import org.apache.log4j.Logger;

import com.sitech.base.AgentProperties;
import com.sitech.ismp.coll.CollBase;
import com.sitech.ismp.coll.basic.TblATO_KPIDETAIL;
import com.sitech.util.DES3;
import com.sitech.util.JSONUtil;

/** 
 * ClassName:SqlCollThread
 * Description: SQL采集,采集结果写SWAP文件
 *
 * @author   Linxc
 * @version  
 * @since    Ver 1.1
 * @Date	 2012	Feb 17, 2012		12:25:53 PM
 */
public class SqlCollUtil implements Runnable {
	private static Logger logger = Logger.getLogger("BUSI_COLL");

    private String url = null;
	
	private Connection conn = null;
	
	/** 采集参数 */
	private HashMap<String, String> params;
	
	/** 主键对应的列名 */
	private String pkColumnName = "";
	
	/** 
	 * 数据来源 IP:PORT:SID
	 */
	private String dataOrig;

	public SqlCollUtil(HashMap<String, String> params) {
		this.params = params;
	}

	public void run() {
		logger.info("-- [SQL_COLL] Begin exe collBySql...");

		try {
			init();
			
			if (this.conn == null) {
				return;
			} else {
				logger.info("-- [SQL_COLL] Get JDBC Connection Success!");
			}

			// key: ColumnName, value: KPI_ID
			Map<String, String> kpiMap = getKpiMap();

            // 获取最终采集sql
			String collSql = getCollSql();
			logger.info("-- [SQL_COLL] Collect By Sql, url=" + this.url + ", sql:\n" + collSql);
			collBySql(collSql, kpiMap);
		} catch (Exception e1) {
			logger.error("Exception while excute SqlCollThread :" + JSONUtil.toJSON(this.params), e1);
		} finally {
			if (null != conn) {
				try {
					conn.close();
				} catch (SQLException e) {
                    logger.error("Exception while close jdbc connection:" + this.url, e);
				}
			}
		}		
		logger.info("-- [SQL_COLL] End collBySql...");
	}

	private void init() throws Exception {
        this.url = params.get("URL");

		String className = params.get("CLASSNAME");
		String userName = params.get("USERNAME");
		String password = params.get("PASSWORD");
        logger.info("URL is :" + userName + "/" + password + "@" + this.url);
		password = DES3.decrypt(password);
		
		Class.forName(className);
		this.conn = DriverManager.getConnection(url, userName, password);
		
		this.dataOrig = AgentProperties.AGENT_ID + "#" + url.substring(url.indexOf("@") + 1);
	}

	private void collBySql(String collSql, Map<String, String> kpiMap) {
		CollBase collResult = new CollBase();
		String kbpClass = params.get("KBPCLASS");
		String interval = params.get("COLL_INTERVAL");
		
		Statement stmt = null;
		ResultSet rs = null;
		try {
			stmt = conn.createStatement();	
			rs = stmt.executeQuery(collSql);
			while (rs.next()) {
				String unitId = kbpClass;
				if (pkColumnName != null && !pkColumnName.trim().equals("")) {
					unitId += ":" + rs.getObject(pkColumnName);
				}

				String kpiId = "";
				String kpiValue = "";
                String extInfo = AgentProperties.AGENT_ID+"##"+params.get("SCHEDULE_ID")+"##"+params.get("DSTUNITID");
				
				for(String columnName : kpiMap.keySet()){					
					kpiId = kpiMap.get(columnName);
					kpiValue = rs.getString(columnName);
					collResult.addKPI(unitId, kpiId, kpiValue, interval, extInfo);
					logger.info(unitId + "	" + kpiId + "	" + kpiValue);
				}				
			}
			
			collResult.saveKPI2File();
		} catch (Exception e) {
			logger.error("-- [SQL_COLL]: Exception while collBySql.", e);
		} finally {
			try {
				if (rs != null) {
					rs.close();
				}
				if (stmt != null) {
					stmt.close();
				}
			} catch (SQLException e) {
				logger.error("-- [SQL_COLL]: Exception while collBySql.", e);
			}
		}

	}

    /**
     *获取拼接sql各种情况返回sql集合
     * @param lastList 上一次拼接得到的集合
     * @param valueList 变量执行后结果集
     * @param key 变量名
     * @return 拼接各种情况结果集
     */
    private List getMsql(List<String> lastList,List valueList,String key){
        List newList = new ArrayList();
        for(String sql :lastList){
            for(int i=0;i<valueList.size();i++){
                newList.add(sql.replace(key, valueList.get(i).toString()));
            }
        }
        return newList;
    }
    /**
     * 获取参数运行后结果
     * @return
     */
    private Map<String, List<String>> getValueMap() throws Exception {
        PreparedStatement pstmt = null;
        ResultSet rset = null;
        ResultSetMetaData rsmd = null;
        String _paramsString = params.get("PARAMS");
        List paramsList = (List) JSONUtil.fromJSON(_paramsString);
        Map<String, List<String>> paramsMap = new HashMap<String, List<String>>();
        try {
            for (int i = 0; i < paramsList.size(); i++) {
                Map tempMap = (Map) paramsList.get(i);
                String varSql = tempMap.get("value").toString();
                //指针可移动
                pstmt = conn.prepareStatement(varSql, ResultSet.TYPE_SCROLL_INSENSITIVE, ResultSet.CONCUR_READ_ONLY);
                rset = pstmt.executeQuery();
                rsmd = rset.getMetaData();

                //指针移动到最后一条记录
                rset.last();
                //获取有几条记录
                int rows = rset.getRow();
                //指针移回
                rset.beforeFirst();

                if (rows == 1 && rsmd.getColumnCount() >= 2) {
                    //单行多列
                    for (int j = 1; j < rsmd.getColumnCount() + 1; j++) {
                        //第一次进入时,移动指针到第一条记录
                        if (j == 1) {
                            rset.next();
                        }
                        List<String> tempList = new ArrayList<String>();
                        tempList.add(rset.getString(j));
                        //key:#varName#.columnLabel(大写) value:list
                        paramsMap.put("#" + tempMap.get("varName") + "#." + rsmd.getColumnLabel(j), tempList);
                    }
                } else {
                    //单列单行/单列多行
                    List<String> tempList = new ArrayList<String>();
                    while (rset.next()) {
                        String value = rset.getString(1);
                        tempList.add(value);
                    }
                    //key:#varName# value:list
                    paramsMap.put("#" + tempMap.get("varName") + "#", tempList);
                }
            }
        } catch (Exception e) {
            throw new Exception("获取参数运行结果出错", e);
        }
        return paramsMap;
    }

    /**
	 * 返回最终的采集SQL
	 * @throws Exception 
	 */
	private String getCollSql() throws Exception {
		String collSql = this.params.get("SQL");
		logger.info("collSql1:"+collSql);
        //去掉用户输入时可能会输入的“;”
		collSql = collSql.replace(";", "");
        //1.获得变量sql运行后的结果 map<String,List>  key:varName value:valueList
        Map<String,List<String>> paramsMap = getValueMap();
        //获取拼装前的M_SQL
        String mSql = this.params.get("MSQL");
        // 2. 保存组装好的中间sql
        String USQL = new String();
        if(collSql.contains("{{M_SQL}}") && mSql != null && !"".equals(mSql.trim())){
            //中间sql
            List<String> sqlList = new ArrayList();
            sqlList.add(mSql);
            //若存在中间sql则替换
            for(String varName : paramsMap.keySet()){
                List<String> valueList = paramsMap.get(varName);
                //获取拼sql后的各种情况集合
                sqlList = getMsql(sqlList,valueList,varName);
            }
            //组装中间sql
            for(int i =0;i<sqlList.size();i++){
                if(i==sqlList.size()-1){
                    USQL = USQL+ sqlList.get(i);
                }else{
                    USQL = USQL+ sqlList.get(i) + " union all ";
                }
            }
        }
        // 3. 将最终sql中的M_SQL替换为组装好的中间sql
        collSql = collSql.replace("{{M_SQL}}","("+USQL+")");
        //4.替换采集sql中的变量
        for(String varName:paramsMap.keySet()){
            List<String> valList = paramsMap.get(varName);
            //最终sql中不允许有多个结果的变量,否则报错
            if(collSql.contains(varName) && valList.size()>1){
                logger.warn("-- [SQL_COLL]: param." + varName + " return too many value!");
            }
            collSql = collSql.replace(varName,valList.get(0)==null?"":valList.get(0));
        }
        logger.info("collSql3:"+collSql);
		return collSql;
	}

	/**
	 * 返回每列的列名与其对应的KPI_ID
	 * key: ColumnName,value: KPI_ID
	 */
	private Map<String, String> getKpiMap() {
		Map<String, String> kpiMap = new HashMap<String, String>();
		
		String kpis = params.get("KPIS");
		JSONArray jsonArray = (JSONArray) JSONUtil.fromJSON(kpis);
		
		String logInfo = "KPI_MAP:\nColumn_Name\t\tKPI_ID\t\tKEY";

		for (int i = 0; i < jsonArray.size(); i++) {
			JSONObject temp = (JSONObject) jsonArray.get(i);
			// SQL执行结果中的列名
			String columnName = temp.getString("columnName");

			// 主键列名
			String columnAttr = temp.getString("columnAttr");

			// 指标ID
			String kpiId = temp.getString("kpiId");

			if (columnName.equals(columnAttr)) {
				pkColumnName = columnName;
				logInfo += "\n" + columnName + "\t\t" + columnAttr + "\t\tY";
			} else {
				logInfo += "\n" + columnName + "\t\t" + columnAttr + "\t\tN";
			}

			if (kpiId == null || kpiId.equals("")) {
				continue;
			}
			kpiMap.put(columnName, kpiId);
		}
		logger.info(logInfo);
		return kpiMap;
	}
	
	public static void main(String[] args) {
		String kpis = "[{columnName:'A',columnAttr:'A',kpiName:'测试SQL_KPI',kpiId:'PM-10-00-00-07'},{columnName:'B',columnAttr:'0',kpiName:'323232',kpiId:'PM-01-2332'},{columnName:'DATE1',columnAttr:'0',kpiName:'aa',kpiId:'PM-10-00-99'}]";
		HashMap<String, String> params = new HashMap<String, String>();
		params.put("CLASSNAME", "oracle.jdbc.driver.OracleDriver");
		params.put("KBPCLASS", "11-15-19");
		params.put("KPIS", kpis);
		params.put("PASSWORD", "2b71c7b672499ed7b764792a49aa82819463e1a7a5d1d0ed");

		params.put("SQL", "select 'aaa' a ,'bbb' b,'#DATE#' as DATE1 from dual");
		params.put("URL", "jdbc:oracle:thin:@172.21.0.77:1527:bnms");

		params.put("USERNAME", "bnms15");

		params.put("PARAMS", "[{varName:'DATE',value:'select sysdate from dual'}]");

		Vector<TblATO_KPIDETAIL> rst = new SqlColl().collBySql(params);
		for(TblATO_KPIDETAIL d:  rst){
			System.out.println(d.toString());
		}
	}
}