E2ECollThread.java 5.17 KB
package com.sitech.ismp.coll.busi.e2e;

import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;

import org.apache.log4j.Logger;

import com.sitech.base.AgentProperties;
import com.sitech.ismp.coll.busi.e2e.dao.TbE2eFile2dbFinishTagDao;
import com.sitech.ismp.coll.busi.e2e.domain.TbE2eFile2dbFinishTag;
import com.sitech.ismp.coll.busi.e2e.util.SqlldrHandler;
import com.sitech.util.DES3;
import com.sitech.util.Formater;
import com.sitech.util.RandomGUID;
import com.sitech.util.SysHelper;
import com.sitech.util.upload.AFtpRemoteFile;
import com.sitech.util.upload.FTPSrv;

/**
 * 上海电信端到端业务监控数据采集
 * 步骤:
 * 	1. FTP登录远程主机,扫描文件目录是否存在数据文件
 *  2. 下载文件到本地$AGENT_HOME/data/目录
 *  3. 创建文件对应的SQL Loader控制文件,执行入库
 *  4. 入库完毕后,将文件入库标识保存到数据库
 * 
 * @author linxc
 * @version
 * @since Ver 6.1
 * @Date 2012 May 17, 2012 9:20:16 AM
 */
public class E2ECollThread implements Runnable {
	private Logger logger = Logger.getLogger("BUSI_COLL");
	
	private String localDataPath = AgentProperties.AGENT_HOME + "/data/";

	private Map<String, String> params;

	public E2ECollThread(Map<String, String> params) {
		this.params = params;
	}

	public void run() {
		List<String> fileList = ftpGetDataFile();

		if (fileList == null || fileList.size() == 0) {
			return;
		}

		for (String fileName : fileList) {
			parseToDb(fileName);
		}
		
		// 超时时间20分钟
		saveDataFileTag(fileList, 1200000);
	}

	/**
	 * 解析数据文件入库
	 */
	private void parseToDb(String fileName) {
		String fileType = params.get("FILECOLLTYPE");
		String username = params.get("JDBC.Username");
		String password = params.get("JDBC.Password");
		try {
			SqlldrHandler handler = new SqlldrHandler(fileType, fileName,
					username, password);
			handler.createDataCtl();
			handler.createSqlldrExecShell();
		} catch (Exception e) {
			logger.error("Exception while execute SQL Loader.", e);
		}
	}
	
	/**
	 * 扫描入库文件是否存在,若不存在说明入库完毕,将该文件的标识保存到数据库
	 */
	private void saveDataFileTag(List<String> fileList, long timeout) {
		TbE2eFile2dbFinishTagDao dao = new TbE2eFile2dbFinishTagDao(params);
		long startTime = System.currentTimeMillis();
		while (true) {
			SysHelper.waitIt(this, 500);
			if (fileList == null || fileList.size() == 0
					|| System.currentTimeMillis() - startTime > timeout) {
				break;
			}
			for (int i = fileList.size() - 1; i >= 0; i--) {
				File file = new File(localDataPath + fileList.get(i));
				if (!file.exists()) {
					try {
						TbE2eFile2dbFinishTag tag = new TbE2eFile2dbFinishTag();
						tag.setID(RandomGUID.getRandomGUID());
						tag.setFILE_NAME(fileList.get(i));
						tag.setFILE_TYPE(Integer.parseInt(params
								.get("FILECOLLTYPE")));
						tag.setFILE_TIME(getDataFileTime(fileList.get(i)));
						dao.insertTbE2eFile2dbFinishTag(tag);
						fileList.remove(i);
					} catch (Exception e) {
						logger.error(
								"Exception while saveDataFileTag, FILE_NAME["
										+ fileList.get(i) + "],FILE_TYPE["
										+ params.get("FILECOLLTYPE") + "]", e);
					}
				}
			}
		}
	}
	
	private Date getDataFileTime(String fileName) throws Exception {
		String subFileName = fileName.substring(fileName.lastIndexOf("_") + 1,
				fileName.lastIndexOf("."));
		return Formater.stringToDate(subFileName, "yyyyMMddHH");
	}	

	/**
	 * FTP下载文件
	 */
	private List<String> ftpGetDataFile() {
		// 保存采集文件的文件名
		List<String> result = new ArrayList<String>();

		String ipAddr = params.get("HOSTIP");
		String username = params.get("HOSTUSER");
		String password = params.get("HOSTPASSWD");
		String remotePath = params.get("FILEPATH");
		String fileFilter = params.get("FILENAMEKEY");

		FTPSrv ftpSrv = new FTPSrv();
		try {
			logger.info("ftp " + ipAddr + " " + username);
			password = DES3.decrypt(password);
			ftpSrv.login(ipAddr, username, password);

			logger.info("cd " + remotePath);
			ftpSrv.chdir(remotePath);

			logger.info("ls " + remotePath);
			AFtpRemoteFile[] remoteFile = ftpSrv.list();
			if (remoteFile == null || remoteFile.length == 0) {
				logger.info("No data file create, return!");
				return null;
			}

			for (AFtpRemoteFile file : remoteFile) {
				String fileName = file.getFileName();
				try {
					if (file.isDirectory() || fileName.indexOf(fileFilter) < 0
							|| !fileName.endsWith(".txt")) {
						continue;
					}

					String localFileName = localDataPath + fileName;
					logger.info("Begin get file[" + fileName + "]...");
					ftpSrv.retrive(fileName, localFileName);
					logger.info("Get file[" + fileName + "] success!");
					result.add(fileName);
					
					ftpSrv.rename(fileName, remotePath + "/his_data/" + fileName);
				} catch (Exception e) {
					logger.error("Exception while Get file[" + fileName + "]",
							e);
				}

			}

		} catch (Exception e) {
			logger.error("Exception while ftpGetDataFile, ip[" + ipAddr
					+ "],username[" + username + "]", e);
		} finally {
			try {
				ftpSrv.logout();
				logger.info("Logout!");
			} catch (Exception e) {
			}
		}
		return result;
	}
}