Эх сурвалжийг харах

完善了并发和单一竞争服务的执行

liu 1 сар өмнө
parent
commit
72ba8974d7

+ 11 - 0
src/main/java/com/bfkj/unidia/Core/DataServices.java

@@ -5,6 +5,8 @@ import com.bfkj.unidia.DataBaseUtils.DbExecutor;
 import com.bfkj.unidia.DataUtils.DataFormatConverter;
 import com.bfkj.unidia.DataUtils.ScriptExecutor;
 import com.bfkj.unidia.Result;
+import com.fasterxml.jackson.databind.JsonSerializable;
+import com.fasterxml.jackson.databind.util.JSONPObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -271,6 +273,9 @@ public class DataServices {
 
         // 获取算法代码
         String algorithmCode = parts[0];
+        if (algorithmCode.equals("5")){
+            logger.info("---------------------------");
+        }
         // 构造键路径,从路径中排除算法代码部分
         String keyPath = String.join(".", Arrays.asList(parts).subList(1, parts.length));
 
@@ -372,6 +377,11 @@ public class DataServices {
         try {
             // 检查是否提供了Java类和方法名,以决定使用Java反射还是JS脚本执行
             if (config.className() != null && config.methodName() != null) {
+
+                if (config.algorithmCode.equals("4")||config.algorithmCode.equals("5")){
+                    System.out.println("********************");
+                }
+
                 // 准备参数值数组,用于Java反射调用
                 Object[] paramValues = new Object[inputParams.size()];
                 int i = 0;
@@ -404,6 +414,7 @@ public class DataServices {
                 return Result.success(Collections.singletonMap("returnData", jsResult.getData()));
             }
         } catch (Exception e) {
+            logger.error("算法执行异常:1,{}2,{}3,{}", config.algorithmCode,config.methodName,config.inputDataTemplate);
             return Result.fail("算法执行异常: " + e.getMessage());
         }
     }

+ 22 - 16
src/main/java/com/bfkj/unidia/Core/ServiceController.java

@@ -108,6 +108,7 @@ public class ServiceController {
      * 控制所有服务的启动或停止
      *
      * @param containerCode 容器代码,标识一组服务
+     *                      --0单一竞争、1并发运行
      * @param type 操作类型,1表示启动服务,其他值表示停止服务
      * @return 返回操作成功的服务数量
      */
@@ -187,6 +188,7 @@ public class ServiceController {
      *
      * @param serviceCode 服务代码,用于识别特定的服务
      * @param controllerType 服务类型,表示服务的启动方式或状态
+     * operating_mode	int				运行模式--1单一竞争、0并发运行
      * @return 返回一个Result对象,包含操作结果如果操作失败,会包含失败信息
      */
     private Result<Integer> controllerLocalService(String serviceCode,int controllerType,String containerCode) {
@@ -202,13 +204,13 @@ public class ServiceController {
             // 根据服务类型处理不同的启动逻辑
             if (serviceType == 0) { // WebApi接口类型,接收数据类型
                 // 更新数据库中的服务状态为运行中
-                return dbExecutor.update(
+                return dbExecutor.dbUpdate(
                         systemEnvCache.getDbInfo(),
                         serviceTableName,
                         Map.of("dataContent", Map.of(
                                 "data", Map.of("run_state", controllerType),
                                 "conditions", Map.of("service_code", serviceCode)
-                        ))
+                        ), "event", "UPDATE")
                 );
             } else { // 非WebAPI接收数据类型
                 // 获取服务的运行模式和cron表达式
@@ -346,16 +348,16 @@ public class ServiceController {
             switch (serviceType) {
                 case 1 -> // 固定频率模式
                         future = taskScheduler.scheduleAtFixedRate(
-                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
+                                () -> executeServiceTask(serviceCode,operatingMode),
                                 Duration.ofMillis(Long.parseLong(cron_express)));
                 case 2 -> // 固定延迟模式
                         future = taskScheduler.scheduleWithFixedDelay(
-                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
+                                () -> executeServiceTask(serviceCode,operatingMode),
                                 Duration.ofMillis(Long.parseLong(cron_express))
                         );
                 case 3 -> // CRON表达式模式
                         future = taskScheduler.schedule(
-                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
+                                () -> executeServiceTask(serviceCode,operatingMode),
                                 new CronTrigger(cron_express));
                 default -> {
                     // 当运行模式不被支持时,返回错误信息
@@ -410,12 +412,17 @@ public class ServiceController {
                     for (Map<String, Object> service : result.getData()) {
                         // 检查服务的运行模式,只有单一竞争模式的服务才被处理
                         Integer mode = (Integer) service.get("operating_mode");
-                        if (mode != 0) continue;
+                        if (mode != 1) continue;
 
                         String serviceCode = (String) service.get("service_code");
                         logger.info("单一竞争模式服务处理:{}",serviceCode);
-
-                        // 获取服务的心跳时间和服务所在的容器代码
+                        Integer operatingMode = (Integer) service.get("operating_mode");
+                        String cronExpress = (String) service.get("cron_express");
+                        Integer serviceType = (Integer) service.get("service_type");
+                        String currentContainer = systemEnvCache.get("container_code", String.class);
+                        controllerScheduledTask(serviceCode, operatingMode,cronExpress,serviceType,1,service, currentContainer);
+                        startService(systemEnvCache.get("container_code", String.class),serviceCode);
+/*                        // 获取服务的心跳时间和服务所在的容器代码
                         LocalDateTime heartbeatTime = (LocalDateTime) service.get("heartbeat_time");
                         String heartbeat_container_code = (String) service.get("heartbeat_container_code");
 
@@ -433,7 +440,7 @@ public class ServiceController {
                                     startService(systemEnvCache.get("container_code", String.class),serviceCode);
                                 }
                             }
-                        }
+                        }*/
                     }
                 }
             } catch (Exception e) {
@@ -491,14 +498,13 @@ public class ServiceController {
      *
      * @param serviceCode   服务代码,标识了需要执行的具体服务任务
      * @param operatingMode 0单一竞争、1并发运行
-     * @param serviceType
      */
-    private synchronized void executeServiceTask(String serviceCode, int operatingMode, int serviceType) {
+    private synchronized void executeServiceTask(String serviceCode, int operatingMode) {
         try {
             // 调用数据服务处理逻辑
             logger.info("执行服务任务:{}", serviceCode);
-                if (operatingMode == 0){
-                    if (singleContentionHandling(serviceCode, operatingMode, serviceType)){
+                if (operatingMode == 1){
+                    if (singleContentionHandling(serviceCode, operatingMode)){
                         logger.info("单一竞争成功!");
                         dataServices.process(Map.of("service_code", serviceCode));
                     }else {
@@ -552,7 +558,7 @@ public class ServiceController {
     }
     /**
      * 获取服务信息
-     *
+     * operatingMode 运行模式--1单一竞争、0并发运行
      * @param serviceCode 服务代码,如果为null,则获取所有启用的服务信息
      * @param useCache 是否使用缓存,true表示使用缓存,false表示不使用缓存
      * @return 返回服务信息的结果对象,包含服务代码、运行模式、服务类型和cron表达式等信息
@@ -597,7 +603,7 @@ public class ServiceController {
     }
 
 
-    private boolean singleContentionHandling(String serviceCode, int operatingMode, int serviceType){
+    private boolean singleContentionHandling(String serviceCode, int operatingMode){
         String containerCode = systemEnvCache.get("container_code", String.class); // 获取容器编码
         Result<List<Map<String, Object>>> result = getServiceInfo(serviceCode, operatingMode, false);
         if (result.isSuccess()&&result.getData()!=null&&result.getData().size()>0) {
@@ -615,7 +621,7 @@ public class ServiceController {
                }
             }
 //            否则如果当前时间减最后活跃时间超过8秒
-            if (Objects.nonNull(last_time)&& new Date().getTime()/1000 - last_time/1000 > 8) {
+            if ((Objects.nonNull(last_time)&& new Date().getTime()/1000 - last_time/1000 > 8)||lastContainerCode.equals(containerCode)) {
                 // 执行抢占
                 int num = preemptive(containerCode, serviceCode, last_time, lastContainerCode);
                 if (num > 0) {

+ 0 - 1
src/main/java/com/bfkj/unidia/DataBaseUtils/TableStructureManager.java

@@ -63,7 +63,6 @@ public class TableStructureManager {
     private static final Set<String> GOLDEN_DB_TYPES = Set.of("goldendb");
     /**
      * 获取表结构信息(线程安全、高性能)
-     *
      * @param dbConfig 数据库配置
      * @param tableName 表名
      * @return 表结构结果

+ 17 - 4
src/main/java/com/bfkj/unidia/IOUtils/KafkaHelper.java

@@ -19,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import com.bfkj.unidia.Result;
 import com.bfkj.unidia.DataUtils.DataFormatConverter;
 import org.springframework.stereotype.Service;
+import org.springframework.util.CollectionUtils;
 
 import javax.annotation.PreDestroy;
 
@@ -43,6 +44,8 @@ public class KafkaHelper {
      * @return            操作结果封装对象,成功时返回发送的消息数量,失败时包含错误信息
      */
     public static Result<List<?>> send(Map<String, String> kafkaConfig, String topic, Object data) {
+        logger.info("Kafka消费主题:{}",topic);
+        logger.info("配置:{}",DataFormatConverter.mapToJson(kafkaConfig));
         //判断消息是否为List类型,直接处理批量消息场景 非批量消息统一转换为单元素列表进行发送处理
         return batchSendMessage(kafkaConfig, topic,
                 data instanceof List<?> dataList ? dataList : Collections.singletonList(data));
@@ -62,6 +65,13 @@ public class KafkaHelper {
                                              List<?> dataList) {
         String cacheKey = null;
         Producer<String, String> producer = null;
+
+        if (CollectionUtils.isEmpty(dataList)||dataList.size()==0){
+            logger.warn("主题:{} kafka集合数据为空",topic);
+        }else {
+            logger.info("主题:{} 发送数据大小:{}",topic,dataList.size());
+        }
+
         try {
             // 校验基础参数有效性
             Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig,"bfkj", topic, dataList);
@@ -110,6 +120,8 @@ public class KafkaHelper {
                     Thread.currentThread().interrupt();
                     producer.close(); // 主动关闭生产者释放资源
                     return Result.fail("批量发送超时超过30秒");
+                }else {
+                    logger.info("主题{},Kafka批量发送成功,成功发送消息数量:{}",topic,successCount.get());
                 }
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
@@ -229,6 +241,8 @@ public class KafkaHelper {
      *     - 失败状态:包含具体错误原因(参数校验失败/连接异常/消费异常等)
      */
     public static Result<List<String>> consume(Map<String, String> kafkaConfig, String topic, String groupId, Integer maxRecords) {
+        logger.info("Kafka消费主题:{}", topic);
+
         // 参数校验
         Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig, groupId,topic, "data");
         if(!checkBaseParamResult.isSuccess()){
@@ -240,6 +254,7 @@ public class KafkaHelper {
         if(!consumerResult.isSuccess()){
             return Result.fail(consumerResult.getError());
         }
+        logger.info("主题:{}成功获取Kafka消费者:{}",topic,cacheKey);
         Consumer<String, String> consumer = consumerResult.getData();
         try {
             consumer.subscribe(Collections.singletonList(topic));
@@ -249,8 +264,9 @@ public class KafkaHelper {
             List<String> messages = new ArrayList<>();
             for (ConsumerRecord<String, String> record : records) {
                 messages.add(record.value());
-                logger.info("Kafka消费数据: {}", record.value());
+//                logger.info("Kafka消费数据: {}", record.value());
             }
+            if (messages.size()>0) logger.info("主题:{},Kafka消费数量:{}",topic,messages.size());
 
             if (!messages.isEmpty()) {
                 consumer.commitSync(); // 同步提交偏移量
@@ -260,9 +276,6 @@ public class KafkaHelper {
                     return Result.fail("Kafka消费者{".concat(cacheKey).concat("}异常"));
                 }
             }
-            logger.info("消费数量:");
-            logger.info(String.valueOf(messages.size()));
-            if (messages.size()>0) logger.info(messages.get(0));
             return Result.success(messages);
         }catch (Exception e) {
             closeConsumer(cacheKey,consumer);

+ 2 - 2
src/main/resources/application.properties

@@ -25,8 +25,8 @@ app.directories.temp=tmp
 #spring.datasource.password=system
 
 # SQLite ?????
-#spring.datasource.url=jdbc:sqlite:file:D:\\UNIDIA206\\UNIDIA\\UNIDIA\\db\\unidia206.db   # ????????????????????
-spring.datasource.url=jdbc:sqlite:file:G:\\WorkSpace\\UNIDIA\\db\\unidia206.db   # ????????????????????
+#spring.datasource.url=jdbc:sqlite:file:G:\\WorkSpace\\UNIDIA\\db\\unidia206.db   # ????????????????????
+spring.datasource.url=jdbc:sqlite:file:G:\\sqlliteDB\\localDb\\unidia206.db   # ????????????????????
 #spring.datasource.url=jdbc:sqlite:/app/service/db/unidia206.db
 spring.datasource.username=
 spring.datasource.password=