KafkaAdaptor.java 1.7 KB
package com.sitech.jmx.adaptor;

import com.sitech.ismp.messageObject.AgentSyncObject;
import com.sitech.kafka.KafkaConfig;
import com.sitech.util.JacksonUtil;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import org.apache.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * Created by dongyj on 2015/4/1.
 */
public class KafkaAdaptor {

    private static Logger logger = Logger.getLogger("LOGER");
    private static KafkaConfig kafkaConfig = new KafkaConfig();

    public void receiveCollConf(){
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        String mk_file_perf = kafkaConfig.getCollConfTopic();
        topicCountMap.put(mk_file_perf, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = kafkaConfig.getConsumerInfo()
                .createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream = consumerMap.get(mk_file_perf).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        if(!it.isEmpty()){
            while (it.hasNext()) {
                String message = new String(it.next().message());
                logger.info("接收消息======" + message);
                AgentSyncObject agentSyncObj = (AgentSyncObject)JacksonUtil.fromJson(message,AgentSyncObject.class);
                if (ManualOprUtil.isManualTriger(agentSyncObj)) {
                    // 手动触发执行任务
                    ManualOprUtil.doManualOperation(agentSyncObj);
                } else {
                    // 采集配置同步
                    SchSyncUtil.doSyncOperation(agentSyncObj);
                }
            }
        }
    }

}