KafkaAdaptor.java
1.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.sitech.jmx.adaptor;
import com.sitech.ismp.messageObject.AgentSyncObject;
import com.sitech.kafka.KafkaConfig;
import com.sitech.util.JacksonUtil;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.log4j.Logger;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Created by dongyj on 2015/4/1.
*/
public class KafkaAdaptor {
private static Logger logger = Logger.getLogger("LOGER");
private static KafkaConfig kafkaConfig = new KafkaConfig();
public void receiveCollConf(){
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
String mk_file_perf = kafkaConfig.getCollConfTopic();
topicCountMap.put(mk_file_perf, new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConfig.getConsumerInfo()
.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(mk_file_perf).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
if(!it.isEmpty()){
while (it.hasNext()) {
String message = new String(it.next().message());
logger.info("接收消息======" + message);
AgentSyncObject agentSyncObj = (AgentSyncObject)JacksonUtil.fromJson(message,AgentSyncObject.class);
if (ManualOprUtil.isManualTriger(agentSyncObj)) {
// 手动触发执行任务
ManualOprUtil.doManualOperation(agentSyncObj);
} else {
// 采集配置同步
SchSyncUtil.doSyncOperation(agentSyncObj);
}
}
}
}
}