KafkaConsumerTest.java
1.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.sitech.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class KafkaConsumerTest extends Thread{
private static KafkaConfig kafkaConfig = new KafkaConfig();
private static ConsumerConnector consumer;
public static void main(String args[]) {
try {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
kafkaConfig.createConsumerConfig());
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(kafkaConfig.getPerfTopic(), new Integer(1));
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(kafkaConfig.getPerfTopic()).get(0);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("receive:" + new String(it.next().message()));
String message = new String(it.next().message());
if (!(null == message) && !("".equals(message))) {
// try{
// JSONObject messageObject = JSONObject.parseObject(message);
// System.out.println("messageObject===="+messageObject);
// String CONTENT = messageObject.getString("content");
// System.out.println("CONTENT===="+CONTENT);
// }catch(JSONException ex){
// ex.printStackTrace();
// }
}
try {
sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} catch (Exception e) {
System.out.println("�����쳣");
e.printStackTrace();
} finally {
}
}
}