KafkaConfig.java 5.04 KB
package com.sitech.kafka;

import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.javaapi.producer.Producer;
import kafka.producer.ProducerConfig;

/**
 * KafkaConfig.java
 *
 * 【功 能】:kafka配置类
 * 【类 名】:KafkaConfig
 *
 * 【变更履历】:
 *
 *    日期            版本                作者                   变更内容
 * -------------------------------------------
 *  2015-3-19   v1.0     dongyj            新规做成
 *
 *
 *
 * 【版 权】:
 *  Copyright (c) 2015  : ~SI-TECH~.
 *
 */
public class KafkaConfig {

	public static Producer<Integer, String> producer;
	public static ConsumerConnector _consumerConnector;

	/**
	 *
	 * @Title: createProducerConfig
	 * @author:dongyj
	 * @Description: 获取生产者连接
	 * @param @return    设定文件
	 * @return ConsumerConfig    返回类型
	 * @throws
	 */
	public static Producer<Integer, String> getProducerInfo(){
		if (producer == null) {
			producer = createDefaultProducer();
		}
		return producer;
	}

	public static synchronized Producer createDefaultProducer(){
		Producer<Integer, String> producer_default = null;
		Properties props = createProducerConfig();
		producer_default = new Producer<Integer, String>(new ProducerConfig(props));
		return producer_default;
	}

	public static  ConsumerConnector getConsumerInfo(){
		if (_consumerConnector == null){
			_consumerConnector = createDefaultConsumer();
		}
		return _consumerConnector;
	}

	public static synchronized ConsumerConnector createDefaultConsumer(){
		ConsumerConnector consumer = null;
		ConsumerConfig conf = createConsumerConfig();
		consumer = kafka.consumer.Consumer.createJavaConsumerConnector(conf);
		return consumer;
	}

	/**
	 *
	 * @Title: createProducerConfig
	 * @author:dongyj
	 * @Description: 创建生产者配置
	 * @param @return    设定文件
	 * @return ConsumerConfig    返回类型
	 * @throws
	 */
	public static Properties createProducerConfig(){
		Properties props = new Properties();
		props.put("serializer.class", KafkaProperties.serializer);
        props.put("metadata.broker.list", KafkaProperties.metadataList);
        props.put("partitioner.class", KafkaProperties.partitioner);
		props.put("request.required.acks", KafkaProperties.required);
        return props;
	}

	/**
	 *
	 * @Title: createConsumerConfig
	 * @author:dongyj
	 * @Description: 创建消费者配置
	 * @param @return    设定文件
	 * @return ConsumerConfig    返回类型
	 * @throws
	 */
	public static ConsumerConfig createConsumerConfig() {
		Properties props = new Properties();
		props.put("zookeeper.connect", KafkaProperties.zookeeper);
		props.put("group.id", KafkaProperties.group);
		props.put("zookeeper.connection.timeout.ms", KafkaProperties.connectionout);
		props.put("zookeeper.session.timeout.ms", KafkaProperties.timeout);
		props.put("zookeeper.sync.time.ms", KafkaProperties.sync);
		props.put("auto.commit.interval.ms", KafkaProperties.interval);
		return new ConsumerConfig(props);
	}

	/**
	 *
	 * @Title: getProperties
	 * @author:dongyj
	 * @Description: 获取配置信息
	 * @param @return    设定文件
	 * @return Properties    返回类型
	 * @throws
	 */
	public Properties getProperties(){
		InputStream inputStream = KafkaConfig.class.getResourceAsStream("/kafka.properties");
		Properties pro  = new Properties();
		try {
			pro.load(inputStream);
		} catch (IOException e1) {
			e1.printStackTrace();
		}
		return pro;
	}

	/**
	 *
	 * @Title: getPerfTopic
	 * @author:dongyj
	 * @Description: 获取性能主题
	 * @param @return    设定文件
	 * @return String    返回类型
	 * @throws
	 */
	public String getPerfTopic() {
		String mk_file_perf = KafkaProperties.topic_perf;
		return mk_file_perf;
	}

	/**
	 *
	 * @Title: getAlarmConfTopic
	 * @author:dongyj
	 * @Description: 获取告警配置主体
	 * @param @return    设定文件
	 * @return String    返回类型
	 * @throws
	 */
	public String getAlarmConfTopic() {
		String mk_file_alarmconf = KafkaProperties.topic_alarmConf;
		return mk_file_alarmconf;
	}

	/**
	 *
	 * @Title: getSwitchType
	 * @author:dongyj
	 * @Description: 获取采集性能数据 流向开关 0:all 1:kafka 2:mq
	 * @param @return    设定文件
	 * @return String    返回类型
	 * @throws
	 */
	public String getSwitchType(){
		String switch_type = KafkaProperties.switch_type;
		return switch_type;
	}

	/**
	 *
	 * @Title: getPmAlarmTopic
	 * @author:dongyj
	 * @Description: 获取预警通道
	 * @param @return    设定文件
	 * @return String    返回类型
	 * @throws
	 */
	public String getPmAlarmTopic() {
		String mk_file_pmalarm = KafkaProperties.topic_pmalarm;
		return mk_file_pmalarm;
	}

	/**
	 *
	 * @Title: getPmConfigTopic
	 * @author:dongyj
	 * @Description: 获取采集配置通道
	 * @param @return    设定文件
	 * @return String    返回类型
	 * @throws
	 */
	public String getCollConfTopic() {
		String mk_file_collconf = KafkaProperties.topic_collConf;
		return mk_file_collconf;
	}

}