Эх сурвалжийг харах

1.IOUtils 增强批量部分成功的情况会返回失败列表
2.DataBaseUtils 增强批量insert update补充返回失败列表的方法
3.其他缓存以及注解代码优化

mother_fuck 1 сар өмнө
parent
commit
e9ac94a497

+ 0 - 2
db/service_info.sql

@@ -30,8 +30,6 @@ CREATE TABLE `service_info`  (
   `log_enable` int NULL DEFAULT NULL COMMENT '是否启用日志',
   `result_type` int NULL DEFAULT NULL COMMENT '返回结果类型',
   `run_state` int NULL DEFAULT NULL COMMENT '运行状态',
-  `heartbeat_time` datetime NULL DEFAULT NULL COMMENT '心跳时间',
-  `heartbeat_container_code` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '心跳容器',
   `last_time` datetime NULL DEFAULT NULL COMMENT '最后活跃时间',
   `last_container_code` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NULL DEFAULT NULL COMMENT '最后活跃容器',
   `is_enabled` int NULL DEFAULT NULL COMMENT '是否启用',

+ 183 - 18
src/main/java/com/bfkj/unidia/Core/ServiceController.java

@@ -4,6 +4,7 @@ import com.bfkj.unidia.Result;
 import com.bfkj.unidia.SystemEnvCache;
 import com.bfkj.unidia.DataBaseUtils.DbExecutor;
 import com.bfkj.unidia.IOUtils.HttpHelper;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -11,6 +12,8 @@ import org.springframework.scheduling.TaskScheduler;
 import org.springframework.scheduling.support.CronTrigger;
 import org.springframework.stereotype.Service;
 
+import java.io.Serializable;
+import java.text.MessageFormat;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.*;
@@ -88,9 +91,10 @@ public class ServiceController {
             }
             // 获取当前容器的代码
             String currentContainer = systemEnvCache.get("container_code", String.class);
+
             // 如果是当前容器
             if (currentContainer.equals(containerCode)) {// 调用启动本地服务的方法
-                return controllerLocalService(serviceCode,type);
+                return controllerLocalService(serviceCode,type,currentContainer);
             } else {// 调用发送远程命令的方法来启动服务
                 return sendRemoteCommand(containerCode, serviceCode, type);
             }
@@ -121,6 +125,8 @@ public class ServiceController {
             String serviceCode = service.get("service_code").toString();
             // 控制单个服务的启动或停止
             Result<Integer> controlleredService = controllerService(containerCode, serviceCode,type);
+
+
             if(controlleredService.isSuccess()) {
                 // 如果操作成功,累加成功计数
                 successCount = successCount + controlleredService.getData();
@@ -183,7 +189,7 @@ public class ServiceController {
      * @param controllerType 服务类型,表示服务的启动方式或状态
      * @return 返回一个Result对象,包含操作结果如果操作失败,会包含失败信息
      */
-    private Result<Integer> controllerLocalService(String serviceCode,int controllerType) {
+    private Result<Integer> controllerLocalService(String serviceCode,int controllerType,String containerCode) {
         try {
             // 获取服务信息,如果获取失败或结果为空,则返回失败结果
             Result<List<Map<String, Object>>> serviceResult = getServiceInfo(serviceCode, 0,false);
@@ -193,7 +199,6 @@ public class ServiceController {
             // 解析服务信息,获取服务类型
             Map<String, Object> service = serviceResult.getData().get(0);
             Integer serviceType = (Integer) service.get("service_type");
-
             // 根据服务类型处理不同的启动逻辑
             if (serviceType == 0) { // WebApi接口类型,接收数据类型
                 // 更新数据库中的服务状态为运行中
@@ -210,7 +215,7 @@ public class ServiceController {
                 Integer operatingMode = (Integer) service.get("operating_mode");
                 String cronExpress = (String) service.get("cron_express");
                 // 启动定时任务
-                return controllerScheduledTask(serviceCode, operatingMode, cronExpress,serviceType,controllerType);
+                return controllerScheduledTask(serviceCode, operatingMode, cronExpress,serviceType,controllerType,service,containerCode);
             }
         } catch (Exception e) {
             // 捕获异常,返回失败结果,包含异常信息
@@ -219,17 +224,69 @@ public class ServiceController {
         }
     }
 
+    /**
+     * 黑白名单检测
+     * 在服务信息里配置:
+     * 1. 只有黑名单:  在此黑名单的容器都不可以运行
+     * 2: 只有白名单:  只有在白名单内的容器
+     * 3: 黑名单,白名单都没有: 所有容器都可以运行
+     * 4: 如果黑名单和白名单配置了同一个容器
+     * @param blackListStr
+     * @param whiteListStr
+     * @param containerCode
+     * @return
+     */
+    public Result<Integer> isBlackList(String  serviceCode,String blackListStr, String whiteListStr, String containerCode,String serviceName){
+        String message = "";
+        if (StringUtils.isNotBlank(blackListStr)){
+            String[] split = StringUtils.split(blackListStr, ","); // 获取黑名单
+            if (split!=null&&split.length>0){
+                  message = MessageFormat.format(
+                        "当前容器[{0}]被:[{1}({2})]服务设置为黑名单,不执行!",
+                        containerCode, serviceName, serviceCode
+                );
+                if (Arrays.asList(split).contains(containerCode)) return Result.fail( message); // 如果当前容器是此服务的黑名单,那么跳过不执行
+            }
+        }
+
+        if (StringUtils.isNotBlank(whiteListStr)){
+            String[] split = StringUtils.split(whiteListStr, ","); // 获取白名单
+            if (split!=null&&split.length>0){
+                message = MessageFormat.format(
+                        "当前容器[{0}]不在[{1}({2})]服务白名单之内,不执行!",
+                        containerCode, serviceName, serviceCode
+                );
+                if (!Arrays.asList(split).contains(containerCode)) return Result.fail(message); // 如果当前容器不是此服务的白名单,那么跳过不执行
+            }
+        }
+        return null;
+    }
+
     /**
      * 控制器调度任务方法
      * 根据类型启动或停止调度任务
      *
-     * @param serviceCode 服务代码,用于标识特定的服务
-     * @param operatingMode 操作模式,控制任务执行的方式
-     * @param cron_express cron表达式,定义任务执行的时间规则
+     * @param serviceCode    服务代码,用于标识特定的服务
+     * @param operatingMode  操作模式,控制任务执行的方式
+     * @param cron_express   cron表达式,定义任务执行的时间规则
      * @param controllerType 任务控制类型,1表示启动任务,非1表示停止任务
+     * @param serviceInfo 服务信息
+     * @param containerCode 容器代码
      * @return 返回一个包含整型结果的Result对象,表示操作的结果
      */
-    private Result<Integer> controllerScheduledTask(String serviceCode, int operatingMode,String cron_express,int serviceType,int controllerType) {
+    private Result<Integer> controllerScheduledTask(String serviceCode, int operatingMode, String cron_express,
+                                                    int serviceType, int controllerType, Map<String, Object> serviceInfo, String containerCode) {
+
+        // 判断黑白名单
+        Result<Integer> result = isBlackList(
+                serviceCode,
+                (String) serviceInfo.getOrDefault("black_list",""),
+                (String) serviceInfo.getOrDefault("white_list",""),
+                containerCode, String.valueOf(serviceInfo.get("service_name"))
+        );
+        if (Objects.nonNull( result)){
+            return result;
+        }
         // 根据类型判断是启动还是停止调度任务
         if(controllerType == 1){
             // 启动调度任务
@@ -250,6 +307,7 @@ public class ServiceController {
      */
     private Result<Integer> sendRemoteCommand(String containerCode, String serviceCode, int type) {
         try {
+
             // 构造请求URL,根据容器编号和命令类型生成目标地址
             String url = containerCode + "/openapi/services/" + (type == 0?"start":"stop") + "Service";
             // 构造请求体,包含服务编号和容器编号,用于验证和执行命令
@@ -283,25 +341,21 @@ public class ServiceController {
      */
     private Result<Integer> startScheduledTask(String serviceCode, int operatingMode,int serviceType,String cron_express) {
         try {
-            // 判断是否为单一竞争模式
-            if (operatingMode == 1) {// 单一竞争模式由定时器控制,此处不控制,直接返回成功状态
-                return Result.success(1);
-            }
             ScheduledFuture<?> future;
             // 根据不同的运行模式,选择相应的任务调度方式
             switch (serviceType) {
                 case 1 -> // 固定频率模式
                         future = taskScheduler.scheduleAtFixedRate(
-                                () -> executeServiceTask(serviceCode),
+                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
                                 Duration.ofMillis(Long.parseLong(cron_express)));
                 case 2 -> // 固定延迟模式
                         future = taskScheduler.scheduleWithFixedDelay(
-                                () -> executeServiceTask(serviceCode),
+                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
                                 Duration.ofMillis(Long.parseLong(cron_express))
                         );
                 case 3 -> // CRON表达式模式
                         future = taskScheduler.schedule(
-                                () -> executeServiceTask(serviceCode),
+                                () -> executeServiceTask(serviceCode,operatingMode,serviceType),
                                 new CronTrigger(cron_express));
                 default -> {
                     // 当运行模式不被支持时,返回错误信息
@@ -435,18 +489,33 @@ public class ServiceController {
      * 本方法负责调用数据服务来处理特定的服务代码它封装了对数据服务的调用逻辑,
      * 并且处理在执行过程中可能发生的异常
      *
-     * @param serviceCode 服务代码,标识了需要执行的具体服务任务
+     * @param serviceCode   服务代码,标识了需要执行的具体服务任务
+     * @param operatingMode 0单一竞争、1并发运行
+     * @param serviceType
      */
-    private void executeServiceTask(String serviceCode) {
+    private synchronized void executeServiceTask(String serviceCode, int operatingMode, int serviceType) {
         try {
             // 调用数据服务处理逻辑
             logger.info("执行服务任务:{}", serviceCode);
-            dataServices.process(Map.of("service_code", serviceCode));
+                if (operatingMode == 0){
+                    if (singleContentionHandling(serviceCode, operatingMode, serviceType)){
+                        logger.info("单一竞争成功!");
+                        dataServices.process(Map.of("service_code", serviceCode));
+                    }else {
+                        logger.info("单一竞争失败!");
+                    }
+                }else {
+                    // 非单一竞争模式那么,正常执行
+                  dataServices.process(Map.of("service_code", serviceCode));
+                }
         } catch (Exception e) {
             // 记录处理服务代码过程中发生的错误
             logger.error("执行服务任务{}失败",serviceCode, e);
         }
     }
+
+
+
     /**
      * 获取所有容器信息
      * 本方法通过数据库查询来获取所有启用且运行中的容器信息,并将这些信息以列表的形式返回
@@ -507,6 +576,11 @@ public class ServiceController {
                             serviceList.put("operating_mode",rs.getInt("operating_mode"));
                             serviceList.put("service_type",rs.getInt("service_type"));
                             serviceList.put("cron_express",rs.getString("cron_express"));
+                            serviceList.put("black_list",rs.getString("black_list"));
+                            serviceList.put("service_name",rs.getString("service_name"));
+                            serviceList.put("white_list",rs.getString("white_list"));
+                            serviceList.put("last_container_code",rs.getString("last_container_code"));
+                            serviceList.put("last_time",rs.getString("last_time"));
                             return serviceList;
                         }
                         catch (Exception e){
@@ -521,5 +595,96 @@ public class ServiceController {
             return Result.fail("获取服务信息失败: " + e.getMessage());
         }
     }
+
+
+    private boolean singleContentionHandling(String serviceCode, int operatingMode, int serviceType){
+        String containerCode = systemEnvCache.get("container_code", String.class); // 获取容器编码
+        Result<List<Map<String, Object>>> result = getServiceInfo(serviceCode, operatingMode, false);
+        if (result.isSuccess()&&result.getData()!=null&&result.getData().size()>0) {
+            Map<String, Object> map = result.getData().get(0);
+            Object time = map.get("last_time");
+            Long last_time = Objects.nonNull(time) ? Long.valueOf(time.toString()): null; // 上一次服务执行时间
+            Object last_container_code =  map.get("last_container_code");
+            String lastContainerCode = last_container_code != null ? (String) last_container_code : null; // 上一次服务执行时间
+
+            if (StringUtils.isBlank(lastContainerCode)){
+                // 执行抢占
+               int num = preemptive(containerCode,serviceCode,last_time,lastContainerCode);
+               if (num>0){
+                   return true;
+               }
+            }
+//            否则如果当前时间减最后活跃时间超过8秒
+            if (Objects.nonNull(last_time)&& new Date().getTime()/1000 - last_time/1000 > 8) {
+                // 执行抢占
+                int num = preemptive(containerCode, serviceCode, last_time, lastContainerCode);
+                if (num > 0) {
+                    return true;
+                }
+            }
+        }
+
+
+
+        return false;
+    }
+
+    /**
+     * 执行抢占
+     *
+     * @param containerCode
+     * @param serviceCode
+     * @param last_time
+     * @param lastContainerCode
+     * @return
+     */
+    private int preemptive(String containerCode, String serviceCode, Long last_time, String lastContainerCode) {
+
+//        使用update service_info set last_container_code = 当前容器编号,last_time = 当前时间
+// where 服务代码 = serviceCode and last_container_code = 最后活跃容器编号 and last_time = 查询出的最后活跃时间
+            List<Map<String, ?>> conditions = null;
+            Map<String, Object> conditionsMap =  null;
+            if (lastContainerCode==null){
+                HashMap<String, Object> map = new HashMap<>();
+                map.put("left", "(");
+                map.put("value", null);
+                map.put("column", "last_container_code");
+                map.put("comparison", "IS NULL");
+                map.put("connection", " AND");
+                map.put("right", "");
+                conditions =List.of(
+                        map,
+                        Map.of(
+                                "left", "",
+                                "column", "service_code",
+                                "comparison", "=",
+                                "value", serviceCode,
+                                "connection", "AND",
+                                "right", ")"
+                        )
+                );
+            }else {
+                conditionsMap = Map.of(
+                        "last_time", last_time,
+                        "last_container_code", lastContainerCode,
+                        "service_code", serviceCode
+                );
+            }
+//        lastContainerCode == null  ? conditions : conditionsMap
+
+        Result<Integer> result = dbExecutor.dbUpdate(systemEnvCache.getDbInfo(), serviceTableName,
+                Map.of("dataContent", Map.of(
+                        "data", Map.of(
+                                "last_time", new Date().getTime(),
+                                "last_container_code", containerCode
+                        ),
+                        "conditions", lastContainerCode == null  ? conditions : conditionsMap
+                ), "event", "UPDATE")
+        );
+        if (result.isSuccess()&&result.getData()!=null){
+            return result.getData();
+        }
+        return 0;
+    }
 }
 

+ 6 - 10
src/main/java/com/bfkj/unidia/DataBaseUtils/ConnectionPoolManager.java

@@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import javax.annotation.PreDestroy;
 import java.io.File;
@@ -20,24 +21,19 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
 /**
  * 高性能连接池管理器(支持 SQLite 无需用户名密码)
  */
-@Component
+@Service
 public class ConnectionPoolManager {
     private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolManager.class);
     // 数据源名称生成器
     private final AtomicInteger dataSourceCounter = new AtomicInteger(1);
     // 连接池缓存:数据库配置标识符 -> 数据源
-    private final Cache<String, HikariDataSource> dataSourceCache = Caffeine.newBuilder()
-            .maximumSize(100)
-            .expireAfterAccess(30, TimeUnit.MINUTES)
-            .removalListener((key, value, cause) -> closeDataSource((String) key, (HikariDataSource) value))
-            .build();
-    private final Cache<String, JdbcTemplate> jdbcTemplateCache = Caffeine.newBuilder()
-            .maximumSize(100)
-            .expireAfterAccess(30, TimeUnit.MINUTES)
-            .build();
+    private final Cache<String, HikariDataSource> dataSourceCache = buildCaffeineCache();
+    private final Cache<String, JdbcTemplate> jdbcTemplateCache = buildCaffeineCache();
 
     //系统环境缓存
     private final SystemEnvCache envCache;

+ 158 - 8
src/main/java/com/bfkj/unidia/DataBaseUtils/DbExecutor.java

@@ -14,6 +14,8 @@ import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
 @Service
 public class DbExecutor {
     private static final Logger logger = LoggerFactory.getLogger(DbExecutor.class);
@@ -32,10 +34,7 @@ public class DbExecutor {
         this.dbUtils = dbUtils;
         this.dbParamsService = dbParamsService;
     }
-    private final Cache<String, List<?>> queryCache = Caffeine.newBuilder()
-            .maximumSize(1000) // 设置最大缓存条目数
-            .expireAfterAccess(5, TimeUnit.MINUTES) // 5分钟无访问
-            .build();
+    private final Cache<String, List<?>> queryCache = buildCaffeineCache();
     /**
      * 更新数据库记录
      * 该方法根据提供的参数和数据库配置,执行相应的数据库操作(插入、更新、删除)
@@ -68,6 +67,123 @@ public class DbExecutor {
         //如果事件类型不支持,返回错误信息
         return Result.fail("不支持的事件类型");
     }
+    /**
+     * 更新数据库记录
+     * 该方法根据提供的参数和数据库配置,执行相应的数据库操作(插入、更新、删除)
+     *
+     * @param dbConfig 数据库配置信息,包括连接字符串、用户名、密码等
+     * @param tableName 需要操作的数据库表名
+     * @param params 操作所需的参数,包括操作类型(INSERT、UPDATE、DELETE)及相应的数据
+     * @return 返回一个Result对象,包含操作结果,主要是影响的行数
+     */
+    public Result<List<List<Object>>> dbUpdateWithFailures(Map<String, String> dbConfig, String tableName, Map<String, Object> params) {
+        //参数预处理,包括参数校验 (会拼接成完整的sql)
+        Result<DbParamsService.ProcessedResult> processDataContextResult = dbParamsService.processExecutorParams(params, dbConfig, tableName);
+        //如果参数不合法,返回错误信息
+        if (!processDataContextResult.isSuccess()) {
+            return Result.fail(processDataContextResult.getError());
+        }
+        DbParamsService.ProcessedResult dbParams = processDataContextResult.getData();
+        //依据不同事件进行操作
+        String event = dbParams.getEvent().toUpperCase();
+        //根据事件类型执行相应的数据库操作
+        if(event.equals("INSERT")){//新增
+            return execInsertWithFailures(dbConfig, tableName, dbParams);
+        }
+        if(event.equals("UPDATE")){//更新
+            return execUpdateWithFailures(dbConfig,dbParams);
+        }
+        //如果事件类型不支持,返回错误信息
+        return Result.fail("不支持的事件类型");
+    }
+    /**
+     * 执行插入操作
+     *
+     * @param dbConfig 数据库配置信息
+     * @param tableName 表名
+     * @param dbParams 数据库参数服务处理结果,包含插入所需的字段和值
+     * @return 返回插入操作的结果,包括成功或失败及受影响的行数
+     */
+    private Result<List<List<Object>>> execInsertWithFailures(Map<String, String> dbConfig, String tableName,
+                                       DbParamsService.ProcessedResult dbParams){
+        try  {
+            // 检查是否存在新增字段,若存在则创建表结构
+            Map<String, Object> addColumn = dbParams.getAddColumns(); // "c" : value
+
+            if (!addColumn.isEmpty()) { // 不存在的列数量是否大于0
+                // 创建表 或者创建字段
+                Result<Integer> createTableResult = tableStructureManager.createTable(dbConfig, tableName, addColumn);
+                if (!createTableResult.isSuccess()) {
+                    return Result.fail("表创建失败");
+                }
+            }
+
+            // 获取插入的列名和对应的值,执行批量插入操作
+            String insertColumns = dbParams.getInsertColumns();
+            List<List<Object>> insertParam = dbParams.getInsertValues(); // 占位符对应的value集合
+            if (!insertParam.isEmpty()) { //  新增
+                Result<List<List<Object>>> batchInsert = jdbcExecutor.batchUpdateWithFailures(dbConfig, insertColumns, insertParam);
+                if(!batchInsert.isSuccess()){
+                    return Result.fail(batchInsert.getError());
+                }
+                return Result.success(batchInsert.getData());
+            }
+
+            // 检查是否存在更新条件,若存在则执行更新操作
+            Map<String,List<List<Object>>> queryConditionList = dbParams.getQueryConditions();
+            if(!queryConditionList.isEmpty()){
+                Result<List<List<Object>>> updated = execUpdateWithFailures(dbConfig,dbParams);
+                if(!updated.isSuccess()){
+                    return Result.fail(updated.getError());
+                }
+                return Result.success(updated.getData());
+            }
+
+            // 返回插入操作的结果,包括受影响的行数
+            return Result.success(null);
+        } catch (Exception e) {
+            logger.error("新增错误", e);
+            return Result.fail("新增错误", e);
+        }
+    }
+    /**
+     * 执行数据库更新操作
+     *
+     * @param dbConfig 数据库配置信息,包含连接数据库所需的参数
+     * @param dbParams 处理后的数据库参数对象,包含更新所需的列和查询条件
+     * @return 返回一个Result对象,包含受影响的行数如果更新操作失败,返回失败的Result对象
+     */
+    private Result<List<List<Object>>> execUpdateWithFailures(Map<String, String> dbConfig,
+                                       DbParamsService.ProcessedResult dbParams){
+        try {
+            // 获取需要更新的列字符串
+            String updateColumns = dbParams.getUpdateColumns();
+            List<List<Object>> failedList = null;
+            // 初始化更新计数器
+            // 遍历查询条件,执行更新操作
+            for(String queryCondition : dbParams.getQueryConditions().keySet()){
+                // 获取当前查询条件的参数
+                List<List<Object>> queryParam = dbParams.getQueryConditions().get(queryCondition);
+                // 拼接更新SQL语句 , sql拼接完成
+                String updateSql = updateColumns.concat(" where ").concat(queryCondition);
+                // 执行批量更新操作
+                Result<List<List<Object>>> batchedUpdate = jdbcExecutor.batchUpdateWithFailures(dbConfig, updateSql, queryParam);
+                // 如果更新失败,返回错误信息
+                if(!batchedUpdate.isSuccess()){
+                    return Result.fail(batchedUpdate.getError());
+                }else {
+                    failedList.addAll(batchedUpdate.getData());
+                }
+            }
+            // 更新操作成功,返回成功信息和更新的总行数
+            return Result.success(failedList);
+        } catch (Exception e) {
+            // 记录错误日志
+            logger.error("动态更新失败", e);
+            // 返回更新失败的Result对象,包含异常信息
+            return Result.fail("动态更新失败: " + e.getMessage());
+        }
+    }
 
     /**
      * 执行插入操作
@@ -83,7 +199,7 @@ public class DbExecutor {
             // 检查是否存在新增字段,若存在则创建表结构
             Map<String, Object> addColumn = dbParams.getAddColumns(); // "c" : value
 
-            if (addColumn.size() > 0) { // 不存在的列数量是否大于0
+            if (!addColumn.isEmpty()) { // 不存在的列数量是否大于0
                 // 创建表 或者创建字段
                 Result<Integer> createTableResult = tableStructureManager.createTable(dbConfig, tableName, addColumn);
                 if (!createTableResult.isSuccess()) {
@@ -95,7 +211,7 @@ public class DbExecutor {
             String insertColumns = dbParams.getInsertColumns();
             List<List<Object>> insertParam = dbParams.getInsertValues(); // 占位符对应的value集合
             int insertCount = 0;
-            if (insertParam.size() > 0) { //  新增
+            if (!insertParam.isEmpty()) { //  新增
                 Result<Integer> batchInsert = jdbcExecutor.batchUpdate(dbConfig, insertColumns, insertParam);
                 if(!batchInsert.isSuccess()){
                     return Result.fail(batchInsert.getError());
@@ -105,7 +221,7 @@ public class DbExecutor {
 
             // 检查是否存在更新条件,若存在则执行更新操作
             Map<String,List<List<Object>>> queryConditionList = dbParams.getQueryConditions();
-            if(queryConditionList.size() > 0){
+            if(!queryConditionList.isEmpty()){
                 Result<Integer> updated = execUpdate(dbConfig,dbParams);
                 if(!updated.isSuccess()){
                     return Result.fail(updated.getError());
@@ -184,6 +300,40 @@ public class DbExecutor {
             return Result.fail("动态删除失败: " + e.getMessage());
         }
     }
+
+    /**
+     * 单条插入/更新(带条件判断)
+     * 此方法用于在数据库中插入或更新一条记录
+     * 它通过判断参数中的"event"字段来决定是插入还是更新操作
+     * 注意:此方法不会直接执行插入或更新操作,而是将操作类型和数据传递给dbUpdate方法进行处理
+     *
+     * @param dbConfig 数据库配置信息,包括数据库URL、用户名、密码等
+     * @param tableName 要操作的表名
+     * @param params 插入或更新的数据,以及操作类型(通过"event"字段指定)
+     * @return 返回一个Result对象,包含受影响的行数
+     */
+    public Result<List<List<Object>>> insertWithFailures(Map<String, String> dbConfig, String tableName,Map<String, Object> params) {
+        // 设置操作类型为插入
+        params.put("event", "INSERT");
+        // 调用dbUpdate方法执行插入操作
+        return dbUpdateWithFailures( dbConfig,  tableName, params);
+    }
+
+    /**
+     * 统一更新入口,自动判断数据格式类型并执行更新操作
+     *
+     * @param dbConfig  数据库连接信息
+     * @param tableName 表名
+     * @param params    更新参数
+     * @return 更新结果
+     */
+    public Result<List<List<Object>>> updateWithFailures(Map<String, String> dbConfig, String tableName,Map<String, Object> params) {
+        // 在参数中添加事件标识,表明这是一个更新操作
+        params.put("event", "UPDATE");
+        // 调用数据库更新方法,执行实际的更新操作
+        return dbUpdateWithFailures(dbConfig, tableName, params);
+    }
+
     /**
      * 单条插入/更新(带条件判断)
      * 此方法用于在数据库中插入或更新一条记录
@@ -419,7 +569,7 @@ public class DbExecutor {
         // 获取查询条件及查询参数
         Map<String, List<List<Object>>> queryConditionsList = dbParams.getQueryConditions();
         // 如果存在查询条件,则处理这些条件
-        if(queryConditionsList.size() > 0){
+        if(!queryConditionsList.isEmpty()){
             // 存储所有的查询条件
             List<String> queryConditions = new ArrayList<>();
             // 遍历每个查询条件,将其用括号包裹后添加到查询条件列表中,并合并参数到whereParam中

+ 21 - 8
src/main/java/com/bfkj/unidia/DataBaseUtils/DbParamsService.java

@@ -6,6 +6,7 @@ import com.github.benmanes.caffeine.cache.Caffeine;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -17,10 +18,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.sql.Timestamp;
 
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
 /**
  * 把前端传入的参数转换为sql语句
  */
-@Component
+@Service
 public class DbParamsService {
     private static final Logger logger = LoggerFactory.getLogger(DbParamsService.class);
     private final DbUtils dbUtil;
@@ -32,9 +35,7 @@ public class DbParamsService {
         this.jdbcExecutor = jdbcExecutor;
     }
     private static final List<String> EVENTS = List.of("SELECT", "INSERT", "UPDATE", "DELETE");
-    private final Cache<String, Boolean> oldDataCache = Caffeine.newBuilder()
-            .maximumSize(10000)
-            .build();
+    private final Cache<String, Boolean> oldDataCache = buildCaffeineCache();
     /**
      * 处理执行器参数,生成包含处理结果的对象。
      *
@@ -293,12 +294,24 @@ public class DbParamsService {
                 queryCriteria.append(conditionMap.containsKey("right") ? conditionMap.get("right") : "");
             }
         }
+        String string = queryCriteria.toString().trim();
+        replaceAll(queryCriteria, "AND )", ")");
+        replaceAll(queryCriteria, "OR )", ")");;
         // 清理末尾连接符
-        if (queryCriteria.length() > 0 && (queryCriteria.toString().endsWith("AND") ||
-                queryCriteria.toString().endsWith("OR"))) {
+        if (!queryCriteria.isEmpty() && (string.endsWith("AND") ||
+                string.endsWith("OR"))) {
             queryCriteria.delete(queryCriteria.length() - 4, queryCriteria.length());
         }
     }
+    public static void replaceAll(StringBuilder builder, String from, String to) {
+        int index = builder.indexOf(from);
+        while (index != -1) {
+            builder.replace(index, index + from.length(), to);
+            index = builder.indexOf(from, index + to.length());
+        }
+    }
+
+
     //行数据结构更新字段数据ConcurrentHashMap、查询条件String、查询参数Object[]
     private record RowParams(Map<String, Object> updateValue,StringBuilder queryCriteria,List<Object> queryParam) {}
     /**
@@ -327,12 +340,12 @@ public class DbParamsService {
                     .append(" = ? ,");
         }
         // 清理末尾连接符
-        if (insertColumns.length() > 0 && insertColumns.toString().endsWith(",")) {
+        if (!insertColumns.isEmpty() && insertColumns.toString().endsWith(",")) {
             insertColumns.delete(insertColumns.length() - 1, insertColumns.length());
             insertParams.delete(insertParams.length() - 1, insertParams.length());
         }
         insertColumns.append(") VALUES(").append(insertParams).append(")");//添加参数占位符
-        if (updateColumns.length() > 0 && updateColumns.toString().endsWith(",")) {
+        if (!updateColumns.isEmpty() && updateColumns.toString().endsWith(",")) {
             updateColumns.delete(updateColumns.length() - 1, updateColumns.length());
         }
     }

+ 2 - 1
src/main/java/com/bfkj/unidia/DataBaseUtils/DbUtils.java

@@ -4,6 +4,7 @@ import com.bfkj.unidia.Result;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import java.util.*;
 import java.util.regex.Matcher;
@@ -18,7 +19,7 @@ import java.util.regex.Pattern;
  * 4. 字段结构验证
  * 5. 条件语句构建
  */
-@Component
+@Service
 public class DbUtils {
     private static final Logger logger = LoggerFactory.getLogger(DbUtils.class);
     // 安全 SQL 缓存(避免重复检测)

+ 37 - 5
src/main/java/com/bfkj/unidia/DataBaseUtils/JdbcExecutor.java

@@ -22,6 +22,7 @@ import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
 import org.springframework.jdbc.core.namedparam.SqlParameterSource;
 import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import java.sql.*;
 import java.util.*;
@@ -29,6 +30,9 @@ import java.util.concurrent.*;
 import java.util.function.Function;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
 
 /**
  * 高性能 JDBC 执行器,支持:
@@ -37,7 +41,7 @@ import java.util.regex.Pattern;
  * - 分批次插入
  * - 自动事务控制
  */
-@Component
+@Service
 @Slf4j
 public class JdbcExecutor {
     private static final Logger logger = LoggerFactory.getLogger(JdbcExecutor.class);
@@ -46,10 +50,7 @@ public class JdbcExecutor {
     // 线程池
     private final ExecutorService executor = ForkJoinPool.commonPool();
     // 安全 SQL 缓存(避免重复检测)
-    private final Cache<String, Boolean> safeSqlCache = Caffeine.newBuilder()
-            .maximumSize(10_000) // 设置最大缓存数量
-            .expireAfterWrite(5, TimeUnit.MINUTES) // 过期时间
-            .build();
+    private final Cache<String, Boolean> safeSqlCache = buildCaffeineCache();
     @Autowired
     public JdbcExecutor(ConnectionPoolManager poolManager) {
         this.poolManager = poolManager;
@@ -368,6 +369,37 @@ public class JdbcExecutor {
             return Result.success(totalSuccess);
         });
     }
+
+
+    /**
+     * 批量执行更新操作,并返回失败的记录
+     *
+     * @param dbConfig 数据库配置信息
+     * @param sql SQL 插入/更新语句
+     * @param batchParams 每一组参数代表一条记录
+     * @return Result<Integer> 包含成功更新的总记录数
+     */
+    public Result<List<List<Object>>> batchUpdateWithFailures(Map<String, String> dbConfig,String sql,List<List<Object>> batchParams) {
+        return validateParams(dbConfig,sql, batchParams,"update",dataSource -> {
+            // 根据负载情况返回相应的批处理大小
+            int dynamicBatchSize =  calculateDynamicBatchSize(dataSource);
+            //参数分批次
+            List<List<List<Object>>> batches = splitBatch(batchParams, dynamicBatchSize);
+            List<List<Object>> failedParams = null;
+            //循环批次
+            for (List<List<Object>> batch : batches) {
+                Result<Integer> result = executeBatchWithRetry(dataSource, sql, batch);
+                if (!result.isSuccess()) {
+                    logger.warn("整体批量插入失败,尝试单条插入");
+                    // 单条插入并统计成功数量
+                    failedParams = batch.parallelStream()
+                            .filter(params -> !update(dbConfig, sql, params).isSuccess())
+                            .toList();
+                }
+            }
+            return Result.success(failedParams);
+        });
+    }
     /**
      * 验证参数并执行数据库操作
      * 此方法主要用于验证传入的数据库配置和SQL语句是否符合规范,以防止SQL注入等安全问题

+ 5 - 5
src/main/java/com/bfkj/unidia/DataBaseUtils/TableStructureManager.java

@@ -8,6 +8,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import java.math.BigDecimal;
 import java.sql.*;
@@ -17,17 +18,16 @@ import java.util.*;
 import java.util.concurrent.*;
 import java.util.concurrent.locks.ReentrantLock;
 
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
 /**
  * 表结构管理器,用于获取和缓存数据库表结构信息(多数据库兼容版)
  */
-@Component
+@Service
 public class TableStructureManager {
     private static final Logger logger = LoggerFactory.getLogger(TableStructureManager.class);
     // 表结构缓存:数据源标识 -> 表名 -> 表结构信息
-    private final Cache<String, ConcurrentMap<String, TableSchema>> schemaCache = Caffeine.newBuilder()
-            .maximumSize(1000)
-            .expireAfterAccess(30, TimeUnit.MINUTES)
-            .build();
+    private final Cache<String, ConcurrentMap<String, TableSchema>> schemaCache = buildCaffeineCache();
     // 数据库连接池管理器
     private final ConnectionPoolManager poolManager;
     // 数据库工具类

+ 32 - 3
src/main/java/com/bfkj/unidia/DataUtils/DataFormatConverter.java

@@ -111,10 +111,10 @@ public class DataFormatConverter {
         Map<String, Object> result = new HashMap<>();
         Deque<String> path = new ArrayDeque<>();
         Deque<Map<String, Object>> mapStack = new ArrayDeque<>();
-
+        StringBuffer buffer  = new StringBuffer();
         while (eventReader.hasNext()) {
+            if (buffer==null) buffer  = new StringBuffer();
             XMLEvent event = eventReader.nextEvent();
-
             if (event.isStartElement()) {
                 StartElement startElement = event.asStartElement();
                 String qualifiedName = startElement.getName().getLocalPart();
@@ -139,16 +139,25 @@ public class DataFormatConverter {
                 Characters characters = event.asCharacters();
                 if (!characters.isWhiteSpace()) {
                     String text = characters.getData();
+                    if (text.equals(".J/G/PEKPORT//30JUN/205440L///XI/CL")){
+                        System.out.println(text);
+                    }
                     if (!mapStack.isEmpty() && mapStack.peek() != null) {
                         if (characters.isCData()) {
                             mapStack.peek().put("#cdata", text);
                         } else {
-                            mapStack.peek().put("#text", text);
+                            buffer.append(text);
+                            mapStack.peek().put("#text", buffer.toString());
                         }
                     }
                 }
             }
             else if (event.isEndElement()) {
+                if (buffer!=null){
+                    System.out.println("------------------------------buffer");
+                    System.out.println(buffer.toString());
+                }
+                buffer = null;
                 if (!mapStack.isEmpty()) {
                     Map<String, Object> current = mapStack.pop();
                     String key = path.pop();
@@ -164,6 +173,26 @@ public class DataFormatConverter {
         return mapToXml(map, "UTF-8", "1.0");
     }
 
+    /**
+     * 集合xml批量转json字符串
+     * @param content
+     * @return
+     * @throws XMLStreamException
+     */
+    public static List<String> xmlsToMap(List<String> content) throws XMLStreamException {
+        List<String> maps = new ArrayList<>();
+        if (content!=null&&content.size()>0){
+            for (String str : content) {
+                System.out.println("接收到需要转换Map的数据");
+
+                Map<String, Object> map = xmlToMap(str);
+                String jsonStr = mapToJson(map);
+                maps.add(jsonStr);
+            }
+        }
+        return maps;
+    }
+
     public static String mapToXml(Map<String, Object> map, String encoding, String version) {
         StringWriter writer = new StringWriter();
         try {

+ 14 - 42
src/main/java/com/bfkj/unidia/IOUtils/ActiveMQHelper.java

@@ -5,24 +5,20 @@ import com.bfkj.unidia.Result;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
 
 import javax.annotation.PreDestroy;
 import javax.jms.*;
-import java.time.Duration;
 import java.util.*;
 
 import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
+import org.springframework.stereotype.Service;
 
-@Component
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
+@Service
 public class ActiveMQHelper {
     private static final Logger logger = LoggerFactory.getLogger(ActiveMQHelper.class);
 
-    // 缓存配置
-    private static final int MAX_CACHE_SIZE = 50;
-    private static final Duration CACHE_EXPIRE_TIME = Duration.ofMinutes(16);
-
     // 缓存实例
     private static final Cache<String, ActiveMQConnectionFactory> connectionFactoryCache = buildCaffeineCache();
     private static final Cache<String, Connection> connectionCache = buildCaffeineCache();
@@ -44,20 +40,17 @@ public class ActiveMQHelper {
      *
      * @param config      配置信息,可能包含连接参数、超时设置等
      * @param destination 目标地址,如队列名称或主题
-     * @param message     待发送的消息对象,可以是单个对象或对象列表
+     * @param data     待发送的消息对象,可以是单个对象或对象列表
      * @return Result<Integer> 操作结果,包含发送成功的消息数量
      * 该方法通过判断消息类型决定调用批量发送接口的不同参数形式,
      * 若为列表类型则直接传递,否则将单条消息包装为列表形式发送。
      */
-    public static Result<Integer> sendMessage(Map<String, String> config,
+    public static Result<List<?>> send(Map<String, String> config,
                                               String destination,
-                                              Object message) {
-        //判断消息是否为List类型,直接处理批量消息场景
-        if(message instanceof List<?> messageList){
-            return batchSendMessage(config, destination, messageList);
-        }
-        //非批量消息统一转换为单元素列表进行发送处理
-        return batchSendMessage(config, destination, Collections.singletonList(message));
+                                              Object data) {
+        //判断消息是否为List类型,直接处理批量消息场景 非批量消息统一转换为单元素列表进行发送处理
+        return batchSendMessage(config, destination,
+                data instanceof List<?> dataList ? dataList : Collections.singletonList(data));
     }
 
     /**
@@ -68,10 +61,11 @@ public class ActiveMQHelper {
      * @param messages 待发送的消息列表
      * @return 成功发送的消息数量
      */
-    private static Result<Integer> batchSendMessage(Map<String, String> config,
+    private static Result<List<?>> batchSendMessage(Map<String, String> config,
                                                    String destination,
                                                    List<?> messages) {
         Session session = null;
+        List<String> failList = new ArrayList<>();
         try {
             // 校验配置参数有效性
             Result<String> validateResult = validateConfig(config, destination, messages);
@@ -88,23 +82,22 @@ public class ActiveMQHelper {
             if (producer == null) {
                 return Result.fail("获取生产者失败");
             }
-            int successCount = 0;
             for (Object messageObj : messages) {//遍历消息列表
                 String message = null;
                 try {
                     message = DataFormatConverter.convertObjectToString(messageObj);//转换为字符串
                     TextMessage textMessage = session.createTextMessage(message);//创建文本消息
                     producer.send(textMessage);//发送消息
-                    successCount++;
                 } catch (JMSException e) {
                     logger.error("发送消息失败: {}", message, e);
+                    failList.add(message);
                 }
             }
             // 事务控制
             if (session.getTransacted()) {
                 session.commit(); // 批量提交
             }
-            return Result.success(successCount);
+            return Result.success(failList);
         } catch (Exception e) {
             rollback(session,"ActiveMQHelper的batchSendMessage回滚事务失败");
             logger.error("ActiveMQ 批量发送异常", e);
@@ -467,27 +460,6 @@ public class ActiveMQHelper {
             return false;
         }
     }
-
-    /**
-     * 构建并配置一个Caffeine缓存实例。
-     *
-     * <p>该方法设置缓存的最大条目数、访问后过期时间,并注册移除监听器以处理缓存项被移除时的清理操作。
-     * 缓存配置参数来源于静态常量,适用于通用缓存场景。
-     *
-     * @param <K> 缓存键的类型
-     * @param <V> 缓存值的类型
-     * @return 配置好的Caffeine缓存实例
-     * @see Caffeine#newBuilder()
-     * @see #MAX_CACHE_SIZE
-     * @see #CACHE_EXPIRE_TIME
-     */
-    private static <K, V> Cache<K, V> buildCaffeineCache() {
-        return Caffeine.newBuilder()
-                .maximumSize(MAX_CACHE_SIZE)
-                .expireAfterAccess(CACHE_EXPIRE_TIME)
-                .removalListener((key, value, cause) -> closeAutoCloseable(value, key))
-                .build();
-    }
     /**
      * 回滚指定的JMS会话并处理可能的异常。
      *

+ 2 - 1
src/main/java/com/bfkj/unidia/IOUtils/HttpHelper.java

@@ -21,6 +21,7 @@ import org.apache.hc.core5.util.Timeout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import java.io.File;
 import java.io.FileOutputStream;
@@ -29,7 +30,7 @@ import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
 import java.util.Map;
 
-@Component
+@Service
 public class HttpHelper {
     private static final Logger logger = LoggerFactory.getLogger(HttpHelper.class);
     // 连接池管理(支持高并发)

+ 30 - 46
src/main/java/com/bfkj/unidia/IOUtils/IBMMQHelper.java

@@ -3,47 +3,25 @@ package com.bfkj.unidia.IOUtils;
 import com.bfkj.unidia.DataUtils.DataFormatConverter;
 import com.bfkj.unidia.Result;
 import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import com.ibm.mq.*;
 import com.ibm.mq.constants.CMQC;
 import com.ibm.mq.constants.MQConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import javax.annotation.PreDestroy;
-import java.time.Duration;
 import java.util.*;
 
-@Component
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
+@Service
 public class IBMMQHelper {
     private static final Logger logger = LoggerFactory.getLogger(IBMMQHelper.class);
     // 缓存队列管理器实例
-    private static final int MAX_CACHE_SIZE = 50;
-    private static final Cache<String, MQQueueManager> queueManagerCache;
-    static {
-        queueManagerCache = Caffeine.newBuilder()
-                .maximumSize(MAX_CACHE_SIZE)
-                .expireAfterAccess(Duration.ofMinutes(16))
-                .removalListener((key, value, cause) -> logger.info("QueueManager缓存移除: {} 原因: {}", key, cause))
-                .build();
-    }
-    private static final Cache<String, MQQueue> producerQueueCache;
-    static {
-        producerQueueCache = Caffeine.newBuilder()
-                .maximumSize(MAX_CACHE_SIZE)
-                .expireAfterAccess(Duration.ofMinutes(16))
-                .removalListener((key, value, cause) -> logger.info("Queue缓存移除: {} 原因: {}", key, cause))
-                .build();
-    }
-    private static final Cache<String, MQQueue> consumerQueueCache;
-    static {
-        consumerQueueCache = Caffeine.newBuilder()
-                .maximumSize(MAX_CACHE_SIZE)
-                .expireAfterAccess(Duration.ofMinutes(16))
-                .removalListener((key, value, cause) -> logger.info("Queue缓存移除: {} 原因: {}", key, cause))
-                .build();
-    }
+    private static final Cache<String, MQQueueManager> queueManagerCache = buildCaffeineCache();
+    private static final Cache<String, MQQueue> producerQueueCache = buildCaffeineCache();
+    private static final Cache<String, MQQueue> consumerQueueCache = buildCaffeineCache();
     private static final Object connectionLock = new Object();
 
     private IBMMQHelper() {}
@@ -61,7 +39,7 @@ public class IBMMQHelper {
      *                  - queueManager: 队列管理器名称
      *                  可选参数包含字符集设置(characterSet/ccsid)
      * @param queueName 目标队列名称,必须为非空字符串
-     * @param message   待发送的消息内容,必须为非空字符串
+     * @param data   待发送的消息内容,必须为非空字符串
      * 返回值:
      * Result<Integer> 操作结果对象,包含以下状态:
      * - 成功:返回1表示消息发送成功
@@ -69,11 +47,22 @@ public class IBMMQHelper {
      * 异常处理:
      * 捕获所有运行时异常,通过日志记录详细错误信息并返回封装的错误结果
      */
-    public static Result<Integer> sendMessage(Map<String, String> mqConfig, String queueName, Object message) {
+    public static Result<List<?>> send(Map<String, String> mqConfig,
+                                              String queueName,
+                                              Object data) {
+        //判断消息是否为List类型,直接处理批量消息场景 非批量消息统一转换为单元素列表进行发送处理
+        return batchSendMessage(mqConfig, queueName,
+                data instanceof List<?> dataList ? dataList : Collections.singletonList(data));
+    }
+
+
+    public static Result<List<?>> batchSendMessage(Map<String, String> mqConfig, String queueName, List<?> messageList) {
         MQQueueManager queueManager = null;
         String cacheKey = null;
+        List<String> failList = new ArrayList<>();
+
         try {
-            Result<String> validateResult = validateConfig(mqConfig, queueName, message);
+            Result<String> validateResult = validateConfig(mqConfig, queueName, messageList);
             if (!validateResult.isSuccess()) {
                 logger.error("MQ配置验证失败: {}", validateResult.getError());
                 return Result.fail(validateResult.getError());
@@ -88,17 +77,11 @@ public class IBMMQHelper {
             try {
                 queue = getproducerMQQueue(queueManager,cacheKey,queueName);
                 logger.info("IBMMQ发送,生产者获取成功,准备发送……");
-                if(message instanceof List<?>){
-                    int successCount = 0;
-                    for (Object messageItem : (List<?>) message) {
-                        Result<Integer> sendSingleMessage = sendSingleMessage(queue,mqConfig, DataFormatConverter.convertObjectToString(messageItem));
-                        if(sendSingleMessage.isSuccess()){
-                            successCount++;
-                        }
+                for (Object messageItem : messageList) {
+                    String msg = sendSingleMessage(queue,mqConfig, DataFormatConverter.convertObjectToString(messageItem));
+                    if(msg != null){
+                        failList.add(msg);
                     }
-                    return Result.success(successCount);
-                }else {
-                    return sendSingleMessage(queue,mqConfig, DataFormatConverter.convertObjectToString(message));
                 }
             }catch (Exception e){
                 closeQueue(cacheKey,queue,producerQueueCache);
@@ -109,6 +92,7 @@ public class IBMMQHelper {
             closeQueueManager(cacheKey,queueManager);
             return Result.fail("IBM MQ 发送异常: " + e.getMessage());
         }
+        return Result.success(failList);
     }
     /**
      * 发送单条消息到指定的IBM MQ队列
@@ -118,7 +102,7 @@ public class IBMMQHelper {
      * @param message    待发送的字符串消息体
      * @return           操作结果封装对象,成功返回1,失败返回错误信息
      */
-    private static Result<Integer> sendSingleMessage(MQQueue queue,
+    private static String sendSingleMessage(MQQueue queue,
                                                      Map<String, String> mqConfig,
                                                      String message) {
         try {
@@ -146,12 +130,12 @@ public class IBMMQHelper {
             MQPutMessageOptions pmo = new MQPutMessageOptions();
             queue.put(mqMessage, pmo);
 
-            // 返回成功结果(消息计数1)
-            return Result.success(1);
+            return null;
         } catch (Exception e) {
             // 记录异常日志并返回失败结果
             logger.error("发送 IBM MQ 消息失败", e);
-            return Result.fail("发送消息失败: " + e.getMessage());
+//            return Result.fail("发送消息失败: " + e.getMessage());
+            return message;
         }
     }
 

+ 35 - 102
src/main/java/com/bfkj/unidia/IOUtils/KafkaHelper.java

@@ -1,7 +1,6 @@
 package com.bfkj.unidia.IOUtils;
 
 import com.github.benmanes.caffeine.cache.Cache;
-import com.github.benmanes.caffeine.cache.Caffeine;
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.clients.producer.*;
 import org.apache.kafka.common.TopicPartition;
@@ -19,31 +18,18 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import com.bfkj.unidia.Result;
 import com.bfkj.unidia.DataUtils.DataFormatConverter;
-import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import javax.annotation.PreDestroy;
 
-@Component
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
+@Service
 public class KafkaHelper {
     private static final Logger logger = LoggerFactory.getLogger(KafkaHelper.class);
-    private static final int MAX_CACHE_SIZE = 100;
     private static final int MAX_POLL_RECORDS = 500;
-    private static final Cache<String, Producer<String, String>> producerCache;
-    static {
-        producerCache = Caffeine.newBuilder()
-                .maximumSize(MAX_CACHE_SIZE)
-                .expireAfterAccess(Duration.ofMinutes(16))
-                .removalListener((key, value, cause) -> logger.info("Producer缓存移除: {} 原因: {}", key, cause))
-                .build();
-    }
-    private static final Cache<String, Consumer<String, String>> consumerCache;
-    static {
-        consumerCache = Caffeine.newBuilder()
-                .maximumSize(100)
-                .expireAfterAccess(Duration.ofMinutes(16))
-                .removalListener((key, value, cause) -> logger.info("Consumer缓存移除: {} 原因: {}", key, cause))
-                .build();
-    }
+    private static final Cache<String, Producer<String, String>> producerCache = buildCaffeineCache();
+    private static final Cache<String, Consumer<String, String>> consumerCache = buildCaffeineCache();
     private static final Object producerLock = new Object();
     private static final Object consumerLock = new Object(); // 新增消费者锁
       // 私有构造函数
@@ -56,102 +42,45 @@ public class KafkaHelper {
      * @param data        待发送的消息数据,支持单条对象或对象列表形式
      * @return            操作结果封装对象,成功时返回发送的消息数量,失败时包含错误信息
      */
-    public static Result<Integer> send(Map<String, String> kafkaConfig, String topic, Object data) {
-        try {
-            // 校验基础参数有效性
-            Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig,"bfkj", topic, data);
-            if(!checkBaseParamResult.isSuccess()){
-                return Result.fail(checkBaseParamResult.getError());
-            }
-            String cacheKey = checkBaseParamResult.getData();
-            // 获取或创建Kafka生产者实例
-            Result<Producer<String, String>> producerResult = getOrCreateProducer(kafkaConfig, cacheKey);
-            if(!producerResult.isSuccess()){
-                return Result.fail(producerResult.getError());
-            }
-            Producer<String, String> producer = producerResult.getData();
-            Result<Integer> integerResult = null;
-            // 处理不同数据类型的消息发送
-            if (data instanceof List<?> dataList) {
-                integerResult = batchSend(producer, topic, dataList, cacheKey);
-            } else {
-                integerResult = sendSingleMessage(producer, topic, data, cacheKey);
-            }
-            logger.info("Kafka发送消息返回结果:");
-            logger.info(integerResult.toString());
-            return integerResult;
-        } catch (Exception e) {
-            // 记录发送异常日志并返回错误结果
-            logger.error("Kafka发送异常", e);
-            return Result.fail("Kafka发送异常: " + e.getMessage());
-        }
-    }
-    /**
-     * 向指定Kafka主题发送单条消息,并通过CountDownLatch同步等待发送结果
-     *
-     * @param producer Kafka生产者实例,负责实际的消息发送
-     * @param topic    目标Kafka主题名称
-     * @param data     待发送的消息数据对象(将转换为JSON字符串)
-     * @return Result<Integer> 包含发送结果的包装对象,成功返回1,失败返回异常信息
-     */
-    private static Result<Integer> sendSingleMessage(Producer<String, String> producer,
-                                                     String topic,
-                                                     Object data,
-                                                     String cacheKey) {
-
-        String value = DataFormatConverter.convertObjectToString(data);
-        ProducerRecord<String, String> record = new ProducerRecord<>(topic, value);
-        AtomicReference<Result<Integer>> resultRef = new AtomicReference<>();
-        CountDownLatch latch = new CountDownLatch(1);
-
-        // 异步发送Kafka消息并处理回调结果
-        producer.send(record, (metadata, exception) -> {
-            if (exception != null) {
-                if (exception instanceof InterruptException) {
-                    logger.warn("生产者操作被中断,准备关闭资源");
-                    Thread.currentThread().interrupt(); // 保留中断状态
-                    //producerCache.invalidate(generateProducerCacheKey(kafkaConfig, topic));
-                    producer.close(); // 安全关闭生产者
-                }
-                logger.error("Kafka消息发送失败", exception);
-                resultRef.set(Result.fail(exception.getMessage()));
-            } else {
-                resultRef.set(Result.success(1));
-            }
-            latch.countDown();
-        });
-
-        // 等待消息发送完成并获取执行结果
-        try {
-            latch.await(); // 等待发送完成
-            return resultRef.get(); // 通过原子引用获取结果
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            producerCache.invalidate(cacheKey);
-            producer.close(); // 安全关闭生产者
-            return Result.fail("发送线程被中断" + e.getMessage());
-        }
+    public static Result<List<?>> send(Map<String, String> kafkaConfig, String topic, Object data) {
+        //判断消息是否为List类型,直接处理批量消息场景 非批量消息统一转换为单元素列表进行发送处理
+        return batchSendMessage(kafkaConfig, topic,
+                data instanceof List<?> dataList ? dataList : Collections.singletonList(data));
     }
 
     /**
      * 批量发送消息到Kafka主题,支持异步发送并处理结果。
      *
-     * @param producer  Kafka生产者实例,用于发送消息
      * @param topic     目标Kafka主题名称
      * @param dataList  待发送的数据列表,元素类型为可序列化对象
      * @return          返回包含发送结果的Result对象:
      *                  - 成功时返回成功发送的消息数量
      *                  - 失败时返回错误信息(超时/部分失败/异常)
      */
-    private static Result<Integer> batchSend(Producer<String, String> producer,
+    private static Result<List<?>> batchSendMessage(Map<String, String> kafkaConfig,
                                              String topic,
-                                             List<?> dataList,
-                                             String cacheKey) {
+                                             List<?> dataList) {
+        String cacheKey = null;
+        Producer<String, String> producer = null;
         try {
+            // 校验基础参数有效性
+            Result<String> checkBaseParamResult = checkBaseParam(kafkaConfig,"bfkj", topic, dataList);
+            if(!checkBaseParamResult.isSuccess()){
+                return Result.fail(checkBaseParamResult.getError());
+            }
+            cacheKey = checkBaseParamResult.getData();
+            // 获取或创建Kafka生产者实例
+            Result<Producer<String, String>> producerResult = getOrCreateProducer(kafkaConfig, cacheKey);
+            if(!producerResult.isSuccess()){
+                return Result.fail(producerResult.getError());
+            }
+            producer = producerResult.getData();
+
             //数据预处理阶段:  将输入数据列表转换为Kafka ProducerRecord对象集合
             List<ProducerRecord<String, String>> records = dataList.stream()
                     .map(item -> new ProducerRecord<String, String>(topic, DataFormatConverter.convertObjectToString(item)))
                     .toList();
+            List<String> failList = new ArrayList<>();
 
             //异步发送初始化
             AtomicInteger successCount = new AtomicInteger(0);
@@ -159,11 +88,13 @@ public class KafkaHelper {
             CountDownLatch latch = new CountDownLatch(records.size());
 
             //异步发送执行
-            records.forEach(record -> producer.send(record, (metadata, exception) -> {
+            Producer<String, String> finalProducer = producer;
+            records.forEach(record -> finalProducer.send(record, (metadata, exception) -> {
                 if (exception != null) {
                     if (firstError.get() == null) {
                         firstError.set(exception);
                         logger.error("批量发送失败", exception);
+                        failList.add(record.value());
                     }
                 } else {
                     successCount.incrementAndGet();
@@ -190,9 +121,11 @@ public class KafkaHelper {
             if (firstError.get() != null) {
                 return Result.fail("部分消息发送失败: " + firstError.get().getMessage());
             }
-            return Result.success(successCount.get());
+            return Result.success(failList);
         } catch (Exception e) {
-            closeProducer(cacheKey,producer);
+            if(producer != null && cacheKey != null) {
+                closeProducer(cacheKey,producer);
+            }
             logger.error("批量处理异常", e);
             return Result.fail("批量处理失败: " + e.getMessage());
         }

+ 36 - 30
src/main/java/com/bfkj/unidia/IOUtils/RabbitMQHelper.java

@@ -1,48 +1,51 @@
 package com.bfkj.unidia.IOUtils;
 
+import com.bfkj.unidia.DataUtils.DataFormatConverter;
 import com.bfkj.unidia.Result;
 import com.github.benmanes.caffeine.cache.Cache;
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.rabbitmq.client.*;
+import org.apache.kafka.clients.producer.Producer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
 
 import javax.annotation.PreDestroy;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-@Component
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
+@Service
 public class RabbitMQHelper {
     private static final Logger logger = LoggerFactory.getLogger(RabbitMQHelper.class);
-    private static final int MAX_CACHE_SIZE = 100;
-    private static final Cache<String, Connection> connectionCache = Caffeine.newBuilder()
-            .maximumSize(MAX_CACHE_SIZE)
-            .expireAfterAccess(10, TimeUnit.MINUTES)
-            .build();
-    private static final Cache<String, Channel> channelCache = Caffeine.newBuilder()
-            .maximumSize(MAX_CACHE_SIZE)
-            .expireAfterAccess(10, TimeUnit.MINUTES)
-            .build();
+    private static final Cache<String, Connection> connectionCache = buildCaffeineCache();
+    private static final Cache<String, Channel> channelCache = buildCaffeineCache();
     private static final Object connectionLock = new Object();
 
     public RabbitMQHelper() {}
 
-    /**
-     * 发送消息到RabbitMQ交换机
-     *
-     * @param rabbitMQConfig RabbitMQ基础配置参数,需包含host、port等必填项
-     * @param exchange       目标交换机名称
-     * @param routingKey     路由键
-     * @param message        待发送的消息内容(UTF-8编码)
-     * @return Result<Integer> 操作结果封装对象,成功返回1,失败返回错误信息
-     */
-    public static Result<Integer> sendMessage(Map<String, String> rabbitMQConfig, String exchange, String routingKey, String message) {
+    public static Result<List<?>> send(Map<String, String> rabbitMQConfig, String exchange, String routingKey, Object data) {
+        // 处理不同数据类型的消息发送
+        return batchSendMessage(rabbitMQConfig, exchange, routingKey,
+                data instanceof List<?> dataList ? dataList : Collections.singletonList(data));
+    }
+
+    public static Result<List<?>> batchSendMessage(Map<String, String> rabbitMQConfig, String exchange, String routingKey, List<?> messages) {
+        List<String> failList = new ArrayList<>();
+
         try {
             // 参数校验阶段
-            Result<Object> checkBaseParamResult = checkBaseParams(rabbitMQConfig, exchange, message);
+            if (messages == null || messages.isEmpty()) {
+                return Result.fail("消息列表不能为空");
+            }
+            Result<Object> checkBaseParamResult = checkBaseParams(rabbitMQConfig, exchange, "");
             if (!checkBaseParamResult.isSuccess()) {
                 return Result.fail(checkBaseParamResult.getError());
             }
@@ -51,18 +54,21 @@ public class RabbitMQHelper {
             if (channel == null) {
                 return Result.fail("获取通道失败");
             }
-            try {
-                // 发送消息到指定交换机
-                channel.basicPublish(exchange, routingKey, null, message.getBytes());
-                return Result.success(1);
-            } catch (IOException e) {
-                logger.error("RabbitMQ消息发送失败", e);
-                return Result.fail("消息发送失败: " + e.getMessage());
+            // 批量发送消息
+            for (Object message : messages) {
+                String msg = DataFormatConverter.convertObjectToString(message);
+                try {
+                    channel.basicPublish(exchange, routingKey, null, msg.getBytes());
+                } catch (IOException e) {
+                    logger.error("RabbitMQ批量消息发送失败", e);
+                    failList.add(msg);
+                }
             }
         } catch (Exception e) {
-            logger.error("RabbitMQ发送异常", e);
-            return Result.fail("RabbitMQ发送异常: " + e.getMessage());
+            logger.error("RabbitMQ批量发送异常", e);
+            return Result.fail("RabbitMQ批量发送异常: " + e.getMessage());
         }
+        return Result.success(failList);
     }
 
     /**

+ 229 - 202
src/main/java/com/bfkj/unidia/IOUtils/RocketMQHelper.java

@@ -1,273 +1,300 @@
 package com.bfkj.unidia.IOUtils;
 
+import com.bfkj.unidia.DataUtils.DataFormatConverter;
+import com.bfkj.unidia.Result;
+import com.github.benmanes.caffeine.cache.Cache;
 import org.apache.rocketmq.client.apis.ClientConfiguration;
 import org.apache.rocketmq.client.apis.ClientException;
 import org.apache.rocketmq.client.apis.ClientServiceProvider;
-import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
 import org.apache.rocketmq.client.apis.consumer.FilterExpression;
 import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
 import org.apache.rocketmq.client.apis.consumer.PushConsumer;
 import org.apache.rocketmq.client.apis.message.Message;
-import org.apache.rocketmq.client.apis.message.MessageBuilder;
 import org.apache.rocketmq.client.apis.message.MessageView;
 import org.apache.rocketmq.client.apis.producer.Producer;
 import org.apache.rocketmq.client.apis.producer.SendReceipt;
-import org.apache.rocketmq.client.apis.producer.Transaction;
-import org.apache.rocketmq.client.apis.producer.TransactionResolution;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
-import java.io.Closeable;
-import java.time.Duration;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import java.util.*;
 import java.util.function.Consumer;
 
-public class RocketMQHelper implements Closeable {
+import static com.bfkj.unidia.cacheUtils.CacheUtil.buildCaffeineCache;
+
+@Service
+public class RocketMQHelper {
     private static final Logger logger = LoggerFactory.getLogger(RocketMQHelper.class);
-    // RocketMQ 客户端服务提供者
     private static final ClientServiceProvider PROVIDER = ClientServiceProvider.loadService();
 
-    // 生产者缓存 (producerGroup -> Producer)
-    private static final Map<String, Producer> PRODUCER_MAP = new ConcurrentHashMap<>();
+    // 缓存实例
+    private static final Cache<String, Producer> producerCache = buildCaffeineCache();
+    private static final Cache<String, PushConsumer> consumerCache = buildCaffeineCache();
 
-    // 消费者缓存 (consumerGroup -> PushConsumer)
-    private static final Map<String, PushConsumer> CONSUMER_MAP = new ConcurrentHashMap<>();
+    // 锁对象
+    private static final Object producerLock = new Object();
+    private static final Object consumerLock = new Object();
 
-    // 线程池用于异步操作
-    private static final ExecutorService ASYNC_EXECUTOR = Executors.newFixedThreadPool(
-            Runtime.getRuntime().availableProcessors() * 4
-    );
-    private RocketMQHelper(){}
-    // 默认客户端配置
-    private static ClientConfiguration defaultClientConfig(String endpoints) {
-        return ClientConfiguration.newBuilder()
-                .setEndpoints(endpoints)
-                .setRequestTimeout(Duration.ofSeconds(3))
-                .build();
-    }
-    // 获取或创建生产者
-    private static synchronized Producer getOrCreateProducer(String producerGroup, String endpoints) {
-        return PRODUCER_MAP.computeIfAbsent(producerGroup, key -> {
-            try {
-                ClientConfiguration config = defaultClientConfig(endpoints);
-                return PROVIDER.newProducerBuilder()
-                        .setClientConfiguration(config)
-                        .build();
-            } catch (ClientException e) {
-                throw new RocketMQException("Failed to create producer: " + producerGroup, e);
-            }
-        });
-    }
-    // 获取或创建消费者
-    private static synchronized PushConsumer getOrCreateConsumer(String consumerGroup, String endpoints,
-                                                                 String topic, String tag, Consumer<MessageView> messageHandler) throws ClientException {
-        String cacheKey = consumerGroup + "@" + topic + "#" + tag;
-        return CONSUMER_MAP.computeIfAbsent(cacheKey, key -> {
-            try {
-                ClientConfiguration config = defaultClientConfig(endpoints);
-                FilterExpression filter = new FilterExpression(tag, FilterExpressionType.TAG);
+    private RocketMQHelper() {}
 
-                return PROVIDER.newPushConsumerBuilder()
-                        .setClientConfiguration(config)
-                        .setConsumerGroup(consumerGroup)
-                        .setSubscriptionExpressions(Map.of(topic, filter))
-                        .setMessageListener(messageView -> {
-                            messageHandler.accept(messageView);
-                            return ConsumeResult.SUCCESS;
-                        })
-                        .build();
-            } catch (ClientException e) {
-                throw new RocketMQException("Failed to create consumer: " + consumerGroup, e);
-            }
-        });
+    /**
+     * 发送消息到指定主题,支持单条或批量发送。
+     *
+     * @param config      配置信息,包含端点、生产者组等
+     * @param topic       主题名称
+     * @param tag         标签
+     * @param data        待发送的消息对象,可以是单个对象或对象列表
+     * @return Result<List<?>> 操作结果,包含发送成功的消息数量
+     */
+    public static Result<List<?>> send(Map<String, String> config,
+                                       String topic,
+                                       String tag,
+                                       Object data) {
+        return batchSendMessage(config, topic, tag,
+                data instanceof List<?> dataList ? dataList : Collections.singletonList(data));
     }
 
     /**
-     * 同步发送消息
+     * 批量发送消息到 RocketMQ
      *
-     * @param producerGroup 生产者组
-     * @param endpoints     RocketMQ 端点
-     * @param topic         主题
-     * @param tag           标签
-     * @param body          消息体
-     * @param properties    消息属性
-     * @return 发送回执
+     * @param config  RocketMQ 连接配置(必须包含 endpoints 和 producerGroup)
+     * @param topic   主题名称
+     * @param tag     标签
+     * @param messages 待发送的消息列表
+     * @return Result<List<?>> 发送结果
      */
-    public static SendReceipt sendSync(String producerGroup, String endpoints,
-                                       String topic, String tag, byte[] body, Map<String, String> properties)
-            throws ClientException {
-        Producer producer = getOrCreateProducer(producerGroup, endpoints);
-
-        MessageBuilder builder = PROVIDER.newMessageBuilder()
-                .setTopic(topic)
-                .setTag(tag)
-                .setBody(body);
-
-        if (properties != null) {
-            properties.forEach(builder::addProperty);
+    private static Result<List<?>> batchSendMessage(Map<String, String> config,
+                                                    String topic,
+                                                    String tag,
+                                                    List<?> messages) {
+        Producer producer = null;
+        List<String> failList = new ArrayList<>();
+        try {
+            // 校验配置参数有效性
+            Result<String> validateResult = validateConfig(config, topic, tag, messages);
+            if (!validateResult.isSuccess()) {
+                return Result.fail(validateResult.getError());
+            }
+            // 获取生产者
+            producer = getProducer(config);
+            if (producer == null) {
+                return Result.fail("获取生产者失败");
+            }
+            for (Object messageObj : messages) {
+                try {
+                    byte[] body = DataFormatConverter.convertObjectToString(messageObj).getBytes(); // 将消息对象转换为字节数组
+                    SendReceipt receipt = producer.send(buildMessage(topic, tag, body));
+                    logger.info("消息发送成功: {}", receipt.getMessageId());
+                } catch (ClientException e) {
+                    logger.error("发送消息失败", e);
+                    failList.add(messageObj.toString());
+                }
+            }
+            return Result.success(failList);
+        } catch (Exception e) {
+            logger.error("RocketMQ 批量发送异常", e);
+            return Result.fail("RocketMQ 批量发送异常: " + e.getMessage());
+        } finally {
+            if (producer != null) {
+                try {
+                    producer.close();
+                } catch (Exception e) {
+                    logger.error("关闭生产者失败", e);
+                }
+            }
         }
-
-        Message message = builder.build();
-        return producer.send(message);
     }
 
     /**
-     * 异步发送消息
+     * 启动消费者
      *
-     * @param producerGroup 生产者组
-     * @param endpoints     RocketMQ 端点
-     * @param topic         主题
-     * @param tag           标签
-     * @param body          消息体
-     * @param properties    消息属性
-     * @return CompletableFuture 发送结果
+     * @param config          配置信息,包含端点、消费者组等
+     * @param topic           主题名称
+     * @param tag             标签
+     * @param messageHandler  消息处理函数
+     * @return Result<String> 操作结果
      */
-    public static CompletableFuture<SendReceipt> sendAsync(String producerGroup, String endpoints,
-                                                           String topic, String tag, byte[] body, Map<String, String> properties) {
-        return CompletableFuture.supplyAsync(() -> {
-            try {
-                return sendSync(producerGroup, endpoints, topic, tag, body, properties);
-            } catch (ClientException e) {
-                throw new RocketMQException("Async send failed", e);
+    public static Result<String> startConsumer(Map<String, String> config,
+                                               String topic,
+                                               String tag,
+                                               Consumer<MessageView> messageHandler) {
+        try {
+            // 校验配置参数有效性
+            Result<String> validateResult = validateConfig(config, topic, tag, null);
+            if (!validateResult.isSuccess()) {
+                return Result.fail(validateResult.getError());
             }
-        }, ASYNC_EXECUTOR);
+            // 获取消费者
+            PushConsumer consumer = getConsumer(config, topic, tag, messageHandler);
+            if (consumer == null) {
+                return Result.fail("获取消费者失败");
+            }
+            return Result.success("消费者启动成功");
+        } catch (Exception e) {
+            logger.error("启动消费者失败", e);
+            return Result.fail("启动消费者失败: " + e.getMessage());
+        }
     }
 
     /**
-     * 发送单向消息(不关心发送结果)
+     * 获取或创建生产者
      *
-     * @param producerGroup 生产者组
-     * @param endpoints     RocketMQ 端点
-     * @param topic         主题
-     * @param tag           标签
-     * @param body          消息体
-     * @param properties    消息属性
+     * @param config RocketMQ 配置信息
+     * @return Producer 生产者实例
      */
-    public static void sendOneway(String producerGroup, String endpoints,
-                                  String topic, String tag, byte[] body, Map<String, String> properties) {
-        ASYNC_EXECUTOR.execute(() -> {
+    private static Producer getProducer(Map<String, String> config) {
+        String cacheKey = generateProducerKey(config);
+        // 尝试从缓存中获取已存在的生产者
+        Producer cached = producerCache.getIfPresent(cacheKey);
+        if (cached != null) {
+            return cached;
+        }
+        // 使用双重检查锁定确保线程安全
+        synchronized (producerLock) {
+            // 再次检查缓存避免重复创建
+            cached = producerCache.getIfPresent(cacheKey);
+            if (cached != null) {
+                return cached;
+            }
             try {
-                sendSync(producerGroup, endpoints, topic, tag, body, properties);
+                ClientConfiguration clientConfig = defaultClientConfig(config.get("endpoints"));
+                Producer producer = PROVIDER.newProducerBuilder()
+                        .setClientConfiguration(clientConfig)
+                        .build();
+                producerCache.put(cacheKey, producer);
+                return producer;
             } catch (ClientException e) {
-                // 记录日志但不抛出异常
-                logger.error("无确认机制消息发送异常: " + e.getMessage());
+                logger.error("创建生产者失败", e);
+                return null;
             }
-        });
+        }
     }
 
     /**
-     * 发送事务消息
+     * 获取或创建消费者
      *
-     * @param producerGroup 生产者组
-     * @param endpoints     RocketMQ 端点
-     * @param topic         主题
-     * @param tag           标签
-     * @param body          消息体
-     * @param properties    消息属性
-     * @param checker       事务检查器
-     * @return 发送回执
+     * @param config          RocketMQ 配置信息
+     * @param topic           主题名称
+     * @param tag             标签
+     * @param messageHandler  消息处理函数
+     * @return PushConsumer 消费者实例
      */
-    public static SendReceipt sendTransaction(String producerGroup, String endpoints,
-                                              String topic, String tag, byte[] body, Map<String, String> properties,
-                                              TransactionChecker checker) throws ClientException {
-        Producer producer = getOrCreateProducer(producerGroup, endpoints);
-
-        MessageBuilder builder = PROVIDER.newMessageBuilder()
-                .setTopic(topic)
-                .setTag(tag)
-                .setBody(body);
-
-        if (properties != null) {
-            properties.forEach(builder::addProperty);
+    private static PushConsumer getConsumer(Map<String, String> config,
+                                            String topic,
+                                            String tag,
+                                            Consumer<MessageView> messageHandler) throws ClientException {
+        String cacheKey = generateConsumerKey(config, topic, tag);
+        // 尝试从缓存中获取已存在的消费者
+        PushConsumer cached = consumerCache.getIfPresent(cacheKey);
+        if (cached != null) {
+            return cached;
         }
-
-        Message message = builder.build();
-
-        // 开始事务
-        Transaction tx = producer.beginTransaction();
-        try {
-            SendReceipt receipt = producer.send(message, tx);
-            // 执行本地事务逻辑 (通过checker实现)
-            TransactionResolution resolution = checker.check(message);
-
-            if (resolution == TransactionResolution.COMMIT) {
-                tx.commit();
-            } else {
-                tx.rollback();
+        synchronized (consumerLock) {
+            // 再次检查缓存避免重复创建
+            cached = consumerCache.getIfPresent(cacheKey);
+            if (cached != null) {
+                return cached;
             }
-            return receipt;
-        } catch (Exception e) {
-            tx.rollback();
-            throw new RocketMQException("事务消息失败", e);
+            ClientConfiguration clientConfig = defaultClientConfig(config.get("endpoints"));
+            FilterExpression filter = new FilterExpression(tag, FilterExpressionType.TAG);
+            PushConsumer consumer = PROVIDER.newPushConsumerBuilder()
+                    .setClientConfiguration(clientConfig)
+                    .setConsumerGroup(config.get("consumerGroup"))
+                    .setSubscriptionExpressions(Map.of(topic, filter))
+                    .setMessageListener(messageView -> {
+                        messageHandler.accept(messageView);
+                        return org.apache.rocketmq.client.apis.consumer.ConsumeResult.SUCCESS;
+                    })
+                    .build();
+            // 不需要显式调用start(),PushConsumerBuilder.build()方法会自动启动消费者
+            consumerCache.put(cacheKey, consumer);
+            return consumer;
         }
     }
 
     /**
-     * 启动消费者
+     * 配置验证
      *
-     * @param consumerGroup  消费者组
-     * @param endpoints      RocketMQ 端点
-     * @param topic          主题
-     * @param tag            标签
-     * @param messageHandler 消息处理函数
+     * @param config      RocketMQ 配置信息
+     * @param topic       主题名称
+     * @param tag         标签
+     * @param messages    待发送的消息列表
+     * @return Result<String> 验证结果
      */
-    public static void startConsumer(String consumerGroup, String endpoints,
-                                     String topic, String tag, Consumer<MessageView> messageHandler)
-            throws ClientException {
-        getOrCreateConsumer(consumerGroup, endpoints, topic, tag, messageHandler);
+    private static Result<String> validateConfig(Map<String, String> config,
+                                                 String topic,
+                                                 String tag,
+                                                 List<?> messages) {
+        if (messages != null && messages.isEmpty()) {
+            return Result.fail("消息列表为空");
+        }
+        if (topic == null || topic.isEmpty()) {
+            return Result.fail("主题名称不能为空");
+        }
+        if (tag == null || tag.isEmpty()) {
+            return Result.fail("标签不能为空");
+        }
+        if (config == null || !config.containsKey("endpoints") || config.get("endpoints").isEmpty()) {
+            return Result.fail("配置中缺少 endpoints");
+        }
+        if (!config.containsKey("producerGroup") || config.get("producerGroup").isEmpty()) {
+            return Result.fail("配置中缺少 producerGroup");
+        }
+        return Result.success("验证通过");
     }
 
     /**
-     * 关闭资源
+     * 构建消息
+     *
+     * @param topic   主题名称
+     * @param tag     标签
+     * @param body    消息体
+     * @return 构建好的消息
      */
-    @Override
-    public void close() {
-        // 关闭所有生产者
-        PRODUCER_MAP.forEach((group, producer) -> {
-            try {
-                producer.close();
-            } catch (Exception e) {
-                logger.error("生产者关闭失败 " + group + ": " + e.getMessage());
-            }
-        });
-        PRODUCER_MAP.clear();
-
-        // 关闭所有消费者
-        CONSUMER_MAP.forEach((key, consumer) -> {
-            try {
-                consumer.close();
-            } catch (Exception e) {
-                logger.error("消费者关闭失败 " + key + ": " + e.getMessage());
-            }
-        });
-        CONSUMER_MAP.clear();
-
-        // 关闭线程池
-        ASYNC_EXECUTOR.shutdown();
+    private static Message buildMessage(String topic,
+                                        String tag,
+                                        byte[] body) {
+        return PROVIDER.newMessageBuilder()
+                .setTopic(topic)
+                .setTag(tag)
+                .setBody(body)
+                .build();
     }
 
     /**
-     * 自定义 RocketMQ 异常
+     * 生成生产者缓存键
+     *
+     * @param config RocketMQ 配置信息
+     * @return 缓存键
      */
-    public static class RocketMQException extends RuntimeException {
-        public RocketMQException(String message) {
-            super(message);
-        }
+    private static String generateProducerKey(Map<String, String> config) {
+        return String.join(":", config.get("endpoints"), config.get("producerGroup"));
+    }
 
-        public RocketMQException(String message, Throwable cause) {
-            super(message, cause);
-        }
+    /**
+     * 生成消费者缓存键
+     *
+     * @param config RocketMQ 配置信息
+     * @param topic  主题名称
+     * @param tag    标签
+     * @return 缓存键
+     */
+    private static String generateConsumerKey(Map<String, String> config,
+                                              String topic,
+                                              String tag) {
+        return String.join(":", config.get("endpoints"), config.get("consumerGroup"), topic, tag);
     }
 
     /**
-     * 事务检查器接口
+     * 默认客户端配置
+     *
+     * @param endpoints RocketMQ 端点
+     * @return 客户端配置
      */
-    @FunctionalInterface
-    public interface TransactionChecker {
-        TransactionResolution check(Message message);
+    private static ClientConfiguration defaultClientConfig(String endpoints) {
+        return ClientConfiguration.newBuilder()
+                .setEndpoints(endpoints)
+                .setRequestTimeout(java.time.Duration.ofSeconds(3))
+                .build();
     }
 }

+ 2 - 4
src/main/java/com/bfkj/unidia/Result.java

@@ -1,10 +1,8 @@
 package com.bfkj.unidia;
 
 import lombok.Getter;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
+
+import java.util.*;
 import java.util.function.Function;
 
 /**

+ 54 - 0
src/main/java/com/bfkj/unidia/cacheUtils/CacheUtil.java

@@ -0,0 +1,54 @@
+package com.bfkj.unidia.cacheUtils;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+
+public class CacheUtil {
+    private static final Logger logger = LoggerFactory.getLogger(CacheUtil.class);
+
+    // 缓存配置
+    private static final int MAX_CACHE_SIZE = 50;
+    private static final Duration CACHE_EXPIRE_TIME = Duration.ofMinutes(16);
+
+    /**
+     * 构建并配置一个Caffeine缓存实例。
+     *
+     * <p>该方法设置缓存的最大条目数、访问后过期时间,并注册移除监听器以处理缓存项被移除时的清理操作。
+     * 缓存配置参数来源于静态常量,适用于通用缓存场景。
+     *
+     * @param <K> 缓存键的类型
+     * @param <V> 缓存值的类型
+     * @return 配置好的Caffeine缓存实例
+     * @see Caffeine#newBuilder()
+     * @see #MAX_CACHE_SIZE
+     * @see #CACHE_EXPIRE_TIME
+     */
+    public static <K, V> Cache<K, V> buildCaffeineCache() {
+        return Caffeine.newBuilder()
+                .maximumSize(MAX_CACHE_SIZE)
+                .expireAfterAccess(CACHE_EXPIRE_TIME)
+                .removalListener((key, value, cause) -> closeAutoCloseable(value, key))
+                .build();
+    }
+
+    /**
+     * 关闭指定的AutoCloseable资源并记录操作日志。
+     *
+     * @param value 需要关闭的资源对象,若未实现AutoCloseable接口则忽略
+     * @param key   标识当前缓存资源的键值,用于日志上下文记录
+     */
+    private static void closeAutoCloseable(Object value, Object key) {
+        if (value instanceof AutoCloseable closeable) {
+            try {
+                closeable.close();
+                logger.info("自动关闭{}缓存: {}", "缓存资源", key);
+            } catch (Exception e) {
+                logger.error("关闭{}缓存失败", "缓存资源", e);
+            }
+        }
+    }
+}