KafkaConsumerTest.java 1.8 KB
package com.sitech.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;


import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumerTest extends Thread{

	private static KafkaConfig kafkaConfig = new KafkaConfig();
	private static ConsumerConnector consumer;

	public static void main(String args[]) {
		try {
			consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
					kafkaConfig.createConsumerConfig());
			Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
	        topicCountMap.put(kafkaConfig.getPerfTopic(), new Integer(1));
	        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
	        KafkaStream<byte[], byte[]> stream = consumerMap.get(kafkaConfig.getPerfTopic()).get(0);
	        ConsumerIterator<byte[], byte[]> it = stream.iterator();
	        while (it.hasNext()) {
	            System.out.println("receive:" + new String(it.next().message()));
	            String message = new String(it.next().message());
	            if (!(null == message) && !("".equals(message))) {
//	    			try{
//	    				JSONObject messageObject = JSONObject.parseObject(message);
//	    				System.out.println("messageObject===="+messageObject);
//	    				String CONTENT =  messageObject.getString("content");
//	    				System.out.println("CONTENT===="+CONTENT);
//	    			}catch(JSONException ex){
//	    				ex.printStackTrace();
//	    			}
	    		}
	            try {
	                sleep(3000);
	            } catch (InterruptedException e) {
	                e.printStackTrace();
	            }
	        }
		} catch (Exception e) {
			System.out.println("�����쳣");
			e.printStackTrace();
		} finally {
		}
	}
}