andy 1 year ago
parent
commit
317537b7ff

+ 2 - 1
src/main/java/com/scbfkj/uni/UniApplication.java

@@ -2,10 +2,11 @@ package com.scbfkj.uni;
 
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientAutoConfiguration;
 import org.springframework.context.annotation.EnableAspectJAutoProxy;
 import org.springframework.scheduling.annotation.EnableScheduling;
 
-@SpringBootApplication
+@SpringBootApplication(exclude = {ElasticsearchRestClientAutoConfiguration.class})
 @EnableAspectJAutoProxy
 @EnableScheduling
 public class UniApplication {

+ 23 - 1
src/main/java/com/scbfkj/uni/api/ControlApi.java

@@ -1,7 +1,29 @@
 package com.scbfkj.uni.api;
 
-import org.springframework.web.bind.annotation.RestController;
+import com.scbfkj.uni.service.ControlService;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.Map;
 
 @RestController
+@RequestMapping("controlApi")
 public class ControlApi {
+    @PostMapping("start")
+    public ResponseEntity<Map<String,Object>> start(@RequestHeader Map<String, String> headers, @RequestBody Map<String, Object> body) throws Exception {
+        Map<String, Object> started = ControlService.startOrStop(body, "1");
+        return ResponseEntity.ok(started);
+    }
+
+    /**
+     * 主动采集与被动采集的停止
+     *
+     * @param body 请求内容
+     * @return
+     */
+    @PostMapping("stop")
+    public ResponseEntity<Map<String,Object>> stop(@RequestBody Map<String, Object> body) throws Exception {
+        Map<String, Object> stoped = ControlService.startOrStop(body, "0");
+        return ResponseEntity.ok(stoped);
+    }
 }

+ 4 - 0
src/main/java/com/scbfkj/uni/api/DataProcessApi.java

@@ -4,4 +4,8 @@ import org.springframework.web.bind.annotation.RestController;
 
 @RestController
 public class DataProcessApi {
+
+
+
+
 }

+ 53 - 1
src/main/java/com/scbfkj/uni/api/GenericApi.java

@@ -1,7 +1,59 @@
 package com.scbfkj.uni.api;
 
-import org.springframework.web.bind.annotation.RestController;
+import com.fasterxml.jackson.databind.ser.std.MapSerializer;
+import com.scbfkj.uni.library.RequestUtil;
+import com.scbfkj.uni.process.DataBase;
+import com.scbfkj.uni.service.DataProcessService;
+import com.scbfkj.uni.system.Config;
+import org.springframework.http.HttpStatusCode;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 
 @RestController
+@RequestMapping("openApi")
 public class GenericApi {
+
+    @PostMapping({"newdata", "modifydata", "movedata", "query"})
+    public ResponseEntity<Map<String,Object>> base(@RequestBody Map<String, Object> body) throws Exception {
+        String uri = RequestUtil.getUri();
+        String event = "0";
+
+        if (uri.endsWith("newdata")) {
+            event = "1";
+        } else if (uri.endsWith("modifydata")) {
+            event = "2";
+        } else if (uri.endsWith("movedata")) {
+            event = "3";
+        } else if (uri.endsWith("query")) {
+            event = "9";
+        }
+        body.put("event", event);
+
+        Map<String, Object> process = DataProcessService.process(body);
+        return ResponseEntity.ok(process);
+    }
+
+    /**
+     * 匹配服务的暴露的接口
+     *
+     * @param body
+     * @return
+     * @throws Exception
+     */
+
+    @PostMapping("{path}")
+    public ResponseEntity<Map<String,Object>> matchService(@RequestBody Map<String, Object> body) throws Exception {
+        String uri = RequestUtil.getUri();
+        List<Map<String, Object>> serviceinfoList = DataBase.query(Config.centerConnectionStr, "select * from serviceinfo where urilist like concat('%',?,'%')", Collections.singletonList(new Object[]{uri}));
+
+        if (serviceinfoList.isEmpty()) {
+            return ResponseEntity.status(HttpStatusCode.valueOf(404)).build();
+        }
+        Map<String, Object> process = DataProcessService.process(body);
+        return ResponseEntity.ok(process);
+    }
 }

+ 4 - 0
src/main/java/com/scbfkj/uni/library/RequestUtil.java

@@ -109,4 +109,8 @@ public class RequestUtil {
         return RequestContextHolder.currentRequestAttributes().getSessionId();
     }
 
+    public static String getUri() {
+        ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.currentRequestAttributes();
+        return requestAttributes.getRequest().getRequestURI();
+    }
 }

+ 77 - 36
src/main/java/com/scbfkj/uni/process/DataBase.java

@@ -83,7 +83,7 @@ public class DataBase {
         try (Connection connection = dataSourcePool.getConnection();
              PreparedStatement preparedStatement = connection.prepareStatement(sql)
         ) {
-            int[] result = null;
+            int[] result;
             int index = 0;
             try {
                 while (argsList.size() > index) {
@@ -121,7 +121,7 @@ public class DataBase {
         }
     }
 
-    public static Map<String, Object> exec(String connectionStr, String expression, List<Map<String, Object>> args, Object executionNumber, List<String> filterColumns, Map<String, Object> filterLines) throws Exception {
+    public static Map<String, Object> exec(String connectionStr, String expression, List<Map<String, Object>> args, Object executionNumber, List<String> filterColumns, List<Map<String, Object>> filterLines) throws Exception {
 
         if (Objects.isNull(executionNumber) || !StringUtils.hasText(executionNumber.toString())) {
             throw new RuntimeException("执行编号不能为空");
@@ -130,7 +130,21 @@ public class DataBase {
         boolean isTableName = !expression.contains(" ");
 
         if (Objects.isNull(filterColumns)) filterColumns = new ArrayList<>();
-        if (Objects.isNull(filterLines)) filterLines = new HashMap<>();
+        if (Objects.isNull(filterLines)) filterLines = new ArrayList<>();
+        String filterLineWhereStr =null;
+        for (Object f : filterLines) {
+            Map<String, Object> it = ((Map<String, Object>) f);
+            filterLineWhereStr = " %s %s %s %s %s %s %s %s ".formatted(
+                    filterLineWhereStr,
+                    it.getOrDefault("left", ""),
+                    it.get("column"),
+                    it.get("comparator"),
+                    Objects.equals(it.get("comparator"), " is null ") ? "" : "?",
+                    !Objects.equals(it.get("comparator"), " is null ")?" ":it.get("value"),
+                    it.getOrDefault("right", ""),
+                    it.getOrDefault("connector", "")
+            );
+        }
 
 //        只有表名
         if (isTableName) {
@@ -141,12 +155,26 @@ public class DataBase {
                 Map<String, Object> map = args.get(0);
                 Map<String, Object> filter = ((Map<String, Object>) map.getOrDefault("filter", map));
                 filterNames = filter.keySet().stream().toList();
+                String whereStr =null;
+                for (Object f : filterLines) {
+                    Map<String, Object> it = ((Map<String, Object>) f);
+                    whereStr = " %s %s %s %s %s %s %s %s ".formatted(
+                            whereStr,
+                            it.getOrDefault("left", ""),
+                            it.get("column"),
+                            it.get("comparator"),
+                            Objects.equals(it.get("comparator"), " is null ") ? "" : "?",
+                            !Objects.equals(it.get("comparator"), " is null ")?" ":it.get("value"),
+                            it.getOrDefault("right", ""),
+                            it.getOrDefault("connector", "")
+                    );
+                }
                 expression = "select * from %s where %s and  %s".formatted(
                         expression,
                         filterNames.stream().map("%s = ?"::formatted).collect(Collectors.joining(" and ")),
-                        Objects.isNull(filterLines) || filterLines.isEmpty() ? " 1=1 " : filterLines.entrySet().stream().map(entry -> "%s = '%s'".formatted(entry.getKey(), entry.getValue())).collect(Collectors.joining(" and ")));
+                         Objects.isNull(filterLineWhereStr) ? " 1=1 " : filterLineWhereStr);
 
-                if(Objects.nonNull(filterColumns) && !filterColumns.isEmpty()){
+                if( !filterColumns.isEmpty()){
                     expression= "select %s from (%s) as T".formatted(String.join(",", filterColumns),expression);
                 }
 
@@ -208,9 +236,10 @@ public class DataBase {
                     valueNames = value.keySet().stream().toList();
                     Map<String, Object> filter = ((Map<String, Object>) map.get("filter"));
                     filterNames = filter.keySet().stream().toList();
+
                     expression = "update %s set %s where %s and %s".formatted(expression, valueNames.stream().map("%s = ?"::formatted).collect(Collectors.joining(",")),
                             filterNames.stream().map("%s = ?"::formatted).collect(Collectors.joining(" and ")),
-                            Objects.isNull(filterLines) || filterLines.isEmpty() ? " 1=1 " : filterLines.entrySet().stream().map(entry -> "%s = '%s'".formatted(entry.getKey(), entry.getValue())).collect(Collectors.joining(" and ")));
+                            Objects.isNull(filterLineWhereStr)  ? " 1=1 " : filterLineWhereStr);
 //                    删除
                 } else if (Objects.equals("3", executionNumber)) {
                     Map<String, Object> map = args.get(0);
@@ -218,7 +247,7 @@ public class DataBase {
                     filterNames = filter.keySet().stream().toList();
                     expression = "delete from %s where %s and  %s".formatted(expression,
                             filterNames.stream().map("%s = ?"::formatted).collect(Collectors.joining(" and ")),
-                            Objects.isNull(filterLines) || filterLines.isEmpty() ? " 1=1 " : filterLines.entrySet().stream().map(entry -> "%s = '%s'".formatted(entry.getKey(), entry.getValue())).collect(Collectors.joining(" and ")));
+                            Objects.isNull(filterLineWhereStr)  ? " 1=1 " : filterLineWhereStr);
                 }
                 List<Object[]> values = new ArrayList<>();
 //                按照名称 和过滤条件取值
@@ -230,7 +259,7 @@ public class DataBase {
                         objects.addAll(valueNames.stream().map(it -> {
 
 //                            新增 更新 对列过滤
-                            if (finalFilterColumns.contains(it)) {
+                            if (finalFilterColumns.contains(it) || finalFilterColumns.isEmpty()) {
                                 return o1.get(it);
                             } else {
                                 return null;
@@ -266,7 +295,7 @@ public class DataBase {
                                 it.get("comparator"),
                                 Objects.equals(it.get("comparator"), " is null ") ? "" : "?",
                                 it.getOrDefault("right", ""),
-                                it.getOrDefault("connector", " and ")
+                                it.getOrDefault("connector", "")
                         );
                         if (!Objects.equals(it.get("comparator"), " is null ")) {
                             dbFilter.add(it.get("value"));
@@ -282,10 +311,10 @@ public class DataBase {
                     whereStr += list.stream().map(it->"%s = ?".formatted(it.getKey())).collect(Collectors.joining(" and "));
                     dbFilter.addAll(list.stream().map(Map.Entry::getValue).toList());
             }
-            whereStr = " %s and  %s".formatted(whereStr, Objects.isNull(filterLines) || filterLines.isEmpty() ? " 1=1 " : filterLines.entrySet().stream().map(entry -> "%s = '%s'".formatted(entry.getKey(), entry.getValue())).collect(Collectors.joining(" and ")));
+            whereStr = " %s and  %s".formatted(whereStr, Objects.isNull(filterLineWhereStr)  ? " 1=1 " : filterLineWhereStr);
             String sql = expression.replaceAll("《whereStr》", " " + whereStr);
 
-            if(Objects.nonNull(filterColumns) && !filterColumns.isEmpty()){
+            if( !filterColumns.isEmpty()){
                 sql= "select %s from (%s) as T".formatted(String.join(",", filterColumns),sql);
             }
 
@@ -309,7 +338,7 @@ public class DataBase {
                     getSQLVarList(expression);
                 }
                 List<String> names = sqlStrVarList.get(expression);
-                String sql = " %s %s".formatted(sqlStrNewSQL.get(expression), Objects.isNull(filterLines) || filterLines.isEmpty() ? "" : " and " + filterLines.entrySet().stream().map(entry -> "%s = '%s'".formatted(entry.getKey(), entry.getValue())).collect(Collectors.joining(" and ")));
+                String sql = " %s  and %s".formatted(sqlStrNewSQL.get(expression), Objects.isNull(filterLineWhereStr)  ? " 1=1 " : filterLineWhereStr);
                 int[] updateResult = updateBatch(connectionStr, sql, args.stream().map(it -> names.stream().map(it::get).toArray()).toList());
                 return UniReturnUtil.success(updateResult);
             }
@@ -317,8 +346,8 @@ public class DataBase {
     }
 
     private static Pattern regExpression = Pattern.compile("(?<=《)([^》]+)?(?=》)");//提取书名号变量的正则表达式
-    public static Map<String, List<String>> sqlStrVarList = new HashMap<>();//SQL语句的书名号变量列表
-    public static Map<String, String> sqlStrNewSQL = new HashMap<>();//SQL语句更换书名号变量后的可执行SQL
+    private static Map<String, List<String>> sqlStrVarList = new HashMap<>();//SQL语句的书名号变量列表
+    private static Map<String, String> sqlStrNewSQL = new HashMap<>();//SQL语句更换书名号变量后的可执行SQL
 
     private static List<String> getSQLVarList(String sqlStr) {
         String newSqlStr = sqlStr;//不破坏之前SQL语句
@@ -342,30 +371,44 @@ public class DataBase {
         return !Objects.equals(mapList.get(0).get("existscount").toString(), "0");
     }
 
-    public static List<Map<String, Object>> query(String connectionStr, String sql, List<Map<String, Object>> argsList, List<String> filterColumns, Map<String, Object> filterLines) throws Exception {
+    public static List<Map<String, Object>> query(String connectionStr, String sql, List<Map<String, Object>> argsList, List<String> filterColumns, List<Map<String, Object>> filterLines) throws Exception {
 
         sql = sql.replaceAll("(\\r)?\\n", " ");
-
-        if (Objects.nonNull(filterLines) && !filterLines.isEmpty()) {
-            sql = " (%s %s %s) ".formatted(sql, sql.contains(" where ") ? " " : " where ", filterLines.entrySet().stream().map(it -> "%s = %s".formatted(it.getKey(), it.getValue())).collect(Collectors.joining(" and ")));
+        String filterLineWhereStr =null;
+        for (Object f : filterLines) {
+            Map<String, Object> it = ((Map<String, Object>) f);
+            filterLineWhereStr = " %s %s %s %s %s %s %s %s ".formatted(
+                    filterLineWhereStr,
+                    it.getOrDefault("left", ""),
+                    it.get("column"),
+                    it.get("comparator"),
+                    Objects.equals(it.get("comparator"), " is null ") ? "" : "?",
+                    !Objects.equals(it.get("comparator"), " is null ")?" ":it.get("value"),
+                    it.getOrDefault("right", ""),
+                    it.getOrDefault("connector", "")
+            );
+        }
+        if (Objects.nonNull(filterLineWhereStr) ) {
+            sql = " %s %s and %s ".formatted(sql, sql.contains(" where ") ? " 1 = 1" : " where ", Objects.isNull(filterLineWhereStr)?" 1=1 ":filterLineWhereStr);
         }
         if (Objects.nonNull(filterColumns) && !filterColumns.isEmpty()) {
-            sql = "select %s from %s".formatted(filterColumns.stream().collect(Collectors.joining(",")), sql);
-        } else {
-            sql = "select * from %s".formatted(sql);
+            sql = "select %s from ( %s ) T".formatted(String.join(",", filterColumns), sql);
         }
 
         List<List<Object>> result = new ArrayList<>();
-        List<String> names = new ArrayList<>();
-        if (sql.matches("《.+?》")) {
-            Pattern compile = Pattern.compile("(?=《).*(?=》)");
-            Matcher matcher = compile.matcher(sql);
-            int index = 0;
-            while (matcher.find()) {
-                String name = matcher.group(index);
-                names.add(name);
-            }
-        }
+//        List<String> names = new ArrayList<>();
+//        Matcher matcher = regExpression.matcher(sql);
+//        if (matcher.find()) {
+//            int index = 0;
+//            while (matcher.find()) {
+//                String name = matcher.group(index);
+//                names.add(name);
+//            }
+//        }
+
+        List<String> names = getSQLVarList(sql);
+        String newSql = sqlStrNewSQL.get(sql);
+
         if (names.size() == 1 && names.contains("whereStr")) {
             List<Map<String, Object>> list = new ArrayList<>();
             for (Map<String, Object> stringObjectMap : argsList) {
@@ -385,17 +428,15 @@ public class DataBase {
             }
             return list;
         } else {
-            for (String it : names) {
-                sql = sql.replace("《%s》".formatted(it), " ? ");
-            }
             for (Map<String, Object> map : argsList) {
                 List<Object> args = new ArrayList<>();
+                map= ((Map<String,Object>) map.getOrDefault("filter", map));
                 for (String name : names) {
                     args.add(map.get(name));
                 }
                 result.add(args);
             }
-            return query(connectionStr, sql, result.stream().map(List::toArray).toList());
+            return query(connectionStr, newSql, result.stream().map(List::toArray).toList());
         }
     }
 
@@ -453,7 +494,7 @@ public class DataBase {
 
     private static final Map<String, List<String>> columns = new HashMap<>();
 
-    public static List<String> getColumns(String connection, String sql, ResultSet resultSet) throws SQLException {
+    private static List<String> getColumns(String connection, String sql, ResultSet resultSet) throws SQLException {
         if (columns.containsKey(connection + sql)) {
             return columns.get(connection + sql);
         }

+ 96 - 5
src/main/java/com/scbfkj/uni/process/Kafka.java

@@ -1,16 +1,107 @@
 package com.scbfkj.uni.process;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.scbfkj.uni.library.DataFormatUtil;
 import com.scbfkj.uni.library.UniReturnUtil;
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
+import org.apache.commons.pool2.KeyedObjectPool;
+import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.impl.DefaultPooledObject;
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.util.StringUtils;
 
-import java.util.Map;
-import java.util.Objects;
+import java.time.Duration;
+import java.util.*;
 
 public class Kafka {
-    public static Map<String, Object> send(String connection, String topic, Object data) {
 
-        if(Objects.isNull(data) ){
+    private static final KeyedObjectPool<String, Producer<String, String>> producerPool = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<String, Producer<String, String>>() {
+        @Override
+        public Producer<String, String> create(String key) throws Exception {
+            Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.stringToMap(key);
+            connectConfigMaps.put("request.timeout.ms", 60000);
+            connectConfigMaps.put("max.request.size", 10240000);
+            connectConfigMaps.put("compression.type", "gzip");
+            connectConfigMaps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+            connectConfigMaps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+
+            return new KafkaProducer<>(connectConfigMaps);
+        }
+
+        @Override
+        public PooledObject<Producer<String, String>> wrap(Producer<String, String> value) {
+            return new DefaultPooledObject<>(value);
+        }
+    });
+    private static final KeyedObjectPool<String, Consumer<String, String>> consumerPool = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<String, Consumer<String, String>>() {
+        @Override
+        public Consumer<String, String> create(String key) throws Exception {
+            Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.stringToMap(key);
+            if (!connectConfigMaps.containsKey("max.poll.records"))
+                connectConfigMaps.put("max.poll.records", 1);
+            connectConfigMaps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            connectConfigMaps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
+            return new KafkaConsumer<>(connectConfigMaps);
+        }
+
+        @Override
+        public PooledObject<Consumer<String, String>> wrap(Consumer<String, String> value) {
+            return new DefaultPooledObject<>(value);
+        }
+    });
+
+    public static Map<String, Object> sendMessage(String connection, String topic, Object data) throws Exception {
+
+        if (Objects.isNull(data)) {
             return UniReturnUtil.fail("数据为空");
         }
-        return null;
+
+        List<Object> sendResult = new ArrayList<>();
+        if (data instanceof List<?> datas) {
+            for (Object o : datas) {
+                sendResult.add(sendMessage(connection, topic, DataFormatUtil.toString(o)));
+            }
+            return UniReturnUtil.success(sendResult);
+        } else {
+            Producer<String, String> producer = producerPool.borrowObject(connection);
+            sendResult.add(producer.send(new ProducerRecord<>(topic, DataFormatUtil.toString(data))));
+            producerPool.returnObject(connection, producer);
+        }
+        return UniReturnUtil.success(sendResult);
+    }
+
+    public static Map<String, Object> reception(String connectConfig, String sourceObjectName, String libraryId, String pollNumber, String fetch) throws Exception {
+        Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.stringToMap(connectConfig);
+        if (!connectConfigMaps.containsKey("group.id")) {
+            connectConfigMaps.put("group.id", "groupid" + (String.format("%s", libraryId)));
+        }
+        connectConfigMaps.put("max.poll.records", pollNumber == null ? 1 : Integer.parseInt(pollNumber));
+        if (StringUtils.hasText(fetch)) {
+            connectConfigMaps.put("max.partition.fetch.bytes", 11534336);
+        }
+        String key = DataFormatUtil.toString(connectConfig);
+        Consumer<String, String> consumer = consumerPool.borrowObject(key);
+        consumer.subscribe(Collections.singleton(sourceObjectName));
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+        List<String> messageList = new ArrayList<>();
+        if (records.count() > 0) {
+            for (ConsumerRecord<String, String> record : records.records(sourceObjectName)) {
+                String readValue = record.value().trim();
+                if (!StringUtils.hasLength(readValue) || readValue.equals("{}")) {
+                    continue;
+                }
+                messageList.add(readValue);
+            }
+        }
+        consumerPool.returnObject(key, consumer);
+        return UniReturnUtil.success(messageList);
+
     }
 }

+ 102 - 105
src/main/java/com/scbfkj/uni/service/ControlService.java

@@ -1,105 +1,102 @@
-//package com.scbfkj.uni.service;
-//
-//import com.scbfkj.uni.library.DataFormatUtil;
-//import com.scbfkj.uni.library.UniReturnUtil;
-//import com.scbfkj.uni.process.DataBase;
-//import com.scbfkj.uni.system.Config;
-//import com.scbfkj.uni.system.ScheduleTask;
-//import com.scbfkj.uni.system.ScheduleUtil;
-//import org.springframework.util.StringUtils;
-//
-//import java.time.LocalDateTime;
-//import java.util.*;
-//
-//public class ControlService {
-//
-//
-//    public static Map<String, ScheduleTask> ScheduleTaskMaps = new HashMap<>();
-//
-//
-//    public static void startServiceByContainerCode() throws Exception {
-//        String queryServiceInfos = "select * from serviceinfo where containercode=?";
-//        List<Map<String, Object>> serviceInfos = DataBase.query(Config.centerConnectionStr, queryServiceInfos, Collections.singletonList(new Object[]{Config.containerCode}));
-//        for (Map<String, Object> serviceInfo : serviceInfos) {
-//            String serviceId = serviceInfo.get("serviceid").toString();
-//            String serviceType = serviceInfo.get("servicetype").toString();
-//            String taskType = serviceInfo.get("tasktype").toString();
-////
-////            todo 启动服务
-//
-//        }
-//    }
-//    /**
-//     * 服务启动
-//     *
-//     * @param serviceId
-//     * @return
-//     */
-//    public static Map<String,Object> start(String serviceId) throws Exception {
-//        //采集数据源及参数配置
-//        List<Map<String, Object>> collectServiceList = DataBase.query(Config.centerConnectionStr,"SELECT SI.* FROM  serviceinfo SI WHERE SI.serviceType ='4'  AND serviceID = ?", Collections.singletonList(new Object[]{serviceId}));
-//
-//        if (Objects.isNull(collectServiceList) || collectServiceList.isEmpty()) {
-//            return UniReturnUtil.fail("采集服务不存在");
-//        }
-//        Map<String, Object> collectServiceMaps = collectServiceList.get(0); //采集服务
-//        Integer loopCount = Objects.isNull (collectServiceMaps.get("loopCount")) ? null : Integer.parseInt(collectServiceMaps.get("loopCount").toString());
-//        int frequencyCount = Objects.isNull (collectServiceMaps.get("frequencyCount")) ? 1 : Integer.parseInt(collectServiceMaps.get("frequencyCount").toString());
-//        int frequencyUnit = Objects.isNull (collectServiceMaps.get("frequencyUnit")) ? 1 : Integer.parseInt(collectServiceMaps.get("frequencyUnit").toString());
-//        int taskType = Objects.isNull (collectServiceMaps.get("taskType")) ? 1 : Integer.parseInt(collectServiceMaps.get("taskType").toString());
-//        Object cronExpress = collectServiceMaps.get("cronExpress");
-//        LocalDateTime taskValidDate = DataFormatUtil.toDateTime(collectServiceMaps.get("taskValid"));
-//        LocalDateTime taskInvalidDate = DataFormatUtil.toDateTime(collectServiceMaps.get("taskInvalid"));
-//        //创建定时任务:
-//        ScheduleTask scheduleTask = ScheduleTaskMaps.get(serviceId);
-//        if (Objects.isNull(scheduleTask)) {
-//            scheduleTask = new ScheduleTask(serviceId, loopCount, taskValidDate, taskInvalidDate);
-//            ScheduleTaskMaps.put(serviceId, scheduleTask);
-//        }
-//        // todo 记录日志
-//        boolean start = ScheduleUtil.start(scheduleTask, (frequencyCount * frequencyUnit), cronExpress, taskType);
-////        LogUtils.log("start: 2", "0", null, (start ? "启动成功" : "启动失败"), serviceId,  null, null, null,null);//无条件记录数据接收日志
-//        if (start) {
-//            return UniReturnUtil.success("启动成功");
-//        }
-//        DataBase.updateBatch(Config.centerConnectionStr,"update serviceinfo set runState = 1 where  serviceid =?",Collections.singletonList(new Object[]{serviceId}));
-//        ScheduleTaskMaps.remove(serviceId);
-//        return UniReturnUtil.fail("启动失败");
-//    }
-//
-//    /**
-//     * 服务停止
-//     *
-//     * @param service_id
-//     * @return
-//     */
-//    public static Map<String,Object> stop(String service_id) throws Exception {
-//        ScheduleTask scheduleTask = ScheduleTaskMaps.get(service_id);
-//        if (Objects.isNull(scheduleTask)) {
-//            return UniReturnUtil.fail("服务: " + service_id + " 已经停止");
-//        }
-//        boolean reset = ScheduleUtil.cancel(scheduleTask);
-//        ScheduleTaskMaps.remove(service_id);
-//        DataBase.updateBatch(Config.centerConnectionStr,"update serviceinfo set runState = 0 where  serviceID =?",Collections.singletonList(new Object[]{service_id}));
-////        LogUtils.log("stop: 2", "0", null, (reset ? "停止成功" : "停止失败"), service_id, null, null, null,null);//无条件记录数据接收日志
-//        return UniReturnUtil.success("停止" + (reset ? "成功" : "失败"));
-//    }
-//
-//    public static Map<String,Object> startOrStop(Map<String, Object> params, String statue) throws Exception {
-//        Object serviceId1 = params.get("serviceId");
-//        String serviceId = Objects.isNull(serviceId1) ? null : serviceId1.toString();
-//        if (!StringUtils.hasText(serviceId)) {
-//            return  UniReturnUtil.fail("服务ID不能为空");
-//        }
-//        List<Map<String, Object>> serviceTypeData = DataBase.query(Config.centerConnectionStr,"select serviceType from serviceinfo where serviceID=?", Collections.singletonList(new Object[]{serviceId}));
-//
-//        if (Objects.nonNull(serviceTypeData) && Objects.equals("4",serviceTypeData.get(0).get("serviceType").toString())) {
-//            if (statue.equals("0")) {
-//                return stop(serviceId);
-//            }
-//            return start(serviceId);
-//        }
-////        ObjectMap.getordropInput(serviceId,false);
-//        return UniReturnUtil.success("重启成功");
-//    }
-//}
+package com.scbfkj.uni.service;
+
+import com.scbfkj.uni.library.DataAliasGetUtil;
+import com.scbfkj.uni.library.UniReturnUtil;
+import com.scbfkj.uni.process.DataBase;
+import com.scbfkj.uni.system.Config;
+import com.scbfkj.uni.system.ScheduleUtil;
+
+import java.util.*;
+
+public class ControlService {
+
+
+    public static void startServiceByContainerCode() throws Exception {
+
+//        查询服务类型为主动采集的服务
+        String queryServiceInfos = "select * from serviceinfo where containercode=? and servicetype=4";
+        List<Map<String, Object>> serviceInfos = DataBase.query(Config.centerConnectionStr, queryServiceInfos, Collections.singletonList(new Object[]{Config.containerCode}));
+//        循环启动
+        for (Map<String, Object> serviceInfo : serviceInfos) {
+            start(serviceInfo);
+        }
+    }
+
+    /**
+     * 服务启动 主要用于前端或者第三方请求
+     *
+     * @param inData
+     * @return
+     */
+    public static Map<String, Object> start(Map<String, Object> inData) throws Exception {
+        Optional<String> serviceidOpt = DataAliasGetUtil.getValue("serviceid", inData);
+        if (serviceidOpt.isEmpty()) {
+            throw new RuntimeException("服务编号不能为空");
+        }
+        String serviceId = serviceidOpt.get();
+        List<Map<String, Object>> serviceInfoList = null;
+        if (serviceId.matches("^\\d+$")) {
+            serviceInfoList = DataBase.query(Config.centerConnectionStr, "select * from servideinfo where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
+        } else {
+            serviceInfoList = DataBase.query(Config.localCenterConnectionStr, "select * from servideinfo where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
+        }
+        if (serviceInfoList.isEmpty()) {
+            throw new RuntimeException("服务%s没有配置".formatted(serviceId));
+        }
+        Map<String, Object> serviceInfo = serviceInfoList.get(0);
+        Object cronexpress = serviceInfo.get("cronexpress");
+        if (Objects.isNull(cronexpress)) {
+            return DataProcessService.process(inData);
+
+        } else {
+            //启动定时任务:
+            boolean start = ScheduleUtil.start(serviceId);
+            if (start) {
+                DataBase.updateBatch(Config.centerConnectionStr, "update serviceinfo set runState = 1 where  serviceid =?", Collections.singletonList(new Object[]{serviceId}));
+                return UniReturnUtil.success("启动成功");
+            } else {
+                return UniReturnUtil.fail("启动失败");
+            }
+        }
+    }
+
+
+    /**
+     * 服务停止
+     *
+     * @param serviceId
+     * @return
+     */
+    public static Map<String, Object> stop(String serviceId) throws Exception {
+        boolean cancel = ScheduleUtil.cancel(serviceId);
+        DataBase.updateBatch(Config.centerConnectionStr, "update serviceinfo set runState = 0 where  serviceID =?", Collections.singletonList(new Object[]{serviceId}));
+//        LogUtils.log("stop: 2", "0", null, (reset ? "停止成功" : "停止失败"), serviceId, null, null, null,null);//无条件记录数据接收日志
+        return UniReturnUtil.success("停止" + (cancel ? "成功" : "失败"));
+    }
+
+    /**
+     * 用用户启停定时任务 服务类型为4的服务
+     *
+     * @param params
+     * @param statue
+     * @return
+     * @throws Exception
+     */
+
+    public static Map<String, Object> startOrStop(Map<String, Object> params, String statue) throws Exception {
+        Optional<String> serviceIdOpt = DataAliasGetUtil.getValue("serviceid", params);
+        if (serviceIdOpt.isEmpty()) return UniReturnUtil.fail("服务ID不能为空");
+        String serviceId = serviceIdOpt.get();
+        Map<String, Object> stop = stop(serviceId);
+        if (statue.equals("0")) {
+            return stop;
+        }
+        List<Map<String, Object>> serviceTypeData = DataBase.query(Config.centerConnectionStr, "select serviceType from serviceinfo where serviceID=? and containercode=? and servicetype='4'", Collections.singletonList(new Object[]{serviceId, Config.containerCode}));
+        if (serviceTypeData.isEmpty()) {
+            throw new RuntimeException("没有找到服务配置");
+        }
+        return start(new HashMap<>() {{
+            put("service", serviceId);
+        }});
+    }
+}

+ 27 - 17
src/main/java/com/scbfkj/uni/service/DataProcessService.java

@@ -1,5 +1,6 @@
 package com.scbfkj.uni.service;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.jayway.jsonpath.JsonPath;
 import com.scbfkj.uni.library.DataAliasGetUtil;
@@ -31,28 +32,33 @@ public class DataProcessService {
         resource.add(inData);
         String serviceId = serviceIdOpt.get();
 
-        List<Map<String, Object>> algorithmLibraries = DataBase.query(Config.centerConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
+        List<Map<String, Object>> algorithmLibraries;
+        if(serviceId.matches("^\\d+$")){
+            algorithmLibraries  = DataBase.query(Config.centerConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
+        }else{
+            algorithmLibraries  = DataBase.query(Config.localCenterConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
+        }
+
         for (Map<String, Object> algorithmLibrary : algorithmLibraries) {
             resource.add(processByAlgorithm(algorithmLibrary, resource, null, null));
         }
-        return UniReturnUtil.success(null);
+        return resource.get(resource.size() - 1);
     }
 
     /**
      * @param algorithmLibrary 算法配置
-     * @param args 算法参数
-     * @param filterColumns 数据库执行的列权限
-     * @param filterLines 数据库执行的行权限
+     * @param args             算法参数
+     * @param filterColumns    数据库执行的列权限
+     * @param filterLines      数据库执行的行权限
      * @return
      * @throws Exception
      */
-    public static Map<String, Object> processByAlgorithm(Map<String, Object> algorithmLibrary, List<Map<String, Object>> args, List<String> filterColumns, Map<String, Object> filterLines) throws Exception {
+    public static Map<String, Object> processByAlgorithm(Map<String, Object> algorithmLibrary, List<Map<String, Object>> args, List<String> filterColumns, List<Map<String,Object>> filterLines) throws Exception {
 
         Object type = algorithmLibrary.get("algorithmtype");
         Object expression = algorithmLibrary.get("computingexpression");
         Object parameterSet = algorithmLibrary.get("parameterset");
         Object preConditions = algorithmLibrary.get("preconditions");
-        Object executionNumber = algorithmLibrary.get("executionnumber");
         Object dataSourceId = algorithmLibrary.get("datasourceid");
         List<Map<String, Object>> datasourceList = DataBase.query(Config.centerConnectionStr, "select * from datasource where datasourceid=?", Collections.singletonList(new Object[]{dataSourceId}));
         Map<String, Object> datasource = datasourceList.get(0);
@@ -69,7 +75,7 @@ public class DataProcessService {
             source.put("args", args);
             source.put("algorithm", algorithmLibrary);
             source.put("datasource", datasource);
-            parameters.add(getParams(parameterSet.toString(), source));
+            parameters.addAll(getParams(parameterSet.toString(), source));
         }
 
         switch (type.toString()) {
@@ -110,8 +116,8 @@ public class DataProcessService {
                     String methodName = currentMethod.getName();
                     if (methodName.equals(expression)) {
                         javaMethod = currentMethod;
-                    }else if(methodName.equals("close")){
-                        closeMethod=currentMethod;
+                    } else if (methodName.equals("close")) {
+                        closeMethod = currentMethod;
                     }
                 }
                 if (Objects.isNull(javaMethod)) {
@@ -120,18 +126,21 @@ public class DataProcessService {
 
                 Object invoke = javaMethod.invoke(classInstance, parameters.toArray());
 //                关闭 因为没有做实例的缓存 所以要关闭算法
-                if(Objects.nonNull(closeMethod)){
+                if (Objects.nonNull(closeMethod)) {
                     closeMethod.invoke(classInstance);
                 }
                 return UniReturnUtil.success(invoke);
             }
 //            JS表达式
             case "2" -> {
-                return UniReturnUtil.success(ScriptEngineUtil.eval(expression.toString(), parameters.toArray()));
+                return UniReturnUtil.success(ScriptEngineUtil.eval(expression.toString(), parameters));
             }
 //            数据库
             case "3" -> {
 //                下放到Database中处理数据
+//                参数表达式顺序是 数据源连接字符串($.datasource.connectset),sql表达式($.algorithm.computingexpression),需要操作的值($.args[1].returnData),执行编号($.algorithm.executionnumber)
+                return DataBase.exec(parameters.get(0).toString(), parameters.get(1).toString(), ((List<Map<String, Object>>) parameters.get(2)), parameters.get(3), filterColumns, filterLines);
+
 
             }
         }
@@ -139,8 +148,8 @@ public class DataProcessService {
     }
 
     /**
-     * @param script  前置表达式 不能为空
-     * @param args 表达式参数
+     * @param script 前置表达式 不能为空
+     * @param args   表达式参数
      * @return
      * @throws Exception
      */
@@ -150,12 +159,13 @@ public class DataProcessService {
 
     /**
      * @param parameterSet 使用标准的jsonPath表达式 $.a.b[0].c,多个参数之间使用;;分隔
-     * @param source jsonpath需要取值的数据源
+     * @param source       jsonpath需要取值的数据源
      * @return
      */
-    public static List<Object> getParams(String parameterSet, Object source) {
+    public static List<Object> getParams(String parameterSet, Object source) throws JsonProcessingException {
         String[] paths = parameterSet.split(";;");
-        return Arrays.stream(paths).map(it -> JsonPath.read(source, it.trim())).toList();
+        String json = DataFormatUtil.toString(source);
+        return Arrays.stream(paths).map(it -> JsonPath.read(json, it.trim())).toList();
     }
 
 }

+ 324 - 2
src/main/java/com/scbfkj/uni/service/LoggerService.java

@@ -1,14 +1,336 @@
 package com.scbfkj.uni.service;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.jayway.jsonpath.JsonPath;
+import com.scbfkj.uni.library.DataFormatUtil;
+import com.scbfkj.uni.library.UniReturnUtil;
+import com.scbfkj.uni.process.DataBase;
+import com.scbfkj.uni.process.Elasticsearch;
+import com.scbfkj.uni.process.Kafka;
+import com.scbfkj.uni.system.Config;
+import org.springframework.stereotype.Component;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.time.LocalDateTime;
+import java.util.*;
+
+@Component
 public class LoggerService {
 
+    private static String logConnection = """
+            {"jdbcUrl":"jdbc:sqlite:logs/log_","driverClassName":"org.sqlite.JDBC"}""";
+
+    private static Map<String, Integer> flag = new HashMap<>();
+
+
+    //    应用
+    public static void logInterface(String appId, String ip, String requestPath, String sessionId, String requestData, String returnData, String returnCode, String returnMessage, String lifecycleId) {
+        try {
+            String tableName = "interfacelog";
+            String connectionStr = getCurrentThreadConnection(tableName);
+
+            HashMap<String, Object> data = new HashMap<>();
+            data.put("applicationid", appId);
+            data.put("requesttime", LocalDateTime.now());
+            data.put("requestip", ip);
+            data.put("requestpath", requestPath);
+            data.put("sessionid", sessionId);
+            data.put("requestdata", requestData);
+            data.put("returndata", returnData);
+            data.put("returncode", returnCode);
+            data.put("returnmessage", returnMessage);
+            data.put("lifecycleid", lifecycleId);
+            DataBase.exec(connectionStr, tableName, Collections.singletonList(data), "1", null, null);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void logService(String ip, String requestPath, String sessionId, String requestData, String returnData, String returnCode, String returnMessage, String lifecycleId) {
+        try {
+            String tableName = "servicelog";
+            String connectionStr = getCurrentThreadConnection(tableName);
+
+            HashMap<String, Object> data = new HashMap<>();
+            data.put("requesttime", LocalDateTime.now());
+            data.put("requestip", ip);
+            data.put("requestpath", requestPath);
+            data.put("sessionid", sessionId);
+            data.put("requestdata", requestData);
+            data.put("returndata", returnData);
+            data.put("returncode", returnCode);
+            data.put("returnmessage", returnMessage);
+            data.put("lifecycleid", lifecycleId);
+            DataBase.exec(connectionStr, tableName, Collections.singletonList(data), "1", null, null);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public static void logServiceError(String ip, String requestPath, String sessionId, String requestData, String returnData, String returnCode, String returnMessage, String lifecycleId) {
+        try {
+            String tableName = "serviceerrlog";
+            String connectionStr = getCurrentThreadConnection(tableName);
+
+            HashMap<String, Object> data = new HashMap<>();
+            data.put("requesttime", LocalDateTime.now());
+            data.put("requestip", ip);
+            data.put("requestpath", requestPath);
+            data.put("sessionid", sessionId);
+            data.put("requestdata", requestData);
+            data.put("returndata", returnData);
+            data.put("returncode", returnCode);
+            data.put("returnmessage", returnMessage);
+            data.put("lifecycleid", lifecycleId);
+            DataBase.exec(connectionStr, tableName, Collections.singletonList(data), "1", null, null);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public static void logSystemError(String ip, String requestPath, String sessionId, String requestData, String returnData, String returnCode, String returnMessage) {
+        try {
+            String tableName = "systemerrlog";
+            String connectionStr = getCurrentThreadConnection(tableName);
+
+            HashMap<String, Object> data = new HashMap<>();
+            data.put("requesttime", LocalDateTime.now());
+            data.put("requestip", ip);
+            data.put("requestpath", requestPath);
+            data.put("sessionid", sessionId);
+            data.put("requestdata", requestData);
+            data.put("returndata", returnData);
+            data.put("returncode", returnCode);
+            data.put("returnmessage", returnMessage);
+            DataBase.exec(connectionStr, tableName, Collections.singletonList(data), "1", null, null);
 
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public static void logUser(String userId, String appId, String ip, String requestPath, String sessionId, String requestData, String returnData, String returnCode, String returnMessage, String lifecycleId) {
+
+        try {
+            String tableName = "userlog";
+            String connectionStr = getCurrentThreadConnection(tableName);
+
+            HashMap<String, Object> data = new HashMap<>();
+            data.put("applicationid", appId);
+            data.put("requesttime", LocalDateTime.now());
+            data.put("requestip", ip);
+            data.put("requestpath", requestPath);
+            data.put("sessionid", sessionId);
+            data.put("requestdata", requestData);
+            data.put("returndata", returnData);
+            data.put("returncode", returnCode);
+            data.put("returnmessage", returnMessage);
+            data.put("lifecycleid", lifecycleId);
+            data.put("userid", userId);
+            DataBase.exec(connectionStr, tableName, Collections.singletonList(data), "1", null, null);
+
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private static String getCurrentThreadConnection(String targetName) throws Exception {
+        long id = Thread.currentThread().getId();
+
+        String fileName = "logs" + File.separator + "log_" + id + "_" + targetName;
+        File file = new File(fileName);
+        HashMap<String, Object> connectionMap = new HashMap<>();
+
+        connectionMap.put("jdbcUrl", "jdbc:sqlite:" + file.getPath());
+        connectionMap.put("driverClassName", "org.sqlite.JDBC");
+        String newConnection = DataFormatUtil.toString(connectionMap);
+        if (!file.exists()) {
+            String sql = null;
+            if (targetName.equals("interfacelog")) {
+                sql = """
+                        CREATE TABLE interfacelog (
+                             
+                            logid         Integer
+                                primary key autoincrement,
+                             requesttime DATETIME,
+                             requestip VARCHAR(255),
+                             requestpath VARCHAR(255),
+                             requestdata TEXT,
+                             sessionid VARCHAR(255),
+                             returndata TEXT,
+                             returncode INT,
+                             returnmessage TEXT,
+                             applicationid INT,
+                             lifecycleid INT
+                         )
+                                                """;
+            } else if (targetName.equals("servicelog")) {
+                sql = """
+                        CREATE TABLE servicelog (
+                             
+                            logid         Integer
+                                primary key autoincrement,
+                             requesttime DATETIME,
+                             requestip VARCHAR(255),
+                             requestpath VARCHAR(255),
+                             requestdata TEXT,
+                             sessionid VARCHAR(255),
+                             returndata TEXT,
+                             returncode INT,
+                             returnmessage TEXT,
+                             lifecycleid INT
+                         )
+                                                                        """;
+            } else if (targetName.equals("serviceerrlog")) {
+                sql = """
+                        CREATE TABLE serviceerrlog (
+                             
+                            logid         Integer
+                                primary key autoincrement,
+                             requesttime DATETIME,
+                             requestip VARCHAR(255),
+                             requestpath VARCHAR(255),
+                             requestdata TEXT,
+                             sessionid VARCHAR(255),
+                             returndata TEXT,
+                             returncode INT,
+                             returnmessage TEXT,
+                             lifecycleid INT
+                         )
+                        """;
+            } else if (targetName.equals("systemerrlog")) {
+                sql = """
+                        create table systemerrlog
+                         (
+                             logid         Integer
+                                 primary key autoincrement,
+                             requesttime   DATETIME,
+                             requestip     VARCHAR(255),
+                             requestpath   VARCHAR(255),
+                             requestdata   TEXT,
+                             sessionid     VARCHAR(255),
+                             returndata    TEXT,
+                             returncode    INT,
+                             returnmessage TEXT,
+                             lifecycleid   INT
+                         );
 
-//    应用
-    public static void log(String connection,String sourceName,String appId,String appName,String ip,String sessionId,String input,String output) {
 
+                                                 """;
+            } else if (targetName.equals("userlog")) {
+                sql = """
+                        CREATE TABLE userdolog (
+                              
+                            logid         Integer
+                                primary key autoincrement,
+                              requesttime DATETIME,
+                              requestip VARCHAR(255),
+                              requestpath VARCHAR(255),
+                              requestdata TEXT,
+                              sessionid VARCHAR(255),
+                              returndata TEXT,
+                              returncode INT,
+                              returnmessage TEXT,
+                              applicationid INT,
+                              userid INT,
+                              lifecycleid INT
+                          )
+                        """;
+            }
+//            创建表结构
+            DataBase.exec(newConnection, sql);
+        }
+        return newConnection;
     }
 
+    /**
+     * 读取日志并发送到目标存储介质,最后删除已经发送成功的日志
+     *
+     * @param fileName
+     * @return
+     */
+    private static Map<String, Object> apply(String fileName) {
+        String newConnection = logConnection.replace("log_", fileName);
+        String tableName = fileName.substring(fileName.lastIndexOf("_") + 1);
+        try {
+            List<Map<String, Object>> result = DataBase.query(newConnection, "select * from %s where 1=?".formatted(tableName), Collections.singletonList(new Object[]{1}));
+            if (result.isEmpty()) {
+                if (!flag.containsKey(fileName)) {
+                    flag.put(fileName, 0);
+                }
+                int value = flag.get(fileName) + 1;
+                if (value > 10) {
+                    new File("logs" + File.separator + fileName).delete();
+                    flag.remove(fileName);
+                    return UniReturnUtil.success(null);
+                } else {
+                    flag.put(fileName, value);
+
+                }
+
+            }
+            List<String> targets = Config.targets;
+
+//           删除成功的日志
+            if (!result.isEmpty()) {
+                for (String target : targets) {
+                    Object type = JsonPath.read(target, "$.type");
+                    if (Objects.isNull(type)) throw new RuntimeException("日志输出目标类型没有配置");
+                    switch (type.toString()) {
+                        case "DB" -> {
+                            DataBase.exec(target, tableName, result, "1", null, null);
+                        }
+                        case "KAFKA" -> {
+                            Kafka.sendMessage(target, tableName, result);
+                        }
+                        case "ES" -> {
+                            Elasticsearch.send(target, tableName, result);
+                        }
+                    }
+                }
+                DataBase.exec(newConnection, tableName, result.stream().map(it -> (Map<String, Object>) new HashMap<String, Object>() {{
+                    put("logid", it.get("logid"));
+                }}).toList(), "3", null, null);
+            }
+            return UniReturnUtil.success(null);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    public static void sendLog() throws FileNotFoundException {
+        File logDir = new File("logs");
+        if (logDir.exists()) {
+            if (logDir.isDirectory()) {
+                File[] files = logDir.listFiles();
+
+
+                List<File> logs = new ArrayList<>();
+                if (files != null) {
+                    logs = Arrays.stream(files).toList();
+                }
+                logs.parallelStream().forEach(it -> {
+                    String fileName = it.getName();
+                    apply(fileName);
+                });
+
+            } else {
+                throw new FileNotFoundException("logs 不是文件夹");
+            }
+        } else {
+            throw new FileNotFoundException("日志目录没有找到");
+        }
+    }
 
 
 }

+ 3 - 0
src/main/java/com/scbfkj/uni/system/Config.java

@@ -22,6 +22,9 @@ public class Config {
     public static String centerConnectionStr;
     public static String securityConnectionStr;
 
+    public static String localCenterConnectionStr= """
+            {"jdbcUrl":"jdbc:sqlite:systemset.sqlite","driverClassName":"org.sqlite.JDBC"}""";
+
     //    是否启动安全验证
     public static boolean enable;
 

+ 8 - 4
src/main/java/com/scbfkj/uni/system/ScheduleUtil.java

@@ -12,8 +12,6 @@ import java.util.concurrent.*;
 public class ScheduleUtil {
 
 
-    private static Map<String, DataProcessService> dataProcessMaps = new HashMap<>(); //数据接收
-
     private static ThreadPoolTaskScheduler threadPoolTaskScheduler = new ThreadPoolTaskScheduler();
 
 
@@ -50,7 +48,9 @@ public class ScheduleUtil {
 
         Integer loopCount = Objects.isNull(serviceInfo.get("loopcount")) ? null : Integer.parseInt(serviceInfo.get("loopcount").toString());
         Object cronExpress = serviceInfo.get("cronexpress");
-
+        if (Objects.isNull(cronExpress)) {
+            throw new RuntimeException("服务:%s,不是定时任务");
+        }
         if (!scheduleTaskMaps.containsKey(serviceId)) {
             ScheduleTask scheduleTask = new ScheduleTask(serviceId, loopCount);
             scheduleTaskMaps.put(serviceId, scheduleTask);
@@ -65,10 +65,14 @@ public class ScheduleUtil {
         ScheduledFuture<?> scheduledFuture = threadPoolTaskScheduler.schedule(scheduleTask, new CronTrigger(cronExpress.toString()));
         scheduledFutureMap.put(serviceId, scheduledFuture);
         DataBase.updateBatch(Config.centerConnectionStr, "update serviceinfo set runState = 1 where  serviceid =?", Collections.singletonList(new Object[]{serviceId}));
-
         return true;
     }
 
+
+    public static boolean start(Runnable runnable,String cron){
+        threadPoolTaskScheduler.schedule(runnable, new CronTrigger(cron));
+        return true;
+    }
     /**
      * 取消,停止定时任务
      *

+ 21 - 10
src/main/java/com/scbfkj/uni/system/SystemInit.java

@@ -1,16 +1,17 @@
 package com.scbfkj.uni.system;
 
 import com.scbfkj.uni.library.DataEncryptionUtil;
+import com.scbfkj.uni.service.ControlService;
 import jakarta.annotation.PostConstruct;
 import org.springframework.beans.factory.annotation.Value;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
 import org.springframework.stereotype.Component;
 
 import java.io.File;
-import java.io.IOException;
+import java.io.FileNotFoundException;
 import java.util.List;
 
+import com.scbfkj.uni.service.LoggerService;
+
 
 @Component
 public class SystemInit {
@@ -44,19 +45,29 @@ public class SystemInit {
             String target = DataEncryptionUtil.decryptRSAByPrivateKey(targetStr);
             Config.targets.add(target);
         }
-        initializeTheSystemEnvironment();
-//        ControlService.startServiceByContainerCode();
+        initializeSystemEnvironment();
+        ControlService.startServiceByContainerCode();
+//        日志任务
+        ScheduleUtil.start(() -> {
+            try {
+                LoggerService.sendLog();
+            } catch (FileNotFoundException exception) {
+                System.out.println(exception.getMessage());
+            }
+        }, "* * * * * *");
+
+        LoggerService.logSystemError(null, null, null, null, null, "0", "启动应用程序成功");
     }
 
-    private void initializeTheSystemEnvironment() {
+    private void initializeSystemEnvironment() {
 //        运行环境初始化
         File resource = new File("logs");
         boolean exists = resource.exists();
-        if (!exists){
-            if(!resource.mkdirs()){
+        if (!exists) {
+            if (!resource.mkdirs()) {
                 throw new RuntimeException("创建日志目录失败");
-            }else {
-                System.out.println("创建目录成功:"+resource.getAbsolutePath());
+            } else {
+                System.out.println("创建目录成功:" + resource.getAbsolutePath());
             }
         }
     }

+ 1 - 1
src/main/resources/application.properties

@@ -3,4 +3,4 @@ db.security.config=Jnj84d14EmSgKEXyAbSH+bratWGkpV89/VA5Er4yQOt7qlnKtGYJzBVJNNYMB
 log.target=B7xSbq4imA5zapX8kEO42mU/5sA2TyF/Ba2Y/++F3z9Np7iT4ywDUkbRC4w/Xrxv1kMSR8PQMJ4dfYwc3mYj0SJJivN5A5/6hI+ZSQBabfZZrYwaIIRdM1XIk4wo1SIrSCXKzef8X6YUH70R2tnh+Uq6KNNp08KaZ2ZXM8vX5Ss=
 server.port=8085
 app.container.code=001
-#app.debug=true
+#app.debug=true

+ 35 - 4
src/test/java/com/scbfkj/uni/process/DataBaseTest.java

@@ -63,9 +63,15 @@ class DataBaseTest {
         add("id");
         add("serviceid");
     }};
-   static HashMap<String, Object> filterLines = new HashMap<>() {{
-        put("id", 15);
-    }};
+   static List<Map<String, Object>> filterLines = new ArrayList<Map<String,Object>>(){{
+       add(new HashMap<>(){{
+           put("left","(");
+           put("column","id");
+           put("comparator","=");
+           put("value",15);
+           put("right",")");
+       }});
+   }};
 
     @Test
     void exec() throws Exception {
@@ -89,8 +95,33 @@ class DataBaseTest {
     }
 
     @Test
-    public void test() throws Exception {
+    public void testWhereStr() throws Exception {
         Map<String, Object> exec = DataBase.exec(connectionStr, "select * from %s where 《whereStr》".formatted(tableName), insert, "0", filterColumns, filterLines);
         System.out.println(DataFormatUtil.toString(exec));
     }
+    @Test
+    public void testBookTitleSymbol() throws Exception {
+        Map<String, Object> exec = DataBase.exec(connectionStr, "select * from %s where id=《id》".formatted(tableName), insert, "0", filterColumns, filterLines);
+        System.out.println(DataFormatUtil.toString(exec));
+    }
+    @Test
+    public void testBookTitleSymbol2() throws Exception {
+        Map<String, Object> exec = DataBase.exec(connectionStr, "select * from %s where id=《id》".formatted(tableName), insert.stream().map(it-> ((Map<String,Object>) it.get("filter"))).toList(), "0", filterColumns, filterLines);
+        System.out.println(DataFormatUtil.toString(exec));
+    }
+    @Test
+    public void testFilter() throws Exception {
+        Map<String, Object> exec = DataBase.exec(connectionStr, "select * from %s where 《whereStr》".formatted(tableName), List.of(new HashMap<>(){{
+            put("filter",new ArrayList<Map<String,Object>>(){{
+                add(new HashMap<>(){{
+                    put("left","(");
+                    put("column","id");
+                    put("comparator","=");
+                    put("value",15);
+                    put("right",")");
+                }});
+            }});
+        }}), "0", filterColumns, filterLines);
+        System.out.println(DataFormatUtil.toString(exec));
+    }
 }

+ 139 - 0
src/test/java/com/scbfkj/uni/service/DataProcessServiceTest.java

@@ -0,0 +1,139 @@
+package com.scbfkj.uni.service;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.scbfkj.uni.library.DataFormatUtil;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+@SpringBootTest
+class DataProcessServiceTest {
+
+    @Test
+    void getParams() throws JsonProcessingException {
+        List<Object> params = DataProcessService.getParams("$.a[0];;$.c", """
+                {"a":[{"b":10}],"c":{"d":"20"}}""");
+        System.out.println(params);
+    }
+
+    @Test
+    void processByAlgorithmByDB0() throws Exception {
+
+        Map<String, Object> map = DataProcessService.processByAlgorithm(new HashMap<>() {{
+            put("algorithmlibraryid", 1);
+            put("serviceid", 1);
+            put("algorithmtype", 3);
+            put("computingexpression", "log_error");
+            put("parameterset", "$.datasource.connectset;;$.algorithm.computingexpression;;$.args[1].returnData;;$.algorithm.executionnumber");
+            put("preconditions", null);
+            put("executionnumber", "0");
+            put("datasourceid", "1");
+        }}, new ArrayList<>() {{
+            add(null);
+            add(new HashMap<>() {{
+                put("returnData", new ArrayList<>() {{
+                    add(new HashMap() {{
+                        put("id", 15);
+                    }});
+                }});
+            }});
+        }}, null, null);
+
+        System.out.println(map);
+
+    }
+    @Test
+    void processByAlgorithmByDB1() throws Exception {
+
+        Map<String, Object> map = DataProcessService.processByAlgorithm(new HashMap<>() {{
+            put("algorithmlibraryid", 1);
+            put("serviceid", 1);
+            put("algorithmtype", 3);
+            put("computingexpression", "log_error");
+            put("parameterset", "$.datasource.connectset;;$.algorithm.computingexpression;;$.args[1].returnData;;$.algorithm.executionnumber");
+            put("preconditions", null);
+            put("executionnumber", "1");
+            put("datasourceid", "1");
+        }}, new ArrayList<>() {{
+            add(null);
+            add(new HashMap<>() {{
+                put("returnData", new ArrayList<>() {{
+                    add(new HashMap() {{
+                        put("id", 17);
+                    }});
+                }});
+            }});
+        }}, null, null);
+
+        System.out.println(map);
+
+    }
+    @Test
+    void processByAlgorithmByDB2() throws Exception {
+
+        Map<String, Object> map = DataProcessService.processByAlgorithm(new HashMap<>() {{
+            put("algorithmlibraryid", 1);
+            put("serviceid", 1);
+            put("algorithmtype", 3);
+            put("computingexpression", "log_error");
+            put("parameterset", "$.datasource.connectset;;$.algorithm.computingexpression;;$.args[1].returnData;;$.algorithm.executionnumber");
+            put("preconditions", null);
+            put("executionnumber", "2");
+            put("datasourceid", "1");
+        }}, new ArrayList<>() {{
+            add(null);
+            add(new HashMap<>() {{
+                put("returnData", new ArrayList<>() {{
+                    add(new HashMap() {{
+                        put("value", new HashMap<String,Object>(){{
+                            put("serviceId",1001);
+                        }});
+                        put("filter", new HashMap<String,Object>(){{
+                            put("id",17);
+                        }});
+                    }});
+                }});
+            }});
+        }}, null, null);
+
+        System.out.println(DataFormatUtil.toString(map));
+
+    }
+    @Test
+    void processByAlgorithmByDB3() throws Exception {
+
+        Map<String, Object> map = DataProcessService.processByAlgorithm(new HashMap<>() {{
+            put("algorithmlibraryid", 1);
+            put("serviceid", 1);
+            put("algorithmtype", 3);
+            put("computingexpression", "log_error");
+            put("parameterset", "$.datasource.connectset;;$.algorithm.computingexpression;;$.args[1].returnData;;$.algorithm.executionnumber");
+            put("preconditions", null);
+            put("executionnumber", "3");
+            put("datasourceid", "1");
+        }}, new ArrayList<>() {{
+            add(null);
+            add(new HashMap<>() {{
+                put("returnData", new ArrayList<>() {{
+                    add(new HashMap() {{
+                        put("value", new HashMap<String,Object>(){{
+                            put("serviceId",1001);
+                        }});
+                        put("filter", new HashMap<String,Object>(){{
+                            put("id",17);
+                        }});
+                    }});
+                }});
+            }});
+        }}, null, null);
+
+        System.out.println(DataFormatUtil.toString(map));
+
+    }
+}