Эх сурвалжийг харах

1.报文解析丢失换行之前的内容
2.新增单一竞争模式的执行

liu 1 сар өмнө
parent
commit
554512d002

+ 0 - 2
db/service_info.sql

@@ -30,8 +30,6 @@ CREATE TABLE `service_info`  (
   `log_enable` int NULL DEFAULT NULL COMMENT '是否启用日志',
   `result_type` int NULL DEFAULT NULL COMMENT '返回结果类型',
   `run_state` int NULL DEFAULT NULL COMMENT '运行状态',
-  `heartbeat_time` datetime NULL DEFAULT NULL COMMENT '心跳时间',
-  `heartbeat_container_code` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '心跳容器',
   `last_time` datetime NULL DEFAULT NULL COMMENT '最后活跃时间',
   `last_container_code` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '最后活跃容器',
   `is_enabled` int NULL DEFAULT NULL COMMENT '是否启用',

BIN
db/unidia206.db


+ 183 - 18
src/main/java/com/bfkj/unidia/Core/ServiceController.java

@@ -4,6 +4,7 @@ import com.bfkj.unidia.Result;
 import com.bfkj.unidia.SystemEnvCache;
 import com.bfkj.unidia.DataBaseUtils.DbExecutor;
 import com.bfkj.unidia.IOUtils.HttpHelper;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -11,6 +12,8 @@ import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
 import org.springframework.stereotype.Service;
 
+import java.io.Serializable;
+import java.text.MessageFormat;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.*;
@@ -88,9 +91,10 @@ public class ServiceController {
             }
             // 获取当前容器的代码
             String currentContainer = systemEnvCache.get("container_code", String.class);
+
             // 如果是当前容器
             if (currentContainer.equals(containerCode)) {// 调用启动本地服务的方法
-                return controllerLocalService(serviceCode,type);
+                return controllerLocalService(serviceCode,type,currentContainer);
             } else {// 调用发送远程命令的方法来启动服务
                 return sendRemoteCommand(containerCode, serviceCode, type);
             }
@@ -121,6 +125,8 @@ public class ServiceController {
             String serviceCode = service.get("service_code").toString();
             // 控制单个服务的启动或停止
             Result<Integer> controlleredService = controllerService(containerCode, serviceCode,type);
+
+
             if(controlleredService.isSuccess()) {
                 // 如果操作成功,累加成功计数
                 successCount = successCount + controlleredService.getData();
@@ -183,7 +189,7 @@ public class ServiceController {
      * @param controllerType 服务类型,表示服务的启动方式或状态
      * @return 返回一个Result对象,包含操作结果如果操作失败,会包含失败信息
      */
-    private Result<Integer> controllerLocalService(String serviceCode,int controllerType) {
+    private Result<Integer> controllerLocalService(String serviceCode,int controllerType,String containerCode) {
         try {
             // 获取服务信息,如果获取失败或结果为空,则返回失败结果
             Result<List<Map<String, Object>>> serviceResult = getServiceInfo(serviceCode, 0,false);
@@ -193,7 +199,6 @@ public class ServiceController {
             // 解析服务信息,获取服务类型
             Map<String, Object> service = serviceResult.getData().get(0);
             Integer serviceType = (Integer) service.get("service_type");
-
             // 根据服务类型处理不同的启动逻辑
             if (serviceType == 0) { // WebApi接口类型,接收数据类型
                 // 更新数据库中的服务状态为运行中
@@ -210,7 +215,7 @@ public class ServiceController {
                 Integer operatingMode = (Integer) service.get("operating_mode");
                 String cronExpress = (String) service.get("cron_express");
                 // 启动定时任务
-                return controllerScheduledTask(serviceCode, operatingMode, cronExpress,serviceType,controllerType);
+                return controllerScheduledTask(serviceCode, operatingMode, cronExpress,serviceType,controllerType,service,containerCode);
             }
         } catch (Exception e) {
             // 捕获异常,返回失败结果,包含异常信息
@@ -219,17 +224,69 @@ public class ServiceController {
         }
     }
 
+    /**
+     * 黑白名单检测
+     * 在服务信息里配置:
+     * 1. 只有黑名单:  在此黑名单的容器都不可以运行
+     * 2: 只有白名单:  只有在白名单内的容器
+     * 3: 黑名单,白名单都没有: 所有容器都可以运行
+     * 4: 如果黑名单和白名单配置了同一个容器
+     * @param blackListStr
+     * @param whiteListStr
+     * @param containerCode
+     * @return
+     */
+    public Result<Integer> isBlackList(String  serviceCode,String blackListStr, String whiteListStr, String containerCode,String serviceName){
+        String message = "";
+        if (StringUtils.isNotBlank(blackListStr)){
+            String[] split = StringUtils.split(blackListStr, ","); // 获取黑名单
+            if (split!=null&&split.length>0){
+                  message = MessageFormat.format(
+                        "当前容器[{0}]被:[{1}({2})]服务设置为黑名单,不执行!",
+                        containerCode, serviceName, serviceCode
+                );
+                if (Arrays.asList(split).contains(containerCode)) return Result.fail( message); // 如果当前容器是此服务的黑名单,那么跳过不执行
+            }
+        }
+
+        if (StringUtils.isNotBlank(whiteListStr)){
+            String[] split = StringUtils.split(whiteListStr, ","); // 获取白名单
+            if (split!=null&&split.length>0){
+                message = MessageFormat.format(
+                        "当前容器[{0}]不在[{1}({2})]服务白名单之内,不执行!",
+                        containerCode, serviceName, serviceCode
+                );
+                if (!Arrays.asList(split).contains(containerCode)) return Result.fail(message); // 如果当前容器不是此服务的白名单,那么跳过不执行
+            }
+        }
+        return null;
+    }
+
     /**
      * 控制器调度任务方法
      * 根据类型启动或停止调度任务
      *
-     * @param serviceCode 服务代码,用于标识特定的服务
-     * @param operatingMode 操作模式,控制任务执行的方式
-     * @param cron_express cron表达式,定义任务执行的时间规则
+     * @param serviceCode    服务代码,用于标识特定的服务
+     * @param operatingMode  操作模式,控制任务执行的方式
+     * @param cron_express   cron表达式,定义任务执行的时间规则
      * @param controllerType 任务控制类型,1表示启动任务,非1表示停止任务
+     * @param serviceInfo 服务信息
+     * @param containerCode 容器代码
      * @return 返回一个包含整型结果的Result对象,表示操作的结果
      */
-    private Result<Integer> controllerScheduledTask(String serviceCode, int operatingMode,String cron_express,int serviceType,int controllerType) {
+    private Result<Integer> controllerScheduledTask(String serviceCode, int operatingMode, String cron_express,
+                                                    int serviceType, int controllerType, Map<String, Object> serviceInfo, String containerCode) {
+
+        // 判断黑白名单
+        Result<Integer> result = isBlackList(
+                serviceCode,
+                (String) serviceInfo.getOrDefault("black_list",""),
+                (String) serviceInfo.getOrDefault("white_list",""),
+                containerCode, String.valueOf(serviceInfo.get("service_name"))
+        );
+        if (Objects.nonNull( result)){
+            return result;
+        }
         // 根据类型判断是启动还是停止调度任务
         if(controllerType == 1){
             // 启动调度任务
@@ -250,6 +307,7 @@ public class ServiceController {
      */
     private Result<Integer> sendRemoteCommand(String containerCode, String serviceCode, int type) {
         try {
+
             // 构造请求URL,根据容器编号和命令类型生成目标地址
             String url = containerCode + "/openapi/services/" + (type == 0?"start":"stop") + "Service";
             // 构造请求体,包含服务编号和容器编号,用于验证和执行命令
@@ -283,25 +341,21 @@ public class ServiceController {
      */
     private Result<Integer> startScheduledTask(String serviceCode, int operatingMode,int serviceType,String cron_express) {
         try {
-            // 判断是否为单一竞争模式
-            if (operatingMode == 1) {// 单一竞争模式由定时器控制,此处不控制,直接返回成功状态
-                return Result.success(1);
-            }
             ScheduledFuture<?> future;
             // 根据不同的运行模式,选择相应的任务调度方式
             switch (serviceType) {
                 case 1 -> // 固定频率模式
                         future = taskScheduler.scheduleAtFixedRate(
-                                () -> executeServiceTask(serviceCode),
+                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
                                 Duration.ofMillis(Long.parseLong(cron_express)));
                 case 2 -> // 固定延迟模式
                         future = taskScheduler.scheduleWithFixedDelay(
-                                () -> executeServiceTask(serviceCode),
+                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
                                 Duration.ofMillis(Long.parseLong(cron_express))
                         );
                 case 3 -> // CRON表达式模式
                         future = taskScheduler.schedule(
-                                () -> executeServiceTask(serviceCode),
+                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
                                 new CronTrigger(cron_express));
                 default -> {
                     // 当运行模式不被支持时,返回错误信息
@@ -435,18 +489,33 @@ public class ServiceController {
      * 本方法负责调用数据服务来处理特定的服务代码它封装了对数据服务的调用逻辑,
      * 并且处理在执行过程中可能发生的异常
      *
-     * @param serviceCode 服务代码,标识了需要执行的具体服务任务
+     * @param serviceCode   服务代码,标识了需要执行的具体服务任务
+     * @param operatingMode 0单一竞争、1并发运行
+     * @param serviceType
      */
-    private void executeServiceTask(String serviceCode) {
+    private synchronized void executeServiceTask(String serviceCode, int operatingMode, int serviceType) {
         try {
             // 调用数据服务处理逻辑
             logger.info("执行服务任务:{}", serviceCode);
-            dataServices.process(Map.of("service_code", serviceCode));
+                if (operatingMode == 0){
+                    if (singleContentionHandling(serviceCode, operatingMode, serviceType)){
+                        logger.info("单一竞争成功!");
+                        dataServices.process(Map.of("service_code", serviceCode));
+                    }else {
+                        logger.info("单一竞争失败!");
+                    }
+                }else {
+                    // 非单一竞争模式那么,正常执行
+                  dataServices.process(Map.of("service_code", serviceCode));
+                }
         } catch (Exception e) {
             // 记录处理服务代码过程中发生的错误
             logger.error("执行服务任务{}失败",serviceCode, e);
         }
     }
+
+
+
     /**
      * 获取所有容器信息
      * 本方法通过数据库查询来获取所有启用且运行中的容器信息,并将这些信息以列表的形式返回
@@ -507,6 +576,11 @@ public class ServiceController {
                             serviceList.put("operating_mode",rs.getInt("operating_mode"));
                             serviceList.put("service_type",rs.getInt("service_type"));
                             serviceList.put("cron_express",rs.getString("cron_express"));
+                            serviceList.put("black_list",rs.getString("black_list"));
+                            serviceList.put("service_name",rs.getString("service_name"));
+                            serviceList.put("white_list",rs.getString("white_list"));
+                            serviceList.put("last_container_code",rs.getString("last_container_code"));
+                            serviceList.put("last_time",rs.getString("last_time"));
                             return serviceList;
                         }
                         catch (Exception e){
@@ -521,5 +595,96 @@ public class ServiceController {
             return Result.fail("获取服务信息失败: " + e.getMessage());
         }
     }
+
+
+    private boolean singleContentionHandling(String serviceCode, int operatingMode, int serviceType){
+        String containerCode = systemEnvCache.get("container_code", String.class); // 获取容器编码
+        Result<List<Map<String, Object>>> result = getServiceInfo(serviceCode, operatingMode, false);
+        if (result.isSuccess()&&result.getData()!=null&&result.getData().size()>0) {
+            Map<String, Object> map = result.getData().get(0);
+            Object time = map.get("last_time");
+            Long last_time = Objects.nonNull(time) ? Long.valueOf(time.toString()): null; // 上一次服务执行时间
+            Object last_container_code =  map.get("last_container_code");
+            String lastContainerCode = last_container_code != null ? (String) last_container_code : null; // 上一次服务执行时间
+
+            if (StringUtils.isBlank(lastContainerCode)){
+                // 执行抢占
+               int num = preemptive(containerCode,serviceCode,last_time,lastContainerCode);
+               if (num>0){
+                   return true;
+               }
+            }
+//            否则如果当前时间减最后活跃时间超过8秒
+            if (Objects.nonNull(last_time)&& new Date().getTime()/1000 - last_time/1000 > 8) {
+                // 执行抢占
+                int num = preemptive(containerCode, serviceCode, last_time, lastContainerCode);
+                if (num > 0) {
+                    return true;
+                }
+            }
+        }
+
+
+
+        return false;
+    }
+
+    /**
+     * 执行抢占
+     *
+     * @param containerCode
+     * @param serviceCode
+     * @param last_time
+     * @param lastContainerCode
+     * @return
+     */
+    private int preemptive(String containerCode, String serviceCode, Long last_time, String lastContainerCode) {
+
+//        使用update service_info set last_container_code = 当前容器编号,last_time = 当前时间
+// where 服务代码 = serviceCode and last_container_code = 最后活跃容器编号 and last_time = 查询出的最后活跃时间
+            List<Map<String, ?>> conditions = null;
+            Map<String, Object> conditionsMap =  null;
+            if (lastContainerCode==null){
+                HashMap<String, Object> map = new HashMap<>();
+                map.put("left", "(");
+                map.put("value", null);
+                map.put("column", "last_container_code");
+                map.put("comparison", "IS NULL");
+                map.put("connection", " AND");
+                map.put("right", "");
+                conditions =List.of(
+                        map,
+                        Map.of(
+                                "left", "",
+                                "column", "service_code",
+                                "comparison", "=",
+                                "value", serviceCode,
+                                "connection", "AND",
+                                "right", ")"
+                        )
+                );
+            }else {
+                conditionsMap = Map.of(
+                        "last_time", last_time,
+                        "last_container_code", lastContainerCode,
+                        "service_code", serviceCode
+                );
+            }
+//        lastContainerCode == null  ? conditions : conditionsMap
+
+        Result<Integer> result = dbExecutor.dbUpdate(systemEnvCache.getDbInfo(), serviceTableName,
+                Map.of("dataContent", Map.of(
+                        "data", Map.of(
+                                "last_time", new Date().getTime(),
+                                "last_container_code", containerCode
+                        ),
+                        "conditions", lastContainerCode == null  ? conditions : conditionsMap
+                ), "event", "UPDATE")
+        );
+        if (result.isSuccess()&&result.getData()!=null){
+            return result.getData();
+        }
+        return 0;
+    }
 }
 

+ 14 - 2
src/main/java/com/bfkj/unidia/DataBaseUtils/DbParamsService.java

@@ -293,12 +293,24 @@ public class DbParamsService {
                 queryCriteria.append(conditionMap.containsKey("right") ? conditionMap.get("right") : "");
             }
         }
+        String string = queryCriteria.toString().trim();
+        replaceAll(queryCriteria, "AND )", ")");
+        replaceAll(queryCriteria, "OR )", ")");;
         // 清理末尾连接符
-        if (queryCriteria.length() > 0 && (queryCriteria.toString().endsWith("AND") ||
-                queryCriteria.toString().endsWith("OR"))) {
+        if (queryCriteria.length() > 0 && (string.endsWith("AND") ||
+                string.endsWith("OR"))) {
             queryCriteria.delete(queryCriteria.length() - 4, queryCriteria.length());
         }
     }
+    public static void replaceAll(StringBuilder builder, String from, String to) {
+        int index = builder.indexOf(from);
+        while (index != -1) {
+            builder.replace(index, index + from.length(), to);
+            index = builder.indexOf(from, index + to.length());
+        }
+    }
+
+
     //行数据结构更新字段数据ConcurrentHashMap、查询条件String、查询参数Object[]
     private record RowParams(Map<String, Object> updateValue,StringBuilder queryCriteria,List<Object> queryParam) {}
     /**

+ 32 - 3
src/main/java/com/bfkj/unidia/DataUtils/DataFormatConverter.java

@@ -111,10 +111,10 @@ public class DataFormatConverter {
         Map<String, Object> result = new HashMap<>();
         Deque<String> path = new ArrayDeque<>();
         Deque<Map<String, Object>> mapStack = new ArrayDeque<>();
-
+        StringBuffer buffer  = new StringBuffer();
         while (eventReader.hasNext()) {
+            if (buffer==null) buffer  = new StringBuffer();
             XMLEvent event = eventReader.nextEvent();
-
             if (event.isStartElement()) {
                 StartElement startElement = event.asStartElement();
                 String qualifiedName = startElement.getName().getLocalPart();
@@ -139,16 +139,25 @@ public class DataFormatConverter {
                 Characters characters = event.asCharacters();
                 if (!characters.isWhiteSpace()) {
                     String text = characters.getData();
+                    if (text.equals(".J/G/PEKPORT//30JUN/205440L///XI/CL")){
+                        System.out.println(text);
+                    }
                     if (!mapStack.isEmpty() && mapStack.peek() != null) {
                         if (characters.isCData()) {
                             mapStack.peek().put("#cdata", text);
                         } else {
-                            mapStack.peek().put("#text", text);
+                            buffer.append(text);
+                            mapStack.peek().put("#text", buffer.toString());
                         }
                     }
                 }
             }
             else if (event.isEndElement()) {
+                if (buffer!=null){
+                    System.out.println("------------------------------buffer");
+                    System.out.println(buffer.toString());
+                }
+                buffer = null;
                 if (!mapStack.isEmpty()) {
                     Map<String, Object> current = mapStack.pop();
                     String key = path.pop();
@@ -164,6 +173,26 @@ public class DataFormatConverter {
         return mapToXml(map, "UTF-8", "1.0");
     }
 
+    /**
+     * 集合xml批量转json字符串
+     * @param content
+     * @return
+     * @throws XMLStreamException
+     */
+    public static List<String> xmlsToMap(List<String> content) throws XMLStreamException {
+        List<String> maps = new ArrayList<>();
+        if (content!=null&&content.size()>0){
+            for (String str : content) {
+                System.out.println("接收到需要转换Map的数据");
+
+                Map<String, Object> map = xmlToMap(str);
+                String jsonStr = mapToJson(map);
+                maps.add(jsonStr);
+            }
+        }
+        return maps;
+    }
+
     public static String mapToXml(Map<String, Object> map, String encoding, String version) {
         StringWriter writer = new StringWriter();
         try {