Authored by zhangdajun

init project

Showing 73 changed files with 4693 additions and 0 deletions

Too many changes to show.

To preserve performance only 73 of 73+ files are displayed.

# maven ignore
target/
*.jar
*.war
*.zip
*.tar
*.tar.gz
*.class
*.project
*.factorypath
*.lst
# eclipse ignore
.settings/
.classpath
target/
bin/
pkg-tar/
SrenewSer/
sms-drugstore/
# idea ignore
.idea/
*.ipr
*.iml
*.iws
# temp ignore
*.log
*.cache
*.diff
*.patch
*.tmp
# system ignore
.DS_Store
Thumbs.db
# else
.springBeans
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.honggorup.workstation.flink</groupId>
<artifactId>workstation-flink</artifactId>
<version>1.15.2</version>
</parent>
<artifactId>common</artifactId>
<version>1.15.2</version>
<dependencies>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
<version>1.5.20</version>
</dependency>
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-models</artifactId>
<version>1.5.20</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-core</artifactId>
<version>5.8.6</version>
</dependency>
<dependency>
<groupId>cn.hutool</groupId>
<artifactId>hutool-crypto</artifactId>
<version>5.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.6.1</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- Redis所需 -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.2.2</version>
<type>jar</type>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<!--<scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpcore-nio</artifactId>
<version>4.4.12</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.10</version>
</dependency>
</dependencies>
<build>
<finalName>workstation-common.${flink.version}</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.config</include>
<include>**/*.grok</include>
<include>**/*.xml</include>
<include>**/mapper</include>
</includes>
</resource>
</resources>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
... ...
package com.honggroup.wks.flink.common.config;
import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.InputStreamReader;
import java.util.Objects;
import java.util.Properties;
/**
* @Auther zhangdajun
* @CreateDate 2022/9/21 09 27
* @Describe 启动时配置文件加载器
* 参数设置优先级:
* 配置文件 小于 命令行输入
* 所有的启动类都要继承该Loader才能完成参数的初始化
*/
public class BootstrapConfigLoader {
public static Logger LOG = LoggerFactory.getLogger(BootstrapConfigLoader.class);
public final static long APPLICATION_START_SERIAL_NO = System.currentTimeMillis();
private static final String SPECIFIED_CONFIG_PATH = "HG_WKS_CONFIG_PATH";
// 可以在环境变量中指定运行环境
public final static String HG_LOG_RUNTIME_ENV = "HG_LOG_RUNTIME_ENV";
// 默认的环境配置文件
public final static String ENV_CONFIG_NAME = "env.config";
public final static String ENV_PROPERTIE_NAME = "env";
public final static String RUNTIME_PARAM_FILE_SUFFIX = "runtime.config";
public static String RUN_ENV_VAL = "";
public static Properties configProperties = new Properties();
/*static {
//MDC.put("APP_START_SERIAL", String.format("%s%d","NEW_START_",APPLICATION_START_SERIAL_NO));
System.out.println("Init Server Config");
LoggerFactory.getLogger("bootstrap").info("Init config");
//loadConfigByRuntimeEnv();
System.out.println("Load Server Config Finish");
}*/
/**
* 1.获取到该系统执行时的环境
* 2.根据环境获取到对应的配置文件,从配置文件中加载对应的配置数据
*/
public static void loadConfigByRuntimeEnv(){
loadConfigByRuntimeEnv(System.getProperty(SPECIFIED_CONFIG_PATH));
}
public static void loadConfigByRuntimeEnv(String specialPath){
specialPath = StrUtil.isEmpty(specialPath)?System.getProperty(SPECIFIED_CONFIG_PATH):specialPath;
// 获取到配置信息
RUN_ENV_VAL = System.getProperty(HG_LOG_RUNTIME_ENV);
if (StrUtil.isEmpty(RUN_ENV_VAL)) {
// 从配置文件中获取
if(!loadEnvConfig(specialPath)){
System.out.println("加载环境配置文件失败,请检查配置文件是否存在!文件名:env.config");
return;
}
}else{
System.out.println("使用环境变量HG_LOG_RUNTIME_ENV的值作为启动参数,HG_LOG_RUNTIME_ENV="+RUN_ENV_VAL);
}
loadRuntimeConfig(specialPath);
}
/**
* 将properties转化为实际的配置属性
* @param clazz
*/
public static void initializeConfig(Class clazz){
// 将配置文件和Config对象进行对应
try {
// 读取配置文件,将文件写入properties中
ConfigInitializer.initialize(configProperties,clazz);
} catch (IllegalAccessException e) {
System.out.println("配置参数初始化失败!");
e.printStackTrace();
}
}
/**
* 加载运行时配置文件
* @return
*/
public static boolean loadRuntimeConfig(String specialPath){
// 加载环境所对应的配置文件,文件名规则为:RUN_ENV_VAL + "-" + RUNTIME_PARAM_FILE_SUFFIX
try (InputStreamReader runtimeFileStream = loadConfig(specialPath,RUN_ENV_VAL + "-" + RUNTIME_PARAM_FILE_SUFFIX)){
configProperties.load(runtimeFileStream);
//LOG.info("System Config Data : {}",configProperties.toString());
return true;
} catch (Exception e) {
System.out.println("加载"+RUN_ENV_VAL + "-" + RUNTIME_PARAM_FILE_SUFFIX+"配置文件失败!");
e.printStackTrace();
return false;
}
}
/**
* 加载运行环境配置文件
* @return
*/
public static boolean loadEnvConfig(String specialPath){
try {
if(StrUtil.isNotEmpty(RUN_ENV_VAL)){
return true;
}
//String specialPath = System.getProperty(SPECIFIED_CONFIG_PATH);
InputStreamReader envInputStreamReader = loadConfig(specialPath,ENV_CONFIG_NAME);
Properties envProp = new Properties();
envProp.load(envInputStreamReader);
RUN_ENV_VAL = envProp.getProperty(ENV_PROPERTIE_NAME);
envInputStreamReader.close();
return true;
} catch (Exception e) {
System.out.println("没有发现运行环境配置文件,文件名env.config");
return false;
}
}
/**
* 根据路径获取到配置文件流
* @param name
* @return
*/
public static InputStreamReader loadConfig(String specialPath,String name) throws Exception {
// 获取运行环境
if(StrUtil.isNotEmpty(specialPath) && !specialPath.equals("/")){
return new InputStreamReader(new FileInputStream(specialPath+File.separator+name));
}else{
return new InputStreamReader(Objects.requireNonNull(BootstrapConfigLoader.class.getResourceAsStream(specialPath + name)));
}
}
public static String wrapPrefixTopic(String topic) throws Exception{
return "^p_"+topic+".*";
}
/*public static void main(String[] args) {
System.out.println(BootstrapConfigLoader.RUN_ENV_VAL);
System.out.println(Config.Mysql.URL);
}*/
}
... ...
package com.honggroup.wks.flink.common.config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import java.util.HashMap;
import java.util.Map;
/**
* @Auther zhangdajun
* @CreateDate 2022/9/21 09 28
* @Describe
*/
public class Config {
public final static ConfigOption<String> SYS_CONFIG = ConfigOptions.key("sys.config").stringType().defaultValue("/").withDescription("配置文件存放路径");
public final static ConfigOption<Integer> SYS_PARALLELISM = ConfigOptions.key("sys.parallelism").intType().defaultValue(1).withDescription("系统默认并行度");
public final static ConfigOption<Long> SYS_CFG_TIMEOUT = ConfigOptions.key("sys.cfg_timeout").longType().defaultValue(30L*1000).withDescription("任务超时时间");
public final static ConfigOption<Boolean> CHECKPOINT_ENABLE = ConfigOptions.key("sys.checkpoint_enable").booleanType().defaultValue(false).withDescription("是否开启checkpoint");
public final static ConfigOption<String> REDIS_HOSTS = ConfigOptions.key("redis.hosts").stringType().noDefaultValue().withDescription("redis集群地址,无默认值" );
public final static ConfigOption<Boolean> REDIS_AUTHORITY = ConfigOptions.key("redis.authority").booleanType().defaultValue(false).withDescription("启用redis权限");
public final static ConfigOption<String> REDIS_PASSWORD = ConfigOptions.key("redis.password").stringType().noDefaultValue().withDescription("redis连接密码");
public final static ConfigOption<Integer> REDIS_DATA_EXPIRE = ConfigOptions.key("redis.data_expire").intType().noDefaultValue().withDescription("redis数据过期时长,暂未使用");
// Mysql Constants
public final static ConfigOption<String> MYSQL_URL = ConfigOptions.key("mysql.url").stringType().noDefaultValue().withDescription("mysql数据库连接地址,无默认值");
public final static ConfigOption<String> MYSQL_USER = ConfigOptions.key("mysql.user").stringType().noDefaultValue().withDescription("mysql用户名,无默认值");
public final static ConfigOption<String> MYSQL_PASSWORD = ConfigOptions.key("mysql.password").stringType().noDefaultValue().withDescription("mysql连接密码,无默认值");
public final static ConfigOption<String> MYSQL_DRIVER = ConfigOptions.key("mysql.driver").stringType().defaultValue("com.mysql.cj.jdbc.Driver").withDescription("mysql连接驱动");
public final static ConfigOption<Integer> MYSQL_BATCH_SIZE = ConfigOptions.key("mysql.batch_size").intType().defaultValue(100).withDescription("mysql批量入库大小");
public final static ConfigOption<Integer> MYSQL_BATCH_INTERVAL_MS = ConfigOptions.key("mysql.batch_interval_ms").intType().defaultValue(20000).withDescription("mysql批量入库间隔,单位毫秒");
public final static ConfigOption<Integer> MYSQL_MAX_RETRIES = ConfigOptions.key("mysql.max_retries").intType().defaultValue(3).withDescription("mysql重连最大次数");
public final static ConfigOption<Integer> MYSQL_MAX_ACTIVE = ConfigOptions.key("mysql.max_active").intType().defaultValue(5).withDescription("mysql最大活动连接数");
public final static ConfigOption<Integer> MYSQL_MIN_IDLE = ConfigOptions.key("mysql.min_idle").intType().defaultValue(1).withDescription("mysql空闲连接数");
public final static ConfigOption<Integer> MYSQL_INITIAL_SIZE = ConfigOptions.key("mysql.initial_size").intType().defaultValue(1).withDescription("mysql初始化连接数");
public final static ConfigOption<Integer> MYSQL_REAL_DATA_PARALLELISM = ConfigOptions.key("mysql.real_data_parallelism").intType().defaultValue(1).withDescription("实时数据入库最大并行度");
public final static ConfigOption<Integer> MYSQL_DB_THREAD = ConfigOptions.key("mysql.db_thread").intType().defaultValue(10).withDescription("mysql多线程入库,最大入库线程数");
// Kafka Configuration
public final static ConfigOption<String> KAFKA_BROKER = ConfigOptions.key("kafka.broker").stringType().defaultValue("127.0.0.1:19001").withDescription("kafka连接地址");
public final static ConfigOption<Boolean> KAFKA_FIX_GROUP = ConfigOptions.key("kafka.fix_group").booleanType().defaultValue(true).withDescription("是否启用固定组名称");
public final static ConfigOption<String> KAFKA_GROUP_ID = ConfigOptions.key("kafka.group_id").stringType().defaultValue("workstation-flink").withDescription("kafka默认组名称");
public final static ConfigOption<Integer> KAFKA_MAX_REQUEST_SIZE = ConfigOptions.key("kafka.max_request_size").intType().defaultValue(124288000).withDescription("kafka最大发送数据大小");
public final static ConfigOption<Integer> KAFKA_BUFFER_MEMORY = ConfigOptions.key("kafka.buffer_memory").intType().defaultValue(124288000).withDescription("kafka内存缓冲区大小");
public final static ConfigOption<Long> KAFKA_MAX_PARTITION_FETCH_BYTES = ConfigOptions.key("kafka.max_partition_fetch_bytes").longType().defaultValue(1048576000L).withDescription("kafka最大拉取单个报文大小");
public final static ConfigOption<Integer> KAFKA_RETRIES_TIMES = ConfigOptions.key("kafka.retries_times").intType().defaultValue(3).withDescription("kafka重连尝试次数");
public final static ConfigOption<Integer> KAFKA_REQUEST_TIMEOUT_MS = ConfigOptions.key("kafka.request_timeout_ms").intType().defaultValue(30000).withDescription("kafka请求超时时间");
public final static ConfigOption<Integer> KAFKA_DEFAULT_API_TIMEOUT_MS = ConfigOptions.key("kafka.default_api_timeout_ms").intType().defaultValue(60000).withDescription("kafka连接超时时间");
public final static ConfigOption<Integer> KAFKA_PARTITION_DISCOVERY_INTERVAL = ConfigOptions.key("kafka.partition_discovery_interval").intType().defaultValue(1000).withDescription("kafka分区发现间隔,用正则表达式的topicId时必须填写");
public final static ConfigOption<Integer> KAFKA_AUTO_COMMIT_INTERVAL_MS = ConfigOptions.key("kafka.auto_commit_interval_ms").intType().defaultValue(5000).withDescription("kafka数据自动提交间隔,单位毫秒");
public final static ConfigOption<Boolean> KAFKA_ENABLE_AUTO_COMMIT = ConfigOptions.key("kafka.enable_auto_commit").booleanType().defaultValue(true).withDescription("kafka是否启用数据自动提交");
public final static ConfigOption<String> KAFKA_AUTO_OFFSET_RESET = ConfigOptions.key("kafka.auto_offset_reset").stringType().defaultValue("latest").withDescription("kafka拉取数据方式");
public final static ConfigOption<String> KAFKA_COMPRESSION_TYPE = ConfigOptions.key("kafka.compression_type").stringType().defaultValue("gzip").withDescription("kafka数据压缩方式");
public final static ConfigOption<Long> KAFKA_MAX_BLOCK_SIZE = ConfigOptions.key("kafka.max_block_size").longType().defaultValue(6000L).withDescription("kafka最大块大小");
public final static ConfigOption<Integer> KAFKA_BATCH_SIZE = ConfigOptions.key("kafka.batch_size").intType().defaultValue(163840).withDescription("kafka批量发送大小");
public final static ConfigOption<Integer> KAFKA_LINGER_MS = ConfigOptions.key("kafka.linger_ms").intType().defaultValue(10).withDescription("kafka发送数据延迟时间");
public final static ConfigOption<Boolean> KAFKA_SECURITY_KERBEROS_ENABLE = ConfigOptions.key("kafka.security_kerberos_enable").booleanType().defaultValue(false).withDescription("kafka kerberos认证启用,只有启用相关参数才能生效");
public final static ConfigOption<String> KAFKA_SECURITY_KERBEROS_TYPE = ConfigOptions.key("kafka.security_kerberos_type").stringType().defaultValue("PLAIN").withDescription("kafka kerberos认证类型");
public final static ConfigOption<String> KAFKA_SECURITY_KERBEROS_CONF_USER = ConfigOptions.key("kafka.security_kerberos_conf_user").stringType().defaultValue("PLAIN").withDescription("kafka kerberos认证用户");
public final static ConfigOption<String> KAFKA_SECURITY_KERBEROS_CONF_PASS = ConfigOptions.key("kafka.security_kerberos_conf_pass").stringType().defaultValue("").withDescription("kafka kerberos认证密码");
public final static ConfigOption<String> KAFKA_JAVA_SECURITY_KRB5 = ConfigOptions.key("kafka.java_security_krb5").stringType().defaultValue("/etc/krb5.conf").withDescription("kafka认证krb5配置文件路径");
public final static ConfigOption<String> KAFKA_JAVA_SECURITY_AUTH_LOGIN = ConfigOptions.key("kafka.java_security_auth_login").stringType().defaultValue("/path/to/jaas.conf").withDescription("kafka认证jaas配置文件路径");
public final static ConfigOption<String> KAFKA_JAVAX_SECURITY_AUTH_USESUBJECTCREDSONLY = ConfigOptions.key("kafka.javax_security_auth_usesubjectcredsonly").stringType().defaultValue("false").withDescription("");
public final static ConfigOption<String> KAFKA_INPUT_TOPIC = ConfigOptions.key("kafka.input_topic").stringType().defaultValue("default_input").withDescription("kafka数据源接入topicId");
public final static ConfigOption<String> KAFKA_OUTPUT_TOPIC = ConfigOptions.key("kafka.output_topic").stringType().defaultValue("default_output").withDescription("");
public final static ConfigOption<Integer> KAFKA_INPUT_PARALLELISM = ConfigOptions.key("kafka.input_parallelism").intType().defaultValue(1).withDescription("kafka 输入并行度");
// Elasticsearch configuration
public final static ConfigOption<String> ELASTICSEARCH_HOSTS = ConfigOptions.key("elasticsearch.hosts").stringType().defaultValue("").withDescription("es连接地址");
public final static ConfigOption<Integer> ELASTICSEARCH_INDEX_NUMBER_OF_SHARDS = ConfigOptions.key("elasticsearch.index_number_of_shards").intType().defaultValue(1).withDescription("es 索引分片数");
public final static ConfigOption<Integer> ELASTICSEARCH_INDEX_NUMBER_OF_REPLICAS = ConfigOptions.key("elasticsearch.index_number_of_replicas").intType().defaultValue(0).withDescription("es 索引副本数");
public final static ConfigOption<Integer> ELASTICSEARCH_BULK_FLUSH_MAX_ACTIONS = ConfigOptions.key("elasticsearch.bulk_flush_max_actions").intType().defaultValue(1000).withDescription("es bulk最大刷新条数");
public final static ConfigOption<Integer> ELASTICSEARCH_BULK_FLUSH_MAX_SIZE_MB = ConfigOptions.key("elasticsearch.bulk_flush_max_size_mb").intType().defaultValue(5).withDescription("es bulk最大刷新数据大小,单位mb");
public final static ConfigOption<Integer> ELASTICSEARCH_BULK_FLUSH_INTERVAL_MS = ConfigOptions.key("elasticsearch.bulk_flush_interval_ms").intType().defaultValue(10000).withDescription("es bulk刷新间隔,单位毫秒");
public final static ConfigOption<Boolean> ELASTICSEARCH_AUTHORITY = ConfigOptions.key("elasticsearch.authority").booleanType().defaultValue(false).withDescription("es是否启用认证");
public final static ConfigOption<String> ELASTICSEARCH_AUTHORITY_USERNAME = ConfigOptions.key("elasticsearch.authority_username").stringType().defaultValue("honggroup").withDescription("es连接认证用户");
public final static ConfigOption<String> ELASTICSEARCH_AUTHORITY_PASSWORD = ConfigOptions.key("elasticsearch.authority_password").stringType().defaultValue("MXEydzNlNFIhQA==").withDescription("es连接认证密码");
public final static ConfigOption<String> ELASTICSEARCH_SOURCE_INDEX_NAME = ConfigOptions.key("es.source.index.name").stringType().defaultValue("monitor-performance").withDescription("es性能数据源索引名称");
public final static ConfigOption<String> ELASTICSEARCH_REST_URL = ConfigOptions.key("elasticsearch.rest.url").stringType().defaultValue("").withDescription("es连接地址");
public final static ConfigOption<Integer> ELASTICSEARCH_PARALLELISM = ConfigOptions.key("elasticsearch.parallelism").intType().defaultValue(1).withDescription("es入库并行度");
public final static ConfigOption<Integer> ELASTICSEARCH_SOCKET_TIMEOUT = ConfigOptions.key("elasticsearch.socket_timeout").intType().defaultValue(60000).withDescription("es连接超时时间,单位毫秒");
public final static ConfigOption<Integer> ELASTICSEARCH_3SIGMA_FREQUENCY = ConfigOptions.key("es.3sigma.frequency").intType().defaultValue(5).withDescription("3sigma统计频率,单位分钟");
public final static ConfigOption<Integer> ELASTICSEARCH_3SIGMA_INTERVAL = ConfigOptions.key("es.3sigma.interval").intType().defaultValue(5).withDescription("3sigma统计间隔,单位分钟");
// zookeeper connect client
public final static ConfigOption<String> ZOOKEEPER_URL = ConfigOptions.key("zookeeper.url").stringType().defaultValue("").withDescription("zookeeper连接地址");
// InfuxDb config
public final static ConfigOption<String> INFLUXDB_URL = ConfigOptions.key("influxdb.url").stringType().defaultValue("").withDescription("influxdb连接地址");
public final static ConfigOption<String> INFLUXDB_DATABASE = ConfigOptions.key("influxdb.database").stringType().defaultValue("").withDescription("influxdb数据库名称");
public final static ConfigOption<String> INFLUXDB_RETENTION_POLICY = ConfigOptions.key("influxdb.retention_policy").stringType().defaultValue("default").withDescription("");
public final static ConfigOption<String> INFLUXDB_RETENTION_POLICY_TIME = ConfigOptions.key("influxdb.retention_policy_time").stringType().defaultValue("30d").withDescription("");
public final static ConfigOption<Integer> INFLUXDB_BATCH_FLUSH_TIME = ConfigOptions.key("influxdb.batch_flush_time").intType().defaultValue(10000).withDescription("influxdb数据刷新间隔,单位毫秒");
public final static ConfigOption<Integer> INFLUXDB_BATCH_COUNT = ConfigOptions.key("influxdb.batch_count").intType().defaultValue(1000).withDescription("influxdb批量入库大小");
public final static ConfigOption<Boolean> ENABLE_TINGYUN_KPI_CHECK = ConfigOptions.key("extend.tingyun_kpi_check_enable").booleanType().defaultValue(false).withDescription("启用听云指标检测");
public final static ConfigOption<Boolean> ENABLE_JS_ACCOUNT = ConfigOptions.key("extend.jinsan_account_enable").booleanType().defaultValue(false).withDescription("启用金三账户数据分析");
public final static ConfigOption<Boolean> ENABLE_EZSONAR = ConfigOptions.key("extend.ezsonar_enable").booleanType().defaultValue(false).withDescription("启用ezsonar对接数据分析");
public final static ConfigOption<Boolean> ENABLE_PWD_EXPIRE_CHANGE = ConfigOptions.key("extend.pwd_expire_change").booleanType().defaultValue(true).withDescription("启用密码过期修改功能");
public final static ConfigOption<Map<String,String>> EVENT_LEVEL = ConfigOptions.key("event.level").mapType().defaultValue(new HashMap<>(){{
put("1","一般告警");
put("2","重要告警");
put("3","严重告警");
}}).withDescription("告警级别,1一般告警,2重要告警,3严重告警");
public final static ConfigOption<String> WKS_PRE_SINK_EXT_TOPIC = ConfigOptions.key("topic.sink_pre_ext_alarm").stringType().defaultValue("monitor-external-alarm").withDescription("外部告警输出topic");
public final static ConfigOption<String> WKS_PRE_SINK_EVENT_TOPIC = ConfigOptions.key("topic.sink_pre_event").stringType().defaultValue("monitor-collector-result-event-new").withDescription("告警处理topic");
public final static ConfigOption<String> WKS_PRE_SINK_STATE_TOPIC = ConfigOptions.key("topic.sink_pre_state").stringType().defaultValue("monitor-state-kpi").withDescription("状态指标topic");
public final static ConfigOption<String> WKS_PRE_SPECIAL_TOPIC = ConfigOptions.key("topic.sink_pre_special_kpi").stringType().defaultValue("monitor-collector-result-db-new").withDescription("特殊指标输出topic");
public final static ConfigOption<String> WKS_PRE_SINK_PREPROCESS_TOPIC = ConfigOptions.key("topic.sink_pre_pre_process").stringType().defaultValue("monitor-wks-preprocess").withDescription("所有预处理完成的指标输出topic");
public final static ConfigOption<String> WKS_PRE_SINK_ABNORMAL_CHECK_TOPIC = ConfigOptions.key("topic.sink_pre_abnormal_check").stringType().defaultValue("monitor-anomaly-detection").withDescription("异常指标检测输出topic,天津使用");
public final static ConfigOption<String> WKS_PRE_SINK_PASSWD_EXPIRED_TOPIC = ConfigOptions.key("topic.sink_pre_pwd_expired_data").stringType().defaultValue("monitor-workstation-collector-config").withDescription("密码过期输出topic");
public final static ConfigOption<String> WKS_KPI_SINK_TREE_TABLE_NAME = ConfigOptions.key("mysql.sink_tree_table_name").stringType().defaultValue("b_result_realtime_collector_tree").withDescription("父子类指标数据输出的表名称");
public final static ConfigOption<String> WKS_KPI_SINK_CORE_TABLE_NAME = ConfigOptions.key("mysql.sink_core_table_name").stringType().defaultValue("b_result_realtime_base").withDescription("核心指标数据输出的表名称");
public final static ConfigOption<String> WKS_KPI_SINK_REAL_TABLE_NAME = ConfigOptions.key("mysql.sink_real_table_name").stringType().defaultValue("b_result_realtime_collector").withDescription("mysql实时数据入库表名称");
public final static ConfigOption<String> WKS_KPI_SINK_ES_PERFORMANCE_INDEX_PREFIX = ConfigOptions.key("es.sink_index_performance").stringType().defaultValue("monitor-performance_").withDescription("es性能数据存放索引前缀");
public final static ConfigOption<String> WKS_KPI_SINK_INFLUX_PERFORMANCE = ConfigOptions.key("influx.sink_performance").stringType().defaultValue("performance").withDescription("influxdb实时性能数据存放表名称");
@Override
public String toString() {
return "Config{" +
"Redis{" +
"}" +
"}";
}
}
... ...
package com.honggroup.wks.flink.common.config;
import cn.hutool.core.util.StrUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.*;
public class ConfigInitializer {
//public final static Map<String,String> param = new LinkedHashMap<String,String>(0);
private static Logger LOG = LoggerFactory.getLogger(ConfigInitializer.class);
public static void initialize(Properties properties, Class<?> rootConfigType) throws IllegalAccessException {
initNextLevel(properties, rootConfigType, new ConfigDesc());
}
private static void initNextLevel(Properties properties, Class<?> recentConfigType,
ConfigDesc parentDesc) throws IllegalArgumentException, IllegalAccessException {
//
for (Field field : recentConfigType.getFields()) {
// 获取到Config中的所有属性,如果是public并且为static类型的属性,则设置响应的值
if (Modifier.isPublic(field.getModifiers()) && Modifier.isStatic(field.getModifiers())) {
String configKey = (parentDesc + "." + field.getName()).toLowerCase();
Class<?> type = field.getType();
if (type.equals(Map.class)) {
/*
* Map config format is, config_key[map_key]=map_value
* Such as plugin.opgroup.resttemplate.rule[abc]=/url/path
*/
// Deduct two generic types of the map
ParameterizedType genericType = (ParameterizedType) field.getGenericType();
Type[] argumentTypes = genericType.getActualTypeArguments();
Type keyType = null;
Type valueType = null;
if (argumentTypes != null && argumentTypes.length == 2) {
// Get key type and value type of the map
keyType = argumentTypes[0];
valueType = argumentTypes[1];
}
Map map = (Map) field.get(null);
// Set the map from config key and properties
setForMapType(configKey, map, properties, keyType, valueType);
} else {
/*
* Others typical field type
*/
String value = properties.getProperty(configKey);
LOG.info("{}={}",configKey,value);
// 数据转换
Object convertedValue = convertToTypicalType(type, value);
if (convertedValue != null) {
//param.put(configKey,value);
field.set(null, convertedValue);
}
}
}
}
// 使用递归逐层的获取每层的配置数据
for (Class<?> innerConfiguration : recentConfigType.getClasses()) {
parentDesc.append(innerConfiguration.getSimpleName());
initNextLevel(properties, innerConfiguration, parentDesc);
parentDesc.removeLastDesc();
}
}
/**
* Convert string value to typical type.
*
* @param type type to convert
* @param value string value to be converted
* @return converted value or null
*/
private static Object convertToTypicalType(Type type, String value) {
if (value == null || type == null) {
return null;
}
Object result = null;
if (String.class.equals(type)) {
result = value;
} else if (int.class.equals(type) || Integer.class.equals(type)) {
result = Integer.valueOf(value);
} else if (long.class.equals(type) || Long.class.equals(type)) {
result = Long.valueOf(value);
} else if (boolean.class.equals(type) || Boolean.class.equals(type)) {
result = Boolean.valueOf(value);
} else if (float.class.equals(type) || Float.class.equals(type)) {
result = Float.valueOf(value);
} else if (double.class.equals(type) || Double.class.equals(type)) {
result = Double.valueOf(value);
} else if (List.class.equals(type)) {
result = convert2List(value);
} else if (type instanceof Class) {
Class<?> clazz = (Class<?>) type;
if (clazz.isEnum()) {
result = Enum.valueOf((Class<Enum>) type, value.toUpperCase());
}
}
return result;
}
/**
* Set map items.
*
* @param configKey config key must not be null
* @param map map to set must not be null
* @param properties properties must not be null
* @param keyType key type of the map
* @param valueType value type of the map
*/
private static void setForMapType(String configKey, Map<Object, Object> map, Properties properties,
final Type keyType, final Type valueType) {
Objects.requireNonNull(configKey);
Objects.requireNonNull(map);
Objects.requireNonNull(properties);
String prefix = configKey + "[";
String suffix = "]";
properties.forEach((propertyKey, propertyValue) -> {
String propertyStringKey = propertyKey.toString();
if (propertyStringKey.startsWith(prefix) && propertyStringKey.endsWith(suffix)) {
String itemKey = propertyStringKey.substring(
prefix.length(), propertyStringKey.length() - suffix.length());
Object keyObj;
Object valueObj;
keyObj = convertToTypicalType(keyType, itemKey);
valueObj = convertToTypicalType(valueType, propertyValue.toString());
if (keyObj == null) {
keyObj = itemKey;
}
if (valueObj == null) {
valueObj = propertyValue;
}
LOG.info("{}={}",keyObj,valueObj);
//param.put(String.valueOf(keyObj),String.valueOf(valueObj));
map.put(keyObj, valueObj);
}
});
}
private static List<String> convert2List(String value) {
if (StrUtil.isEmpty(value)) {
return Collections.emptyList();
}
List<String> result = new LinkedList<>();
String[] segments = value.split(",");
for (String segment : segments) {
String trimmedSegment = segment.trim();
if (StrUtil.isNotEmpty(trimmedSegment)) {
result.add(trimmedSegment);
}
}
return result;
}
}
class ConfigDesc {
private LinkedList<String> descs = new LinkedList<>();
void append(String currentDesc) {
if (StrUtil.isNotEmpty(currentDesc)) {
descs.addLast(currentDesc);
}
}
void removeLastDesc() {
descs.removeLast();
}
@Override
public String toString() {
return String.join(".", descs);
}
}
\ No newline at end of file
... ...
package com.honggroup.wks.flink.common.driver;
import cn.hutool.core.codec.Base64;
import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidDataSourceFactory;
import com.honggroup.wks.flink.common.config.Config;
import org.apache.flink.configuration.Configuration;
import javax.sql.DataSource;
import java.util.Properties;
/**
* @Auther zhangdajun
* @CreateDate 2023/4/14 11 02
* @Describe
*/
public class DruidDataSourceManager {
private static DruidDataSourceManager druidDataSourceManager;
private DataSource dataSource;
private DruidDataSourceManager(Configuration configuration){
Properties prop = new Properties();
prop.setProperty("driverClassName",configuration.getString(Config.MYSQL_DRIVER));
prop.setProperty("url",configuration.getString(Config.MYSQL_URL));
prop.setProperty("username",configuration.getString(Config.MYSQL_USER));
prop.setProperty("password",Base64.decodeStr(configuration.getString(Config.MYSQL_PASSWORD)));
prop.setProperty("maxActive",String.valueOf(configuration.getInteger(Config.MYSQL_MAX_ACTIVE)));
prop.setProperty("minIdle",String.valueOf(configuration.getInteger(Config.MYSQL_MIN_IDLE)));
prop.setProperty("initialSize",String.valueOf(configuration.getInteger(Config.MYSQL_INITIAL_SIZE)));
prop.setProperty("timeBetweenEvictionRunsMillis","60000");
prop.setProperty("testWhileIdle","true");
prop.setProperty("testOnBorrow","false");
prop.setProperty("testOnReturn","false");
prop.setProperty("keepAlive","true");
prop.setProperty("validationQuery","SELECT 1 FROM DUAL");
try {
dataSource = DruidDataSourceFactory.createDataSource(prop);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public synchronized static DruidDataSourceManager instance(Configuration configuration){
if(druidDataSourceManager == null){
druidDataSourceManager = new DruidDataSourceManager(configuration);
}
return druidDataSourceManager;
}
public DataSource getDataSource(){
return dataSource;
}
}
... ...
package com.honggroup.wks.flink.common.driver;
import cn.hutool.core.codec.Base64;
import cn.hutool.core.codec.Base64Decoder;
import cn.hutool.core.map.MapUtil;
import com.alibaba.druid.pool.DruidDataSource;
import com.honggroup.wks.flink.common.config.Config;
import org.apache.flink.configuration.Configuration;
import org.apache.ibatis.mapping.Environment;
import org.apache.ibatis.session.SqlSessionFactory;
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
import org.apache.ibatis.transaction.TransactionFactory;
import org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.sql.DataSource;
import java.io.InputStream;
import java.util.Properties;
/**
* Mybatis 工厂类
* 获取sqlSession
*/
public class MybatisSessionFactory {
private static final Logger LOG = LoggerFactory.getLogger(MybatisSessionFactory.class);
private static final String MYBATIS_CONFIG_FILE = "mybatis-config.xml";
private static SqlSessionFactory sqlSessionFactory;
private MybatisSessionFactory() {
super();
}
/**
* 创建SqlSessionFactory
* @return
*/
public synchronized static SqlSessionFactory buildAndGet(Configuration config) {
if (null == sqlSessionFactory) {
InputStream inputStream;
Properties properties = new Properties();
properties.putAll(
MapUtil.builder().put("driver", config.getString(Config.MYSQL_DRIVER))
.put("url", config.getString(Config.MYSQL_URL)).put("username", config.getString(Config.MYSQL_USER))
.put("password", Base64Decoder.decodeStr(config.getString(Config.MYSQL_PASSWORD)))
.put("minIdle",config.getInteger(Config.MYSQL_MIN_IDLE)).put("maxTotal",config.getInteger(Config.MYSQL_MAX_ACTIVE))
.build()
);
LOG.info("Mysql数据库连接参数:{}",properties.toString());
try {
TransactionFactory transactionFactory = new JdbcTransactionFactory();
Environment environment = new Environment("development", transactionFactory, loadDataSource(config));
inputStream = MybatisSessionFactory.class.getClassLoader().getResourceAsStream(MYBATIS_CONFIG_FILE);
sqlSessionFactory = new SqlSessionFactoryBuilder().build(inputStream, properties);
sqlSessionFactory.getConfiguration().setEnvironment(environment);
} catch (Exception e) {
LOG.error("create MybatisSessionFactory read mybatis-config.xml cause Exception", e);
}
if (null != sqlSessionFactory) {
LOG.info("get Mybatis sqlsession sucessed....");
} else {
LOG.info("get Mybatis sqlsession failed....");
}
}
return sqlSessionFactory;
}
public static DataSource loadDataSource(Configuration configuration){
DruidDataSource dataSource = new DruidDataSource();
dataSource.setDriverClassName(configuration.getString(Config.MYSQL_DRIVER));
dataSource.setUrl(configuration.getString(Config.MYSQL_URL));
dataSource.setUsername(configuration.getString(Config.MYSQL_USER));
dataSource.setPassword(Base64.decodeStr(configuration.getString(Config.MYSQL_PASSWORD)));
//设置连接池的一些参数
dataSource.setMaxActive(configuration.getInteger(Config.MYSQL_MAX_ACTIVE));
dataSource.setMinIdle(configuration.getInteger(Config.MYSQL_MIN_IDLE));
dataSource.setInitialSize(configuration.getInteger(Config.MYSQL_INITIAL_SIZE));
dataSource.setTimeBetweenEvictionRunsMillis(60000);
return dataSource;
}
}
... ...
package com.honggroup.wks.flink.common.module;
import lombok.Data;
/**
* @Auther zhangdajun
* @CreateDate 2022/12/5 11 09
* @Describe
*/
@Data
public class EsSyncObj {
public EsSyncObj() {
}
public EsSyncObj(String indexName, String data) {
this.indexName = indexName;
this.data = data;
}
String indexName;
String data;
}
... ...
package com.honggroup.wks.flink.common.module;
import lombok.Data;
/**
* @Auther zhangdajun
* @CreateDate 2022/12/19 20 06
* @Describe
*/
@Data
public class Performance5Minute {
String id,resId,flag,kpiId;
long startTime;
long endTime;
double maxValue;
double minValue;
double avgValue;
int interval;
@Override
public String toString() {
return "Performance5MinData{" +
", startTime=" + startTime +
", endTime=" + endTime +
", maxValue=" + maxValue +
", minValue=" + minValue +
", avgValue=" + avgValue +
", interval=" + interval +
", resId=" + resId +
", flag=" + flag +
", kpiId=" + kpiId +
'}';
}
}
... ...
package com.honggroup.wks.flink.common.module;
import com.alibaba.fastjson.annotation.JSONField;
import lombok.Data;
import java.io.Serializable;
import java.util.Date;
/**
* @Auther zhangdajun
* @CreateDate 2022/11/1 10 05
* @Describe
*/
@Data
public class PerformanceCollData implements Serializable {
String id;
@JSONField(format = "yyyy-MM-dd'T'HH:mm:ss.SSSZ")
Date dbTime;
double kpiValue;
String kpiId;
String flag;
String resId;
String dbTimeStr;
String resType;
String message;
String ext;
String resName;//资源名称
String kpiName;//指标名称
double increment = 0d; // 较上一次的增长率
}
... ...
package com.honggroup.wks.flink.common.module;
import lombok.Data;
import java.io.Serializable;
/**
* @Auther zhangdajun
* @CreateDate 2022/10/29 18 34
* @Describe 性能数据
*/
@Data
public class PerformanceData extends PerformanceCollData implements Serializable {
long collTime;
long startTime;
long endTime;
double maxValue;
double minValue;
double avgValue;
double percentage_99;
double percentage_95;
double percentage_50;
int interval;
@Override
public String toString() {
return "PerformanceData{" +
"collTime=" + collTime +
", startTime=" + startTime +
", endTime=" + endTime +
", maxValue=" + maxValue +
", minValue=" + minValue +
", avgValue=" + avgValue +
", percentage_99=" + percentage_99 +
", percentage_95=" + percentage_95 +
", percentage_50=" + percentage_50 +
", interval=" + interval +
", resId=" + resId +
", flag=" + flag +
", kpiId=" + kpiId +
'}';
}
}
... ...
package com.honggroup.wks.flink.common.module;
import lombok.Data;
import java.util.Date;
import java.util.List;
/**
* @Auther zhangdajun
* @CreateDate 2022/11/16 08 42
* @Describe
*/
@Data
public class PerformanceIncrement {
public PerformanceIncrement() {
}
public PerformanceIncrement(String resId, String flag, String kpiId) {
this.resId = resId;
this.flag = flag;
this.kpiId = kpiId;
}
private String id;
private String resId;
private String flag;
private String kpiId;
private double preKpiValue; // 前一个指标值
private double sufKpiValue;// 后一个指标值
private Date preCollDate; // 前一个采集时间
private Date sufCollDate; // 后一个采集时间
private List<Double> incrementValue;// 增量值
private String dataMonth;// 数据所在月份
@Override
public String toString() {
return "PerformanceIncrement{" +
"id='" + id + '\'' +
", resId='" + resId + '\'' +
", flag='" + flag + '\'' +
", kpiId='" + kpiId + '\'' +
", preKpiValue=" + preKpiValue +
", sufKpiValue=" + sufKpiValue +
", preCollDate=" + preCollDate +
", sufCollDate=" + sufCollDate +
", incrementValue=" + incrementValue +
'}';
}
}
... ...
package com.honggroup.wks.flink.common.module;
import lombok.Data;
import java.util.Date;
/**
* @Auther zhangdajun
* @CreateDate 2022/11/16 08 42
* @Describe 增长率告警处理
*/
@Data
public class PerformanceIncrementAlarm {
public PerformanceIncrementAlarm() {
}
public PerformanceIncrementAlarm(PerformanceIncrement increment) {
this.increment = increment;
}
PerformanceIncrement increment;
private String alarmContent;
@Override
public String toString() {
return "PerformanceIncrementAlarm{" +
"increment=" + increment.toString() +
", alarmContent='" + alarmContent + '\'' +
'}';
}
}
... ...
package com.honggroup.wks.flink.common.module;
import com.honggroup.wks.flink.common.module.threshold.DynamicThresholdAlarm;
import lombok.Data;
import java.io.Serializable;
/**
* @Auther zhangdajun
* @CreateDate 2019/5/19 17 13
* @Describe
*/
@Data
public class WorkstationAlarmSubscribe implements Serializable {
int isClear;// 是否为告警消除
int isClose;// 是否为告警关闭
String refreshKey;// 刷新key
String operator = "system";// 操作人,值:System或具体用户名
SubscribeType type;
String message;// 关闭原因
DynamicThresholdAlarm alarmEntity;// 告警对象
public enum SubscribeType{
NEW(1,"新生成"),CLOSE(0,"告警关闭"),CLEAR(3,"告警消除"),
RESET(4,"告警重置"),HANDLE(5,"手动发送"),RESUME(6,"告警恢复"),
TRAP(7, "TRAP告警");
public int value;
public String name;
SubscribeType(int val,String n){
this.value = val;
this.name = n;
}
}
}
... ...
package com.honggroup.wks.flink.common.module.threshold;
import cn.hutool.crypto.digest.MD5;
import lombok.Data;
import java.util.Date;
/**
* @Auther zhangdajun
* @CreateDate 2022/11/8 15 09
* @Describe 动态阈值告警对象
*/
@Data
public class DynamicThresholdAlarm{
public DynamicThresholdAlarm() {
}
public DynamicThresholdAlarm(String key) {
this.key = key;
}
public DynamicThresholdAlarm(String resId, String flag, String kpiId) {
this.resId = resId;
this.flag = flag;
this.kpiId = kpiId;
this.alarmId = MD5.create().digestHex(resId + flag + kpiId);
}
private double kpiValue;// 指标值
private double[] threshold = new double[2];
private double sensitivity; // 灵敏度
private Sigma3AggPerformanceData dynamicBaseLine; // 动态基线值
private Sigma3AggPerformanceData sameMinuteCfgData; // 同分钟比较值
private String id;
private String key;
private String resId;//资源id
private String flag;//二级资源标识
private String kpiId;//指标id
private String alarmId; // 告警ID 告警id【MD5(资源id+flag+指标id)】
private String resName;//资源名称
private String resType;//资源类型
private String kpiName;//指标名称
private int alarmLevel;//告警级别【1一般告警、2重要告警、3严重告警】
private String alarmLevelName;//告警级别名称【1一般告警、2重要告警、3严重告警】
private Date alarmTime; //首次告警时间
private String alarmContent;//告警内容
private int alarmRepeatCnt = 1;//告警次数
private Date updateTime;//最近一次告警时间
private Date createTime;//告警信息入库时间
//private int alarmStatus;// 告警状态【0告警,1关闭】默认0
private int alarmStatus = -1; // 1为告警新增,0为告警消除,2为告警更新
private String alarmResource = "alarm";// 告警来源【alarm监控告警】
private int isUpgrade = 0;//是否发生告警升级【0未升级,1升级】
private String upgradeContent;//告警升级内容【比如:一般告警升级为重要告警】
private Date upgradeTime;//告警升级时间
private String noticeContent;//告警通知内容
private String alarmNo;//告警编号【yyyyMMdd0001】
private String subWay;//【暂未用】订阅方式
private String alarmType;//【暂未用】告警类型
private String batchNo = "1";//【暂未用】批次号
private String taskId = "";//【暂未用】任务id
private String taskName = "";//【暂未用】任务名称
}
... ...
package com.honggroup.wks.flink.common.module.threshold;
import lombok.Data;
import java.util.List;
/**
* @Auther zhangdajun
* @CreateDate 2022/11/11 15 18
* @Describe
*/
@Data
public class Sigma3AggPerformanceData {
private String id;
private String resId;
private String flag;
private String kpiId;
private double meanAvg;
private double standardDeviation;
private double maxValue;
private double minValue;
private double avgValue;
// 基线上值
private double upperValue;
// 基线下值
private double lowerValue;
private List<Double> dataSet;// 数据集
public Sigma3AggPerformanceData(){}
public Sigma3AggPerformanceData(String resId, String flag, String kpiId) {
this.resId = resId;
this.flag = flag;
this.kpiId = kpiId;
this.id = resId + "##" + flag + "##" + kpiId;
}
}
... ...
package com.honggroup.wks.flink.common.module.threshold;
import lombok.Data;
import java.util.List;
/**
* @Auther zhangdajun
* @CreateDate 2022/11/11 14 39
* @Describe 3西格玛数据对象
*/
@Data
public class Sigma3PerformanceData {
private double meanAvg;
private double standardDeviation;
private List<Double> sigmaFilterRst;
private String resId;
private String flag;
private String kpiId;
@Override
public String toString() {
return "Sigma3PerformanceData{" +
"meanAvg=" + meanAvg +
", standardDeviation=" + standardDeviation +
", sigmaFilterRst=" + sigmaFilterRst +
", resId='" + resId + '\'' +
", flag='" + flag + '\'' +
", kpiId='" + kpiId + '\'' +
'}';
}
}
... ...
package com.honggroup.wks.flink.common.utils;
/**
* 聚合类型
* // 求和(SUM)、求平均(AVG)、求最大(MAX)、求最小(MIN)
*/
public class AggType{
public final static String SUM = "SUM";
public final static String AVG = "AVG";
public final static String MAX = "MAX";
public final static String MIN = "MIN";
}
\ No newline at end of file
... ...
package com.honggroup.wks.flink.common.utils;
import org.apache.http.HttpResponse;
import org.apache.http.impl.client.DefaultConnectionKeepAliveStrategy;
import org.apache.http.protocol.HttpContext;
import java.util.concurrent.TimeUnit;
/**
* @Auther zhangdajun
* @CreateDate 2022/12/16 13 03
* @Describe
*/
public class CustomConnectKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
public static CustomConnectKeepAliveStrategy INSTANCE = new CustomConnectKeepAliveStrategy();
private CustomConnectKeepAliveStrategy(){
super();
}
private final long MAX_KEEP_ALIVE_MINUTE = 10;
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
long keepAliveDuration = super.getKeepAliveDuration(response, context);
if(keepAliveDuration < 0){
return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTE);
}
return keepAliveDuration;
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import java.sql.Timestamp;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* @Description: 时间操作工具类
* @Author: zhangdajun
* @CreateDate: 2021-08-01 14:44
* @UpdateUser: zhangdajun
* @UpdateDate: 2021-08-01 14:44
* @UpdateRemark:
* @Version: 1.0
*/
public class DateUtils {
public static String[] formatArry = {
"yyyyMMdd",
"yyyy-MM-dd",
"yyyy/MM/dd",
"yyyy-MM-dd HH",
"yyyyMMdd HHmmss",
"yyyy-MM-dd HH:mm",
"yyyy-MM-dd HH:mm:ss",
"yyyy/MM/dd HH:mm:ss",
"yyyy-MM-ddHH:mm:ss.SSS",
"yyyy-MM-dd HH:mm:ss.SSS",
"yyyy-MM-dd HH:mm:ss,SSS",
"yyyy-MM",
"yyyyMMddHHmm",
"yyyy-MM-dd'T'HH:mm:ss.SSS",
};
public static void main(String[] args) throws ParseException {
//String str = "20210801";
//String str = "1563984000000";
String str = "2020-12-02 16:58:19,533";
Date s = autoTransTimestamp(str);//strToDate(str);
System.out.println(s);
}
public static Date strToDate(String str, String defFormatter) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(defFormatter);
try {
Date date = simpleDateFormat.parse(str);
return date;
} catch (Exception e) {
return null;
}
}
public static Date strToDate(String str) {
if (str == null || str.length() == 0) {
return null;
}
boolean isFound = false;
Date date = null;
String strDt = str.replaceAll(",", ".").replaceAll(",", ".");
for (int i = 0; i < formatArry.length; i++) {
if (strDt.length() != formatArry[i].length()) {
continue;
}
SimpleDateFormat format = new SimpleDateFormat(formatArry[i]);
try {
date = format.parse(strDt);
isFound = true;
break;
} catch (Exception e) {
//e.printStackTrace();
}
}
if (isFound) {
return date;
} else {
return null;
}
}
public static String strToDateFormat(String str) {
return strToDateFormat(str, "yyyy-MM-dd HH:mm:ss");
}
public static String strToDateFormat(String str, String defFormatter) {
if (str == null || str.length() == 0) {
return null;
}
boolean isFound = false;
SimpleDateFormat formatter = new SimpleDateFormat(defFormatter);
String formatDate = "";
for (int i = 0; i < formatArry.length; i++) {
if (str.length() != formatArry[i].length()) {
continue;
}
SimpleDateFormat format = new SimpleDateFormat(formatArry[i]);
try {
Date date = format.parse(str);
formatDate = formatter.format(date);
isFound = true;
break;
} catch (Exception e) {
//e.printStackTrace();
}
}
if (isFound) {
return formatDate;
} else {
return str;
}
}
public static Timestamp autoTransTimestamp(String str) throws ParseException {
// 统一符号
str = str.replace(".",",").replace("T"," ");
// 时间小于19位数的,补全
if(str.length() < 23){
// 自动补全后面几位数
str = supplementSSS(str);
}
return strToTimestamp(str);
}
/**
* 补充毫秒位数
* @return
*/
private static String supplementSSS(String timeStr){
String tmp[] = timeStr.split(",");
StringBuilder suffix = new StringBuilder();
if(tmp.length == 2){
suffix = new StringBuilder(tmp[1]);
}
while(suffix.length() < 3){
suffix.insert(0, "0");
}
suffix.insert(0,",");
suffix.insert(0,tmp[0]);
return suffix.toString();
}
/**
* @return
* @throws
* @Description 字符串转Timestamp
* @author zhangdajun
* @date 2021-09-27 15:27
*/
public static Timestamp strToTimestamp(String str) throws ParseException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss,SSS");
Date date = simpleDateFormat.parse(str);
Timestamp ts = new Timestamp(date.getTime());
return ts;
}
public static Date strToDate_yMdHmsS(String str) throws ParseException {
String strdt = str.replaceAll(" ", "").replaceAll(",", ".").replaceAll(",", ".");
Date date;
try {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-ddHH:mm:ss.SSS");
date = simpleDateFormat.parse(strdt);
} catch(ParseException e1) {
try{
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-ddHH:mm:ss");
date = simpleDateFormat.parse(strdt);
} catch(ParseException e2) {
throw e2;
}
}
return date;
}
public static Date strToDate_yMdHms(String str) throws ParseException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = simpleDateFormat.parse(str);
return date;
}
/**
* @return
* @throws
* @Description 字符串转Timestamp
* @author zhangdajun
* @date 2021-09-27 15:27
*/
public static Timestamp strToTimestamp(String str, String format) throws ParseException {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat(format);
Date date = simpleDateFormat.parse(str);
Timestamp ts = new Timestamp(date.getTime());
return ts;
}
/**
* Date 转 字符串
*
* @param date
* @param formatStr
* @return
*/
public static String dateToStr(Date date, String formatStr) {
SimpleDateFormat sdf = new SimpleDateFormat(formatStr);
String str = sdf.format(date);
return str;
}
public static String dateToStr(long timestamp,String formatStr){
return dateToStr(new Date(timestamp),formatStr);
}
}
... ...
package com.honggroup.wks.flink.common.utils;
/**
* @Auther zhangdajun
* @CreateDate 2023/2/16 17 05
* @Describe
*/
public interface DimensionType {
String RES = "res"; // 资源维度
String KPI = "kpi"; // 指标维度
String BIZ = "biz"; // 业务维度
String SCRIPT = "script"; // 指令维度
String TASK = "task"; // 任务维度
String DATA = "data";// 采集数据
String THRESHOLD = "threshold";
}
... ...
package com.honggroup.wks.flink.common.utils;
import org.apache.http.HttpHost;
import java.util.HashSet;
import java.util.Set;
/**
* @Auther zhangdajun
* @CreateDate 2022/12/14 15 20
* @Describe
*/
public class ElasticsearchUtils {
/**
* 生成连接信息组
* @param hosts
* @return
*/
public static HttpHost[] getHostAndPortList(String hosts) {
Set<HttpHost> re = new HashSet<>(0);
String[] hostList = hosts.split(",");
for (String host : hostList) {
String[] parts = host.split(":", 2);
if (parts.length > 1) {
re.add(new HttpHost(parts[0], Integer.parseInt(parts[1])));
}
}
return re.toArray(new HttpHost[0]);
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import java.util.UUID;
public class IdUtils {
/**
* 告警id生成规则
* @param resourceId
* @param subResourceId
* @param kpiId
* @param alarmLevel
* @return
*/
public static String generateId(String resourceId,String subResourceId,String kpiId,String alarmLevel){
return MD5Util.getMD5((resourceId+subResourceId+kpiId+alarmLevel).getBytes());
}
/**
* 快照id生成规则
* @param resourceId
* @param subResourceId
* @param kpiId
* @return
*/
public static String generateId(String resourceId,String subResourceId, String kpiId){
return MD5Util.getMD5((resourceId+subResourceId+kpiId).getBytes());
}
/**
* 获取uudh
* @return
*/
public static String getId(){
return UUID.randomUUID().toString().replace("-", "").toLowerCase();
}
public static String getId(String... strs){
String id = "";
for (String str : strs) {
id += str;
}
return MD5Util.getMD5(id.getBytes());
}
/**
* 通知并信息id,按照接收人和IP地址合并
* @param username
* @param ip
* @return
*/
public static String noticeMergeIdByIp(String username, String ip){
return MD5Util.getMD5((username+ip).getBytes());
}
/**
* 通知并信息id,按照接收人和业务id合并
* @param username
* @param busId
* @return
*/
public static String noticeMergeIdByBusId(String username, String busId){
return MD5Util.getMD5((username+busId).getBytes());
}
}
... ...
package com.honggroup.wks.flink.common.utils;
/**
* @Auther zhangdajun
* @CreateDate 2023/1/7 18 00
* @Describe
*/
public enum KpiDimension {
_1D("1维指标"),_2D("2维指标");
String name;
KpiDimension(String name){
this.name = name;
}
}
... ...
package com.honggroup.wks.flink.common.utils;
public class LogLevelUtil {
public static final String OFF = "OFF";
public static final String FATAL = "FATAL";
public static final String ERROR = "ERROR";
public static final String WARN = "WARN";
public static final String INFO = "INFO";
public static final String DEBUG = "DEBUG";
public static final String TRACE = "TRACE";
public static final String ALL = "ALL";
public static final String[] LOG_LEVEL_ARR = {OFF, FATAL, ERROR, WARN, INFO, DEBUG, TRACE, ALL};
/**
* 日志级别转换为统一格式字符串
*
* @param logLevel
* @return
*/
public static String format(String logLevel) {
if (logLevel == null || logLevel.length() == 0) {
return "Empty";
}
String str = logLevel.replaceAll("[^A-Za-z]", "");
for (String item : LOG_LEVEL_ARR) {
if (item.equals(str)) {
return item;
}
}
return logLevel;
}
public static void main(String[] args) {
System.out.println("123_.Abc321".replaceAll("[^A-Za-z]", ""));
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import java.security.MessageDigest;
public class MD5Util {
public MD5Util() {
}
public static String getMD5(byte[] source) {
String s = null;
char[] hexDigits = new char[]{'0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'};
try {
MessageDigest md = MessageDigest.getInstance("MD5");
md.update(source);
byte[] tmp = md.digest();
char[] str = new char[32];
int k = 0;
for(int i = 0; i < 16; ++i) {
byte byte0 = tmp[i];
str[k++] = hexDigits[byte0 >>> 4 & 15];
str[k++] = hexDigits[byte0 & 15];
}
s = new String(str);
} catch (Exception var9) {
var9.printStackTrace();
}
return s.toUpperCase();
}
public static String getMD58(byte[] source) {
String md5Str = getMD5(source);
return md5Str.substring(0, 8);
}
public static String makeKpiId(String kpiName) {
String kpiId = "KPI" + getMD58(kpiName.getBytes());
return kpiId;
}
public static void main(String[] args) {
String[] name = new String[]{"海虹列头柜频率", "台达列头柜频率", "精密空调回风湿度", "精密空调回风温度", "精密空调风机状态", "精密空调风机电流", "精密空调电机电流", "精密空调压缩机电流", "精密空调加湿器状态", "精密空调电加热状态"};
for(int i = 0; i < name.length; ++i) {
String uuid = "KPI" + getMD58(name[i].getBytes());
System.out.println(uuid);
}
}
}
\ No newline at end of file
... ...
package com.honggroup.wks.flink.common.utils;
/**
* @Auther zhangdajun
* @CreateDate 2022/9/25 16 03
* @Describe
*/
public enum NodeType {
INPUT("输入节点","input"),
CLEANFILTER("清洗过滤","cleanFilter"),
PARSE("数据格式化","parse"),
CONVERSION("数据转化","conversion"),
AGGREGATION("数据聚合","aggregation"),
OUTPUT("数据输出","output")
;
private String name;
private String val;
NodeType(String n,String v){
this.name = n;
this.val = v;
}
public String getName() {
return name;
}
public String getVal() {
return val;
}
public enum Parse{
grok, json, xml, text, cvs, kv, split
}
public enum Convert{
redis,desensitization,processing
}
}
... ...
package com.honggroup.wks.flink.common.utils;
/**
* @Auther zhangdajun
* @CreateDate 2022/9/25 21 49
* @Describe
*/
public enum NodeWashingFilterType {
WASHING("过滤","washing"),
SCREENING("筛选","screening"),
REPLACE("替换","replace");
private String val;
private String text;
NodeWashingFilterType(String n,String v){
this.text = n;
this.val = v;
}
public String getText() {
return text;
}
public String getVal() {
return val;
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import cn.hutool.core.codec.Base64;
import org.apache.commons.lang3.RandomUtils;
import java.nio.charset.StandardCharsets;
public class PasswordUtils {
private static final int DEFAULT_LEVEL = 3;
private static final int DEFAULT_LENGTH = 3;
public static String generatePwd(int level, int pwdLength) {
if (pwdLength < 6) {
pwdLength = 6;
}
StringBuilder password = new StringBuilder();
switch (level) {
case 1: {
switch (RandomUtils.nextInt(0, 2)) {
case 0: {
for (int i = 0; i < pwdLength; i++) {
password.append(RandomUtils.nextInt(0, 10));
}
}
break;
case 1: {
for (int i = 0; i < pwdLength; i++) {
password.append((char) RandomUtils.nextInt(97, 123));
}
}
break;
}
}
break;
case 2: {
password.append(RandomUtils.nextInt(0, 10));
password.append((char) RandomUtils.nextInt(97, 123));
for (int i = 0; i < pwdLength - 2; i++) {
switch (RandomUtils.nextInt(0, 2)) {
case 0: {
password.append(RandomUtils.nextInt(0, 10));
}
break;
case 1: {
password.append((char) RandomUtils.nextInt(97, 123));
}
break;
}
}
}
break;
case 3: {
password.append(RandomUtils.nextInt(0, 10));
password.append((char) RandomUtils.nextInt(65, 91));
password.append((char) RandomUtils.nextInt(97, 123));
for (int i = 0; i < pwdLength - 3; i++) {
switch (RandomUtils.nextInt(0, 3)) {
case 0: {
password.append(RandomUtils.nextInt(0, 10));
}
break;
case 1: {
password.append((char) RandomUtils.nextInt(65, 91));
}
break;
case 2: {
password.append((char) RandomUtils.nextInt(97, 123));
}
break;
}
}
}
break;
case 4: {
password.append(RandomUtils.nextInt(0, 10));
password.append((char) RandomUtils.nextInt(65, 91));
password.append((char) RandomUtils.nextInt(97, 123));
password.append(getSpecialChar());
for (int i = 0; i < pwdLength - 4; i++) {
switch (RandomUtils.nextInt(0, 4)) {
case 0: {
password.append(RandomUtils.nextInt(0, 10));
}
break;
case 1: {
password.append((char) RandomUtils.nextInt(65, 91));
}
break;
case 2: {
password.append((char) RandomUtils.nextInt(97, 123));
}
break;
case 3: {
password.append(getSpecialChar());
}
break;
}
}
}
break;
}
return password.toString();
}
public static String generatePwd() {
return generatePwd(DEFAULT_LEVEL, DEFAULT_LENGTH);
}
public static String generatePwd(int level) {
return generatePwd(level, DEFAULT_LENGTH);
}
public static String generatePwdAndEnCode(int level, int pwdLength) {
return Base64.encode(generatePwd(level, pwdLength).getBytes(StandardCharsets.UTF_8));
}
public static String generatePwdAndEnCode() {
return generatePwdAndEnCode(DEFAULT_LEVEL, DEFAULT_LENGTH);
}
private static char getSpecialChar() {
// ~!#$%^&*()_+
char c = '~';
switch (RandomUtils.nextInt(0, 12)) {
case 0: c = '~'; break;
case 1: c = '!'; break;
case 2: c = '#'; break;
case 3: c = '$'; break;
case 4: c = '%'; break;
case 5: c = '^'; break;
case 6: c = '&'; break;
case 7: c = '*'; break;
case 8: c = '('; break;
case 9: c = ')'; break;
case 10: c = '_'; break;
case 11: c = '+'; break;
}
return c;
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import org.apache.commons.lang3.StringUtils;
/**
* @Auther zhangdajun
* @CreateDate 2019/4/13 15 32
* @Describe
*/
public interface RedisKey {
String KEY_CONCAT_FLAG = ":";
interface AlarmKey{
String notice = "notice";
String sendAlarmTimes = "sendAlarmTimes";
String lastSendTime = "lastSendTime";
String level = "level";// 告警级别
String upgrade = "event:upgrade:";// 告警升级
interface Upgrade{
String create = "event:upgrade:create";
String processed = "event:upgrade:upgrade:";
String repeated = "event:upgrade:repeated:"; // 已重复次数
}
String clear = "event:clear:"; // 告警消除
String close = "event:close:"; // 告警关闭
interface Manual{
String clear = "event:manual:clear:";
String close = "event:manual:close:";
String kpiClose = "event:manual:kpi:close:";
}
}
interface Application{
String system = "system";
String subscribe = "system_default_subscribe";
}
interface ResourceKey{
String bizType = "bizType:resource";
String lastData = "lastData:";
String alarmFlag = "alarmFlag:";
String alarmRepeatCnt = "alarmRepeatCnt";
String health = "resource:health";
String autoDiscovery = "resource:discovery";
interface HealthKey{
String score = "score";
String health = "health";
}
}
interface FilterKey{
String filter = "filter:item";
String filterBizType = "filter:biz";
String filterResource = "filter:resource";
}
static String buildKey(String flag, String... keys){
StringBuffer sb = new StringBuffer(flag);
if(keys != null && keys.length > 0){
for (String key : keys) {
if(!StringUtils.isEmpty(key)) sb.append(KEY_CONCAT_FLAG).append(key);
}
}
return sb.toString();
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import cn.hutool.core.codec.Base64Decoder;
import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
import com.honggroup.wks.flink.common.config.Config;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.*;
import redis.clients.jedis.commands.PipelineCommands;
import redis.clients.jedis.util.JedisClusterCRC16;
import java.io.Serializable;
import java.util.*;
public class RedisUtil implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(RedisUtil.class);
private static final long serialVersionUID = 1L;
private static Set<HostAndPort> getHostAndPortList(String hosts) {
Set<HostAndPort> re = new HashSet<>();
String[] hostList = hosts.split(",");
for (String host : hostList) {
String[] parts = host.split(":", 2);
if (parts.length > 1) {
re.add(new HostAndPort(parts[0], Integer.parseInt(parts[1])));
}
}
return re;
}
JedisCluster jedisCluster = null;
private static RedisUtil redisUtil;
Configuration config;
private final GenericObjectPoolConfig cfg = new GenericObjectPoolConfig();
private RedisUtil(){ }
private RedisUtil(Configuration config){
this.config = config;
}
public static RedisUtil instance(Configuration config){
if(redisUtil == null){
redisUtil = new RedisUtil(config);
}
return redisUtil;
}
public JedisCluster getConnect(){
if(jedisCluster == null){
cfg.setMaxIdle(100);//最大空闲
cfg.setMinIdle(10);// 最小空闲
cfg.setMaxTotal(1024);// 最大连接数
cfg.setBlockWhenExhausted(true);// 使用完后是否等待
cfg.setMaxWaitMillis(30000); // 当资源池连接用尽后,调用者的最大等待时间(单位为毫秒)
cfg.setTestOnCreate(false);//
cfg.setTestOnBorrow(false);// 向资源池借用连接时是否做连接有效性检测(ping)。检测到的无效连接将会被移除。
cfg.setTestOnReturn(false);
cfg.setTestWhileIdle(true);
}
jedisCluster = buildJedisCluster();
return jedisCluster;
}
private final CommandObjects commandObjects = new CommandObjects();
// 搜索集群内
public Set<String> keys(String pattern) {
Set<String> result = new HashSet<>();
// 获取Redis集群内所有节点
Map<String, ConnectionPool> clusterNodes = jedisCluster.getClusterNodes();
for (Map.Entry<String, ConnectionPool> entry : clusterNodes.entrySet()) {
Connection resource = entry.getValue().getResource();
Set<String> res = resource.executeCommand(commandObjects.keys(pattern));
if (!CollectionUtil.isEmpty(res)) {
// 合并搜索结果
result.addAll(res);
}
resource.close();
}
return result;
}
public <T> Map<String,T> mutiPatternGet(Set<String> keys,Class<T> clazz) {
/*Map<Connection, List<String>> poolKeyMap = getPoolKeyMap(new ArrayList<>(keys));
for (Map.Entry<Connection, List<String>> connectionListEntry : poolKeyMap.entrySet()) {
Connection connection = connectionListEntry.getKey();
List<String> keyList = connectionListEntry.getValue();
List<String> values = connection.executeCommand(commandObjects.mget(keys.toArray(new String[0])));
System.out.println();
}*/
// 获取Redis集群内所有节点
//Map<String, ConnectionPool> clusterNodes = jedisCluster.getClusterNodes();
Map<String, T> cacheData = new HashMap<>();
/*for (Map.Entry<String, ConnectionPool> entry : clusterNodes.entrySet()) {
Connection resource = entry.getValue().getResource();
String s = resource.executeCommand(commandObjects.get(keys.iterator().next()));
// 合并搜索结果
List<String> values = resource.executeCommand(commandObjects.mget(keys.toArray(new String[0])));
int i = 0;
for (String key : keys) {
cacheData.put(key, JSONObject.parseObject(values.get(i), clazz));
}
resource.close();
}*/
return cacheData;
}
public Map<Integer, List<String>> getPoolKeyMap(List<String> keys) {
Map<Integer, List<String>> poolKeysMap = new LinkedHashMap<Integer, List<String>>();
try {
for (String key : keys) {
int slot = JedisClusterCRC16.getSlot(key);
if (poolKeysMap.containsKey(slot)) {
poolKeysMap.get(slot).add(key);
} else {
List<String> subKeyList = new ArrayList<String>();
subKeyList.add(key);
poolKeysMap.put(slot, subKeyList);
}
}
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
return poolKeysMap;
}
private JedisCluster buildJedisCluster() {
Set<HostAndPort> redisListHostAndPort;
redisListHostAndPort = getHostAndPortList(config.getString(Config.REDIS_HOSTS));
LOG.debug("initParamEnv:redisHosts=" + config.getString(Config.REDIS_HOSTS));
LOG.debug("initParamEnv:redisAuthority=" + config.getBoolean(Config.REDIS_AUTHORITY));
LOG.debug("initParamEnv:redisAuthorityPassword=" + config.getString(Config.REDIS_PASSWORD));
if (redisListHostAndPort == null) {
LOG.error("Param:Config.REDIS_HOSTS = {}", Config.REDIS_HOSTS);
} else {
if (config.getBoolean(Config.REDIS_AUTHORITY)) {
jedisCluster = new JedisCluster(redisListHostAndPort, 10000, 1000, 10, Base64Decoder.decodeStr(config.getString(Config.REDIS_PASSWORD)), cfg);
} else {
jedisCluster = new JedisCluster(redisListHostAndPort, 10000, 1000, 10, cfg);
}
}
return jedisCluster;
}
public static String keyGenerator(String... keys){
StringBuffer genRst = new StringBuffer();
for (String key : keys) {
genRst.append(":").append(key);
}
return genRst.substring(1);
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class SerializeUtil {
public static byte[] serialize(Object object) {
ObjectOutputStream oos = null;
ByteArrayOutputStream baos = null;
try {
//序列化
baos = new ByteArrayOutputStream();
oos = new ObjectOutputStream(baos);
oos.writeObject(object);
byte[] bytes = baos.toByteArray();
return bytes;
} catch (Exception e) {
}
return null;
}
public static<T> T deserialize(byte[] bytes) {
ByteArrayInputStream bais = null;
try {
//反序列化
bais = new ByteArrayInputStream(bytes);
ObjectInputStream ois = new ObjectInputStream(bais);
return (T)ois.readObject();
} catch (Exception e) {
}
return null;
}
}
\ No newline at end of file
... ...
package com.honggroup.wks.flink.common.utils;
import cn.hutool.core.lang.Tuple;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.math3.stat.StatUtils;
import org.apache.commons.math3.stat.descriptive.moment.StandardDeviation;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
* @Auther zhangdajun
* @CreateDate 2021/8/24 13 05
* @Describe
*/
@Slf4j
public class SigmaUtil {
/**
* @Author zhongkai
* @Date 2021/2/20 16:31
* @Description 3σ准则过滤
**/
public static List<Double> sigma3Filter(List<Double> moduleData,List<Double> calData){
List<Double> returnList= new ArrayList<>(calData);
double[] dataA = new double[moduleData.size()];
for (int i=0;i<moduleData.size();i++){
dataA[i]=moduleData.get(i);
}
List<Double> outliersList = sigmaFilter(dataA, calData,3);
for (double vo:outliersList){
if (returnList.contains(vo)){
returnList.remove(vo);
}
}
return returnList;
}
/**
* @Description x sigema准则计算
* @param module 原始数据数组
* @param data 需要对比的数据数组
* @param x sigema的个数 一般使用3σ则x=3 样本数量一般要大于等于30个
**/
public static List<Double> sigmaFilter(double[] module, List<Double> data, int x){
Tuple tuple = calMeanAndStandardDeviation(module);
double avg = tuple.get(0);
log.debug("算数平均值μ:"+avg );//算数平均值
double stDev = tuple.get(1);
log.debug("标准差σ为:" + stDev);
return sigmaFilter(avg,stDev,data,x);
}
public static List<Double> sigmaFilter(double avg,double stDev,List<Double> data,int x){
List<Double> outliersList=new ArrayList<>();
for (double vo : data){
//判断异常值方法
if(Math.abs(vo-avg)>(x*stDev)) {
outliersList.add(vo);
log.debug("使用"+x+"σ准则进行过滤,该数组中的"+vo+"属于异常值!");
}
}
return outliersList;
}
public static List<Double> sigma3Filter(double avg,double stDev,double[] data,int x){
List<Double> outliersList=new ArrayList<>();
for (double vo : data){
//判断异常值方法
if(Math.abs(vo-avg)>(x*stDev)) {
outliersList.add(vo);
log.debug("使用"+x+"σ准则进行过滤,该数组中的"+vo+"属于异常值!");
}
}
return outliersList;
}
/**
* 返回最大或最小的指标值
* @param avg
* @param stDev
* @param data
* @param x
* @param type 0为最小,1为最大
* @return
*/
public static double matrixMean(double avg, double stDev, double[] data, int x, int type){
double meanMax = 0;
double meanMin = Double.MAX_VALUE;
for (double vo : data){
// 不属于异常检测的值将纳入计算
if(!(Math.abs(vo-avg)>(x*stDev))) {
if(vo > meanMax){
meanMax = vo;
}
if(vo < meanMin){
meanMin = vo;
}
}else{
log.info("排除值:{}",vo);
}
}
return type == 0 ? meanMin : meanMax;
}
/**
* 异常检测方法
* @param avg
* @param stDev
* @param data
* @param x
* @return 返回true有异常,返回false无异常
*/
public static boolean anomalyDetection(double avg,double stDev,double data,int x){
if(Math.abs(data-avg) > (x*stDev)) {
return true;
}
return false;
}
/**
* 计算算数平均值以及标准方差
* @param data
* @return
*/
public static Tuple calMeanAndStandardDeviation(double[] data){
return new Tuple(calMean(data),calStandardDeviation(data));
}
public static double calMean(double[] data){
return StatUtils.mean(data);
}
public static double calStandardDeviation(double[] data){
StandardDeviation standardDeviation =new StandardDeviation();
return standardDeviation.evaluate(data);
}
public static double sigma(double[] data,int x){
StandardDeviation standardDeviation =new StandardDeviation();
return standardDeviation.evaluate(data) * x;
}
/**
* 校验是否为2的幂次方
* @param n
* @return
*/
public static boolean isTwoPower(int n){
return n > 0 && n != 1 && (n&(n-1)) == 0;
}
public static int formatTwoPowerData(int n){
while (true){
if(isTwoPower(n)){
break;
}else{
n ++;
}
}
return n;
}
public static void main(String[] args) {
/*System.out.println(isTwoPower(5));
System.out.println(formatTwoPowerData(5));*/
/*List<Double> baseData = Arrays.asList(1.0,1.0,2.0,1.0,0.0,12.0,1.0,1.0,1.0,
0.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,12.0,1.0,1.0,1.0,15.0,10d,20.01,10.1,3.01,4.01,5.01,6.01,100d);*/
double[] baseData = {0d,0d,0d,0d,0d,0d,0d,0d,0d,0d,0d,0d,1d,0d,0d,0d,0d,0d,0d,0d,0d,0d,0d,0d,0d,0d};
List<Double> calData = Arrays.asList(0.2,0.3,0.4,0.5,0.6,0.7,0.8,0.9);
Tuple tuple = calMeanAndStandardDeviation(baseData);
System.out.println(sigmaFilter(tuple.get(0),tuple.get(1),calData,3));
//System.out.println(JSONObject.toJSONString(SigmaUtil.sigma3Filter(baseData,calData)));
}
}
... ...
package com.honggroup.wks.flink.common.utils;
import java.util.Calendar;
import java.util.Date;
/**
* @Auther zhangdajun
* @CreateDate 2020/3/2 14 45
* @Describe
*/
public class TimeHelper {
/**
* 当前时间是否满足设定的告警通知时间区间
* @param times 时间区间字符串,格式:[00:00]-[17:00],[09:00]-[19:00]
* @param currentTime 当前时间
* @return 是否在时间区间内
*/
public static boolean timeZoneJudge(String times, Date currentTime){
boolean flag = false;
String[] timesArr = times.split(",");
for(String timeStr : timesArr){
String[] innerTimeArr = timeStr.split("-");
//flag = flag || isEffectiveDate(currentTime,innerTimeArr[0].replace("[","").replace("]",""),innerTimeArr[1].replace("[","").replace("]",""));
flag = flag || isEffectiveDate(currentTime,innerTimeArr[0].replace(" ",""),innerTimeArr[1].replace(" ",""));
}
return flag;
}
/**
* 判断当前时间在开始时间与结束时间之间
* @param nowTime 当前时间
* @param startTime 开始时间
* @param endTime 结束时间
* @return 是否在时间区间内
*/
public static boolean isEffectiveDate(Date nowTime, String startTime, String endTime) {
long timestamp = nowTime.getTime();
Calendar date = Calendar.getInstance();
date.setTimeInMillis(timestamp);
Calendar begin = Calendar.getInstance();
begin.setTimeInMillis(timestamp);
String[] beginArr = startTime.split(":");
begin.set(Calendar.HOUR_OF_DAY, Integer.parseInt(beginArr[0]));
begin.set(Calendar.MINUTE, Integer.parseInt(beginArr[1]));
begin.set(Calendar.SECOND, Integer.parseInt(beginArr[2]));
Calendar end = Calendar.getInstance();
end.setTimeInMillis(timestamp);
String[] afterArr = endTime.split(":");
end.set(Calendar.HOUR_OF_DAY, Integer.parseInt(afterArr[0]));
end.set(Calendar.MINUTE, Integer.parseInt(afterArr[1]));
end.set(Calendar.SECOND, Integer.parseInt(afterArr[2]));
if (date.after(begin) && date.before(end)) {
return true;
} else {
return false;
}
}
}
... ...
package com.honggroup.wks.flink.common.utils;
/**
* @Auther zhangdajun
* @CreateDate 2022/9/27 16 32
* @Describe
*/
public enum TransMessageType {
MIDDLE,TRANS, AGG
}
... ...
package com.honggroup.wks.flink.common.utils;
import java.util.HashMap;
import java.util.Map;
/**
* @Auther zhangdajun
* @CreateDate 2022/10/30 13 58
* @Describe
*/
public interface WorkstationConstants {
String RDS_CAL_RECORD_FLAG = "wks:cal:record";
String RDS_DYNAMIC_KPI_LIST = "wks:cal:kpi";
String RDS_WKS_3SIGMA_PREFIX = "wks:3sigma:";
String RDS_WKS_5MIN_INTERVAL_RCD = "wks:5min:";
String RDS_WKS_3SIGMA_DATA = "data:";
String RDS_ORG_SEARCH_RCD_KEY = "search:";
String REDIS_PREFIX_DDIC_DDIC = "ddic:ddic";
String REDIS_PREFIX_DDIC_RESTYPT = "ddic:resType";
String REDIS_PREFIX_DDIC_PROTOCOL = "ddic:protocol";
String REDIS_PREFIX_DDIC_PROTOCOL_PLUGINS = "ddic:plugins";
String REDIS_PREFIX_DDIC_KEYTKPI = "ddic:keytKpi";
String REDIS_PREFIX_BASEINFO_COLLECTOR = "baseinfo:collector";
String REDIS_PREFIX_BASEINFO_TASK = "baseinfo:task";
String REDIS_PREFIX_BASEINFO_TEMPLATE = "baseinfo:template";
String REDIS_PREFIX_BASEINFO_DIRECTIVE = "baseinfo:directive";
String REDIS_PREFIX_BASEINFO_RES = "baseinfo:res";
String REDIS_PREFIX_BASEINFO_KPI = "baseinfo:kpi";
String REDIS_PREFIX_BASEINFO_KEYTKPI = "baseinfo:keytkpi";
String REDIS_PREFIX_BASEINFO_BIZ = "baseinfo:biz";
String REDIS_PREFIX_BASEINFO_APP = "baseinfo:app";
String REDIS_PREFIX_BASEINFO_SERVER = "baseinfo:server";
String RECORD_WAY_AUTO_DISCOVERY = "auto_discovery";
String RESOURCE_STATE_MONITOR = "monitor";
String PERF_3SIGMA_RCD = "pfm_3sigma_rcd";
String PERF_3SIGMA_5MIN_RCD = "pfm_3sigma_5min_rcd";
String AGG_5MIN_INDEX_PREFIX = "pfm_agg5min_";
String ES_PERFORMANCE_INDEX_ALIAS = "monitor-performance-search-90d";
String ALARM_DYNAMIC_THRESHOLD = "monitor-alarm-dynamic-threshold";
String KAFKA_MONITOR_PERFORMANCE_TOPIC = "monitor-performance";
String KAFKA_INPUT_SUBSCRIBE = "monitor-subscribe";
String ES_MONITOR_PERFORMANCE_DAY = "pfm_day_";
String ES_MONITOR_PERFORMANCE_INCREMENT = "pfm_increment_";
public String REDIS_CURR_ALARM_RECORD = "event:current";
String OUTPUT_COLLECTOR_RESULT_DB_BASE = "monitor-collector-result-db-base";
String OUTPUT_COLLECTOR_RESULT_EVENT = "monitor-collector-result-event-new";
String OUTPUT_COLLECTOR_RESULT_DB = "monitor-collector-result-db-new";
String OUTPUT_PRPROCESS_RESULT = "monitor-wks-preprocess";
String REDIS_PREFIX_LASTCOLL = "lastcoll";
int SYS_DDIC_ALARM_LEVEL_1 = 1;
int SYS_DDIC_ALARM_LEVEL_2 = 2;
int SYS_DDIC_ALARM_LEVEL_3 = 3;
String MONITOR_ALARM_UPGRADE = "/monitor/alarm/upgrade";
String MONITOR_ALARM_CLEAR = "/monitor/alarm/clear";
String MONITOR_ALARM_AUTO_CLEAR = "/monitor/alarm/autoClear";
String MONITOR_ALARM_CLOSE = "/monitor/alarm/close";
String MONITOR_ALARM_CLOSE_ALARM = "/monitor/alarm/close/alarm";
String MONITOR_ALARM_CLOSE_NOTICE = "/monitor/alarm/close/notice";
String MONITOR_ALARM_CLOSE_ALLKPI_ALARM = "/monitor/alarm/close/allkpi/alarm";
String MONITOR_ALARM_CLOSE_ALLKPI_NOTICE = "/monitor/alarm/close/allkpi/notice";
String MONITOR_RESOURCE_STATE = "/monitor/resource/state";
String MONITOR_RESOURCE_AUTODISCOVERY = "/monitor/resource/autodiscovery";
String INSPECTION_TASK = "/inspection/task";
String INSPECTION_ELEMENT = "/inspection/element";
String MONITOR_FAULT_AUTO_CLEAR = "/monitor/fault/autoClear";
String SYS_DDIC_ALARM_TYPE_ACTIVITY = "activity";
Integer ALARM_STATUS_0 = 0;
Integer ALARM_STATUS_1 = 1;
String ALARM_RESOURCE_ALARM = "alarm";
int IS_EVENT_UPGRADE = 1;
int EVENT_CLEAR_TYPE_AUTO = 1; // 自动消除
int EVENT_CLEAR_TYPE_FILTER = 2;// 过滤消除
int EVENT_CLEAR_TYPE_HANDLE = 3;// 手动消除
int EVENT_CLEAR_TYPE_NO_POLICY = 4;// 没有符合规则的策略消除
}
... ...
package com.honggroup.wks.flink.common.utils;
/**
* @Auther zhangdajun
* @CreateDate 2022/12/9 10 57
* @Describe
*/
public class WorkstationUID {
public static String UIDResIdFlagKpi(String resId,String flag,String kpiId){
return String.format("%s##%s##%s",resId,flag,kpiId);
}
}
... ...
mysql.url=jdbc:mysql://192.168.0.58:3306/cloud_backend_monitor?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&serverTimezone=Asia/Shanghai
mysql.user=root
mysql.password=MTIzNDU2
mysql.max_active=50
mysql.min_idle=1
mysql.initial_size=50
mysql.batch_size=10
mysql.batch_interval_ms=2000
mysql.max_retries=3
#Base Kafka Config
kafka.broker=192.168.0.51:19002
kafka.fix_group=true
kafka.group_id=honggroup_log
kafka.max_request_size=524288000
kafka.buffer_memory=524288000
kafka.max_partition_fetch_bytes=1048576000
kafka.retries_times=3
kafka.partition_discovery_interval=1000
kafka.auto_commit_interval_ms=5000
kafka.enable_auto_commit=true
kafka.auto_offset_reset=latest
kafka.compression_type=gzip
kafka.max_block_size=60000
kafka.batch_size=163840
kafka.linger_ms=10
kafka.request_timeout_ms=30000
kafka.default_api_timeout_ms=60000
#Optional Kafka Security Config
kafka.security_kerberos_enable=false
kafka.security_kerberos_type=PLAIN
kafka.security_kerberos_conf_user=
kafka.security_kerberos_conf_pass=
kafka.java_security_krb5=
kafka.java_security_auth_login=
kafka.javax_security_auth_usesubjectcredsonly=false
redis.hosts = 192.168.0.61:7001,192.168.0.61:7002,192.168.0.61:7003,192.168.0.61:7004,192.168.0.61:7005,192.168.0.61:7006
redis.authority = false
redis.password = dWxtcA==
redis.data_expire = 86400
# ES config
#elasticsearch.rest.url= 192.168.0.46:9200,192.168.0.47:9200,192.168.0.48:9200
elasticsearch.rest.url= 192.168.0.46:9200,192.168.0.47:9200,192.168.0.48:9200
elasticsearch.hosts = 192.168.0.46:9200,192.168.0.47:9200,192.168.0.48:9200
elasticsearch.index_number_of_shards = 1
elasticsearch.index_number_of_replicas = 0
elasticsearch.bulk_flush_max_actions = 1000
elasticsearch.bulk_flush_max_size_mb = 5
elasticsearch.bulk_flush_interval_ms = 10000
#\u6743\u9650\u8BA4\u8BC1\u5F00\u5173\uFF0C\u9ED8\u8BA4\u4E0D\u5F00\u542F
elasticsearch.authority = false
#\u6743\u9650\u8BA4\u8BC1\u7528\u6237\u540D
#elasticsearch.authority.username = honggroup
#\u6743\u9650\u8BA4\u8BC1\u5BC6\u7801\uFF0C\u91C7\u7528192.168.45.86\u52A0\u5BC6
#elasticsearch.authority.password = MXEydzNlNFIhQA==
influxdb.url= http://zhjk-influxdb:8086
influxdb.database= monitor
influxdb.retention_policy= autogen
influxdb.retention_policy_time= 30d
influxdb.batch_flush_time=10000
influxdb.batch_count=1000
zookeeper.url=192.168.1.53:19001
... ...
env=dev
\ No newline at end of file
... ...
<?xml version="1.0" encoding="UTF-8" ?>
<!--
status="warn" 日志框架本身的输出日志级别,可以修改为debug
monitorInterval="5" 自动加载配置文件的间隔时间,不低于 5秒;生产环境中修改配置文件,是热更新,无需重启应用
-->
<configuration status="info" monitorInterval="5">
<!--
集中配置属性进行管理
使用时通过:${name}
-->
<properties>
<property name="LOG_HOME">/Users/zhangdajun/codding-space/monitor_v3/workstation-flink/common/target</property>
<property name="LOG_PATTERN">%d{HH:mm:ss.SSS} %X{APP_START_SERIAL} [%t] [%-5level] %M:%L --- %m%n</property>
<property name="LOG_PATTERN_YEAR">%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] [%-5level] %M:%L --- %m%n</property>
</properties>
<!-- 日志处理 -->
<Appenders>
<!-- 控制台输出 appender,SYSTEM_OUT输出黑色,SYSTEM_ERR输出红色 -->
<Console name="Console" target="SYSTEM_OUT" >
<PatternLayout pattern="[%d{yyyy-MM-dd HH:mm:ss:SSS}] [%p] - %l - %m%n"/>
</Console>
<!-- 按照一定规则拆分的日志文件的appender --> <!-- 拆分后的文件 -->
<!-- filePattern="${LOG_HOME}/$${date:yyyy-MM-dd}/myrollog-%d{yyyy-MM-dd-HH-mm}-%i.log"> -->
<RollingFile name="rollingFile" fileName="${LOG_HOME}/wks-biz.log"
filePattern="${LOG_HOME}/$${date:yyyy-MM-dd}/wks-biz-%d{yyyy-MM-dd}-%i.log">
<!-- 日志级别过滤器 -->
<ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY" />
<!-- 日志消息格式 -->
<PatternLayout pattern="${LOG_PATTERN_YEAR}" />
<Policies>
<!-- 在系统启动时,出发拆分规则,生产一个新的日志文件 -->
<OnStartupTriggeringPolicy />
<!-- 按照文件大小拆分,10MB -->
<SizeBasedTriggeringPolicy size="200MB" />
<!-- 按照时间节点拆分,规则根据filePattern定义的 -->
<TimeBasedTriggeringPolicy />
</Policies>
<!-- 在同一个目录下,文件的个限定为 30个,超过进行覆盖 -->
<DefaultRolloverStrategy max="10" />
</RollingFile>
</Appenders>
<!-- logger 定义 -->
<Loggers>
<!-- 使用 rootLogger 配置 日志级别 level="trace" -->
<Root level="info">
<!-- 指定日志使用的处理器 -->
<AppenderRef ref="Console" />
<AppenderRef ref="rollingFile" />
</Root>
<!--过滤掉spring和mybatis的一些无用的DEBUG信息-->
<Logger name="org.mybatis" level="INFO"/>
<Logger name="org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" level="error"/>
<Logger name="org.apache.zookeeper" level="error"/>
<Logger name="org.apache.hadoop" level="error"/>
<Logger name="org.apache.kafka" level="error"/>
<Logger name="akka" level="error"/>
<Logger name="org.apache.flink" level="warn"/>
<!--log4j2 自带过滤日志-->
<Logger name="org.apache.catalina.startup.DigesterFactory" level="error"/>
<Logger name="org.apache.catalina.util.LifecycleBase" level="error"/>
<Logger name="org.apache.coyote.http11.Http11NioProtocol" level="warn"/>
<Logger name="org.apache.sshd.common.util.SecurityUtils" level="warn"/>
<Logger name="org.apache.tomcat.util.net.NioSelectorPool" level="warn"/>
<Logger name="org.crsh.plugin" level="warn"/>
<Logger name="org.crsh.ssh" level="warn"/>
<Logger name="org.eclipse.jetty.util.component.AbstractLifeCycle" level="error"/>
<Logger name="org.hibernate.validator.internal.util.Version" level="warn"/>
<Logger name="org.springframework.boot.actuate.autoconfigure.CrshAutoConfiguration" level="warn"/>
<Logger name="org.springframework.boot.actuate.endpoint.jmx" level="warn"/>
<Logger name="org.thymeleaf" level="warn"/>
</Loggers>
</configuration>
\ No newline at end of file
... ...
# Forked from https://github.com/elasticsearch/logstash/tree/v1.4.0/patterns
USERNAME [a-zA-Z0-9._-]+
USER %{USERNAME:UNWANTED}
INT (?:[+-]?(?:[0-9]+))
BASE10NUM (?<![0-9.+-])(?>[+-]?(?:(?:[0-9]+(?:\.[0-9]+)?)|(?:\.[0-9]+)))
NUMBER (?:%{BASE10NUM:UNWANTED})
BASE16NUM (?<![0-9A-Fa-f])(?:[+-]?(?:0x)?(?:[0-9A-Fa-f]+))
BASE16FLOAT \b(?<![0-9A-Fa-f.])(?:[+-]?(?:0x)?(?:(?:[0-9A-Fa-f]+(?:\.[0-9A-Fa-f]*)?)|(?:\.[0-9A-Fa-f]+)))\b
POSINT \b(?:[1-9][0-9]*)\b
NONNEGINT \b(?:[0-9]+)\b
WORD \b\w+\b
NOTSPACE \S+
SPACE \s*
DATA .*?
GREEDYDATA .*
#QUOTEDSTRING (?:(?<!\\)(?:"(?:\\.|[^\\"])*"|(?:'(?:\\.|[^\\'])*')|(?:`(?:\\.|[^\\`])*`)))
QUOTEDSTRING (?>(?<!\\)(?>"(?>\\.|[^\\"]+)+"|""|(?>'(?>\\.|[^\\']+)+')|''|(?>`(?>\\.|[^\\`]+)+`)|``))
UUID [A-Fa-f0-9]{8}-(?:[A-Fa-f0-9]{4}-){3}[A-Fa-f0-9]{12}
# Networking
MAC (?:%{CISCOMAC:UNWANTED}|%{WINDOWSMAC:UNWANTED}|%{COMMONMAC:UNWANTED})
CISCOMAC (?:(?:[A-Fa-f0-9]{4}\.){2}[A-Fa-f0-9]{4})
WINDOWSMAC (?:(?:[A-Fa-f0-9]{2}-){5}[A-Fa-f0-9]{2})
COMMONMAC (?:(?:[A-Fa-f0-9]{2}:){5}[A-Fa-f0-9]{2})
IPV6 ((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?
IPV4 (?<![0-9])(?:(?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2})[.](?:25[0-5]|2[0-4][0-9]|[0-1]?[0-9]{1,2}))(?![0-9])
IP (?:%{IPV6:UNWANTED}|%{IPV4:UNWANTED})
HOSTNAME \b(?:[0-9A-Za-z][0-9A-Za-z-]{0,62})(?:\.(?:[0-9A-Za-z][0-9A-Za-z-]{0,62}))*(\.?|\b)
HOST %{HOSTNAME:UNWANTED}
IPORHOST (?:%{HOSTNAME:UNWANTED}|%{IP:UNWANTED})
HOSTPORT (?:%{IPORHOST}:%{POSINT:PORT})
# paths
PATH (?:%{UNIXPATH}|%{WINPATH})
UNIXPATH (?>/(?>[\w_%!$@:.,~-]+|\\.)*)+
#UNIXPATH (?<![\w\/])(?:/[^\/\s?*]*)+
TTY (?:/dev/(pts|tty([pq])?)(\w+)?/?(?:[0-9]+))
WINPATH (?>[A-Za-z]+:|\\)(?:\\[^\\?*]*)+
URIPROTO [A-Za-z]+(\+[A-Za-z+]+)?
URIHOST %{IPORHOST}(?::%{POSINT:port})?
# uripath comes loosely from RFC1738, but mostly from what Firefox
# doesn't turn into %XX
URIPATH (?:/[A-Za-z0-9$.+!*'(){},~:;=@#%_\-]*)+
#URIPARAM \?(?:[A-Za-z0-9]+(?:=(?:[^&]*))?(?:&(?:[A-Za-z0-9]+(?:=(?:[^&]*))?)?)*)?
URIPARAM \?[A-Za-z0-9$.+!*'|(){},~@#%&/=:;_?\-\[\]]*
URIPATHPARAM %{URIPATH}(?:%{URIPARAM})?
URI %{URIPROTO}://(?:%{USER}(?::[^@]*)?@)?(?:%{URIHOST})?(?:%{URIPATHPARAM})?
# Months: January, Feb, 3, 03, 12, December
MONTH \b(?:Jan(?:uary)?|Feb(?:ruary)?|Mar(?:ch)?|Apr(?:il)?|May|Jun(?:e)?|Jul(?:y)?|Aug(?:ust)?|Sep(?:tember)?|Oct(?:ober)?|Nov(?:ember)?|Dec(?:ember)?)\b
MONTHNUM (?:0?[1-9]|1[0-2])
MONTHNUM2 (?:0[1-9]|1[0-2])
MONTHDAY (?:(?:0[1-9])|(?:[12][0-9])|(?:3[01])|[1-9])
# Days: Monday, Tue, Thu, etc...
DAY (?:Mon(?:day)?|Tue(?:sday)?|Wed(?:nesday)?|Thu(?:rsday)?|Fri(?:day)?|Sat(?:urday)?|Sun(?:day)?)
# Years?
YEAR (?>\d\d){1,2}
# Time: HH:MM:SS
#TIME \d{2}:\d{2}(?::\d{2}(?:\.\d+)?)?
# I'm still on the fence about using grok to perform the time match,
# since it's probably slower.
# TIME %{POSINT<24}:%{POSINT<60}(?::%{POSINT<60}(?:\.%{POSINT})?)?
HOUR (?:2[0123]|[01]?[0-9])
MINUTE (?:[0-5][0-9])
# '60' is a leap second in most time standards and thus is valid.
SECOND (?:(?:[0-5]?[0-9]|60)(?:[:.,][0-9]+)?)
TIME (?!<[0-9])%{HOUR}:%{MINUTE}(?::%{SECOND})(?![0-9])
# datestamp is YYYY/MM/DD-HH:MM:SS.UUUU (or something like it)
DATE_US %{MONTHNUM}[/-]%{MONTHDAY}[/-]%{YEAR}
DATE_EU %{MONTHDAY}[./-]%{MONTHNUM}[./-]%{YEAR}
ISO8601_TIMEZONE (?:Z|[+-]%{HOUR}(?::?%{MINUTE}))
ISO8601_SECOND (?:%{SECOND}|60)
TIMESTAMP_ISO8601 %{YEAR}-%{MONTHNUM}-%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?%{ISO8601_TIMEZONE}?
DATE %{DATE_US}|%{DATE_EU}
DATESTAMP %{DATE}[- ]%{TIME}
TZ (?:[PMCE][SD]T|UTC)
PM_AM (?:PM|AM)
DATESTAMP_RFC822 %{DAY} %{MONTH} %{MONTHDAY} %{YEAR} %{TIME} %{TZ}
DATESTAMP_RFC2822 %{DAY}, %{MONTHDAY} %{MONTH} %{YEAR} %{TIME} %{ISO8601_TIMEZONE}
DATESTAMP_OTHER %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{TZ} %{YEAR}
DATESTAMP_EVENTLOG %{YEAR}%{MONTHNUM2}%{MONTHDAY}%{HOUR}%{MINUTE}%{SECOND}
DATESTAMP_HGLOG %{DAY} %{MONTH} %{MONTHDAY} %{TIME} %{YEAR}
#<Jun 7, 2022 5:00:42 PM CST>
DATESTAMP_WEBLOGIC %{MONTH} %{MONTHDAY}, %{YEAR} %{TIME} %{PM_AM} CST
# Syslog Dates: Month Day HH:MM:SS
SYSLOGTIMESTAMP %{MONTH} +%{MONTHDAY} %{TIME}
PROG (?:[\w._/%-]+)
SYSLOGPROG %{PROG:program}(?:\[%{POSINT:pid}\])?
SYSLOGHOST %{IPORHOST}
SYSLOGFACILITY <%{NONNEGINT:facility}.%{NONNEGINT:priority}>
HTTPDATE %{MONTHDAY}/%{MONTH}/%{YEAR}:%{TIME} %{INT}
# Shortcuts
QS %{QUOTEDSTRING:UNWANTED}
# Log formats
SYSLOGBASE %{SYSLOGTIMESTAMP:timestamp} (?:%{SYSLOGFACILITY} )?%{SYSLOGHOST:logsource} %{SYSLOGPROG}:
MESSAGESLOG %{SYSLOGBASE} %{DATA}
COMMONAPACHELOG %{IPORHOST:clientip} %{USER:ident} %{USER:auth} \[%{HTTPDATE:timestamp}\] "(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})" %{NUMBER:response} (?:%{NUMBER:bytes}|-)
COMBINEDAPACHELOG %{COMMONAPACHELOG} %{QS:referrer} %{QS:agent}
COMMONAPACHELOG_DATATYPED %{IPORHOST:clientip} %{USER:ident;boolean} %{USER:auth} \[%{HTTPDATE:timestamp;date;dd/MMM/yyyy:HH:mm:ss Z}\] "(?:%{WORD:verb;string} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion;float})?|%{DATA:rawrequest})" %{NUMBER:response;int} (?:%{NUMBER:bytes;long}|-)
# Log Levels
LOGLEVEL ([A|a]lert|ALERT|[T|t]race|TRACE|[D|d]ebug|DEBUG|[N|n]otice|NOTICE|[I|i]nfo|INFO|[W|w]arn?(?:ing)?|WARN?(?:ING)?|[E|e]rr?(?:or)?|ERR?(?:OR)?|[C|c]rit?(?:ical)?|CRIT?(?:ICAL)?|[F|f]atal|FATAL|[S|s]evere|SEVERE|EMERG(?:ENCY)?|[Ee]merg(?:ency)?)
LOGDATE (19|20)\d{2}\-[01]\d\-[0123]\d\s\d{2}\:\d{2}\:\d{2}\.\d{3}
LOGJSON (\{.+\})
\ No newline at end of file
... ...
import cn.hutool.core.util.ObjUtil;
import cn.hutool.core.util.StrUtil;
import com.honggroup.wks.flink.common.config.Config;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.description.TextElement;
import java.lang.reflect.Field;
/**
* @Auther zhangdajun
* @CreateDate 2023/4/27 17 30
* @Describe
*/
public class PrintConfig {
public static void main(String[] args) throws Exception{
Config cfg = new Config();
Field[] fields = cfg.getClass().getDeclaredFields();
String defaultValue = "";
for (Field field : fields) {
field.setAccessible(true);
ConfigOption f = (ConfigOption)field.get(null);
if(ObjUtil.isEmpty(f.defaultValue())){
defaultValue = ",无默认值";
}else{
defaultValue = ",默认值:"+f.defaultValue();
}
System.out.println(f.key()+": "+((TextElement)f.description().getBlocks().get(0)).getFormat()+defaultValue);
}
}
}
... ...
import cn.hutool.core.collection.CollUtil;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
/**
* @Auther zhangdajun
* @CreateDate 2022/12/31 12 43
* @Describe
*/
public class Test {
public static void main(String[] args) {
Set<String> collKpi = new HashSet<>();
collKpi.add("KPI1");
collKpi.add("KPI2");
collKpi.add("KPI3");
List<String> checkKpi = new ArrayList<>();
checkKpi.add("KPI4");
checkKpi.add("KPI5");
checkKpi.add("KPI6");
System.out.println(CollUtil.intersection(collKpi,checkKpi));
//System.out.println(collKpi.retainAll(checkKpi));
System.out.println(collKpi.toString());
}
}
... ...
import java.util.Date;
/**
* @Auther zhangdajun
* @CreateDate 2022/12/20 16 04
* @Describe
*/
public class TimeTrans {
public static void main(String[] args) {
Date d = new Date(1667663700000L);
System.out.println(d.toString());
}
}
... ...
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.honggorup.workstation.flink</groupId>
<artifactId>workstation-flink</artifactId>
<version>1.15.2</version>
</parent>
<artifactId>configuration</artifactId>
<version>1.15.2</version>
<packaging>jar</packaging>
<description>load config</description>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>${zookeeper.client.version}</version>
</dependency>
<!-- Redis所需 -->
<dependency>
<groupId>com.honggorup.workstation.flink</groupId>
<artifactId>common</artifactId>
<version>1.15.2</version>
</dependency>
</dependencies>
<build>
<finalName>workstation-common.${flink.version}</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.config</include>
<include>**/*.grok</include>
<include>**/*.xml</include>
<include>**/mapper</include>
</includes>
</resource>
</resources>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<encoding>${project.build.sourceEncoding}</encoding>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
\ No newline at end of file
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 活动告警表(BAlarm)实体类
*
* @author zhangdajun
* @since 2023-01-07 19:41:47
*/
@Data
public class BAlarm implements Serializable {
private static final long serialVersionUID = 742040664080684827L;
/**
* 主键ID,同告警ID
*/
private String id;
/**
* 告警ID
*/
private String alarmId;
/**
* 批次号
*/
private String batchNo;
/**
* 任务ID
*/
private String taskId;
/**
* 任务名称
*/
private String taskName;
/**
* 资源ID
*/
private String resId;
/**
* 二级资源标识
*/
private String flag;
/**
* 资源名称
*/
private String resName;
/**
* 资源类型
*/
private String resType;
/**
* 指标ID
*/
private String kpiId;
/**
* 指标名称
*/
private String kpiName;
/**
* 告警类型:activity活动告警
*/
private String alarmType;
/**
* 告警级别:1一般告警、2重要告警、3严重告警
*/
private String alarmLevel;
/**
* 告警级别名称:一般告警、重要告警、严重告警
*/
private String alarmLevelName;
/**
* 首次告警时间
*/
private Date alarmTime;
/**
* 告警内容
*/
private String alarmContent;
/**
* 告警次数
*/
private Integer alarmRepeatCnt;
/**
* 订阅方式,该字段不使用
*/
private String subWay;
/**
* 最后一次更新时间
*/
private Date updateTime;
/**
* 入库时间,即首次告警时间
*/
private Date createTime;
/**
* 告警状态,0告警,1关闭
*/
private Integer alarmStatus;
/**
* 告警来源:alarm监控告警、docking-huawei华为云告警、docking-huawei阿里云告警、docking-machineroom机房告警[alarm_resource]
*/
private String alarmResource;
/**
* 是否发生告警升级,0未升级,1升级
*/
private Integer isUpgrade;
/**
* 升级内容,比如:一般告警升级为重要告警
*/
private String upgradeContent;
/**
* 升级时间
*/
private Date upgradeTime;
/**
* 通知内容
*/
private String noticeContent;
/**
* 告警编号:yyyyMMddNNNN
*/
private String alarmNo;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import com.honggroup.wks.flink.common.utils.WorkstationConstants;
import lombok.Data;
/**
* 告警策略表(BAlarmPolicy)实体类
*
* @author zhangdajun
* @since 2023-02-04 14:37:29
*/
@Data
public class BAlarmPolicy implements Serializable {
private static final long serialVersionUID = -24307908557906819L;
/**
* 告警策略id
*/
private String alarmPolicyId;
/**
* 告警策略名称
*/
private String alarmPolicyName;
/**
* 告警策略描述
*/
private String alarmPolicyDesc;
/**
* 指标ID
*/
private String kpiId;
/**
* 严重告警策略表达式
*/
private String seriousPolicy;
/**
* 严重告警通知表达式,对应alarm表的noticeContent
*/
private String seriousExpr;
/**
* 严重告警时间段
*/
private String seriousTimes;
/**
* 重要告警策略表达式
*/
private String importantPolicy;
/**
* 重要告警通知表达式,对应alarm表的noticeContent
*/
private String importantExpr;
/**
* 重要告警时间段
*/
private String importantTimes;
/**
* 一般告警策略表达式
*/
private String commonlyPolicy;
/**
* 一般告警通知表达式,对应alarm表的noticeContent
*/
private String commonlyExpr;
/**
* 一般告警时间段
*/
private String commonlyTimes;
/**
* 告警过滤表达式
*/
private String filterPolicy;
/**
* 告警消除通知表达式
*/
private String cleanPolicy;
/**
* 清除方式 消除类型: 0自动消除、1手动消除
*/
private String clearWay;
/**
* 策略类型:1全局默认策略,2全局非默认策略,3模板策略
*/
private String policyType;
/**
* 是否模糊策略
*/
private String isVague;
private String createTime;
/**
* 0未同步 1已同步
*/
private String syncFlag;
/**
* 告警处理时长,单位:分钟
*/
private Integer times;
/**
* 告警自动消除时长,单位:分钟
*/
private Integer autoClearTimes;
/**
* 触发策略,0立刻,1:1分钟,2:2分钟,3:3分钟,5:5分钟,10:10分钟,15:15分钟
*/
private String triggerPolicy;
/**
* 告警内容表达式,对应alarm表的alarmCotent
*/
private String alarmContentExpr;
/**
* 告警模版ID
*/
private String alarmTempId;
/**
* 告警模版名称
*/
private String alarmTempName;
public String getNoticeContentTempByLevel(int level){
if(WorkstationConstants.SYS_DDIC_ALARM_LEVEL_3 ==(level)){
return this.seriousExpr;
}
if(WorkstationConstants.SYS_DDIC_ALARM_LEVEL_2 ==(level)){
return this.importantExpr;
}
if(WorkstationConstants.SYS_DDIC_ALARM_LEVEL_1 ==(level)){
return this.commonlyExpr;
}
return null;
}
public String getAlarmExpression(int level){
if(WorkstationConstants.SYS_DDIC_ALARM_LEVEL_3 == (level)){
return this.seriousPolicy;
}
if(WorkstationConstants.SYS_DDIC_ALARM_LEVEL_2 == (level)){
return this.importantPolicy;
}
if(WorkstationConstants.SYS_DDIC_ALARM_LEVEL_1 == (level)){
return this.commonlyPolicy;
}
return null;
}
@Override
public String toString() {
return "BAlarmPolicy{" +
"alarmPolicyId='" + alarmPolicyId + '\'' +
", alarmPolicyName='" + alarmPolicyName + '\'' +
", alarmPolicyDesc='" + alarmPolicyDesc + '\'' +
", kpiId='" + kpiId + '\'' +
", seriousPolicy='" + seriousPolicy + '\'' +
", seriousExpr='" + seriousExpr + '\'' +
", seriousTimes='" + seriousTimes + '\'' +
", importantPolicy='" + importantPolicy + '\'' +
", importantExpr='" + importantExpr + '\'' +
", importantTimes='" + importantTimes + '\'' +
", commonlyPolicy='" + commonlyPolicy + '\'' +
", commonlyExpr='" + commonlyExpr + '\'' +
", commonlyTimes='" + commonlyTimes + '\'' +
", filterPolicy='" + filterPolicy + '\'' +
", cleanPolicy='" + cleanPolicy + '\'' +
", clearWay='" + clearWay + '\'' +
", policyType='" + policyType + '\'' +
", isVague='" + isVague + '\'' +
", createTime='" + createTime + '\'' +
", syncFlag='" + syncFlag + '\'' +
", times=" + times +
", autoClearTimes=" + autoClearTimes +
", triggerPolicy='" + triggerPolicy + '\'' +
", alarmContentExpr='" + alarmContentExpr + '\'' +
", alarmTempId='" + alarmTempId + '\'' +
", alarmTempName='" + alarmTempName + '\'' +
'}';
}
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 告警策略阈值自定义表(BAlarmPolicyThresholdCustom)实体类
*
* @author zhangdajun
* @since 2023-02-23 10:00:19
*/
@Data
public class BAlarmPolicyThresholdCustom implements Serializable {
private static final long serialVersionUID = 474438211102726165L;
/**
* 主键id
*/
private String id;
/**
* 资源ID
*/
private String resId;
/**
* 指标ID
*/
private String kpiId;
/**
* 二级资源ID
*/
private String flag;
/**
* 阈值基准值
*/
private Long baseVal;
/**
* 阈值比例
*/
private Long percent;
/**
* 运算符:>、<、=
*/
private String operator;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 告警模板与告警策略关联关系表(BAlarmtemplatePolicy)实体类
*
* @author zhangdajun
* @since 2023-02-04 14:38:00
*/
@Data
public class BAlarmtemplatePolicy implements Serializable {
private static final long serialVersionUID = -39887776161676720L;
/**
* 告警模板id
*/
private String alarmTempId;
/**
* 告警策略id
*/
private String alarmPolicyId;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 业务信息表(BBustype)实体类
*
* @author zhangdajun
* @since 2022-12-30 21:15:36
*/
@Data
public class BBustype implements Serializable {
private static final long serialVersionUID = -65779428817549589L;
/**
* 业务ID
*/
private String busId;
/**
* 父级id,根节点为0
*/
private String parentId;
/**
* 业务code
*/
private String busTypeCode;
/**
* 业务名称
*/
private String busTypeName;
/**
* 业务描述
*/
private String busTypeDesc;
/**
* 处室负责人
*/
private String admin;
/**
* 处室负责人联系方式
*/
private String adminTel;
/**
* 创建时间
*/
private Date createTime;
/**
* 是否使用:1 使用 0 不使用
*/
private Integer isUse;
/**
* 重要程度:1 核心、2 重要、3 一般、9 虚拟业务
*/
private Integer important;
/**
* 排序
*/
private Integer sort;
/**
* 对应华青融天第三方接口所需参数
*/
private String streamId;
/**
* 所属处室
*/
private String department;
/**
* 运维负责人
*/
private String manager;
/**
* 运维负责人联系方式
*/
private String managerTel;
/**
* 是否叶子节点0否,1是
*/
private Integer isLeaf;
/**
* 负责人姓名
*/
private String nickname;
/**
* 资源ID
*/
private String resId;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 指令模板信息表(BChecktemplate)实体类
*
* @author zhangdajun
* @since 2022-12-29 15:37:08
*/
@Data
public class BChecktemplate implements Serializable {
private static final long serialVersionUID = 513782505956293736L;
/**
* 指令模板id
*/
private String checkTempId;
/**
* 指令模板code
*/
private String tempCode;
/**
* 指令模板名称
*/
private String tempName;
/**
* 模板类型
*/
private String tempType;
/**
* 调度周期
*/
private String checkFrequency;
/**
* 创建时间
*/
private Date createTime;
private String syncFlag;
/**
* 描述信息
*/
private String remark;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 采集指令信息表(BCollDirctive)实体类
*
* @author zhangdajun
* @since 2022-12-29 15:37:26
*/
@Data
public class BCollDirctive implements Serializable {
private static final long serialVersionUID = 555108931438248560L;
/**
* 指令ID
*/
private String dirctiveId;
/**
* 指令code
*/
private String dirctiveCode;
/**
* 指令名称
*/
private String dirctiveName;
/**
* 指令协议
*/
private String collScope;
/**
* 采集方式
*/
private String collWay;
/**
* 执行频率
*/
private String exeFrequency;
/**
* 脚本类型
*/
private String dirctiveType;
/**
* 返回结果类型
*/
private String resultType;
/**
* 指令内容
*/
private String dirctiveContent;
/**
* 创建时间
*/
private Date createTime;
/**
* 执行频率
*/
private String collFrequency;
/**
* 同步标识 0:未同步 1:已同步
*/
private String syncFlag;
/**
* 系统类型(1:监控指令,2:巡检指令)
*/
private String systemType;
/**
* 是否默认 0:是 1:否
*/
private String isDefault;
/**
* 创建者
*/
private String createBy;
/**
* 更新备注
*/
private String remark;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 采集器信息表(BCollector)实体类
*
* @author zhangdajun
* @since 2022-12-30 23:47:31
*/
@Data
public class BCollector implements Serializable {
private static final long serialVersionUID = -42324020132608759L;
/**
* 采集器ID
*/
private String collectorId;
/**
* 采集器名称
*/
private String collectorName;
/**
* 采集器类型(1: 内网、2: 两层交换、3: 三层交换、4: 四层交换)
*/
private String collectorType;
/**
* 采集方式
*/
private String collWay;
/**
* 部署路径
*/
private String deployUrl;
/**
* 部署ip端口
*/
private String deployIp;
/**
* 连接信息id
*/
private String connectId;
/**
* 部署状态(1、未部署;2、部署失败;3、已部署)
*/
private String deployStatus;
private Date createTime;
/**
* 1正常 0异常
*/
private String collectorStatus;
/**
* 异常状态提示时间值,单位分钟,如30就是30分钟
*/
private Integer statusTime;
/**
* 最后一次更新状态时间
*/
private Date lastUpdateTime;
/**
* 异常通知状态:1通知 0不通知
*/
private String noticeStatus;
/**
* 订阅id
*/
private String subId;
/**
* 同步状态 1已同步 0未同步
*/
private String syncFlag;
/**
* 同步时间
*/
private Date syncTime;
/**
* 域Id
*/
private String domainId;
/**
* 采集类型:collType_other(普通);collType_vc(虚拟化);collType_trap(trap);collType_oracle(数据库)
*/
private String collType;
/**
* 备注
*/
private String remark;
/**
* kafka配置
*/
private String broker;
/**
* 业务id集合,分割
*/
private String busIds;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 资源割接管理(BCutoverManage)实体类
*
* @author zhangdajun
* @since 2023-02-04 12:45:35
*/
@Data
public class BCutoverManage implements Serializable {
private static final long serialVersionUID = 639101519486961189L;
/**
* 主键
*/
private String cutoverId;
/**
* 割接名称
*/
private String cutoverName;
/**
* 割接起始日期
*/
private Date startTime;
/**
* 割接终止日期
*/
private Date endTime;
/**
* 割接描述
*/
private String cutoverDesc;
/**
* 停机模式:1、暂停告警;2、暂停采集;3、暂停通知
*/
private String cutoverMode;
/**
* 同步标识 :0、未同步 1、已同步
*/
private String syncFlag;
/**
* 创建日期
*/
private Date createTime;
/**
* 接收人名称
*/
private String acceptUserName;
/**
* 接收人
*/
private String acceptUser;
/**
* 停机状态 不关联流程 1、新建 2、停机中、3:暂停、4:停机完成 5:报竣、6:审批中,7:审批通过
*/
private String stopStatus;
/**
* 关系ID
*/
private String targetId;
/**
* 关系类型:res, biz, affectedRes, affectedBiz
*/
private String targetType;
private String resId;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 资源割接(停机)管理辅助表(BCutoverRelation)实体类
*
* @author zhangdajun
* @since 2023-03-07 09:51:31
*/
@Data
public class BCutoverRelation implements Serializable {
private static final long serialVersionUID = -90137176136625399L;
/**
* 主键
*/
private String id;
/**
* 割接管理ID
*/
private String cutoverId;
/**
* 关系ID
*/
private String targetId;
/**
* 关系类型:res, biz, affectedRes, affectedBiz
*/
private String targetType;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 动环配置信息表(BDhInfoBase)实体类
*
* @author zhangdajun
* @since 2022-12-30 20:09:07
*/
@Data
public class BDhInfoBase implements Serializable {
private static final long serialVersionUID = -63767005931260854L;
/**
* 主键
*/
private String id;
/**
* 资源ID
*/
private String resId;
/**
* 指标Id
*/
private String kpiId;
/**
* 指标名称
*/
private String kpiName;
/**
* 指标单位
*/
private String unit;
/**
* 指标最大值
*/
private String max;
/**
* 指标最小值
*/
private String min;
/**
* 备注
*/
private String remark;
/**
* 是否关键指标 0是1否
*/
private Integer isKeyKpi;
/**
* 创建时间
*/
private Date createTime;
/**
* 0基本指标、1性能指标、2状态指标、3告警指标
*/
private String kpiIdent;
/**
* 地区(如:sx、ln、tj)
*/
private String area;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 业务应用与拨测场景关系表(BDialtestConfig)实体类
*
* @author zhangdajun
* @since 2022-12-30 21:18:26
*/
@Data
public class BDialtestConfig implements Serializable {
private static final long serialVersionUID = 752466231025243861L;
/**
* 主键id
*/
private String id;
/**
* 业务id
*/
private String busId;
/**
* 业务名称
*/
private String busName;
/**
* 父业务id
*/
private String busParentId;
/**
* 父业务名称
*/
private String busParentName;
/**
* 应用id
*/
private String appId;
/**
* 应用名称
*/
private String appName;
/**
* 任务id(拨测场景)
*/
private String taskId;
/**
* 任务名称(拨测场景)
*/
private String taskName;
/**
* 任务类型
*/
private String taskType;
/**
* 任务步骤
*/
private String taskItem;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 域信息,山西使用(BDomainIp)实体类
*
* @author zhangdajun
* @since 2022-12-29 17:34:22
*/
@Data
public class BDomainIp implements Serializable {
private static final long serialVersionUID = -81810838829550083L;
/**
* 网络区域
*/
private String ip;
/**
* 地址段
*/
private String domainId;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 资源健康度信息表(BHealth)实体类
*
* @author zhangdajun
* @since 2022-12-29 17:49:57
*/
@Data
public class BHealth implements Serializable {
private static final long serialVersionUID = -25138059099377235L;
/**
* 主键ID
*/
private String id;
/**
* 资源ID
*/
private String resId;
/**
* 资源类型
*/
private String resType;
/**
* 得分
*/
private Integer score;
/**
* 健康情况
*/
private String health;
private Date updateTime;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 金三用户清单表(BJsUsers)实体类
*
* @author zhangdajun
* @since 2022-12-31 15:24:30
*/
@Data
public class BJsUsers implements Serializable {
private static final long serialVersionUID = 664190178685561863L;
/**
* 用户名
*/
private String username;
/**
* 姓名
*/
private String nickname;
/**
* 入职时间
*/
private Date joinTime;
/**
* 入库时间
*/
private Date createTime;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 金三用户离职表(BJsUsersBack)实体类
*
* @author zhangdajun
* @since 2022-12-31 15:23:59
*/
@Data
public class BJsUsersBack implements Serializable {
private static final long serialVersionUID = 474619095725235848L;
/**
* 用户名
*/
private String username;
/**
* 姓名
*/
private String nickname;
/**
* 入职时间
*/
private Date joinTime;
/**
* 离职时间
*/
private Date leaveTime;
/**
* 入库时间
*/
private Date createTime;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 所有系统的所有金三用户最后一次时间信息表(BJsUsersUnlogin)实体类
*
* @author zhangdajun
* @since 2022-12-31 15:26:20
*/
@Data
public class BJsUsersUnlogin implements Serializable {
private static final long serialVersionUID = -68907995202724434L;
/**
* 主键ID
*/
private String id;
/**
* 用户名
*/
private String username;
/**
* 系统名称
*/
private String sysName;
/**
* 系统别名
*/
private String sysAlias;
/**
* 最后一次登录时间
*/
private Date lastLoginTime;
/**
* 入库时间
*/
private Date createTime;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 指标信息表(BKpi)实体类
*
* @author zhangdajun
* @since 2022-12-29 15:37:52
*/
@Data
public class BKpi implements Serializable {
private static final long serialVersionUID = 641001034995039126L;
/**
* 指标ID
*/
private String kpiId;
/**
* 指标名称
*/
private String kpiName;
/**
* 指标类型
*/
private String kpiType;
/**
* 指标结果数据类型
*/
private String kpiDataType;
/**
* 指标单位
*/
private String kpiUnit;
/**
* 备注
*/
private String remark;
/**
* 指标分类
*/
private String kpiCategory;
/**
* 展示方式
*/
private String viewWay;
/**
* 二级资源标识
*/
private String flag;
/**
* 0基本指标、1性能指标、2状态指标、3告警指标
*/
private String kpiIdent;
/**
* 创建时间
*/
private Date createTime;
/**
* 0非预警指标,1预警指标,2预警最大值指标
*/
private Integer isWarning;
/**
* 0无下探,1一级指标 2二级指标,以此类推
*/
private Integer kpiLevel;
/**
* 告警压制次数,0:不压制,大于0为压制次数
*/
private Integer ignoreCnt;
/**
* 0非趋势指标 1趋势指标
*/
private Integer isTrend;
/**
* 同步标识 0:未同步 1:已同步
*/
private String syncFlag;
/**
* 排序
*/
private Integer sort;
/**
* 指标类型组id
*/
private String unitGroupId;
/**
* 指标权限:硬件 hard、系统soft、公共(默认即硬件和系统)all或空或NULL
*/
private String kpiPower;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 指标结果转译配置表(BKpiTranslate)实体类
*
* @author zhangdajun
* @since 2023-01-06 11:30:52
*/
@Data
public class BKpiTranslate implements Serializable {
private static final long serialVersionUID = 232950502815142149L;
/**
* 主键id
*/
private String id;
/**
* 指标id
*/
private String kpiId;
/**
* 二级资源标识
*/
private String flag;
/**
* 资源类型
*/
private String resType;
/**
* 转译属性:kpiValue、message、ext
*/
private String field;
/**
* 原始值(转译前信息)
*/
private String originalVal;
/**
* 匹配方式:相等:equal;模糊:contain
*/
private String matchType;
/**
* 转译后结果
*/
private String result;
/**
* 备注说明
*/
private String remark;
/**
* 创建人
*/
private String createUser;
/**
* 创建时间
*/
private Date createTime;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 待推送通知信息表(BNotice)实体类
*
* @author zhangdajun
* @since 2023-01-06 15:46:57
*/
@Data
public class BNotice implements Serializable {
private static final long serialVersionUID = 473641827369867495L;
/**
* 主键ID
*/
private String id;
/**
* 通知类型:10:告警通知;11:告警消除;20:巡检报表通知;30:系统通知;99:测试通知
*/
private Integer type;
/**
* 通知ID,告警通知:告警id;巡检通知:报表id;系统通知:系统通知模板id
*/
private String targetId;
/**
* 通知内容
*/
private String content;
/**
* 告警编号:yyyyMMddNNNN
*/
private String alarmNo;
/**
* 通知方式:wechat微信、message短信、email邮件
*/
private String way;
/**
* 通知用户,多个用逗号分隔
*/
private String usernames;
/**
* 通知创建时间
*/
private Date noticeTime;
/**
* 时间戳
*/
private Long timestamp;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 资源信息表(BResource)实体类
*
* @author zhangdajun
* @since 2022-12-29 16:43:23
*/
@Data
public class BResource implements Serializable {
private static final long serialVersionUID = -97253930566443978L;
/**
* 资源id
*/
private String resId;
/**
* 资源编号
*/
private String resCode;
/**
* 资源名称
*/
private String resName;
/**
* 别名
*/
private String shortName;
/**
* IP地址
*/
private String ip;
/**
* 管理IP
*/
private String manageIp;
/**
* 端口
*/
private String port;
/**
* 厂商
*/
private String provider;
/**
* 型号
*/
private String model;
/**
* 操作系统类型
*/
private String os;
/**
* 录入方式:add手动添加、import批量导入、auto_discovery自动发现、cmdb_syncCMDB同步、other_sync第三方同步[record_way][record_way]
*/
private String recordWay;
/**
* 资源类型分类
*/
private String hostType;
/**
* 负责人
*/
private String admin;
/**
* 资源状态:new:新增、monitor:监控中、stop:暂停监控、alarmIgnore:告警压制中 [resource_state]
*/
private String state;
/**
* 资源类型
*/
private String resType;
/**
* 采集协议
*/
private String collProtocol;
/**
* 资源位置、所属机房
*/
private String resPositon;
/**
* 资源池ID,虚拟化独有,集群ID
*/
private String colonyId;
/**
* 父资源ID,上下级关系体现
*/
private String parentId;
/**
* 采集方式:active主动采集、passive被动采集
*/
private String collWay;
/**
* 资源所属域,自动发现资源根据IP自动查找所属域,找不到默认为所属平台的域
*/
private String extendCol1;
/**
* 自动发现资源的所属平台对应资源id
*/
private String extendCol2;
/**
* 自动发现资源的所属平台对应资源名称
*/
private String extendCol3;
/**
* 创建人
*/
private String createBy;
/**
* 创建时间
*/
private Date createTime;
/**
* 更新时间
*/
private Date updateTime;
/**
* 采集器PING操作标识:0禁ping;1启用ping,使用资源协议连接;2启用ping,不使用资源协议连接;
*/
private String pingEnable;
/**
* 标签,可以有多个标签,多个标签以#分割
*/
private String resLabel;
/**
* 上报详情:0 否 1、是
*/
private String reportFlag;
/**
* 资产:assets,资源:resources,共有:share
*/
private String resCategory;
/**
* 备注
*/
private String remark;
// 任务ID
private String taskId;
// 资源类型名称
private String resTypeName;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 资源与管理员关系表(BResourceAdmin)实体类
*
* @author zhangdajun
* @since 2022-12-29 17:33:15
*/
@Data
public class BResourceAdmin implements Serializable {
private static final long serialVersionUID = 430283896632616678L;
/**
* 主键id
*/
private String id;
/**
* 资源id
*/
private String resId;
/**
* 用户名
*/
private String username;
/**
* 排序
*/
private Integer sort;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 资源告警模板关联关系表(BResourceAlarmtemplate)实体类
*
* @author zhangdajun
* @since 2023-02-04 14:11:27
*/
@Data
public class BResourceAlarmtemplate implements Serializable {
private static final long serialVersionUID = 559358722679390843L;
/**
* 逐渐ID
*/
private String id;
/**
* 资源ID
*/
private String resId;
/**
* 告警模板ID
*/
private String alarmTempId;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 资源与业务类型关联关系表(BResourceBustype)实体类
*
* @author zhangdajun
* @since 2022-12-29 17:52:29
*/
@Data
public class BResourceBustype implements Serializable {
private static final long serialVersionUID = -35894943866579900L;
/**
* 主键ID
*/
private String id;
/**
* 资源ID
*/
private String resId;
/**
* 业务ID
*/
private String busId;
/**
* 应用ID
*/
private String appId;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 资源指标的告警压制次数(BResourceKpiIgnore)实体类
*
* @author zhangdajun
* @since 2023-02-23 19:36:22
*/
@Data
public class BResourceKpiIgnore implements Serializable {
private static final long serialVersionUID = -85018020942883365L;
/**
* 主键ID
*/
private String id;
/**
* 资源ID
*/
private String resId;
/**
* 指标id
*/
private String kpiId;
/**
* 二级资源标识
*/
private String flag;
/**
* 压制次数
*/
private Integer ignoreCnt;
/**
* Flag前缀
*/
private String subFlag;
}
... ...
package com.honggroup.model;
import java.io.Serializable;
import lombok.Data;
/**
* 资源采集协议参数值表(BResourceProtocolParamValues)实体类
*
* @author zhangdajun
* @since 2022-12-30 21:57:23
*/
@Data
public class BResourceProtocolParamValues implements Serializable {
private static final long serialVersionUID = 666120941660585319L;
/**
* 采集协议参数id
*/
private String id;
/**
* 参数code
*/
private String paramCode;
/**
* 参数值
*/
private String paramValue;
/**
* 资源ID
*/
private String resId;
/**
* 采集协议
*/
private String protocol;
/**
* 参数名称
*/
private String paramName;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 资源监控协议密码修改记录(BResourcePwdLog)实体类
*
* @author zhangdajun
* @since 2022-12-30 22:45:49
*/
@Data
public class BResourcePwdLog implements Serializable {
private static final long serialVersionUID = 481286900939355774L;
private String id;
private String resId;
private String protocol;
private String passwordOld;
private String passwordNew;
/**
* 密码修改状态:0:即将过期;1:发起修改;2:修改成功;3:修改失败
*/
private Integer state;
/**
* 创建时间
*/
private Date createTime;
/**
* 修改时间
*/
private Date updateTime;
}
... ...
package com.honggroup.model;
import java.util.Date;
import java.io.Serializable;
import lombok.Data;
/**
* 资源配置关系绑定表(BResourceRelation)实体类
*
* @author zhangdajun
* @since 2022-12-29 17:46:25
*/
@Data
public class BResourceRelation implements Serializable {
private static final long serialVersionUID = -75146091033024232L;
public BResourceRelation() {
}
public BResourceRelation(String id, String resId, String resType, String reType) {
this.id = id;
this.resId = resId;
this.resType = resType;
this.reType = reType;
}
public BResourceRelation(String id, String resId, String resType, String reType, String targetId, String direct) {
this.id = id;
this.resId = resId;
this.resType = resType;
this.reType = reType;
this.targetId = targetId;
this.direct = direct;
}
/**
* 主键
*/
private String id;
/**
* 资源ID
*/
private String resId;
/**
* 资源类型
*/
private String resType;
/**
* 资源类型(听云告警中台枚举)
*/
private String fromType;
/**
* 关系ID
*/
private String reType;
/**
* 关系资源ID
*/
private String targetId;
/**
* 关系资源类型
*/
private String targetType;
/**
* 关系资源类型(听云告警中台枚举)
*/
private String toType;
/**
* 关系类型:1, 直接关系(虚拟机和宿主机), 2,间接关系(虚拟机和资源池或虚拟化平台)
*/
private String direct;
/**
* 拓扑Id
*/
private String jtopoId;
/**
* 拓扑生成时间
*/
private Date topoCreatetime;
}
... ...