E2ECollThread.java
5.17 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package com.sitech.ismp.coll.busi.e2e;
import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import com.sitech.base.AgentProperties;
import com.sitech.ismp.coll.busi.e2e.dao.TbE2eFile2dbFinishTagDao;
import com.sitech.ismp.coll.busi.e2e.domain.TbE2eFile2dbFinishTag;
import com.sitech.ismp.coll.busi.e2e.util.SqlldrHandler;
import com.sitech.util.DES3;
import com.sitech.util.Formater;
import com.sitech.util.RandomGUID;
import com.sitech.util.SysHelper;
import com.sitech.util.upload.AFtpRemoteFile;
import com.sitech.util.upload.FTPSrv;
/**
* 上海电信端到端业务监控数据采集
* 步骤:
* 1. FTP登录远程主机,扫描文件目录是否存在数据文件
* 2. 下载文件到本地$AGENT_HOME/data/目录
* 3. 创建文件对应的SQL Loader控制文件,执行入库
* 4. 入库完毕后,将文件入库标识保存到数据库
*
* @author linxc
* @version
* @since Ver 6.1
* @Date 2012 May 17, 2012 9:20:16 AM
*/
public class E2ECollThread implements Runnable {
private Logger logger = Logger.getLogger("BUSI_COLL");
private String localDataPath = AgentProperties.AGENT_HOME + "/data/";
private Map<String, String> params;
public E2ECollThread(Map<String, String> params) {
this.params = params;
}
public void run() {
List<String> fileList = ftpGetDataFile();
if (fileList == null || fileList.size() == 0) {
return;
}
for (String fileName : fileList) {
parseToDb(fileName);
}
// 超时时间20分钟
saveDataFileTag(fileList, 1200000);
}
/**
* 解析数据文件入库
*/
private void parseToDb(String fileName) {
String fileType = params.get("FILECOLLTYPE");
String username = params.get("JDBC.Username");
String password = params.get("JDBC.Password");
try {
SqlldrHandler handler = new SqlldrHandler(fileType, fileName,
username, password);
handler.createDataCtl();
handler.createSqlldrExecShell();
} catch (Exception e) {
logger.error("Exception while execute SQL Loader.", e);
}
}
/**
* 扫描入库文件是否存在,若不存在说明入库完毕,将该文件的标识保存到数据库
*/
private void saveDataFileTag(List<String> fileList, long timeout) {
TbE2eFile2dbFinishTagDao dao = new TbE2eFile2dbFinishTagDao(params);
long startTime = System.currentTimeMillis();
while (true) {
SysHelper.waitIt(this, 500);
if (fileList == null || fileList.size() == 0
|| System.currentTimeMillis() - startTime > timeout) {
break;
}
for (int i = fileList.size() - 1; i >= 0; i--) {
File file = new File(localDataPath + fileList.get(i));
if (!file.exists()) {
try {
TbE2eFile2dbFinishTag tag = new TbE2eFile2dbFinishTag();
tag.setID(RandomGUID.getRandomGUID());
tag.setFILE_NAME(fileList.get(i));
tag.setFILE_TYPE(Integer.parseInt(params
.get("FILECOLLTYPE")));
tag.setFILE_TIME(getDataFileTime(fileList.get(i)));
dao.insertTbE2eFile2dbFinishTag(tag);
fileList.remove(i);
} catch (Exception e) {
logger.error(
"Exception while saveDataFileTag, FILE_NAME["
+ fileList.get(i) + "],FILE_TYPE["
+ params.get("FILECOLLTYPE") + "]", e);
}
}
}
}
}
private Date getDataFileTime(String fileName) throws Exception {
String subFileName = fileName.substring(fileName.lastIndexOf("_") + 1,
fileName.lastIndexOf("."));
return Formater.stringToDate(subFileName, "yyyyMMddHH");
}
/**
* FTP下载文件
*/
private List<String> ftpGetDataFile() {
// 保存采集文件的文件名
List<String> result = new ArrayList<String>();
String ipAddr = params.get("HOSTIP");
String username = params.get("HOSTUSER");
String password = params.get("HOSTPASSWD");
String remotePath = params.get("FILEPATH");
String fileFilter = params.get("FILENAMEKEY");
FTPSrv ftpSrv = new FTPSrv();
try {
logger.info("ftp " + ipAddr + " " + username);
password = DES3.decrypt(password);
ftpSrv.login(ipAddr, username, password);
logger.info("cd " + remotePath);
ftpSrv.chdir(remotePath);
logger.info("ls " + remotePath);
AFtpRemoteFile[] remoteFile = ftpSrv.list();
if (remoteFile == null || remoteFile.length == 0) {
logger.info("No data file create, return!");
return null;
}
for (AFtpRemoteFile file : remoteFile) {
String fileName = file.getFileName();
try {
if (file.isDirectory() || fileName.indexOf(fileFilter) < 0
|| !fileName.endsWith(".txt")) {
continue;
}
String localFileName = localDataPath + fileName;
logger.info("Begin get file[" + fileName + "]...");
ftpSrv.retrive(fileName, localFileName);
logger.info("Get file[" + fileName + "] success!");
result.add(fileName);
ftpSrv.rename(fileName, remotePath + "/his_data/" + fileName);
} catch (Exception e) {
logger.error("Exception while Get file[" + fileName + "]",
e);
}
}
} catch (Exception e) {
logger.error("Exception while ftpGetDataFile, ip[" + ipAddr
+ "],username[" + username + "]", e);
} finally {
try {
ftpSrv.logout();
logger.info("Logout!");
} catch (Exception e) {
}
}
return result;
}
}