andy 9 months ago
parent
commit
1d3980f9ce

+ 522 - 509
mainFactory/src/main/java/org/bfkj/application/DataProcess.java

@@ -1,530 +1,543 @@
 package org.bfkj.application;
 
-
 import org.bfkj.config.AppConfig;
 import org.bfkj.config.ObjectMap;
-import org.bfkj.utils.*;
+import org.bfkj.utils.LogUtils;
+import org.bfkj.utils.MapTools;
+import org.bfkj.utils.MyDbHelper;
+import org.bfkj.utils.ScriptEnginePro;
 
 import java.util.*;
 import java.util.stream.Collectors;
 
 public class DataProcess {//数据处理对象
-    private String serviceId;//作为实例化服务对象的Key
-    private long lastActive; //服务最后活跃时间
-    private String errorMessage = null;  //当前数据输入处理对象不可用信息
-    private MyDbHelper baseDbHelper = null;//数据底座的数据库服务对象:捆绑到服务实例上,避免并发导致线程不安全
-
-    private Map<String, Integer> serviceErrorCount = new HashMap<>();   //服务错误次数:针对脚本异常
-
-    private List<Map<String, Object>> calcList;//缓存计算库配置信息
-    private Map<String, ScriptEnginePro> ScriptEngineProMaps = new HashMap<>();//用于缓存当前服务的算法引擎对象
-    private Map<String, MyDbHelper> calcDbHelperMaps = new HashMap<>();//算法的数据库服务对象:捆绑到服务实例上,避免并发导致线程不安全:多个数据处理服务对象操作同一个对象
-
-    private List<String> serviceAuthMap = new ArrayList<>();//缓存当前服务的安全等级,对应各应用
-
-    private Set<Integer> enableLogCalculationLibrary = new HashSet<>();
-
-    public DataProcess(String service_Id) {//初始化构造,实例化一个服务对象
-        lastActive = System.currentTimeMillis();//默认最后活跃时间
-        serviceId = service_Id;
-        try {
-            baseDbHelper = ObjectMap.getordropMyDbHelper(AppConfig.REMOTE_DB_CONNECT);
-            if (Objects.nonNull(baseDbHelper.getErrorMessage())) {
-                errorMessage = "获取底座myDbHelper对象异常: ".concat(baseDbHelper.getErrorMessage());
-                return;
-            }
-            //算法信息
-            Map<String, Object> calculationMap = baseDbHelper.queryByParamsReturnList("SELECT CL.*,DI.*  FROM calculation_library CL  left JOIN datasourceinfo DI ON DI.dataSourceID = CL.datasource_id WHERE CL.service_id  =? and  CL.library_type is not null order by library_sort,library_id", service_Id);//直接数据库查询
-
-            calcList = Objects.isNull(calculationMap.get("returnData")) ? null : (List<Map<String, Object>>) calculationMap.get("returnData");
-            if (!calculationMap.get("code").equals("0") || calcList == null || calcList.isEmpty()) {//查询数据库失败
-                errorMessage = "查询".concat(serviceId).concat("的算法失败:").concat(calculationMap.containsKey("message") ? calculationMap.get("message").toString() : "对应算法不存在");
-                return;
-            }
-            //获取当前服务的安全等级
-            Map<String, Object> serviceAuth = baseDbHelper.queryByParamsReturnList("SELECT app_id FROM appService WHERE serviceID = ?", service_Id);//直接数据库查询
-            if (!serviceAuth.get("code").equals("0")) {//查询数据库失败
-                errorMessage = "获取".concat(service_Id).concat("的应用安全等级出错:").concat(serviceAuth.get("message").toString());
-                return;
-            }
-            Object serviceAuthList = serviceAuth.get("returnData");//获取服务应用等级   List<Map>    key:字段名,value
-            if (!Objects.isNull(serviceAuthList)) {//将ListMap合并为一个Map,方便使用 // map:{1,appid},{2,appidd}
-                serviceAuthMap = ((List<Map<String, Object>>) serviceAuthList).stream().map(map -> map.get("app_id").toString()).toList();
-            }
-            List<Map<String, Object>> returnData = (List<Map<String, Object>>) calculationMap.getOrDefault("returnData", Collections.EMPTY_LIST);
+	private String serviceId;//作为实例化服务对象的Key
+	private long lastActive; //服务最后活跃时间
+	private String errorMessage = null;  //当前数据输入处理对象不可用信息
+	private MyDbHelper baseDbHelper = null;//数据底座的数据库服务对象:捆绑到服务实例上,避免并发导致线程不安全
+
+	private Map<String, Integer> serviceErrorCount = new HashMap<>();   //服务错误次数:针对脚本异常
+
+	private List<Map<String, Object>> calcList;//缓存计算库配置信息
+	private Map<String, ScriptEnginePro> ScriptEngineProMaps = new HashMap<>();//用于缓存当前服务的算法引擎对象
+	private Map<String, MyDbHelper> calcDbHelperMaps = new HashMap<>();//算法的数据库服务对象:捆绑到服务实例上,避免并发导致线程不安全:多个数据处理服务对象操作同一个对象
+
+	private List<String> serviceAuthMap = new ArrayList<>();//缓存当前服务的安全等级,对应各应用
+
+	private Set<Integer> enableLogCalculationLibrary = new HashSet<>();
+
+	public DataProcess(String service_Id) {//初始化构造,实例化一个服务对象
+		lastActive = System.currentTimeMillis();//默认最后活跃时间
+		serviceId = service_Id;
+		try {
+			baseDbHelper = ObjectMap.getordropMyDbHelper(AppConfig.REMOTE_DB_CONNECT);
+			if ( Objects.nonNull(baseDbHelper.getErrorMessage()) ) {
+				errorMessage = "获取底座myDbHelper对象异常: ".concat(baseDbHelper.getErrorMessage());
+				return;
+			}
+			//算法信息
+			Map<String, Object> calculationMap = baseDbHelper.queryByParamsReturnList("SELECT CL.*,DI.*  FROM calculation_library CL  left JOIN datasourceinfo DI ON DI.dataSourceID = CL.datasource_id WHERE CL.service_id  =? and  CL.library_type is not null order by library_sort,library_id", service_Id);//直接数据库查询
+
+			calcList = Objects.isNull(calculationMap.get("returnData")) ? null : (List<Map<String, Object>>) calculationMap.get("returnData");
+			if ( ! calculationMap.get("code").equals("0") || calcList == null || calcList.isEmpty() ) {//查询数据库失败
+				errorMessage = "查询".concat(serviceId).concat("的算法失败:").concat(calculationMap.containsKey("message") ? calculationMap.get("message").toString() : "对应算法不存在");
+				return;
+			}
+			//获取当前服务的安全等级
+			Map<String, Object> serviceAuth = baseDbHelper.queryByParamsReturnList("SELECT app_id FROM appService WHERE serviceID = ?", service_Id);//直接数据库查询
+			if ( ! serviceAuth.get("code").equals("0") ) {//查询数据库失败
+				errorMessage = "获取".concat(service_Id).concat("的应用安全等级出错:").concat(serviceAuth.get("message").toString());
+				return;
+			}
+			Object serviceAuthList = serviceAuth.get("returnData");//获取服务应用等级   List<Map>    key:字段名,value
+			if ( ! Objects.isNull(serviceAuthList) ) {//将ListMap合并为一个Map,方便使用 // map:{1,appid},{2,appidd}
+				serviceAuthMap = ((List<Map<String, Object>>) serviceAuthList).stream().map(map -> map.get("app_id").toString()).toList();
+			}
+			List<Map<String, Object>> returnData = (List<Map<String, Object>>) calculationMap.getOrDefault("returnData", Collections.EMPTY_LIST);
 //            过滤出开启日志的算法ID
-            List<Integer> enabled = returnData.stream()
-                    .filter(it -> Objects.nonNull(it.get("processType")) && "1".equals(it.get("processType").toString()))
-                    .map(it -> ((Integer) it.get("library_id"))).toList();
-            enableLogCalculationLibrary.addAll(enabled);
-        } catch (Exception e) {
-            errorMessage = "数据处理对象初始化异常: ".concat(serviceId).concat(";异常信息:").concat(LogUtils.getException(e));
-        }
-    }
-
-    //数据处理服务对象关闭时,关闭缓存的数据库对象以及脚本引擎对象
-    public void close() {//销毁当前服务对象时,需要将缓存的算法引擎实例同步销毁
-        try {
-            for (ScriptEnginePro scriptEnginePro : ScriptEngineProMaps.values()) {//脚本引擎
-                scriptEnginePro.close();//关闭动态方法实例、缓存等
-            }
-            ScriptEngineProMaps.clear();//清除缓存
-            for (MyDbHelper calcMyDbHelper : calcDbHelperMaps.values()) {//数据库对象
-                calcMyDbHelper.close();//关闭数据库连接池
-            }
-            calcDbHelperMaps.clear();//清除缓存
+			List<Integer> enabled = returnData.stream()
+			                                  .filter(it -> Objects.nonNull(it.get("processType")) && "1".equals(it.get("processType").toString()))
+			                                  .map(it -> ((Integer) it.get("library_id"))).toList();
+			enableLogCalculationLibrary.addAll(enabled);
+		} catch ( Exception e ) {
+			errorMessage = "数据处理对象初始化异常: ".concat(serviceId).concat(";异常信息:").concat(LogUtils.getException(e));
+		}
+	}
+
+	//数据处理服务对象关闭时,关闭缓存的数据库对象以及脚本引擎对象
+	public void close() {//销毁当前服务对象时,需要将缓存的算法引擎实例同步销毁
+		try {
+			for ( ScriptEnginePro scriptEnginePro : ScriptEngineProMaps.values() ) {//脚本引擎
+				scriptEnginePro.close();//关闭动态方法实例、缓存等
+			}
+			ScriptEngineProMaps.clear();//清除缓存
+			for ( MyDbHelper calcMyDbHelper : calcDbHelperMaps.values() ) {//数据库对象
+				calcMyDbHelper.close();//关闭数据库连接池
+			}
+			calcDbHelperMaps.clear();//清除缓存
 //            baseDbHelper.close();//底座数据库对象
-        } catch (Exception e) {
-            System.out.println("dataProcess 关闭异常: " + LogUtils.getException(e));
-        }
-
-    }
-
-    //统一成功信息处理
-    public Map<String, Object> processSuccess(Object returnData) {
-        if (returnData instanceof Map) {
-            Map<String, Object> resultData1 = new HashMap<>();
-            try {
-                resultData1 = (Map<String, Object>) returnData;
-            } catch (Exception e) {
-                resultData1.put("returnData", returnData);
-            }
-            resultData1.put("code", "0");
-            return resultData1;
-        }
-        Map<String, Object> returnMap = new HashMap<>();//用于当前方法统一返回参数,不存在深拷贝问题
-        returnMap.put("code", "0");
-        returnMap.put("returnData", returnData);
-        return returnMap;
-    }
-
-    //统一错误信息处理
-    public Map<String, Object> processFail(String errorMessage, String libraryId) {
-        Map<String, Object> returnMap = new HashMap<>();//用于当前方法统一返回参数,不存在深拷贝问题
-        returnMap.put("code", "-1");
-        returnMap.put("message", errorMessage);
-        if (!MapTools.isBlank(libraryId)) {
-            returnMap.put("library_id", libraryId);
-        }
-        return returnMap;
-    }
-
-    //数据处理对外接口
-    public Map<String, Object> processData(Map<String, Object> inputData, String... user_id) { //{serviceid:1,datacontent:[{key:value},{key:value}],event:0,page:1,pagesize:10}
-        try {
-
-            if (!MapTools.isBlank(errorMessage)) {//如果当前服务存在问题,代表数据库对象不可用,此时应该重构当前对象,调用方控制
-                LogUtils.log("processData:1", "-1", null, "服务不可用".concat(errorMessage), serviceId, inputData, null, null, null);
-                return processFail("服务不可用".concat(errorMessage), null);
-            }
-            if (System.currentTimeMillis() - lastActive > 2000) {  //        更新数据库中的服务最新活跃时间
-                lastActive = System.currentTimeMillis();//更新最后活跃时间  //------更新当前服务的最后活跃时间,用于服务监控
-                baseDbHelper.updateByCondition("update serviceinfo set runState = ? ,lastactive = ? where  serviceID =?", null, "1", new Date(lastActive), serviceId); // 服务表增加最后活跃时间
-            }
-            //创建生命周期ID;//默认继承
-            if (!inputData.containsKey("dataObjectId")) {
-                inputData.put("dataObjectId", createLifeCycleCol(Long.valueOf(AppConfig.WORK_ID), Integer.parseInt(serviceId))); // 生成新的生命周期ID
-            }
-            String dataObjectId = inputData.get("dataObjectId").toString();
-
-            inputData = authCheck(inputData, user_id); /*权限检查,添加列权限以及行权限到inputData中*/
-            if ("-1".equals(inputData.get("code")) || (Objects.nonNull(inputData.get("message")) && inputData.get("message").toString().equals("服务未授任何数据权限"))) {
-                return inputData;
-            }
-            List<Map<String, Object>> calcData = new ArrayList<>();//定义算法全量结果列表
-            calcData.add(inputData);//默认入口参数为第0个算法的结果
-            Map<String, Object> execResult = execCalultion(calcData, null, dataObjectId);//执行所有的算法库
-            String library_id = execResult.get("library_id").toString();//获取最后一个算法的编号
-            List<Map<String, Object>> preListData = (List<Map<String, Object>>) execResult.get("preData");//获取所有的前置检测结果
-            calcData = (List<Map<String, Object>>) execResult.get("calcData");//所有的算法结果
-            //依据算法编号规则确定最后一个算法结果,是前置还是算法
+		} catch ( Exception e ) {
+			System.out.println("dataProcess 关闭异常: " + LogUtils.getException(e));
+		}
+
+	}
+
+	//统一成功信息处理
+	public Map<String, Object> processSuccess(Object returnData) {
+		if ( returnData instanceof Map ) {
+			Map<String, Object> resultData1 = new HashMap<>();
+			try {
+				resultData1 = (Map<String, Object>) returnData;
+			} catch ( Exception e ) {
+				resultData1.put("returnData", returnData);
+			}
+			resultData1.put("code", "0");
+			return resultData1;
+		}
+		Map<String, Object> returnMap = new HashMap<>();//用于当前方法统一返回参数,不存在深拷贝问题
+		returnMap.put("code", "0");
+		returnMap.put("returnData", returnData);
+		return returnMap;
+	}
+
+	//统一错误信息处理
+	public Map<String, Object> processFail(String errorMessage, String libraryId) {
+		Map<String, Object> returnMap = new HashMap<>();//用于当前方法统一返回参数,不存在深拷贝问题
+		returnMap.put("code", "-1");
+		returnMap.put("message", errorMessage);
+		if ( ! MapTools.isBlank(libraryId) ) {
+			returnMap.put("library_id", libraryId);
+		}
+		return returnMap;
+	}
+
+	//数据处理对外接口
+	public Map<String, Object> processData(Map<String, Object> inputData, String... user_id) { //{serviceid:1,datacontent:[{key:value},{key:value}],event:0,page:1,pagesize:10}
+		try {
+
+			if ( ! MapTools.isBlank(errorMessage) ) {//如果当前服务存在问题,代表数据库对象不可用,此时应该重构当前对象,调用方控制
+				LogUtils.log("processData:1", "-1", null, "服务不可用".concat(errorMessage), serviceId, inputData, null, null, null);
+				return processFail("服务不可用".concat(errorMessage), null);
+			}
+			if ( System.currentTimeMillis() - lastActive > 2000 ) {  //        更新数据库中的服务最新活跃时间
+				lastActive = System.currentTimeMillis();//更新最后活跃时间  //------更新当前服务的最后活跃时间,用于服务监控
+				baseDbHelper.updateByCondition("update serviceinfo set runState = ? ,lastactive = ? where  serviceID =?", null, "1", new Date(lastActive), serviceId); // 服务表增加最后活跃时间
+			}
+			//创建生命周期ID;//默认继承
+			if ( ! inputData.containsKey("dataObjectId") ) {
+				inputData.put("dataObjectId", createLifeCycleCol(Long.valueOf(AppConfig.WORK_ID), Integer.parseInt(serviceId))); // 生成新的生命周期ID
+			}
+			String dataObjectId = inputData.get("dataObjectId").toString();
+
+			inputData = authCheck(inputData, user_id); /*权限检查,添加列权限以及行权限到inputData中*/
+			if ( "-1".equals(inputData.get("code")) || (Objects.nonNull(inputData.get("message")) && inputData.get("message").toString().equals("服务未授任何数据权限")) ) {
+				return inputData;
+			}
+			List<Map<String, Object>> calcData = new ArrayList<>();//定义算法全量结果列表
+			calcData.add(inputData);//默认入口参数为第0个算法的结果
+			Map<String, Object> execResult = execCalultion(calcData, null, dataObjectId);//执行所有的算法库
+			String library_id = execResult.get("library_id").toString();//获取最后一个算法的编号
+			List<Map<String, Object>> preListData = (List<Map<String, Object>>) execResult.get("preData");//获取所有的前置检测结果
+			calcData = (List<Map<String, Object>>) execResult.get("calcData");//所有的算法结果
+			//依据算法编号规则确定最后一个算法结果,是前置还是算法
 
 //         preListData = preListData.stream().filter(Objects::isNull).collect(Collectors.toList());
 
-
-            Map<String, Object> lastResult = library_id.endsWith("N") ? preListData.get(preListData.size() - 1) : calcData.get(calcData.size() - 1);//获取最后一个计算结果
-            if (Objects.equals(lastResult.get("code"), "-1")) {//最后一个计算结果为-1则记录异常,否则全部记录正常
-                Object message = lastResult.get("message");
-                LogUtils.log("processData:3", "-1", library_id, Objects.nonNull(message) ? message.toString() : null, serviceId, calcData, dataObjectId, preListData, inputData.get("event"));
-                return processFail(library_id.concat("数据计算错误:").concat(lastResult.get("message").toString()), library_id);
-            }
-            //写入 成功日志
-            List<Map<String, Object>> logData = calcData;
-            logData = logData.stream()
-                    .filter(Objects::nonNull)
-                    .filter(it ->
-                            {
+			Map<String, Object> lastResult = library_id.endsWith("N") ? preListData.get(preListData.size() - 1) : calcData.get(calcData.size() - 1);//获取最后一个计算结果
+			if ( Objects.equals(lastResult.get("code"), "-1") ) {//最后一个计算结果为-1则记录异常,否则全部记录正常
+				Object message = lastResult.get("message");
+				LogUtils.log("processData:3", "-1", library_id, Objects.nonNull(message) ? message.toString() : null, serviceId, calcData, dataObjectId, preListData, inputData.get("event"));
+				return processFail(library_id.concat("数据计算错误:").concat(lastResult.get("message").toString()), library_id);
+			}
+			//写入 成功日志
+			List<Map<String, Object>> logData = calcData;
+			logData = logData.stream()
+			                 .filter(Objects::nonNull)
+			                 .filter(it ->
+			                         {
 //                              过滤出需要写入的日志
-                                Object libraryId = it.get("library_id");
-                                return Objects.isNull(libraryId) || enableLogCalculationLibrary.contains(Integer.parseInt(libraryId.toString()));
-                            }
-                    ).collect(Collectors.toList());
+				                         Object libraryId = it.get("library_id");
+				                         return Objects.isNull(libraryId) || enableLogCalculationLibrary.contains(Integer.parseInt(libraryId.toString()));
+			                         }
+			                 ).collect(Collectors.toList());
 //            可能还保留一个索引为0的日志 若果只有这一条日志的情况则清空日志
-            if (logData.size() == 1 && "0".equals(logData.get(0).getOrDefault("library_id", 0).toString())) {
-                logData.clear();
-            }
+			if ( logData.size() == 1 && "0".equals(logData.get(0).getOrDefault("library_id", 0).toString()) ) {
+				logData.clear();
+			}
 //            如果日志不为空则写入日志
-            if (!logData.isEmpty()) {
-                LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, logData, dataObjectId, preListData, inputData.get("event"));  //此处无法把所有的记过集返回过去
-            }
-            calcData = calcData.stream().filter(Objects::nonNull).collect(Collectors.toList());
-            return processSuccess(calcData.get(calcData.size() - 1)); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
-        } catch (Exception e) {
-            return processFail("processData 异常: " + LogUtils.getException(e), null);
-        }
-
-    }
-
-    /*权限检查*/
-    public Map<String, Object> authCheck(Map<String, Object> inputData, String... user_id) {
-        try {
-            Object appid = inputData.get("appid");//获取应用编号
-            if (Objects.nonNull(appid) && !serviceAuthMap.contains(appid.toString())) {//上传了应用编号但是不在当前服务授权的应用列表中则返回错误
-                return processFail("服务未授权", null);
-            }
-            /*在构造函数中获取超级用户名单,方便此处的判定*/
-            if (Objects.nonNull(user_id) && inputData.containsKey("authId") && !"1,9999".contains(user_id[0])) {//用户编号不为空,权限编号不为空,不是超级用户
-                Map<String, Object> userDataAuthMap = baseDbHelper.queryByParamsReturnList("SELECT QCS.columnName,TUA.row_auth FROM t_user_group_auth  TUA LEFT JOIN querytemplatecolumnset QCS on TUA.queryTemplateColumnSetID = QCS.queryTemplateColumnSetID WHERE TUA.user_id = ?  AND TUA.auth_id = ?  AND QCS.columnName is not null AND TUA.queryTemplateColumnSetID IS NOT NULL", user_id[0], inputData.get("authId"));
-                if (userDataAuthMap.get("code").equals("-1") || Objects.isNull(userDataAuthMap.get("returnData")) || ((List<Map<String, Object>>) userDataAuthMap.get("returnData")).isEmpty()) {
-                    if (userDataAuthMap.get("code").equals("0")) {
-                        userDataAuthMap.put("message", "服务未授任何数据权限");
-                    }
-                    return userDataAuthMap;
-                }
-                List<Map<String, Object>> rowAuth = new ArrayList<>();//初始化行权限
-                List<String> authColumn = new ArrayList<>();//初始化列权限
-                List<Map<String, Object>> dataAuth = (List<Map<String, Object>>) userDataAuthMap.get("returnData");
-                dataAuth.forEach(dataAuthMap -> {//循环获取到的权限
-                    String columnName = dataAuthMap.get("columnName").toString();//字段名
-                    authColumn.add(columnName);//添加到列权限
-                    Object rowAuthObj = dataAuthMap.get("row_auth");//获取对应列的行权限
-                    if (Objects.nonNull(rowAuthObj)) {//如果存在则组建标准的限制条件
-
-                        /*获取规格*/
-                        Map<String, Object> authExtendMapList = baseDbHelper.queryByParamsReturnList("select nextRule from belowSet where  auth_id =?", inputData.get("authId"));
-                        if ("-1".equals(authExtendMapList.get("code"))) {
-                            System.out.println("权限集成执行异常".concat(authExtendMapList.get("message").toString()));
-                            return;
-                        }
-                        Object nextRule = authExtendMapList.get("returnData") instanceof List<?> tempList && !tempList.isEmpty() && tempList.get(0) instanceof Map tempMap ? tempMap.get("nextRule") : null;  // t_user_group.up_user_groupid
-                        String tempRowAuth = rowAuthObj.toString();
-                        String connect = tempRowAuth.startsWith("!") ? "!=" : "=";//如果首位是!代表不等于,否则代表等于
-                        tempRowAuth = tempRowAuth.startsWith("!") ? tempRowAuth.substring(1) : tempRowAuth;//修订行权限表达式
-                        String[] row_auths = tempRowAuth.split(",", -1); //按,号进行分组!PEK,
-                        for (String row_auth : row_auths) {//循环进行标准化
-                            if (MapTools.isBlank(row_auth)) continue;//为空不设置--误操作填写了空格或者删除未设置为NULL
-                            Map<String, Object> signRowAuth = new HashMap<>();
-                            signRowAuth.put(columnName, row_auth);
-                            rowAuth.addAll(baseDbHelper.changeSignFilter(signRowAuth, connect, connect.equals("!=") ? "and" : "or"));
-                            //如果row_auth开始左括号,且结束是右口号则------PEK,CTU,(t_user_group.up_user_groupid)
-                            if (Objects.nonNull(nextRule)) {
-                                List<Object> childAuthList = extendNextAuth(columnName, row_auth, nextRule); // [4,5]
-                                if (!childAuthList.isEmpty()) {
-                                    childAuthList.forEach(authid -> {
-                                        Map<String, Object> tpSignRowAuth = new HashMap<>();
-                                        tpSignRowAuth.put(columnName, authid);
-                                        rowAuth.addAll(baseDbHelper.changeSignFilter(tpSignRowAuth, connect, connect.equals("!=") ? "and" : "or"));
-                                    });
-                                }
-                            }
-                        }
-                    }
-                });
-                if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
-                    rowAuth.get(0).put("left", "(".concat(rowAuth.get(0).get("left").toString()));
-                    rowAuth.get(rowAuth.size() - 1).put("connector", "  and ");
-                    rowAuth.get(rowAuth.size() - 1).put("right", " ) ".concat(rowAuth.get(rowAuth.size() - 1).get("right").toString()));
-                    inputData.put("rowAuth", rowAuth);//添加到入参,方便后面调用
-                }
-                inputData.put("authColumn", authColumn);//添加到入参,方便后面调用
-            }
-            inputData.put("currentUser", user_id);
-        } catch (Exception e) {
-            System.out.println("authCheck 异常:" + LogUtils.getException(e));
-        }
-        return inputData;
-    }
-
-    /**
-     * @param columnName ID
-     * @param row_auth   2
-     * @param nextRule   t_user_group.up_user_groupid
-     * @return
-     */
-    private List<Object> extendNextAuth(String columnName, String row_auth, Object nextRule) {
-        List<Object> returnData = new ArrayList<>();
-        String[] rule = nextRule.toString().split("\\.", -1);
-        Map<String, Object> queryAuthReturnList = baseDbHelper.queryByParamsReturnList("select " + columnName + " from " + rule[0] + " where " + rule[1] + " = ?", row_auth);
-        if ("-1".equals(queryAuthReturnList.get("code")) || ((List) queryAuthReturnList.get("returnData")).isEmpty()) {
-            return returnData;
-        }
-        ((List<Map<String, Object>>) queryAuthReturnList.get("returnData")).stream().filter(Objects::nonNull).forEach(authvalue -> {
-            returnData.add(authvalue.get(columnName)); // 5
-            returnData.addAll(extendNextAuth(columnName, authvalue.get(columnName).toString(), nextRule));
-        });
-        return returnData;
-    }
-
-    /*执行算法*/
-    public Map<String, Object> execCalultion(List<Map<String, Object>> inData, String beginLibraryId, String dataObjectId) {
-        Map<String, Object> returnData = new HashMap<>();//初始化最终返回结果
-        List<Map<String, Object>> preData = new ArrayList<>();//初始化前置检测结果列表
-        String lastLibraryId = "";//初始化最后一个算法的编号
-        List<Map<String, Object>> calcData = Objects.isNull(inData) || inData.isEmpty() ? new ArrayList<>() : inData;//默认算法全量计算结果等于入口参数
-        try {
-            Map<String, Object> preCalMap = new HashMap<>();//用于组建前置检测算法配置
-            // 获取当前时间搓
-            long beginTime = System.currentTimeMillis();
-            for (Map<String, Object> calculationLibrary : calcList) {//循环算法库
-                lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
-                if (MapTools.isNotBlank(beginLibraryId) && !beginLibraryId.equals(lastLibraryId)) {
-                    continue;//如果异常恢复开始算法编号不为空且不等于当前循环的算法编号则跳过
-                }
-                beginLibraryId = null;//为避免异常恢复开始后的后续算法编号不等于当前循环的算法编号的跳过
-                Object is_exec = calculationLibrary.get("is_exec");//获取前置检测表达式
-                if (Objects.nonNull(is_exec) && !MapTools.isBlank(is_exec.toString())) {//存在则进行前置检测
-                    lastLibraryId = lastLibraryId.concat("_N");//修订算法编号,避免与正式算法冲突
-                    preCalMap.put("library_id", lastLibraryId);//组建前置算法配置信息
-                    preCalMap.put("library_type", 2);/*如果脚本包含function则js否则java*/
-                    preCalMap.put("computing_expression", is_exec); //组建前置算法配置信息
-                    Map<String, Object> preEnginResult = execEngine(lastLibraryId, preCalMap, calcData);//调用算法引擎进行执行
-                    preEnginResult.put("preExecTime", System.currentTimeMillis() - beginTime);
-                    beginTime = System.currentTimeMillis();
-                    // 获取当前时间搓
-                    preData.add(preEnginResult);//添加执行结果到前置检测结果列表
-                    setServiceErrorCount(lastLibraryId, preEnginResult.get("code").toString());//依据返回的code进行连续错误计数和重置
-                    if ("-1".equals(preEnginResult.get("code")) || Objects.equals("2", preEnginResult.get("returnData"))) {
-                        break; //如果code为-1 则表示脚本计算错误 退出计算;如果code为2 则后续算法都不执行
-                    }
-                    if (Objects.equals("1", preEnginResult.get("returnData"))) { //前置检查不通过,当前算法不执行,后续 还需执行
-                        calcData.add(null);//自动添加算法空结果,避免数据订阅时的序号出现错误
-                        continue;
-                    }
-                    lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
-                }
-                //依据算法类型调用数据库对象或脚本引擎对象进行算法执行
-                Map<String, Object> currentResult = calculationLibrary.get("library_type").toString().equals("3") ? execDB(lastLibraryId, calculationLibrary, inData, dataObjectId) : execEngine(lastLibraryId, calculationLibrary, inData);
-                currentResult.put("execTime", System.currentTimeMillis() - beginTime);
-                beginTime = System.currentTimeMillis();
-
-                setServiceErrorCount(lastLibraryId, Objects.nonNull(currentResult) && currentResult.containsKey("code") ? currentResult.get("code").toString() : "0");//依据返回的code进行连续错误计数和重置
-                calcData.add(currentResult);//添加到全量算法结果中
-                if (Objects.nonNull(currentResult) && !currentResult.get("code").equals("0")) {//算法执行异常则退出
-                    break;
-                } else {
-                    currentResult.put("library_id", lastLibraryId);
-                }
-            }
-
-        } catch (Exception e) {
-            System.out.println("execCalultion: 算法执行异常: " + LogUtils.getException(e));
-        }
-        returnData.put("library_id", lastLibraryId);
-        returnData.put("preData", preData);
-        returnData.put("calcData", calcData);
-        return returnData;
-
-    }
-
-    //脚本引擎执行
-    private Map<String, Object> execEngine(String library_id, Map<String, Object> currentCalMap, List<Map<String, Object>> calcAllData) {
-        try {
-            if (!ScriptEngineProMaps.containsKey(library_id)) {//不存在缓存则创建
-                ScriptEngineProMaps.put(library_id, new ScriptEnginePro(currentCalMap));
-            }
-            ScriptEnginePro currentEngin = ScriptEngineProMaps.get(library_id);//依据算法编号获取算法引擎
-            String errMessage = "";
-            if (Objects.nonNull(currentEngin) && Objects.nonNull(currentEngin.getErrorMessage())) {//获取引擎出错则进行关闭
-                errMessage = currentEngin.getErrorMessage();
-                currentEngin.close();
-                currentEngin = null;
-            }
-            if (Objects.isNull(currentEngin)) {//未获取到引擎则返回异常
-                ScriptEngineProMaps.remove(library_id);
-                return processFail("创建引擎失败".concat(errMessage), library_id);
-            }
-            Map<String, String> parmaNames = currentEngin.getParmaNames();//获取对应脚本订阅的变量列表/*依据当前算法 数据订阅规则 进行数据的提取*/
-            Map<String, Object> paramValue = new HashMap<>();
-            for (String key : parmaNames.keySet()) {//理论上此处最多单个key是list[],如果存在多个list[]则应开启迪卡乘积
-                Object currentData = dataSubscription(calcAllData, key, currentCalMap);//获取数据
-                paramValue.put(currentCalMap.get("library_type").toString().equals("2") ? parmaNames.get(key) : key, currentData);
-            }
-            Map<String, Object> stringObjectMap = currentEngin.execScript(paramValue);
-            return stringObjectMap;
-        } catch (Exception e) {
-            return processFail("创建引擎失败".concat(LogUtils.getException(e)), library_id);
-        }
-    }
-
-    //数据库算法执行
-    private Map<String, Object> execDB(String library_id, Map<String, Object> calculationLibrary, List<Map<String, Object>> calcData, String dataObjectId) {
-        try {
-            Object connectConfigObj = calculationLibrary.get("connectConfig");
-            if (Objects.isNull(connectConfigObj)) {
-                return processFail("连接信息未配置 ", library_id);
-            }
-            if (!calcDbHelperMaps.containsKey(library_id)) {//不存在缓存则创建
-                calcDbHelperMaps.put(library_id, new MyDbHelper(connectConfigObj.toString()));
-            }
-            MyDbHelper myDbHelper = calcDbHelperMaps.get(library_id);//依据算法编号获取数据库处理对象
-            String errMessage = "";
-            if (Objects.nonNull(myDbHelper) && MapTools.isNotBlank(errMessage)) {//获取出现错误则进行关闭
-                errMessage = myDbHelper.getErrorMessage();
-                myDbHelper.close();
-                myDbHelper = null;
-            }
-            if (Objects.isNull(myDbHelper)) {//未获取到则返回异常
-                calcDbHelperMaps.remove(library_id);
-                return processFail("获取业务数据库对象失败".concat(errMessage), library_id);
-            }
-            //未指定订阅则默认为上一个算法的结果:
-            String paramIndex = Objects.isNull(calculationLibrary.get("parmIndex")) ? (String.valueOf(calcData.size() - 1)).concat(calcData.size() == 1 ? ".dataContent" : ".returnData") : calculationLibrary.get("parmIndex").toString();
-            Object tempDBPrams = dataSubscription(calcData, "List.".concat(paramIndex), calculationLibrary);//获取数据订阅结果
-            //如果表名和SQL同时不为空且事件为7则,分解为先查询,后更新方式,可以有效使用预编译,性能会更高
-            //事件7的select应该是固定为书名号方式,而不是动态组建方式,因为要跟insert或update对应,如果是动态条件则不使用事件7而是采用算法分离方式进行
-            if ("7".equals(calculationLibrary.get("event")) && !Objects.isNull(calculationLibrary.get("tableName")) && !Objects.isNull(calculationLibrary.get("computing_expression"))) {
-                Map<String, Object> tempCalcInfo = calculationLibrary;
-                String tableName = tempCalcInfo.get("tableName").toString();//先记录表名
-                String sqlStr = tempCalcInfo.get("computing_expression").toString();//记录SQL语句,方便获取查询条件,用于组建事件6的filter
-                tempCalcInfo.put("tableName", "");//清空表名,方便执行SQL
-                tempDBPrams = myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//先执行sql获取结果后
-                //从myDbHelper获取sqlStrVarList.get(sqlStr)可以获取到查询条件,用于组建事件6的filter
-                //获取tempDBPrams,循环returnData,添加filter,否则无法执行事件6
-                tempCalcInfo.put("tableName", tableName);//还原表名
-                tempCalcInfo.put("computing_expression", "");//清除SQL
-                tempCalcInfo.put("event", "6");//执行新增或更新
-                return myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//调用表名方式执行事件
-            }
-            return myDbHelper.generalProcess(calculationLibrary, tempDBPrams, dataObjectId, calcData.get(0));
-        } catch (Exception e) {
-            return processFail("数据库执行失败".concat(LogUtils.getException(e)), library_id);
-        }
-    }
-
-    /*数据订阅:注意因前置导致算法未执行时,全量结果集的序号会存在问题*/
-    private Object dataSubscription(List<Map<String, Object>> calcAllData, String paramRule, Map<String, Object> calculationLibrary) { // List.1.returnData.0
-        try {
-            String[] itemRule = paramRule.split("\\.", -1);//订阅规则按.进行分割
-            String dataType = itemRule.length > 0 ? itemRule[0] : "List"; //首位是最终返回的数据类型
-            dataType = dataType.endsWith("]") ? dataType.substring(0, dataType.indexOf("[")) : dataType;//订阅时需要把[]去掉,在调用脚本引擎时进行分解
-            String dataLocation = itemRule.length > 1 ? itemRule[1] : ""; //数据位置:代表从哪个算法结果中进行取值
-            if (dataLocation.equals("T")) {//固定参数 //JSON.T.{"BSM":""}----String.T.代表"",List.T.代表[],Map.T.代表{},任意类型.T代表null
-                Object tempObj = itemRule.length > 2 ? MapTools.isBlank(itemRule[2]) ? ("List".equals(dataType) ? new ArrayList<>() : ("JSON,Map".contains(dataType) ? new HashMap<>() : null)) : itemRule[2] : null;
-                if ("Boolean".equals(dataType)) {
-                    tempObj = itemRule[2].equals("true");
-                }
-                return tempObj;
-
-            }
-            //如果是L则从算法配置信息中获取,否则从全量计算结果中获取
-            Object returnData = dataLocation.equals("L") ? calculationLibrary : MapTools.isNumber(dataLocation) ? calcAllData.get(Integer.parseInt(dataLocation)) : null; // String.L.connectConfig.username
-            for (int index = 2; index < itemRule.length; index++) {
-                if (Objects.isNull(returnData)) return null;
-                if (returnData instanceof String) {//如果获取的是字符串则自动转MAP--XML字符串也自动转Map
-                    returnData = MapTools.strToObj(returnData.toString());//  1.  简单字符串 , 2. map字符串 3. List字符串
-                }
-                if (MapTools.isNumber(itemRule[index])) {//数字代表从当前参数中取第N位,如果当前参数并不是List应该返回全部不应该返回空
-                    //pr
-                    int tempIndex = Integer.parseInt(itemRule[index]);
-                    returnData = returnData instanceof List<?> tempList && tempList.size() > tempIndex ? tempList.get(tempIndex) : returnData;
-                } else {//不是数字代表从当前参数中取对应的键,如果当前参数是List应该返回List中首个对应的键值
-                    if (returnData instanceof Map) {//是MAP则直接获取对应Key的值
-                        returnData = itemRule[index].equals("returnData") && Objects.isNull(((Map<?, ?>) returnData).get("returnData")) ? ((Map<?, ?>) returnData).get("dataContent") : ((Map<?, ?>) returnData).get(itemRule[index]);
-                    } else {//简单的错误兼容,尤其是某些时候数据可能是单条可能是数组时
-                        if (returnData instanceof List && ((List<?>) returnData).size() > 0 && ((List<?>) returnData).get(0) instanceof Map) {//如果是数组则且有下层
-                            returnData = ((Map<?, ?>) ((List<?>) returnData).get(0)).get(itemRule[index]);
-                        }
-                    }
-                }
-            }
-            if (Objects.nonNull(returnData)) {//获取到了订阅的数据则进行数据格式转换
-                if ("List,Map".contains(dataType) && returnData instanceof String) {//订阅是数组或对象则字符串转对象
-                    returnData = MapTools.strToObj(returnData.toString());
-                }
-                if ("List".equals(dataType) && !(returnData instanceof List<?>)) {//订阅是数组但是实际不是数组则添加到数组
-                    List<Object> tempList = new ArrayList<>();
-                    tempList.add(returnData);
-                    returnData = tempList;
-                }
-                if ("Array".contains(dataType)) {
-                    if (returnData instanceof List<?> tempList && !tempList.isEmpty()) {
-                        List<Object> tpList = new ArrayList<>();
-                        for (Object o : tempList) {
-                            if (o instanceof Map<?, ?>) {
-                                tpList.add(MapTools.objToJSONStr(o));
-                            } else if (o instanceof String tpStr && tpStr.contains("=") && tpStr.contains("{") && tpStr.contains("}")) {
-                                tpList.add(MapTools.objToJSONStr(MapTools.strToObj(tpStr)));
-                            } else {
-                                tpList.add(o);
-                            }
-                        }
-                        returnData = tpList;
-                    }
-                    if (returnData instanceof Map<?, ?> tpMap) {
-                        returnData = MapTools.objToJSONStr(tpMap);
-                    }
-                    if (returnData instanceof String tpStr && tpStr.contains("=") && tpStr.contains("{") && tpStr.contains("}")) {
-                        returnData = MapTools.objToJSONStr(MapTools.strToObj(tpStr));
-                    }
-                }
+			if ( ! logData.isEmpty() ) {
+				LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, logData, dataObjectId, preListData, inputData.get("event"));  //此处无法把所有的记过集返回过去
+			}
+			calcData = calcData.stream().filter(Objects::nonNull).collect(Collectors.toList());
+			return processSuccess(calcData.get(calcData.size() - 1)); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
+		} catch ( Exception e ) {
+			return processFail("processData 异常: " + LogUtils.getException(e), null);
+		}
+
+	}
+
+	/*权限检查*/
+	public Map<String, Object> authCheck(Map<String, Object> inputData, String... user_id) {
+		try {
+			Object appid = inputData.get("appid");//获取应用编号
+			if ( Objects.nonNull(appid) && ! serviceAuthMap.contains(appid.toString()) ) {//上传了应用编号但是不在当前服务授权的应用列表中则返回错误
+				return processFail("服务未授权", null);
+			}
+			/*在构造函数中获取超级用户名单,方便此处的判定*/
+			if ( Objects.nonNull(user_id) && inputData.containsKey("authId") && ! "1,9999".contains(user_id[0]) ) {//用户编号不为空,权限编号不为空,不是超级用户
+				Map<String, Object> userDataAuthMap = baseDbHelper.queryByParamsReturnList("SELECT QCS.columnName,TUA.row_auth FROM t_user_group_auth  TUA LEFT JOIN querytemplatecolumnset QCS on TUA.queryTemplateColumnSetID = QCS.queryTemplateColumnSetID WHERE TUA.user_id = ?  AND TUA.auth_id = ?  AND QCS.columnName is not null AND TUA.queryTemplateColumnSetID IS NOT NULL", user_id[0], inputData.get("authId"));
+				if ( userDataAuthMap.get("code").equals("-1") || Objects.isNull(userDataAuthMap.get("returnData")) || ((List<Map<String, Object>>) userDataAuthMap.get("returnData")).isEmpty() ) {
+					if ( userDataAuthMap.get("code").equals("0") ) {
+						userDataAuthMap.put("message", "服务未授任何数据权限");
+					}
+					return userDataAuthMap;
+				}
+				List<Map<String, Object>> rowAuth = new ArrayList<>();//初始化行权限
+				List<String> authColumn = new ArrayList<>();//初始化列权限
+				List<Map<String, Object>> dataAuth = (List<Map<String, Object>>) userDataAuthMap.get("returnData");
+				dataAuth.forEach(dataAuthMap -> {//循环获取到的权限
+					String columnName = dataAuthMap.get("columnName").toString();//字段名
+					authColumn.add(columnName);//添加到列权限
+					Object rowAuthObj = dataAuthMap.get("row_auth");//获取对应列的行权限
+					if ( Objects.nonNull(rowAuthObj) ) {//如果存在则组建标准的限制条件
+
+						/*获取规格*/
+						Map<String, Object> authExtendMapList = baseDbHelper.queryByParamsReturnList("select nextRule from belowSet where  auth_id =?", inputData.get("authId"));
+						if ( "-1".equals(authExtendMapList.get("code")) ) {
+							System.out.println("权限集成执行异常".concat(authExtendMapList.get("message").toString()));
+							return;
+						}
+						Object nextRule = authExtendMapList.get("returnData") instanceof List<?> tempList && ! tempList.isEmpty() && tempList.get(0) instanceof Map tempMap ? tempMap.get("nextRule") : null;  // t_user_group.up_user_groupid
+						String tempRowAuth = rowAuthObj.toString();
+						String connect = tempRowAuth.startsWith("!") ? "!=" : "=";//如果首位是!代表不等于,否则代表等于
+						tempRowAuth = tempRowAuth.startsWith("!") ? tempRowAuth.substring(1) : tempRowAuth;//修订行权限表达式
+						String[] row_auths = tempRowAuth.split(",", - 1); //按,号进行分组!PEK,
+						for ( String row_auth : row_auths ) {//循环进行标准化
+							if ( MapTools.isBlank(row_auth) ) continue;//为空不设置--误操作填写了空格或者删除未设置为NULL
+							Map<String, Object> signRowAuth = new HashMap<>();
+							signRowAuth.put(columnName, row_auth);
+							rowAuth.addAll(baseDbHelper.changeSignFilter(signRowAuth, connect, connect.equals("!=") ? "and" : "or"));
+							//如果row_auth开始左括号,且结束是右口号则------PEK,CTU,(t_user_group.up_user_groupid)
+							if ( Objects.nonNull(nextRule) ) {
+								List<Object> childAuthList = extendNextAuth(columnName, row_auth, nextRule); // [4,5]
+								if ( ! childAuthList.isEmpty() ) {
+									childAuthList.forEach(authid -> {
+										Map<String, Object> tpSignRowAuth = new HashMap<>();
+										tpSignRowAuth.put(columnName, authid);
+										rowAuth.addAll(baseDbHelper.changeSignFilter(tpSignRowAuth, connect, connect.equals("!=") ? "and" : "or"));
+									});
+								}
+							}
+						}
+					}
+				});
+				if ( rowAuth.size() > 0 ) {//存在行权限则进行括号包裹
+					rowAuth.get(0).put("left", "(".concat(rowAuth.get(0).get("left").toString()));
+					rowAuth.get(rowAuth.size() - 1).put("connector", "  and ");
+					rowAuth.get(rowAuth.size() - 1).put("right", " ) ".concat(rowAuth.get(rowAuth.size() - 1).get("right").toString()));
+					inputData.put("rowAuth", rowAuth);//添加到入参,方便后面调用
+				}
+				inputData.put("authColumn", authColumn);//添加到入参,方便后面调用
+			}
+			inputData.put("currentUser", user_id);
+		} catch ( Exception e ) {
+			System.out.println("authCheck 异常:" + LogUtils.getException(e));
+		}
+		return inputData;
+	}
+
+	/**
+	 * @param columnName ID
+	 * @param row_auth   2
+	 * @param nextRule   t_user_group.up_user_groupid
+	 *
+	 * @return
+	 */
+	private List<Object> extendNextAuth(String columnName, String row_auth, Object nextRule) {
+		List<Object> returnData = new ArrayList<>();
+		String[] rule = nextRule.toString().split("\\.", - 1);
+		Map<String, Object> queryAuthReturnList = baseDbHelper.queryByParamsReturnList("select " + columnName + " from " + rule[0] + " where " + rule[1] + " = ?", row_auth);
+		if ( "-1".equals(queryAuthReturnList.get("code")) || ((List) queryAuthReturnList.get("returnData")).isEmpty() ) {
+			return returnData;
+		}
+		((List<Map<String, Object>>) queryAuthReturnList.get("returnData")).stream().filter(Objects::nonNull).forEach(authvalue -> {
+			returnData.add(authvalue.get(columnName)); // 5
+			returnData.addAll(extendNextAuth(columnName, authvalue.get(columnName).toString(), nextRule));
+		});
+		return returnData;
+	}
+
+	/*执行算法*/
+	public Map<String, Object> execCalultion(List<Map<String, Object>> inData, String beginLibraryId, String dataObjectId) {
+		Map<String, Object> returnData = new HashMap<>();//初始化最终返回结果
+		List<Map<String, Object>> preData = new ArrayList<>();//初始化前置检测结果列表
+		String lastLibraryId = "";//初始化最后一个算法的编号
+		List<Map<String, Object>> calcData = Objects.isNull(inData) || inData.isEmpty() ? new ArrayList<>() : inData;//默认算法全量计算结果等于入口参数
+		try {
+			Map<String, Object> preCalMap = new HashMap<>();//用于组建前置检测算法配置
+			// 获取当前时间搓
+			long beginTime = System.currentTimeMillis();
+			for ( Map<String, Object> calculationLibrary : calcList ) {//循环算法库
+				lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
+				if ( MapTools.isNotBlank(beginLibraryId) && ! beginLibraryId.equals(lastLibraryId) ) {
+					continue;//如果异常恢复开始算法编号不为空且不等于当前循环的算法编号则跳过
+				}
+				beginLibraryId = null;//为避免异常恢复开始后的后续算法编号不等于当前循环的算法编号的跳过
+				Object is_exec = calculationLibrary.get("is_exec");//获取前置检测表达式
+				if ( Objects.nonNull(is_exec) && ! MapTools.isBlank(is_exec.toString()) ) {//存在则进行前置检测
+					lastLibraryId = lastLibraryId.concat("_N");//修订算法编号,避免与正式算法冲突
+					preCalMap.put("library_id", lastLibraryId);//组建前置算法配置信息
+					preCalMap.put("library_type", 2);/*如果脚本包含function则js否则java*/
+					preCalMap.put("computing_expression", is_exec); //组建前置算法配置信息
+					Map<String, Object> preEnginResult = execEngine(lastLibraryId, preCalMap, calcData);//调用算法引擎进行执行
+					preEnginResult.put("preExecTime", System.currentTimeMillis() - beginTime);
+					beginTime = System.currentTimeMillis();
+					// 获取当前时间搓
+					preData.add(preEnginResult);//添加执行结果到前置检测结果列表
+					setServiceErrorCount(lastLibraryId, preEnginResult.get("code").toString());//依据返回的code进行连续错误计数和重置
+					if ( "-1".equals(preEnginResult.get("code")) || Objects.equals("2", preEnginResult.get("returnData")) ) {
+						break; //如果code为-1 则表示脚本计算错误 退出计算;如果code为2 则后续算法都不执行
+					}
+					if ( Objects.equals("1", preEnginResult.get("returnData")) ) { //前置检查不通过,当前算法不执行,后续 还需执行
+						calcData.add(null);//自动添加算法空结果,避免数据订阅时的序号出现错误
+						continue;
+					}
+					lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
+				}
+				//依据算法类型调用数据库对象或脚本引擎对象进行算法执行
+				Map<String, Object> currentResult = calculationLibrary.get("library_type").toString().equals("3") ? execDB(lastLibraryId, calculationLibrary, inData, dataObjectId) : execEngine(lastLibraryId, calculationLibrary, inData);
+				currentResult.put("execTime", System.currentTimeMillis() - beginTime);
+				beginTime = System.currentTimeMillis();
+
+				setServiceErrorCount(lastLibraryId, Objects.nonNull(currentResult) && currentResult.containsKey("code") ? currentResult.get("code").toString() : "0");//依据返回的code进行连续错误计数和重置
+				calcData.add(currentResult);//添加到全量算法结果中
+				if ( Objects.nonNull(currentResult) && ! currentResult.get("code").equals("0") ) {//算法执行异常则退出
+					break;
+				} else {
+					currentResult.put("library_id", lastLibraryId);
+				}
+			}
+
+		} catch ( Exception e ) {
+			System.out.println("execCalultion: 算法执行异常: " + LogUtils.getException(e));
+		}
+		returnData.put("library_id", lastLibraryId);
+		returnData.put("preData", preData);
+		returnData.put("calcData", calcData);
+		return returnData;
+
+	}
+
+	//脚本引擎执行
+	private Map<String, Object> execEngine(String library_id, Map<String, Object> currentCalMap, List<Map<String, Object>> calcAllData) {
+		try {
+			//不存在缓存则创建
+			if ( ! ScriptEngineProMaps.containsKey(library_id) ) {
+				ScriptEngineProMaps.put(library_id, new ScriptEnginePro(currentCalMap));
+			}
+			//依据算法编号获取算法引擎
+			ScriptEnginePro currentEngin = ScriptEngineProMaps.get(library_id);
+			String errMessage = "";
+			//获取引擎出错则进行关闭
+			if ( Objects.nonNull(currentEngin) && Objects.nonNull(currentEngin.getErrorMessage()) ) {
+				errMessage = currentEngin.getErrorMessage();
+				currentEngin.close();
+				currentEngin = null;
+			}
+			//未获取到引擎则返回异常
+			if ( Objects.isNull(currentEngin) ) {
+				ScriptEngineProMaps.remove(library_id);
+				return processFail("创建引擎失败".concat(errMessage), library_id);
+			}
+			//获取对应脚本订阅的变量列表/*依据当前算法 数据订阅规则 进行数据的提取*/
+			Map<String, String> parmaNames = currentEngin.getParmaNames();
+			Map<String, Object> paramValue = new HashMap<>();
+			//理论上此处最多单个key是list[],如果存在多个list[]则应开启迪卡乘积
+			for ( String key : parmaNames.keySet() ) {
+				//获取数据
+				Object currentData = dataSubscription(calcAllData, key, currentCalMap);
+				paramValue.put(currentCalMap.get("library_type").toString().equals("2") ? parmaNames.get(key) : key, currentData);
+			}
+			Map<String, Object> stringObjectMap = currentEngin.execScript(paramValue);
+			return stringObjectMap;
+		} catch ( Exception e ) {
+			return processFail("创建引擎失败".concat(LogUtils.getException(e)), library_id);
+		}
+	}
+
+	//数据库算法执行
+	private Map<String, Object> execDB(String library_id, Map<String, Object> calculationLibrary, List<Map<String, Object>> calcData, String dataObjectId) {
+		try {
+			Object connectConfigObj = calculationLibrary.get("connectConfig");
+			if ( Objects.isNull(connectConfigObj) ) {
+				return processFail("连接信息未配置 ", library_id);
+			}
+			//不存在缓存则创建
+			if ( ! calcDbHelperMaps.containsKey(library_id) ) {
+				calcDbHelperMaps.put(library_id, new MyDbHelper(connectConfigObj.toString()));
+			}
+			//依据算法编号获取数据库处理对象
+			MyDbHelper myDbHelper = calcDbHelperMaps.get(library_id);
+			String errMessage = "";
+			//获取出现错误则进行关闭
+			if ( Objects.nonNull(myDbHelper) && MapTools.isNotBlank(errMessage) ) {
+				errMessage = myDbHelper.getErrorMessage();
+				myDbHelper.close();
+				myDbHelper = null;
+			}
+			//未获取到则返回异常
+			if ( Objects.isNull(myDbHelper) ) {
+				calcDbHelperMaps.remove(library_id);
+				return processFail("获取业务数据库对象失败".concat(errMessage), library_id);
+			}
+			//未指定订阅则默认为上一个算法的结果:
+			String paramIndex = Objects.isNull(calculationLibrary.get("parmIndex")) ? (String.valueOf(calcData.size() - 1)).concat(calcData.size() == 1 ? ".dataContent" : ".returnData") : calculationLibrary.get("parmIndex").toString();
+			Object tempDBPrams = dataSubscription(calcData, "List.".concat(paramIndex), calculationLibrary);//获取数据订阅结果
+			//如果表名和SQL同时不为空且事件为7则,分解为先查询,后更新方式,可以有效使用预编译,性能会更高
+			//事件7的select应该是固定为书名号方式,而不是动态组建方式,因为要跟insert或update对应,如果是动态条件则不使用事件7而是采用算法分离方式进行
+			if ( "7".equals(calculationLibrary.get("event")) && ! Objects.isNull(calculationLibrary.get("tableName")) && ! Objects.isNull(calculationLibrary.get("computing_expression")) ) {
+				Map<String, Object> tempCalcInfo = calculationLibrary;
+				String tableName = tempCalcInfo.get("tableName").toString();//先记录表名
+				String sqlStr = tempCalcInfo.get("computing_expression").toString();//记录SQL语句,方便获取查询条件,用于组建事件6的filter
+				tempCalcInfo.put("tableName", "");//清空表名,方便执行SQL
+				tempDBPrams = myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//先执行sql获取结果后
+				//从myDbHelper获取sqlStrVarList.get(sqlStr)可以获取到查询条件,用于组建事件6的filter
+				//获取tempDBPrams,循环returnData,添加filter,否则无法执行事件6
+				tempCalcInfo.put("tableName", tableName);//还原表名
+				tempCalcInfo.put("computing_expression", "");//清除SQL
+				tempCalcInfo.put("event", "6");//执行新增或更新
+				return myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//调用表名方式执行事件
+			}
+			return myDbHelper.generalProcess(calculationLibrary, tempDBPrams, dataObjectId, calcData.get(0));
+		} catch ( Exception e ) {
+			return processFail("数据库执行失败".concat(LogUtils.getException(e)), library_id);
+		}
+	}
+
+	/*数据订阅:注意因前置导致算法未执行时,全量结果集的序号会存在问题*/
+	private Object dataSubscription(List<Map<String, Object>> calcAllData, String paramRule, Map<String, Object> calculationLibrary) { // List.1.returnData.0
+		try {
+			String[] itemRule = paramRule.split("\\.", - 1);//订阅规则按.进行分割
+			String dataType = itemRule.length > 0 ? itemRule[0] : "List"; //首位是最终返回的数据类型
+			dataType = dataType.endsWith("]") ? dataType.substring(0, dataType.indexOf("[")) : dataType;//订阅时需要把[]去掉,在调用脚本引擎时进行分解
+			String dataLocation = itemRule.length > 1 ? itemRule[1] : ""; //数据位置:代表从哪个算法结果中进行取值
+			if ( dataLocation.equals("T") ) {//固定参数 //JSON.T.{"BSM":""}----String.T.代表"",List.T.代表[],Map.T.代表{},任意类型.T代表null
+				Object tempObj = itemRule.length > 2 ? MapTools.isBlank(itemRule[2]) ? ("List".equals(dataType) ? new ArrayList<>() : ("JSON,Map".contains(dataType) ? new HashMap<>() : null)) : itemRule[2] : null;
+				if ( "Boolean".equals(dataType) ) {
+					tempObj = itemRule[2].equals("true");
+				}
+				return tempObj;
+
+			}
+			//如果是L则从算法配置信息中获取,否则从全量计算结果中获取
+			Object returnData = dataLocation.equals("L") ? calculationLibrary : MapTools.isNumber(dataLocation) ? calcAllData.get(Integer.parseInt(dataLocation)) : null; // String.L.connectConfig.username
+			for ( int index = 2; index < itemRule.length; index++ ) {
+				if ( Objects.isNull(returnData) ) return null;
+				if ( returnData instanceof String ) {//如果获取的是字符串则自动转MAP--XML字符串也自动转Map
+					returnData = MapTools.strToObj(returnData.toString());//  1.  简单字符串 , 2. map字符串 3. List字符串
+				}
+				if ( MapTools.isNumber(itemRule[index]) ) {//数字代表从当前参数中取第N位,如果当前参数并不是List应该返回全部不应该返回空
+					//pr
+					int tempIndex = Integer.parseInt(itemRule[index]);
+					returnData = returnData instanceof List<?> tempList && tempList.size() > tempIndex ? tempList.get(tempIndex) : returnData;
+				} else {//不是数字代表从当前参数中取对应的键,如果当前参数是List应该返回List中首个对应的键值
+					if ( returnData instanceof Map ) {//是MAP则直接获取对应Key的值
+						returnData = itemRule[index].equals("returnData") && Objects.isNull(((Map<?, ?>) returnData).get("returnData")) ? ((Map<?, ?>) returnData).get("dataContent") : ((Map<?, ?>) returnData).get(itemRule[index]);
+					} else {//简单的错误兼容,尤其是某些时候数据可能是单条可能是数组时
+						if ( returnData instanceof List && ((List<?>) returnData).size() > 0 && ((List<?>) returnData).get(0) instanceof Map ) {//如果是数组则且有下层
+							returnData = ((Map<?, ?>) ((List<?>) returnData).get(0)).get(itemRule[index]);
+						}
+					}
+				}
+			}
+			if ( Objects.nonNull(returnData) ) {//获取到了订阅的数据则进行数据格式转换
+				if ( "List,Map".contains(dataType) && returnData instanceof String ) {//订阅是数组或对象则字符串转对象
+					returnData = MapTools.strToObj(returnData.toString());
+				}
+				if ( "List".equals(dataType) && ! (returnData instanceof List<?>) ) {//订阅是数组但是实际不是数组则添加到数组
+					List<Object> tempList = new ArrayList<>();
+					tempList.add(returnData);
+					returnData = tempList;
+				}
+				if ( "Array".contains(dataType) ) {
+					if ( returnData instanceof List<?> tempList && ! tempList.isEmpty() ) {
+						List<Object> tpList = new ArrayList<>();
+						for ( Object o : tempList ) {
+							if ( o instanceof Map<?, ?> ) {
+								tpList.add(MapTools.objToJSONStr(o));
+							} else if ( o instanceof String tpStr && tpStr.contains("=") && tpStr.contains("{") && tpStr.contains("}") ) {
+								tpList.add(MapTools.objToJSONStr(MapTools.strToObj(tpStr)));
+							} else {
+								tpList.add(o);
+							}
+						}
+						returnData = tpList;
+					}
+					if ( returnData instanceof Map<?, ?> tpMap ) {
+						returnData = MapTools.objToJSONStr(tpMap);
+					}
+					if ( returnData instanceof String tpStr && tpStr.contains("=") && tpStr.contains("{") && tpStr.contains("}") ) {
+						returnData = MapTools.objToJSONStr(MapTools.strToObj(tpStr));
+					}
+				}
 //            if ("List".equals(dataType) && returnData instanceof List<?> && MapTools.isXMLList(returnData)) {//订阅是数组但是实际是XML字符串数组则自动转换为list<map>
 //                returnData = MapTools.XMLListToListMap(returnData);//减少JAVA的动态调用,目前主要就是航班报文,BSMBPM等订阅的是String,所以不会进入这里
 //            }
-                returnData = "Map".equals(dataType) && !(returnData instanceof Map<?, ?>) ? null//是不是不妥
-                        : ("String".equals(dataType) ? MapTools.objToJSONStr(returnData) : ("Boolean".equals(dataType) ? returnData.toString().equals("true") : ("JSONStr".equals(dataType) ? MapTools.objToJSONStr(returnData) : returnData)));
-            }
-            return returnData;
-        } catch (Exception e) {
-            System.out.println("数据订阅异常: ".concat(LogUtils.getException(e)));
-            return null;
-        }
-
-    }
-
-    /*算法连续错误次数判定*/
-    public void setServiceErrorCount(String library_id, String success) {
-        if (!success.equals("0")) {//如果是异常
-            Integer errCount = Objects.isNull(serviceErrorCount.get(library_id)) ? 1 : serviceErrorCount.get(library_id) + 1;//获取当前异常连续数
-            serviceErrorCount.put(library_id, errCount);//连续数加1
-            if (errCount > AppConfig.SERVICE_ERR_MAX) {//如果连续错误数超过设定值则停止当前服务
-                ServiceInputControl.stop(serviceId);
-            }
-        } else {
-            serviceErrorCount.put(library_id, 0);//如果是成功则重置连续错误数为0
-        }
-    }
-
-    private long sequence = 0L;
-    private long lastTimestamp = -1L;
-
-    public String createLifeCycleCol(Long workerId, Integer serviceId) {
-        long timestamp = System.currentTimeMillis();
-        //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
-        if (lastTimestamp == timestamp) { //如果是同一时间生成的,则进行毫秒内序列
-            sequence++;
-            if (sequence > 999L) {//毫秒内序列溢出
-                sequence = 0;
-                while (lastTimestamp == System.currentTimeMillis()) {//阻塞到下一个毫秒,获得新的时间戳
-                }
-                timestamp = System.currentTimeMillis();
-            }
-        } else {
-            sequence = 0L;
-        }
-        lastTimestamp = timestamp;//上次生成ID的时间截
-        //移位并通过或运算拼到一起组成64位的ID
-        return String.valueOf(timestamp).concat(String.format("%03d", sequence)).concat(String.format("%04d", workerId)).concat(String.format("%04d", serviceId));
-    }
-
-    public String getServiceId() {
-        return serviceId;
-    }
-
-    public String getErrorMessage() {
-        return errorMessage;
-    }
-
-    public long getLastActive() {
-        return lastActive;
-    }
+				returnData = "Map".equals(dataType) && ! (returnData instanceof Map<?, ?>) ? null//是不是不妥
+						: ("String".equals(dataType) ? MapTools.objToJSONStr(returnData) : ("Boolean".equals(dataType) ? returnData.toString().equals("true") : ("JSONStr".equals(dataType) ? MapTools.objToJSONStr(returnData) : returnData)));
+			}
+			return returnData;
+		} catch ( Exception e ) {
+			System.out.println("数据订阅异常: ".concat(LogUtils.getException(e)));
+			return null;
+		}
+
+	}
+
+	/*算法连续错误次数判定*/
+	public void setServiceErrorCount(String library_id, String success) {
+		if ( ! success.equals("0") ) {//如果是异常
+			Integer errCount = Objects.isNull(serviceErrorCount.get(library_id)) ? 1 : serviceErrorCount.get(library_id) + 1;//获取当前异常连续数
+			serviceErrorCount.put(library_id, errCount);//连续数加1
+			if ( errCount > AppConfig.SERVICE_ERR_MAX ) {//如果连续错误数超过设定值则停止当前服务
+				ServiceInputControl.stop(serviceId);
+			}
+		} else {
+			serviceErrorCount.put(library_id, 0);//如果是成功则重置连续错误数为0
+		}
+	}
+
+	private long sequence = 0L;
+	private long lastTimestamp = - 1L;
+
+	public String createLifeCycleCol(Long workerId, Integer serviceId) {
+		long timestamp = System.currentTimeMillis();
+		//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
+		if ( lastTimestamp == timestamp ) { //如果是同一时间生成的,则进行毫秒内序列
+			sequence++;
+			if ( sequence > 999L ) {//毫秒内序列溢出
+				sequence = 0;
+				while ( lastTimestamp == System.currentTimeMillis() ) {//阻塞到下一个毫秒,获得新的时间戳
+				}
+				timestamp = System.currentTimeMillis();
+			}
+		} else {
+			sequence = 0L;
+		}
+		lastTimestamp = timestamp;//上次生成ID的时间截
+		//移位并通过或运算拼到一起组成64位的ID
+		return String.valueOf(timestamp).concat(String.format("%03d", sequence)).concat(String.format("%04d", workerId)).concat(String.format("%04d", serviceId));
+	}
+
+	public String getServiceId() {
+		return serviceId;
+	}
+
+	public String getErrorMessage() {
+		return errorMessage;
+	}
+
+	public long getLastActive() {
+		return lastActive;
+	}
 }

+ 95 - 0
mainFactory/src/main/java/org/bfkj/utils/ScriptEnginePro.java

@@ -38,6 +38,101 @@ public class ScriptEnginePro {
 
     private long threadId = Thread.currentThread().getId();//当前引擎线程编号
 
+    public ScriptEnginePro() {
+    }
+
+    public void setJavaMethod(final Method javaMethod) {
+        this.javaMethod = javaMethod;
+    }
+
+    public void setClassInstance(final Object classInstance) {
+        this.classInstance = classInstance;
+    }
+
+    public void setScriptEngine(final ScriptEngine scriptEngine) {
+        this.scriptEngine = scriptEngine;
+    }
+
+    public void setLibrary_id(final String library_id) {
+        this.library_id = library_id;
+    }
+
+    public void setLibrary_type(final String library_type) {
+        this.library_type = library_type;
+    }
+
+    public void setParmaNames(final Map<String, String> parmaNames) {
+        this.parmaNames = parmaNames;
+    }
+
+    public void setComputing_expression(final String computing_expression) {
+        this.computing_expression = computing_expression;
+    }
+
+    public void setJavaParams(final List<Object> javaParams) {
+        this.javaParams = javaParams;
+    }
+
+    public void setThreadId(final long threadId) {
+        this.threadId = threadId;
+    }
+
+    public void setScriptCompile(final CompiledScript scriptCompile) {
+        this.scriptCompile = scriptCompile;
+    }
+
+    public void setJavaMethodClose(final Method javaMethodClose) {
+        this.javaMethodClose = javaMethodClose;
+    }
+
+    public void setErrorMessage(final String errorMessage) {
+        this.errorMessage = errorMessage;
+    }
+
+    public Map<Long, Bindings> getBindingMap() {
+        return bindingMap;
+    }
+
+    public Method getJavaMethod() {
+        return javaMethod;
+    }
+
+    public Method getJavaMethodClose() {
+        return javaMethodClose;
+    }
+
+    public Object getClassInstance() {
+        return classInstance;
+    }
+
+    public ScriptEngine getScriptEngine() {
+        return scriptEngine;
+    }
+
+    public CompiledScript getScriptCompile() {
+        return scriptCompile;
+    }
+
+    public String getLibrary_id() {
+        return library_id;
+    }
+
+    public String getLibrary_type() {
+        return library_type;
+    }
+
+    public String getComputing_expression() {
+        return computing_expression;
+    }
+
+    public List<Object> getJavaParams() {
+        return javaParams;
+    }
+
+    public long getThreadId() {
+        return threadId;
+    }
+
     public ScriptEnginePro(Map<String, Object> calcInfo) {//引擎构造函数
 
         try {

+ 50 - 0
mainFactory/src/main/resources/application-prod.yml

@@ -0,0 +1,50 @@
+server:
+  port: ${PORT:18501}
+  servlet:
+    encoding:
+      charset: UTF-8
+      force: true
+      enabled: true
+  codec:
+    max-in-memory-size: -1
+  application:
+    name: 18501_解析入库
+spring:
+  mvc:
+    view:
+      prefix: /
+      suffix: .html
+
+log:
+  url: jdbc:mysql://10.211.67.169:3306/DPFBLOG
+  title: bigdata
+  cipher: QtrmuqDw^bJu$
+  type: com.mysql.cj.jdbc.Driver
+
+mydb:
+  url: ${MYDB_URL:jdbc:mysql://10.211.67.169:3306/DISACenter_2}
+  title: ${MYDB_USER:bigdata}
+  cipher: ${MYDB_PASSWD:QtrmuqDw^bJu$}
+  type: ${MYDB_DRIVER:com.mysql.cj.jdbc.Driver}
+  serviceURL: ${MY_SERVICE_URL:10.211.67.163:18501}
+
+isEs:
+  open: false
+  scheme: http
+  hostname: 10.211.67.175
+  port: 9201
+  auth: true
+  username: "elastic"
+  password: "TUihk5^a0J+GkI4!4"
+  successIndexName: "success_log_center"
+  errorIndexName: "error_log_center"
+
+
+service:
+  pool:
+    maxPoolSize: 512
+    corePoolSize: 6
+    queueCapacity: 1024
+    keepAliveSeconds: 60
+    threadNamePrefix: logthreadpool
+    waitForTasksToCompleteOnShutdown: true

+ 6 - 3
mainFactory/src/test/java/org/bfkj/protocol/MyRabbitMQTest.java

@@ -1,7 +1,10 @@
 package org.bfkj.protocol;
 
+import org.junit.Test;
+
 public class MyRabbitMQTest {
 
+	@Test
     public void testSendMethod() {
         MyRabbitMQ myRabbitMQ = new MyRabbitMQ();
         for (int i = 0; i < 100; i++) {
@@ -9,9 +12,9 @@ public class MyRabbitMQTest {
             myRabbitMQ.sendMethod("Q_LUG_TO_CDP", ""
                     , "hello:" + i, """
                     {
-                      "host":"localhost",
-                      "username": "andy",
-                      "password": "123456",
+                      "host":"120.26.64.82",
+                      "username": "admin",
+                      "password": "admin",
                       "port": "5672"
                     }""", "amp.topic.lug");
         }

+ 29 - 0
mainFactory/src/test/java/org/bfkj/protocol/WebAPITest.java

@@ -0,0 +1,29 @@
+package org.bfkj.protocol;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class WebAPITest {
+
+	@Test
+	public void testExecWebApi() {
+		WebAPI webAPI = new WebAPI();
+		Map<String, Object> stringObjectMap = webAPI.execWebApi(
+				new HashMap<>(
+						Map.of("X-HW-ID", "airport_cargo_oneid", "X-HW-APPKEY", "wE8yZ30Z2L/9D5#24w2l@g==", "Content-Type", "application/json")),
+				"post",
+				new HashMap<>(
+						Map.of("appid", "wx95c5de88bftsegs", "appSecret", "af1f4b97b4e13f27a")), """
+						{
+						  "url":"http://10.81.50.24/release/szx_response/v1/szx_response_api/getToken",
+						  "method":"post",
+						  "headers":{
+						    "X-HW-ID":"airport_cargo_oneid","X-HW-APPKEY":"wE8yZ30Z2L/9D5#24w2l@g==",
+						    "Content-Type":"application/json"
+						  },
+						  "body":{"appid": "wx95c5de88bftsegs","appSecret": "af1f4b97b4e13f27a"}}""");
+		System.out.println(stringObjectMap);
+	}
+}

File diff suppressed because it is too large
+ 10 - 26
mainFactory/src/test/java/org/bfkj/utils/ScriptEngineProTest.java


Some files were not shown because too many files changed in this diff