Selaa lähdekoodia

数据质量监控配置

andy 1 vuosi sitten
vanhempi
commit
8bb3b488c1

+ 1 - 1
src/main/java/com/scbfkj/uni/service/LoggerService.java

@@ -252,7 +252,7 @@ public class LoggerService {
 
     public enum LogType {
 
-        USER("userlog"), INTERFACE("interfacelog"), SYSTEM("systemerrlog"), SERVICE("servicelog"), SERVICE_ERR("serviceerrlog");
+        USER("userlog"), INTERFACE("interfacelog"), SYSTEM("systemerrlog"), SERVICE("servicelog"), SERVICE_ERR("serviceerrlog"), QUALITY_CONTROL("qualitycontrollog");
 
         private final String name;
 

+ 237 - 2
src/main/java/com/scbfkj/uni/system/SystemInit.java

@@ -1,8 +1,15 @@
 package com.scbfkj.uni.system;
 
 import com.scbfkj.uni.library.DataEncryptionUtil;
+import com.scbfkj.uni.library.DataFormatUtil;
+import com.scbfkj.uni.library.EmailUtil;
 import com.scbfkj.uni.library.UniReturnUtil;
+import com.scbfkj.uni.library.script.DatabaseScriptUtil;
+import com.scbfkj.uni.library.script.JsScriptEngineUtil;
+import com.scbfkj.uni.library.script.KafkaScriptUtil;
 import com.scbfkj.uni.process.DataBase;
+import com.scbfkj.uni.process.Elasticsearch;
+import com.scbfkj.uni.process.Kafka;
 import com.scbfkj.uni.service.ControlService;
 import com.scbfkj.uni.service.LoggerService;
 import jakarta.annotation.PostConstruct;
@@ -14,13 +21,17 @@ import org.springframework.stereotype.Component;
 import java.io.*;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
-import java.util.HashMap;
-import java.util.Scanner;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.stream.Collectors;
 
 
 @Component
 public class SystemInit {
 
+    private static final ExecutorService executor = Executors.newCachedThreadPool();
+    private static final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
+
     private static final DataBase DATA_BASE = new DataBase();
     private final ResourcePatternResolver resourcePatternResolver;
     @Value("${db.center.config}")
@@ -41,6 +52,8 @@ public class SystemInit {
 
     @Value("${app.enable-reset-config:false}")
     private boolean enableResetConfig = false;
+    List<String> qualityControlIds = new ArrayList<>();
+    Set<String> qualityControlIds2 = new HashSet<>();
 
     public SystemInit(ResourcePatternResolver resourcePatternResolver) {
         this.resourcePatternResolver = resourcePatternResolver;
@@ -114,6 +127,7 @@ public class SystemInit {
             }
         }, 1000);
         ScheduleUtil.startFrequencyTask(SystemInit::containerHeartbeat, 60000);
+        scheduledExecutorService.scheduleWithFixedDelay(this::createQualityControlTask, 0, 60000, TimeUnit.MILLISECONDS);
     }
 
     private void initializeSystemEnvironment() throws Exception {
@@ -188,4 +202,225 @@ public class SystemInit {
         }
 
     }
+
+    /**
+     * 创建质量控制任务
+     *
+     * @throws Exception
+     */
+    private void createQualityControlTask() {
+        try {
+            // 查询质量控制列表
+            qualityControlIds.clear();
+            qualityControlIds.addAll(DATA_BASE.query(Config.getCenterConnectionStr(), "select * from qualitycontrol").stream().map(it -> it.get("id").toString()).toList());
+//            判断数据是否有新增或者删除
+            if (qualityControlIds.containsAll(qualityControlIds2) && qualityControlIds2.containsAll(qualityControlIds)) {
+                return;
+            }
+            executor.shutdown();
+            executor.awaitTermination(1, TimeUnit.MINUTES);
+            qualityControlIds2.clear();
+            for (String id : qualityControlIds) {// 获取质量控制实体的id
+                qualityControlIds2.add(id);
+
+                // 查询指定id的质量控制实体
+                List<Map<String, Object>> qualityControlEntities2 = DATA_BASE.query(Config.getCenterConnectionStr(), "select * from qualitycontrol where id =?", id);
+
+                if (qualityControlEntities2.isEmpty()) {
+                    continue;
+                }
+                Map<String, Object> it2 = qualityControlEntities2.get(0);
+                Object frequency2 = it2.get("frequency");
+                if (frequency2!=null){
+                    // 提交任务到定时线程池
+                    scheduledExecutorService.scheduleWithFixedDelay(()->{
+
+                        try {
+
+
+                            // 查询指定id的质量控制实体
+                            List<Map<String, Object>> qualityControlEntities = DATA_BASE.query(Config.getCenterConnectionStr(), "select * from qualitycontrol where id =?", id);
+
+                            if (qualityControlEntities.isEmpty()) {
+                                return;
+                            }
+
+                            // 获取质量控制实体的各个属性
+                            Map<String, Object> it = qualityControlEntities.get(0);
+                            Object frequency = it.get("frequency");
+                            Object sourceId = it.get("sourceid");
+                            Object sourceName = it.get("sourcename");
+                            Object target = it.get("target");
+                            Object rules = it.get("rules");
+                            Object messageTemplate = it.get("messagetemplate");
+                            Object lastRunContainerCode = it.get("lastruncontainercode");
+                            Object single = it.get("single");
+                            Object lastRunTimeObj = it.get("lastruntime");
+                            Object enablelog = it.get("enablelog");
+
+                            // 如果频率不为空
+                            if (frequency != null) {
+                                // 根据频率设定睡眠时长
+                                Thread.sleep(Long.parseLong(frequency.toString()));
+
+                                // 如果是单例运行
+                                if (single != null && "1".equals(single.toString())) {
+                                    // 如果从未运行过
+                                    if (lastRunTimeObj == null) {
+                                        // 尝试抢占任务,成功则继续执行
+                                        int update = DATA_BASE.update(Config.getCenterConnectionStr(), "update qualitycontrol set lastruncontainercode=? ,lastruntime =? where lastruncontainercode is null and lastruntime is null and id =?", Config.getContainerCode(), LocalDateTime.now(), id);
+                                        if (update == 0) {
+                                            return;
+                                        }
+                                    } else if (!Config.getContainerCode().equals(lastRunContainerCode)) {
+                                        LocalDateTime checkDateTime = LocalDateTime.now().plusSeconds(-5);
+                                        LocalDateTime lastDateTime = DataFormatUtil.toDateTime(lastRunTimeObj);
+
+                                        // 如果最后运行时间在5秒前,则启动运行
+                                        if (lastDateTime.isBefore(checkDateTime)) {
+                                            int update = DATA_BASE.update(Config.getCenterConnectionStr(), "update qualitycontrol set lastruncontainercode=? ,lastruntime =? where  lastruntime = ? and id =?", Config.getContainerCode(), LocalDateTime.now(), lastRunTimeObj, id);
+                                            if (update == 0) {
+                                                return;
+                                            }
+                                        }
+                                    }
+                                }
+
+                                Object outData = null;
+                                List<Map<String, Object>> result = new ArrayList<>();
+                                try {
+                                    // 查询数据源信息
+                                    List<Map<String, Object>> dataSources = DATA_BASE.query(Config.getCenterConnectionStr(), "select * from datasource where datasourceid = ?", sourceId);
+
+                                    if (!dataSources.isEmpty()) {
+                                        Map<String, Object> dataSourceMap = dataSources.get(0);
+                                        Object datasourceType = dataSourceMap.get("datasourcetype");
+
+                                        // 如果数据源类型为数据库
+                                        if ("DB".equalsIgnoreCase(datasourceType.toString())) {
+                                            String sql = rules.toString().trim();
+                                            if (!sql.toLowerCase().startsWith("select ")) {
+                                                throw new RuntimeException("数据库规则必须是查询语句:" + sql);
+                                            }
+                                            result.addAll(DATA_BASE.query(DATA_BASE.queryConnectionStr(sourceId.toString()), sql));
+                                        } else if ("KAFKA".equalsIgnoreCase(datasourceType.toString()) || "ES".equalsIgnoreCase(datasourceType.toString())) {
+                                            Map<String, Object> stringObjectMap;
+                                            if ("ES".equalsIgnoreCase(datasourceType.toString())) {
+                                                stringObjectMap = Elasticsearch.execByDataSourceId(sourceId.toString(), sourceName.toString(), "SEARCH", Collections.singletonList(rules.toString()));
+                                            } else {
+                                                stringObjectMap = KafkaScriptUtil.receptionMessage(sourceId.toString(), sourceName.toString(), "QualityControlTask", 100L);
+                                            }
+                                            if ("0".equals(stringObjectMap.get("code"))) {
+                                                List<Object> data = (List<Object>) stringObjectMap.get("returnData");
+                                                Map<String, Object> exec = JsScriptEngineUtil.eval(rules.toString(), data);
+                                                result.add(exec);
+                                            }
+                                        }
+                                    }
+                                    boolean send = true;
+                                    Object filter = it.get("resultfilter");
+                                    if (Objects.nonNull(filter)) {
+                                        String filterString = filter.toString();
+
+                                        outData = JsScriptEngineUtil.exec(filterString, result);
+                                        if (outData == null || (!"true".equalsIgnoreCase(outData.toString()) && !"1".equalsIgnoreCase(outData.toString()))) {
+                                            send = false;
+                                        }
+                                    }
+
+                                    if (send) {
+                                        // 发送消息
+                                        sendMessage(target == null ? null : target.toString(), messageTemplate == null ? null : messageTemplate.toString(), result);
+                                        DATA_BASE.update(Config.getCenterConnectionStr(), "update qualitycontrol set lastruntime = ? , lastruncontainercode= ? where id =?", LocalDateTime.now(), Config.getContainerCode(), id);
+                                    }
+//                                    判断日志记录
+
+                                } catch (Exception e) {
+                                    if (Config.isDebug()) {
+                                        e.printStackTrace();
+                                    }
+                                } finally {
+                                    if (enablelog != null && "1".equalsIgnoreCase(enablelog.toString())) {
+                                        LoggerService.LogType qualityControl = LoggerService.LogType.QUALITY_CONTROL;
+                                        Object finalExec = outData;
+                                        LoggerService.log(qualityControl, new HashMap<>() {{
+                                            put("logtime", LocalDateTime.now());
+                                            put("qualitycontrolid", id);
+                                            put("prepesource", DataFormatUtil.toString(finalExec));
+                                            put("inputdata", DataFormatUtil.toString(result));
+                                            put("containercode", Config.getContainerCode());
+                                        }});
+                                    }
+                                }
+                            }
+                        } catch (Exception e) {
+                            if (Config.isDebug()) {
+                                e.printStackTrace();
+                            }
+                        }
+                    },0,1,TimeUnit.MILLISECONDS);
+                }
+
+
+
+            }
+        } catch (Exception e) {
+            if (Config.isDebug()) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    // 替换模板中的占位符
+    public static String replaceTemplate(String template, Map<String, Object> replacements) {
+        // 如果替换映射为空,则直接返回原始模板
+        if (replacements == null) {
+            return template;
+        }
+        // 遍历替换映射中的每个键值对
+        for (Map.Entry<String, Object> entry : replacements.entrySet()) {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            // 使用值替换模板中的占位符
+            template = template.replace("{" + key + "}", value.toString());
+        }
+        // 返回替换后的模板
+        return template;
+    }
+
+    private void sendMessage(String target, String message, List<Map<String, Object>> data) throws Exception {
+
+        if (data != null && !data.isEmpty())
+            for (String targetName : target.split(",")) {
+                String[] split = targetName.split(":");
+                String datasourceId = split[0];
+                String targetTopic = split[1];
+                List<Map<String, Object>> dataSources = DATA_BASE.query(Config.getCenterConnectionStr(), "select * from datasource where datasourceid = ?", datasourceId);
+                if (!dataSources.isEmpty()) {
+                    Map<String, Object> dataSourceMap = dataSources.get(0);
+                    Object datasourceType = dataSourceMap.get("datasourcetype");
+                    if ("KAFKA".equalsIgnoreCase(datasourceType.toString())) {
+                        List<String> messages = new ArrayList<>();
+                        for (Map<String, Object> it : data) {
+                            if (message != null) {
+                                messages.add(replaceTemplate(message, it));
+                            } else {
+                                messages.add(DataFormatUtil.toString(it));
+                            }
+                        }
+                        KafkaScriptUtil.sendMessage(datasourceId, targetTopic, messages);
+                    } else if ("DB".equalsIgnoreCase(datasourceType.toString())) {
+                        List<String> keys = data.get(0).keySet().stream().toList();
+
+                        String sql = "insert into %s (%s)values(%s)".formatted(targetTopic, String.join(",", keys), keys.stream().map(it -> "?").collect(Collectors.joining(",")));
+                        List<Object[]> values = data.stream().map(d -> keys.stream().map(d::get).toArray()).toList();
+
+                        DATA_BASE.update(DATA_BASE.queryConnectionStr(datasourceId), sql, values);
+                    }
+
+                }
+
+            }
+
+    }
 }