andy 1 an în urmă
părinte
comite
484a773ffe

+ 6 - 1
pom.xml

@@ -97,7 +97,12 @@
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-pool2</artifactId>
         </dependency>
-    </dependencies>
+		<dependency>
+			<groupId>com.jayway.jsonpath</groupId>
+			<artifactId>json-path</artifactId>
+		</dependency>
+
+	</dependencies>
 
 	<build>
 		<plugins>

+ 1 - 0
src/main/java/com/scbfkj/uni/library/DataAliasGetUtil.java

@@ -12,6 +12,7 @@ public class DataAliasGetUtil {
 
     public static Optional<String> getValue(String key, Map<String, Object> data) throws Exception {
 
+        if(Objects.isNull(data)) return Optional.empty();
         Optional<String> result = Optional.ofNullable(data.get(key)).map(Object::toString);
         if (result.isPresent()) return result;
         if (Objects.isNull(keyAlias) || keyAlias.isEmpty()) {

+ 2 - 2
src/main/java/com/scbfkj/uni/library/DataFormatUtil.java

@@ -25,8 +25,8 @@ public final class DataFormatUtil {
         objectMapper.setSerializationInclusion(JsonInclude.Include.ALWAYS);
     }
 
-    public static JsonNode stringToJsonNode(String source) throws JsonProcessingException {
-        return objectMapper.readTree(source);
+    public static JsonNode toJsonNode(Object source) throws JsonProcessingException {
+            return objectMapper.readTree(toString(source));
     }
 
     public static <T> T stringToBean(String source, Class<T> clazz) throws JsonProcessingException {

+ 37 - 4
src/main/java/com/scbfkj/uni/process/DataBase.java

@@ -2,11 +2,13 @@ package com.scbfkj.uni.process;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.scbfkj.uni.library.DataFormatUtil;
+import com.scbfkj.uni.system.Config;
 import com.zaxxer.hikari.HikariConfig;
 import com.zaxxer.hikari.pool.HikariPool;
 import jakarta.annotation.Nonnull;
 
 import java.sql.*;
+import java.time.LocalDateTime;
 import java.util.*;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -14,9 +16,29 @@ import java.util.stream.Collectors;
 
 public class DataBase {
 
+    private final static Map<String, Map<String, Object>> cacheDatas = new HashMap<>();
+    private final static List<Map<String, Object>> cacheConfigList = new ArrayList<>();
+
+
     public static List<Map<String, Object>> query(String connectionStr, String sql, List<Object[]> argsList) throws Exception {
+
+//        缓存数据 connectionStr + sql
+
+        String key = "%s;%s;%s".formatted(connectionStr, sql,DataFormatUtil.toString(argsList));
+        if (cacheDatas.containsKey(key)) {
+            Map<String, Object> cacheData = cacheDatas.get(key);
+
+            Object effectiveduration = cacheData.get("effectiveduration");
+            if (Objects.isNull(effectiveduration) || ((LocalDateTime) effectiveduration).isBefore(LocalDateTime.now())) {
+                cacheDatas.remove(key);
+            } else {
+                return ((List<Map<String, Object>>) cacheData.get("data"));
+            }
+
+        }
+
         HikariPool dataSourcePool = getDataSourcePool(connectionStr);
-        return argsList.parallelStream().flatMap(args -> {
+        List<Map<String, Object>> result = argsList.parallelStream().flatMap(args -> {
             try (Connection connection = dataSourcePool.getConnection();
                  PreparedStatement preparedStatement = connection.prepareStatement(sql)
             ) {
@@ -24,13 +46,24 @@ public class DataBase {
                     preparedStatement.setObject(i + 1, args[i]);
                 }
                 ResultSet resultSet = preparedStatement.executeQuery();
-                List<Map<String, Object>> result = getResult(connectionStr, sql, resultSet);
-                return result.stream();
+                return getResult(connectionStr, sql, resultSet).stream();
             } catch (Exception exception) {
                 throw new RuntimeException("数据异常: %s\n sql: %s ;\n args: %s ".formatted(exception.getMessage(), sql, DataFormatUtil.toDate(args)));
             }
         }).toList();
 
+        if (cacheConfigList.isEmpty()) {
+            cacheConfigList.addAll(DataBase.query(Config.centerConnectionStr, "select * from datacache", Collections.emptyList()));
+        }
+        cacheConfigList.stream().filter(it -> Objects.equals(it.get("querysql"), sql) && Objects.equals(it.get("connectset"), connectionStr)).findFirst().ifPresent(it -> {
+            Object o = it.get("effectiveduration");
+            LocalDateTime localDateTime = LocalDateTime.now().plusSeconds(Long.parseLong(o.toString()));
+            cacheDatas.put(key, new HashMap<>() {{
+                put("effectiveduration", localDateTime);
+                put("data", result);
+            }});
+        });
+        return result;
     }
 
 
@@ -142,7 +175,7 @@ public class DataBase {
         if (dataSourcePools.containsKey(connectionStr)) {
             return dataSourcePools.get(connectionStr);
         }
-        JsonNode jsonNode = DataFormatUtil.stringToJsonNode(connectionStr);
+        JsonNode jsonNode = DataFormatUtil.toJsonNode(connectionStr);
 
         JsonNode jdbcUrl = jsonNode.get("jdbcUrl");
         JsonNode username = jsonNode.get("username");

+ 105 - 91
src/main/java/com/scbfkj/uni/service/ControlService.java

@@ -1,91 +1,105 @@
-package com.scbfkj.uni.service;
-
-import com.scbfkj.uni.library.DataFormatUtil;
-import com.scbfkj.uni.library.UniReturnUtil;
-import com.scbfkj.uni.process.DataBase;
-import com.scbfkj.uni.system.Config;
-import com.scbfkj.uni.system.ScheduleTask;
-import com.scbfkj.uni.system.ScheduleUtil;
-import org.springframework.util.StringUtils;
-
-import java.time.LocalDateTime;
-import java.util.*;
-
-public class ControlService {
-
-
-    public static Map<String, ScheduleTask> ScheduleTaskMaps = new HashMap<>();
-    /**
-     * 服务启动
-     *
-     * @param serviceId
-     * @return
-     */
-    public static Map<String,Object> start(String serviceId) throws Exception {
-        //采集数据源及参数配置
-        List<Map<String, Object>> collectServiceList = DataBase.query(Config.centerConnectionStr,"SELECT SI.* FROM  serviceinfo SI WHERE SI.serviceType ='4'  AND serviceID = ?", Collections.singletonList(new Object[]{serviceId}));
-
-        if (Objects.isNull(collectServiceList) || collectServiceList.isEmpty()) {
-            return UniReturnUtil.fail("采集服务不存在");
-        }
-        Map<String, Object> collectServiceMaps = collectServiceList.get(0); //采集服务
-        Integer loopCount = Objects.isNull (collectServiceMaps.get("loopCount")) ? null : Integer.parseInt(collectServiceMaps.get("loopCount").toString());
-        int frequencyCount = Objects.isNull (collectServiceMaps.get("frequencyCount")) ? 1 : Integer.parseInt(collectServiceMaps.get("frequencyCount").toString());
-        int frequencyUnit = Objects.isNull (collectServiceMaps.get("frequencyUnit")) ? 1 : Integer.parseInt(collectServiceMaps.get("frequencyUnit").toString());
-        int taskType = Objects.isNull (collectServiceMaps.get("taskType")) ? 1 : Integer.parseInt(collectServiceMaps.get("taskType").toString());
-        Object cronExpress = collectServiceMaps.get("cronExpress");
-        LocalDateTime taskValidDate = DataFormatUtil.toDateTime(collectServiceMaps.get("taskValid"));
-        LocalDateTime taskInvalidDate = DataFormatUtil.toDateTime(collectServiceMaps.get("taskInvalid"));
-        //创建定时任务:
-        ScheduleTask scheduleTask = ScheduleTaskMaps.get(serviceId);
-        if (Objects.isNull(scheduleTask)) {
-            scheduleTask = new ScheduleTask(serviceId, loopCount, taskValidDate, taskInvalidDate);
-            ScheduleTaskMaps.put(serviceId, scheduleTask);
-        }
-        // todo 记录日志
-        boolean start = ScheduleUtil.start(scheduleTask, (frequencyCount * frequencyUnit), cronExpress, taskType);
-//        LogUtils.log("start: 2", "0", null, (start ? "启动成功" : "启动失败"), serviceId,  null, null, null,null);//无条件记录数据接收日志
-        if (start) {
-            return UniReturnUtil.success("启动成功");
-        }
-        DataBase.updateBatch(Config.centerConnectionStr,"update serviceinfo set runState = 1 where  serviceid =?",Collections.singletonList(new Object[]{serviceId}));
-        ScheduleTaskMaps.remove(serviceId);
-        return UniReturnUtil.fail("启动失败");
-    }
-
-    /**
-     * 服务停止
-     *
-     * @param service_id
-     * @return
-     */
-    public static Map<String,Object> stop(String service_id) throws Exception {
-        ScheduleTask scheduleTask = ScheduleTaskMaps.get(service_id);
-        if (Objects.isNull(scheduleTask)) {
-            return UniReturnUtil.fail("服务: " + service_id + " 已经停止");
-        }
-        boolean reset = ScheduleUtil.cancel(scheduleTask);
-        ScheduleTaskMaps.remove(service_id);
-        DataBase.updateBatch(Config.centerConnectionStr,"update serviceinfo set runState = 0 where  serviceID =?",Collections.singletonList(new Object[]{service_id}));
-//        LogUtils.log("stop: 2", "0", null, (reset ? "停止成功" : "停止失败"), service_id, null, null, null,null);//无条件记录数据接收日志
-        return UniReturnUtil.success("停止" + (reset ? "成功" : "失败"));
-    }
-
-    public static Map<String,Object> startOrStop(Map<String, Object> params, String statue) throws Exception {
-        Object serviceId1 = params.get("serviceId");
-        String serviceId = Objects.isNull(serviceId1) ? null : serviceId1.toString();
-        if (!StringUtils.hasText(serviceId)) {
-            return  UniReturnUtil.fail("服务ID不能为空");
-        }
-        List<Map<String, Object>> serviceTypeData = DataBase.query(Config.centerConnectionStr,"select serviceType from serviceinfo where serviceID=?", Collections.singletonList(new Object[]{serviceId}));
-
-        if (Objects.nonNull(serviceTypeData) && Objects.equals("4",serviceTypeData.get(0).get("serviceType").toString())) {
-            if (statue.equals("0")) {
-                return stop(serviceId);
-            }
-            return start(serviceId);
-        }
-//        ObjectMap.getordropInput(serviceId,false);
-        return UniReturnUtil.success("重启成功");
-    }
-}
+//package com.scbfkj.uni.service;
+//
+//import com.scbfkj.uni.library.DataFormatUtil;
+//import com.scbfkj.uni.library.UniReturnUtil;
+//import com.scbfkj.uni.process.DataBase;
+//import com.scbfkj.uni.system.Config;
+//import com.scbfkj.uni.system.ScheduleTask;
+//import com.scbfkj.uni.system.ScheduleUtil;
+//import org.springframework.util.StringUtils;
+//
+//import java.time.LocalDateTime;
+//import java.util.*;
+//
+//public class ControlService {
+//
+//
+//    public static Map<String, ScheduleTask> ScheduleTaskMaps = new HashMap<>();
+//
+//
+//    public static void startServiceByContainerCode() throws Exception {
+//        String queryServiceInfos = "select * from serviceinfo where containercode=?";
+//        List<Map<String, Object>> serviceInfos = DataBase.query(Config.centerConnectionStr, queryServiceInfos, Collections.singletonList(new Object[]{Config.containerCode}));
+//        for (Map<String, Object> serviceInfo : serviceInfos) {
+//            String serviceId = serviceInfo.get("serviceid").toString();
+//            String serviceType = serviceInfo.get("servicetype").toString();
+//            String taskType = serviceInfo.get("tasktype").toString();
+////
+////            todo 启动服务
+//
+//        }
+//    }
+//    /**
+//     * 服务启动
+//     *
+//     * @param serviceId
+//     * @return
+//     */
+//    public static Map<String,Object> start(String serviceId) throws Exception {
+//        //采集数据源及参数配置
+//        List<Map<String, Object>> collectServiceList = DataBase.query(Config.centerConnectionStr,"SELECT SI.* FROM  serviceinfo SI WHERE SI.serviceType ='4'  AND serviceID = ?", Collections.singletonList(new Object[]{serviceId}));
+//
+//        if (Objects.isNull(collectServiceList) || collectServiceList.isEmpty()) {
+//            return UniReturnUtil.fail("采集服务不存在");
+//        }
+//        Map<String, Object> collectServiceMaps = collectServiceList.get(0); //采集服务
+//        Integer loopCount = Objects.isNull (collectServiceMaps.get("loopCount")) ? null : Integer.parseInt(collectServiceMaps.get("loopCount").toString());
+//        int frequencyCount = Objects.isNull (collectServiceMaps.get("frequencyCount")) ? 1 : Integer.parseInt(collectServiceMaps.get("frequencyCount").toString());
+//        int frequencyUnit = Objects.isNull (collectServiceMaps.get("frequencyUnit")) ? 1 : Integer.parseInt(collectServiceMaps.get("frequencyUnit").toString());
+//        int taskType = Objects.isNull (collectServiceMaps.get("taskType")) ? 1 : Integer.parseInt(collectServiceMaps.get("taskType").toString());
+//        Object cronExpress = collectServiceMaps.get("cronExpress");
+//        LocalDateTime taskValidDate = DataFormatUtil.toDateTime(collectServiceMaps.get("taskValid"));
+//        LocalDateTime taskInvalidDate = DataFormatUtil.toDateTime(collectServiceMaps.get("taskInvalid"));
+//        //创建定时任务:
+//        ScheduleTask scheduleTask = ScheduleTaskMaps.get(serviceId);
+//        if (Objects.isNull(scheduleTask)) {
+//            scheduleTask = new ScheduleTask(serviceId, loopCount, taskValidDate, taskInvalidDate);
+//            ScheduleTaskMaps.put(serviceId, scheduleTask);
+//        }
+//        // todo 记录日志
+//        boolean start = ScheduleUtil.start(scheduleTask, (frequencyCount * frequencyUnit), cronExpress, taskType);
+////        LogUtils.log("start: 2", "0", null, (start ? "启动成功" : "启动失败"), serviceId,  null, null, null,null);//无条件记录数据接收日志
+//        if (start) {
+//            return UniReturnUtil.success("启动成功");
+//        }
+//        DataBase.updateBatch(Config.centerConnectionStr,"update serviceinfo set runState = 1 where  serviceid =?",Collections.singletonList(new Object[]{serviceId}));
+//        ScheduleTaskMaps.remove(serviceId);
+//        return UniReturnUtil.fail("启动失败");
+//    }
+//
+//    /**
+//     * 服务停止
+//     *
+//     * @param service_id
+//     * @return
+//     */
+//    public static Map<String,Object> stop(String service_id) throws Exception {
+//        ScheduleTask scheduleTask = ScheduleTaskMaps.get(service_id);
+//        if (Objects.isNull(scheduleTask)) {
+//            return UniReturnUtil.fail("服务: " + service_id + " 已经停止");
+//        }
+//        boolean reset = ScheduleUtil.cancel(scheduleTask);
+//        ScheduleTaskMaps.remove(service_id);
+//        DataBase.updateBatch(Config.centerConnectionStr,"update serviceinfo set runState = 0 where  serviceID =?",Collections.singletonList(new Object[]{service_id}));
+////        LogUtils.log("stop: 2", "0", null, (reset ? "停止成功" : "停止失败"), service_id, null, null, null,null);//无条件记录数据接收日志
+//        return UniReturnUtil.success("停止" + (reset ? "成功" : "失败"));
+//    }
+//
+//    public static Map<String,Object> startOrStop(Map<String, Object> params, String statue) throws Exception {
+//        Object serviceId1 = params.get("serviceId");
+//        String serviceId = Objects.isNull(serviceId1) ? null : serviceId1.toString();
+//        if (!StringUtils.hasText(serviceId)) {
+//            return  UniReturnUtil.fail("服务ID不能为空");
+//        }
+//        List<Map<String, Object>> serviceTypeData = DataBase.query(Config.centerConnectionStr,"select serviceType from serviceinfo where serviceID=?", Collections.singletonList(new Object[]{serviceId}));
+//
+//        if (Objects.nonNull(serviceTypeData) && Objects.equals("4",serviceTypeData.get(0).get("serviceType").toString())) {
+//            if (statue.equals("0")) {
+//                return stop(serviceId);
+//            }
+//            return start(serviceId);
+//        }
+////        ObjectMap.getordropInput(serviceId,false);
+//        return UniReturnUtil.success("重启成功");
+//    }
+//}

+ 69 - 104
src/main/java/com/scbfkj/uni/service/DataProcessService.java

@@ -1,11 +1,14 @@
 package com.scbfkj.uni.service;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.jayway.jsonpath.JsonPath;
 import com.scbfkj.uni.library.DataAliasGetUtil;
+import com.scbfkj.uni.library.DataFormatUtil;
 import com.scbfkj.uni.library.ScriptEngineUtil;
 import com.scbfkj.uni.library.UniReturnUtil;
 import com.scbfkj.uni.process.DataBase;
+import com.scbfkj.uni.system.Config;
 import jakarta.el.MethodNotFoundException;
-import org.springframework.stereotype.Service;
 
 import java.io.File;
 import java.lang.reflect.Method;
@@ -17,33 +20,64 @@ import java.util.stream.Collectors;
 
 public class DataProcessService {
 
-    private String id;
 
-    public DataProcessService(String id) {
-        this.id = id;
-    }
+    public static Map<String, Object> process(Map<String, Object> inData) throws Exception {
 
-    public Map<String, Object> process(Map<String, Object> inData) throws Exception {
         Optional<String> serviceIdOpt = DataAliasGetUtil.getValue("serviceid", inData);
-        if (serviceIdOpt.isEmpty()) {
+        if (Objects.isNull(inData) || inData.isEmpty() || serviceIdOpt.isEmpty())
             return UniReturnUtil.fail("服务编号不能为空");
-        }
+
+        List<Map<String, Object>> resource = new ArrayList<>();
+        resource.add(inData);
         String serviceId = serviceIdOpt.get();
-//        todo
+
+        List<Map<String, Object>> algorithmLibraries = DataBase.query(Config.centerConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
+        for (Map<String, Object> algorithmLibrary : algorithmLibraries) {
+            resource.add(processByAlgorithm(algorithmLibrary, resource, null, null));
+        }
         return UniReturnUtil.success(null);
     }
 
-    public Object processByAlgorithm(String type, String targetSource, String className, String event, String expression,List<String> filterColumns, Map<String, Object> filterLines, Object... args) throws Exception {
+    public static Map<String, Object> processByAlgorithm(Map<String, Object> algorithmLibrary, List<Map<String, Object>> args, List<String> filterColumns, Map<String, Object> filterLines) throws Exception {
+
+        Object type = algorithmLibrary.get("algorithmtype");
+        Object expression = algorithmLibrary.get("computingexpression");
+        Object parameterSet = algorithmLibrary.get("parameterset");
+        Object preConditions = algorithmLibrary.get("preconditions");
+        Object executionNumber = algorithmLibrary.get("executionnumber");
+        Object dataSourceId = algorithmLibrary.get("datasourceid");
+        List<Map<String, Object>> datasourceList = DataBase.query(Config.centerConnectionStr, "select * from datasource where datasourceid=?", Collections.singletonList(new Object[]{dataSourceId}));
+        Map<String, Object> datasource = datasourceList.get(0);
+
+        if (Objects.nonNull(preConditions)) {
+            String result = preConditionScript(preConditions.toString(), args);
+            if (Objects.equals("1", result) || Objects.equals("2", result)) {
+                return UniReturnUtil.success(result);
+            }
+        }
+        List<Object> parameters = new ArrayList<>();
+//        获取入参列表
+        if (Objects.nonNull(parameterSet)) {
+            HashMap<String, Object> source = new HashMap<>();
+            source.put("args", args);
+            source.put("algorithm", algorithmLibrary);
+            source.put("datasource", datasource);
+            parameters.add(getParams(parameterSet.toString(), source));
+        }
 
-        switch (type) {
+        switch (type.toString()) {
 //            java反射
             case "1" -> {
-                if (Objects.nonNull(targetSource)) {
-                    File file = new File(System.getProperty("user.dir").concat(File.separator).concat("plugins").concat(File.separator).concat(targetSource));
+                Object connectset = datasource.get("connectset");
+                JsonNode jsonNode = DataFormatUtil.toJsonNode(connectset);
+                JsonNode path = jsonNode.get("path");
+                if (Objects.nonNull(path)) {
+                    File file = new File(System.getProperty("user.dir").concat(File.separator).concat("plugins").concat(File.separator).concat(path.asText()));
                     if (!file.exists()) {
-                        return "外部文件加载不存在:".concat(System.getProperty("user.dir")).concat(File.separator).concat("plugins").concat(File.separator).concat(targetSource);
+                        throw new RuntimeException("外部文件加载不存在:".concat(System.getProperty("user.dir")).concat(File.separator).concat("plugins").concat(File.separator).concat(path.asText()));
                     }
                     Method addURLMethod = null;
+
                     boolean accessible = false;
                     try {
                         addURLMethod = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
@@ -59,121 +93,52 @@ public class DataProcessService {
                         }
                     }
                 }
+                JsonNode className = jsonNode.get("className");
 
-                Class<?> classExample = Class.forName(className); //获取类实例
+                Class<?> classExample = Class.forName(className.asText()); //获取类实例
                 Method javaMethod = null;
+                Method closeMethod = null;
                 Object classInstance = classExample.getConstructor().newInstance();//类实例接口 无参数构造
                 for (Method currentMethod : classExample.getMethods()) {//循环所有方法
                     String methodName = currentMethod.getName();
                     if (methodName.equals(expression)) {
                         javaMethod = currentMethod;
-                        break;
+                    }else if(methodName.equals("close")){
+                        closeMethod=currentMethod;
                     }
                 }
                 if (Objects.isNull(javaMethod)) {
                     throw new MethodNotFoundException(expression + ": 没找到");
                 }
-                return javaMethod.invoke(classInstance, args);
+
+                Object invoke = javaMethod.invoke(classInstance, parameters.toArray());
+//                关闭 因为没有做实例的缓存 所以要关闭算法
+                if(Objects.nonNull(closeMethod)){
+                    closeMethod.invoke(classInstance);
+                }
+                return UniReturnUtil.success(invoke);
             }
 //            JS表达式
             case "2" -> {
-                return ScriptEngineUtil.eval(expression, args);
+                return UniReturnUtil.success(ScriptEngineUtil.eval(expression.toString(), parameters.toArray()));
             }
 //            数据库
             case "3" -> {
-                String eventStr = expression.trim().substring(0, 6);
-                if (Objects.equals(eventStr, "insert") || Objects.equals(eventStr, "update") || Objects.equals(eventStr, "delete") || Objects.equals(eventStr, "select")) {
-                    switch (event) {
-                        case "1", "2", "3" -> {
-                            List<Object[]> argsList = Arrays.stream(args).map(it -> ((Object[]) it)).toList();
-                            return DataBase.updateBatch(targetSource, expression, argsList);
-
-                        }
-                        case "0" -> {
-                            List<Object[]> argsList = Arrays.stream(args).map(it -> ((Object[]) it)).toList();
-                            return DataBase.query(targetSource, expression, argsList);
-                        }
-                        default -> {
-                            throw new RuntimeException("event 不能为空");
-                        }
-                    }
-                } else {
-                    switch (event) {
-                        case "6" -> {
-
-                            Map<String, List<Map<String, Object>>> insertOrUpdate = splitDBData(targetSource, expression, Arrays.stream(args).map(it -> ((Map<String, Object>) it)).toList());
-                            List<Map<String, Object>> insertList = insertOrUpdate.get("1");
-                            Object o = processByAlgorithm(type, targetSource, className, "1", expression, filterColumns,filterLines,insertList.toArray());
-                            List<Map<String, Object>> updateList = insertOrUpdate.get("2");
-                            Object o2 = processByAlgorithm(type, targetSource, className, "2", expression,filterColumns,filterLines, updateList.toArray());
-                            return null;
-                        }
-                        case "0" -> {
-                            expression = """
-                                    select * from %s where 《whereStr》
-                                    """.formatted(expression);
-                            List<Map<String, Object>> result = DataBase.query(targetSource, expression, Arrays.stream(args).map(it -> ((Map<String, Object>) it)).toList(), filterColumns, filterLines);
-
-                        }
-                        case "1" -> {
-                            List<Map<String,Object>> objects = Arrays.stream(args).map(it -> (Map<String,Object>)((Map<String, Object>) it).getOrDefault("value", it)).toList();
-
-                            Map<String,Object> o = objects.get(0);
-                            List<String> names = o.keySet().stream().filter(it->{
-                                if(Objects.isNull(filterColumns) || filterColumns.isEmpty()) return true;
-                                return filterColumns.contains(it);
-                            }).toList();
-                            String sql = "insert into %s(%s) values(%s)".formatted(expression, String.join(",", names), names.stream().map(it->"?").collect(Collectors.joining(",")));
+//                下放到Database中处理数据
 
-                            return DataBase.updateBatch(targetSource, sql, objects.stream().map(value -> names.stream().map(value::get).toArray()).toList());
-                        }
-                        case "2" -> {
-
-                        }
-                        case "3" -> {
-
-                        }
-                        default -> {
-
-                            throw new RuntimeException("event 不能为空");
-                        }
-                    }
-                }
-            }
-        }
-        return null;
-    }
-
-    public Map<String, List<Map<String, Object>>> splitDBData(String connectionStr, String tableName, List<Map<String, Object>> argsList) throws Exception {
-
-        Map<String, List<Map<String, Object>>> result = new HashMap<>() {{
-            put("1", new ArrayList<>());
-            put("2", new ArrayList<>());
-        }};
-        for (Map<String, Object> args : argsList) {
-
-            if (exist(connectionStr, tableName, args)) {
-                result.get("2").add(args);
-            } else {
-                result.get("1").add(args);
             }
         }
-
-        return result;
+        return UniReturnUtil.fail("");
     }
 
-    public boolean exist(String connectionStr, String tableName, Map<String, Object> args) throws Exception {
-
-        List<Map<String, Object>> mapList = DataBase.query(connectionStr, "select count(1) as existscount from %s where 《whereStr》 ".formatted(tableName), List.of(args), null, null);
-
-        return !mapList.get(0).get("existscount").equals("0");
+    private static String preConditionScript(String script, List<Map<String, Object>> args) throws Exception {
+        return ScriptEngineUtil.eval(script, args);
     }
 
-    public void close() {
-
+    //    参数使用标准的jsonPath表达式 $.a.b[0].c,多个参数之间使用;;分隔
+    private static List<Object> getParams(String parameterSet, Object source) {
+        String[] paths = parameterSet.split(";;");
+        return Arrays.stream(paths).map(it -> JsonPath.read(source, it.trim())).toList();
     }
 
-    public Object getErrorMessage() {
-        return null;
-    }
 }

+ 15 - 42
src/main/java/com/scbfkj/uni/system/ScheduleTask.java

@@ -1,74 +1,47 @@
 package com.scbfkj.uni.system;
 
-import com.scbfkj.uni.service.ControlService;
 import com.scbfkj.uni.service.DataProcessService;
 
-import java.time.LocalDateTime;
-import java.util.Date;
 import java.util.HashMap;
-import java.util.Objects;
 
 public class ScheduleTask implements Runnable {
+
+
     private String id; // 服务ID
 
     public String getId() {
         return id;
     }
 
-    Integer loopCount;
+    Integer loopCount = 0;
     int count = 0;
-    public LocalDateTime taskValid;
-    public LocalDateTime taskInvalid;
+
+    private DataProcessService service;
 
 
     /**
      * @param id 任务ID
      */
-    public ScheduleTask(String id, Integer loopCount, LocalDateTime taskValid, LocalDateTime taskInvalid) {
+    public ScheduleTask(String id) {
+        this.id = id;
+    }
+
+    public ScheduleTask(String id, Integer loopCount) {
         this.id = id;
         this.loopCount = loopCount;
-        this.taskValid = taskValid;
-        this.taskInvalid = taskInvalid;
     }
 
     @Override
     public void run() {
-        if (Objects.nonNull(taskInvalid) && taskInvalid.isBefore(LocalDateTime.now())) {
+
+        do {
+            count++;
             try {
-                ControlService.stop(id);
+                service.process(new HashMap<>());
             } catch (Exception e) {
                 throw new RuntimeException(e);
             }
-            return;
-        }
-        if (Objects.isNull(taskValid) || taskValid.isAfter(LocalDateTime.now())) {
-            if (Objects.isNull(loopCount) || count < loopCount) {
-                if (!Objects.isNull(loopCount))
-                    count++;
-                DataProcessService dataProcess =  ScheduleUtil.getordropInput(id);
-                if (Objects.isNull(dataProcess) || Objects.nonNull(dataProcess.getErrorMessage())) {
-//                 todo   LogUtils.log("run: 1", "-1",  null,"启动服务失败,获取采集对象异常:" + (Objects.isNull(dataProcess) ? "" : dataProcess.getErrorMessage()), id,"服务编号:" + id, null,null,null);//无条件记录数据接收日志
-                    return;
-                }
-                try {
-                    dataProcess.process(new HashMap<>() {{
-                        put("dataContent", "1");
-                        put("event", "1");
-                    }});
-
-                } catch (Exception e) {
-//                    todo 日志
-                }
-
-            } else {
-                try {
-                    ControlService.stop(id);
-                    count = 0;
-                } catch (Exception e) {
-//                    todo 日志
-                }
-            }
-        }
+        } while (loopCount > count);
     }
 
 

+ 45 - 54
src/main/java/com/scbfkj/uni/system/ScheduleUtil.java

@@ -1,54 +1,28 @@
 package com.scbfkj.uni.system;
 
+import com.scbfkj.uni.library.UniReturnUtil;
+import com.scbfkj.uni.process.DataBase;
 import com.scbfkj.uni.service.DataProcessService;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
 
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.ScheduledFuture;
+import java.util.*;
+import java.util.concurrent.*;
 
 public class ScheduleUtil {
 
 
     private static Map<String, DataProcessService> dataProcessMaps = new HashMap<>(); //数据接收
 
-    private static DataProcessService getordropInput(String serviceId, boolean isGet) {
-        if (isGet) {
-            DataProcessService dataProcess = dataProcessMaps.get(serviceId);
-            if (Objects.nonNull(dataProcess) && Objects.nonNull(dataProcess.getErrorMessage())) {
-                dataProcess.close();
-                dataProcess = null;
-                dataProcessMaps.remove(serviceId);
-            }
-            if (!dataProcessMaps.containsKey(serviceId)) {
-                dataProcessMaps.put(serviceId, new DataProcessService(serviceId));
-            }
-            return dataProcessMaps.get(serviceId);
-        } else {
-            DataProcessService datacollect = dataProcessMaps.get(serviceId);
-            if (Objects.nonNull(datacollect)) {
-                datacollect.close();
-                datacollect = null;
-            }
-            dataProcessMaps.remove(serviceId);
-            return null;
-        }
-    }
-
-    public static DataProcessService getordropInput(String serviceId) {
-        return getordropInput(serviceId, true);
-    }
-
-
     private static ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
 
+
     private static Map<String, ScheduledFuture<?>> scheduledFutureMap = new HashMap<>();
 
-    static {
+    private final static Map<String, Map<String, Object>> serviceInfoesMap = new HashMap<>();
+    public static Map<String, ScheduleTask> scheduleTaskMaps = new HashMap<>();
 
+    static {
         threadPoolTaskScheduler.setPoolSize(8);
         threadPoolTaskScheduler.initialize();
         System.out.println("定时任务线程池启动");
@@ -57,39 +31,56 @@ public class ScheduleUtil {
     /**
      * 启动
      *
-     * @param scheduleTask 定时任务
+     * @param serviceId 服务id
      */
-    public static boolean start(ScheduleTask scheduleTask, int cycleFrequency, Object cronExpress, int taskType) {
+    public static boolean start(String serviceId) throws Exception {
+
+        if (!serviceInfoesMap.containsKey(serviceId)) {
+            //采集数据源及参数配置
+            List<Map<String, Object>> serviceInfos = DataBase.query(Config.centerConnectionStr, "SELECT SI.* FROM  serviceinfo SI WHERE SI.servicetype ='4'  AND serviceid = ?", Collections.singletonList(new Object[]{serviceId}));
+
+            if (Objects.isNull(serviceInfos) || serviceInfos.isEmpty()) {
+                throw new RuntimeException("采集服务不存在");
+            }
+            Map<String, Object> serviceInfo = serviceInfos.get(0);
+            serviceInfoesMap.put(serviceId, serviceInfo);
+        }
+
+        Map<String, Object> serviceInfo = serviceInfoesMap.get(serviceId); //采集服务
+
+        Integer loopCount = Objects.isNull(serviceInfo.get("loopcount")) ? null : Integer.parseInt(serviceInfo.get("loopcount").toString());
+        Object cronExpress = serviceInfo.get("cronexpress");
+
+        if (!scheduleTaskMaps.containsKey(serviceId)) {
+            ScheduleTask scheduleTask = new ScheduleTask(serviceId, loopCount);
+            scheduleTaskMaps.put(serviceId, scheduleTask);
+        }
+        ScheduleTask scheduleTask = scheduleTaskMaps.get(serviceId);
+        //创建定时任务:
         System.out.println("启动定时任务线程 taskId " + scheduleTask.getId());// 设置时间比当前时间晚了
-        ScheduledFuture<?> scheduledFuture;
-        if (taskType == 1) {
-            scheduledFuture = threadPoolTaskScheduler.scheduleWithFixedDelay(scheduleTask, new Date(), cycleFrequency);
-        } else {
-            if (Objects.isNull(cronExpress)) {
+        if (Objects.isNull(cronExpress)) {
 //                todo 记录日志
-                return false;
-            }
-            scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask, new CronTrigger(cronExpress.toString()));
+            throw new RuntimeException("定时表达式并没有设置");
         }
-        scheduledFutureMap.put(scheduleTask.getId(), scheduledFuture);
+        ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask, new CronTrigger(cronExpress.toString()));
+        scheduledFutureMap.put(serviceId, scheduledFuture);
+        DataBase.updateBatch(Config.centerConnectionStr, "update serviceinfo set runState = 1 where  serviceid =?", Collections.singletonList(new Object[]{serviceId}));
+
         return true;
     }
 
     /**
-     * 取消
+     * 取消,停止定时任务
      *
-     * @param scheduleTask 定时任务
+     * @param serviceId 定时任务
      */
-    public static boolean cancel(ScheduleTask scheduleTask) {
-        System.out.println("关闭定时任务线程 taskId " + scheduleTask.getId());
-        String serviceId = scheduleTask.getId();
-        ScheduledFuture<?> scheduledFuture = scheduledFutureMap.get(serviceId);
+    public static boolean cancel(String serviceId) throws Exception {
+        System.out.println("关闭定时任务线程 taskId " + serviceId);
+        ScheduledFuture<?> scheduledFuture = scheduledFutureMap.remove(serviceId);
         if (scheduledFuture != null && !scheduledFuture.isCancelled()) {
             scheduledFuture.cancel(false);
+            DataBase.updateBatch(Config.centerConnectionStr, "update serviceinfo set runState = 0 where  serviceID =?", Collections.singletonList(new Object[]{serviceId}));
         }
-        scheduledFutureMap.remove(serviceId);
-        //根据服务ID获取所有算法,进行统一关闭
-        getordropInput(serviceId,false);
         return true;
     }
 }

+ 5 - 18
src/main/java/com/scbfkj/uni/system/SystemInit.java

@@ -1,16 +1,11 @@
 package com.scbfkj.uni.system;
 
 import com.scbfkj.uni.library.DataEncryptionUtil;
-import com.scbfkj.uni.library.DataFormatUtil;
-import com.scbfkj.uni.process.DataBase;
-import com.scbfkj.uni.service.DataProcessService;
 import jakarta.annotation.PostConstruct;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
 
 
 @Component
@@ -45,20 +40,12 @@ public class SystemInit {
             String target = DataEncryptionUtil.decryptRSAByPrivateKey(targetStr);
             Config.targets.add(target);
         }
-
-        startService();
+        initializeTheSystemEnvironment();
+//        ControlService.startServiceByContainerCode();
     }
 
-    public void startService() throws Exception {
-        String queryServiceInfos = "select * from serviceinfo where containercode=?";
-        List<Map<String, Object>> serviceInfos = DataBase.query(Config.centerConnectionStr, queryServiceInfos, Collections.singletonList(new Object[]{Config.containerCode}));
-        for (Map<String, Object> serviceInfo : serviceInfos) {
-            String serviceId = serviceInfo.get("serviceid").toString();
-            String serviceType = serviceInfo.get("servicetype").toString();
-            String taskType = serviceInfo.get("tasktype").toString();
-//
-//            todo 启动服务
-
-        }
+    private void initializeTheSystemEnvironment() {
+//        运行环境初始化
     }
+
 }