|
@@ -2,75 +2,71 @@ package org.bfkj.application;
|
|
|
|
|
|
|
|
|
import org.bfkj.config.AppConfig;
|
|
|
-import org.bfkj.config.ObjectMap;
|
|
|
import org.bfkj.utils.LogUtils;
|
|
|
import org.bfkj.utils.MapTools;
|
|
|
import org.bfkj.utils.MyDbHelper;
|
|
|
import org.bfkj.utils.ScriptEnginePro;
|
|
|
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
|
|
|
import java.util.*;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
|
|
|
-
|
|
|
-/**
|
|
|
- * z
|
|
|
- * 数据收集
|
|
|
- */
|
|
|
-public class DataProcess {
|
|
|
-
|
|
|
-
|
|
|
+public class DataProcess {//数据处理对象
|
|
|
private String serviceId;//作为实例化服务对象的Key
|
|
|
private long lastActive; //服务最后活跃时间
|
|
|
private String errorMessage = null; //当前数据输入处理对象不可用信息
|
|
|
- private List<Map<String, Object>> calculationLibraryList;//输出计算库
|
|
|
- private final Map<String, Integer> serviceErrorCount = new HashMap<>(); //服务错误次数:针对脚本异常
|
|
|
+ private MyDbHelper baseDbHelper = null;//数据底座的数据库服务对象:捆绑到服务实例上,避免并发导致线程不安全
|
|
|
|
|
|
- private final Map<String, ScriptEnginePro> ScriptEngineProMaps = new HashMap<>();//用于缓存当前服务的算法引擎对象,数据库的无需缓存
|
|
|
+ private Map<String, Integer> serviceErrorCount = new HashMap<>(); //服务错误次数:针对脚本异常
|
|
|
|
|
|
- private final List<String> baseInfo = new ArrayList<>() {{
|
|
|
- add("serviceId");
|
|
|
- add("event");
|
|
|
- add("page");
|
|
|
- add("pageSize");
|
|
|
- add("dataObjectId");
|
|
|
- }};
|
|
|
+ private List<Map<String, Object>> calcList;//缓存计算库配置信息
|
|
|
+ private Map<String, ScriptEnginePro> ScriptEngineProMaps = new HashMap<>();//用于缓存当前服务的算法引擎对象
|
|
|
+ private Map<String, MyDbHelper> calcDbHelperMaps = new HashMap<>();//算法的数据库服务对象:捆绑到服务实例上,避免并发导致线程不安全:多个数据处理服务对象操作同一个对象
|
|
|
|
|
|
+ private List<String> serviceAuthMap = new ArrayList<>();//缓存当前服务的安全等级,对应各应用
|
|
|
|
|
|
public DataProcess(String service_Id) {//初始化构造,实例化一个服务对象
|
|
|
lastActive = System.currentTimeMillis();//默认最后活跃时间
|
|
|
serviceId = service_Id;
|
|
|
- MyDbHelper myDbHelper = ObjectMap.getordropMyDbHelper(AppConfig.getSystemParams(AppConfig.REMOTE_DB_CONNECT));
|
|
|
- if (Objects.nonNull(myDbHelper.getErrorMessage())) {
|
|
|
- errorMessage = "获取myDbHelper对象异常: " + myDbHelper.getErrorMessage();
|
|
|
- return;
|
|
|
- }
|
|
|
- Map<String, Object> serviceInfoResult = myDbHelper.queryByParamsReturnList("select * from serviceinfo where serviceID = ?", service_Id);//直接数据库查询
|
|
|
- List<Map<String, Object>> mapList = MapTools.getMapList(serviceInfoResult);
|
|
|
- if (!serviceInfoResult.get("code").equals("0") || Objects.isNull(mapList) || mapList.isEmpty()) {//查询数据库失败
|
|
|
- errorMessage = !serviceInfoResult.get("code").equals("0") ? ("查询" + serviceId + "服务配置失败:" + serviceInfoResult.get("message")) : ("未配置服务:" + serviceId);
|
|
|
- return;
|
|
|
- }
|
|
|
- //算法信息
|
|
|
- String calculationListSql = "SELECT CL.*,DI.* FROM calculation_library CL left JOIN datasourceinfo DI ON DI.dataSourceID = CL.datasource_id WHERE CL.service_id =? order by library_sort,library_id";
|
|
|
- Map<String, Object> calculationResult = myDbHelper.queryByParamsReturnList(calculationListSql, service_Id);//直接数据库查询
|
|
|
- calculationLibraryList = MapTools.getMapList(calculationResult);
|
|
|
- if (!calculationResult.get("code").equals("0")) {//查询数据库失败
|
|
|
- errorMessage = "查询" + service_Id + "的算法失败:" + calculationResult.get("message");
|
|
|
- }
|
|
|
- if (calculationLibraryList == null || calculationLibraryList.isEmpty()) {//查询数据库失败
|
|
|
- errorMessage = "服务ID " + service_Id + "没有找到对应的算法";
|
|
|
+ try{
|
|
|
+ baseDbHelper = new MyDbHelper(AppConfig.getSystemParams(AppConfig.REMOTE_DB_CONNECT));//获取底座数据库对象
|
|
|
+ if (Objects.nonNull(baseDbHelper.getErrorMessage())) {
|
|
|
+ errorMessage = "获取底座myDbHelper对象异常: ".concat(baseDbHelper.getErrorMessage());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //算法信息
|
|
|
+ Map<String, Object> calculationMap = baseDbHelper.queryByParamsReturnList("SELECT CL.*,DI.* FROM calculation_library CL left JOIN datasourceinfo DI ON DI.dataSourceID = CL.datasource_id WHERE CL.service_id =? and CL.library_type is not null order by library_sort,library_id", service_Id);//直接数据库查询
|
|
|
+ calcList = Objects.isNull(calculationMap.get("returnData"))?null:(List<Map<String, Object>>) calculationMap.get("returnData");
|
|
|
+ if (!calculationMap.get("code").equals("0") || calcList == null || calcList.isEmpty()) {//查询数据库失败
|
|
|
+ errorMessage = "查询".concat(serviceId).concat("的算法失败:").concat(calculationMap.get("message").toString());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ //获取当前服务的安全等级
|
|
|
+ Map<String, Object> serviceAuth = baseDbHelper.queryByParamsReturnList("SELECT app_id FROM appService WHERE serviceID = ?", service_Id);//直接数据库查询
|
|
|
+ if (!serviceAuth.get("code").equals("0")) {//查询数据库失败
|
|
|
+ errorMessage = "获取".concat(service_Id).concat("的应用安全等级出错:").concat(serviceAuth.get("message").toString());
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ Object serviceAuthList = serviceAuth.get("returnData");//获取服务应用等级 List<Map> key:字段名,value
|
|
|
+ if (!Objects.isNull(serviceAuthList)) {//将ListMap合并为一个Map,方便使用 // map:{1,appid},{2,appidd}
|
|
|
+ serviceAuthMap = ((List<Map<String, Object>>) serviceAuthList).stream().map(map -> map.get("app_id").toString()).toList();
|
|
|
+ }
|
|
|
+ }catch (Exception e) {
|
|
|
+ errorMessage = "数据处理对象初始化异常: ".concat(serviceId).concat(";异常信息:").concat(LogUtils.getException(e));
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 关闭的连接信息
|
|
|
- */
|
|
|
+ //数据处理服务对象关闭时,关闭缓存的数据库对象以及脚本引擎对象
|
|
|
public void close() {//销毁当前服务对象时,需要将缓存的算法引擎实例同步销毁
|
|
|
- for (ScriptEnginePro scriptEnginePro : ScriptEngineProMaps.values()) {
|
|
|
- scriptEnginePro.close();
|
|
|
+ for (ScriptEnginePro scriptEnginePro : ScriptEngineProMaps.values()) {//脚本引擎
|
|
|
+ scriptEnginePro.close();//关闭动态方法实例、缓存等
|
|
|
+ }
|
|
|
+ ScriptEngineProMaps.clear();//清除缓存
|
|
|
+ for (MyDbHelper calcMyDbHelper : calcDbHelperMaps.values()) {//数据库对象
|
|
|
+ calcMyDbHelper.close();//关闭数据库连接池
|
|
|
}
|
|
|
- ScriptEngineProMaps.clear();
|
|
|
+ calcDbHelperMaps.clear();//清除缓存
|
|
|
+ baseDbHelper.close();//底座数据库对象
|
|
|
}
|
|
|
-
|
|
|
//统一成功信息处理
|
|
|
public Map<String, Object> processSuccess(Object returnData) {
|
|
|
if (returnData instanceof Map) {
|
|
@@ -83,7 +79,6 @@ public class DataProcess {
|
|
|
returnMap.put("returnData", returnData);
|
|
|
return returnMap;
|
|
|
}
|
|
|
-
|
|
|
//统一错误信息处理
|
|
|
public Map<String, Object> processFail(String errorMessage, String libraryId) {
|
|
|
Map<String, Object> returnMap = new HashMap<>();//用于当前方法统一返回参数,不存在深拷贝问题
|
|
@@ -94,383 +89,290 @@ public class DataProcess {
|
|
|
}
|
|
|
return returnMap;
|
|
|
}
|
|
|
-
|
|
|
- //数据收集----目前应该是数据处理
|
|
|
+ //数据处理对外接口
|
|
|
public Map<String, Object> processData(Map<String, Object> inputData, String... user_id) { //{serviceid:1,datacontent:[{key:value},{key:value}],event:0,page:1,pagesize:10}
|
|
|
- //事件判断由算法进行,不再作为入口必须条件 ?
|
|
|
if (!MapTools.isBlank(errorMessage)) {//如果当前服务存在问题,代表数据库对象不可用,此时应该重构当前对象,调用方控制
|
|
|
- LogUtils.log("processData:1", "-1", null, "服务不可用" + errorMessage, serviceId, AppConfig.WORK_ID, MapTools.jacksonObjToStr(inputData), null, null, null);
|
|
|
- return processFail("服务不可用" + errorMessage, null);
|
|
|
+ LogUtils.log("processData:1", "-1", null, "服务不可用".concat(errorMessage), serviceId, inputData, null, null, null);
|
|
|
+ return processFail("服务不可用".concat(errorMessage), null);
|
|
|
}
|
|
|
if (System.currentTimeMillis() - lastActive > 2000) { // 更新数据库中的服务最新活跃时间
|
|
|
lastActive = System.currentTimeMillis();//更新最后活跃时间 //------更新当前服务的最后活跃时间,用于服务监控
|
|
|
- Runtime.getRuntime().gc();
|
|
|
- //更新服务的活跃时间
|
|
|
- MyDbHelper myDbHelper = ObjectMap.getordropMyDbHelper(AppConfig.getSystemParams(AppConfig.REMOTE_DB_CONNECT));
|
|
|
- if (Objects.nonNull(myDbHelper.getErrorMessage())) {
|
|
|
- LogUtils.log("processData:2", "-1", null, "数据接收错误,获取远程数据库对象: " + myDbHelper.getErrorMessage(), serviceId, AppConfig.WORK_ID, MapTools.jacksonObjToStr(inputData), null, null, null);
|
|
|
- return processFail("数据接收错误,获取远程数据库对象: " + myDbHelper.getErrorMessage(), null);
|
|
|
- }
|
|
|
- myDbHelper.updateByCondition("update serviceinfo set runState = 1 ,lastactive = now() where serviceID =?", null, serviceId); // 服务表增加最后活跃时间
|
|
|
+ baseDbHelper.updateByCondition("update serviceinfo set runState = ? ,lastactive = ? where serviceID =?", null, "1", lastActive, serviceId); // 服务表增加最后活跃时间
|
|
|
}
|
|
|
//创建生命周期ID;//默认继承
|
|
|
if (!inputData.containsKey("dataObjectId")) {
|
|
|
-// 生成新的生命周期ID
|
|
|
- inputData.put("dataObjectId", createLifeCycleCol(Long.valueOf(AppConfig.WORK_ID), Integer.parseInt(serviceId)));
|
|
|
+ inputData.put("dataObjectId", createLifeCycleCol(Long.valueOf(AppConfig.WORK_ID), Integer.parseInt(serviceId))); // 生成新的生命周期ID
|
|
|
}
|
|
|
- String dataObjectId = (String) inputData.get("dataObjectId");
|
|
|
- Object event = inputData.get("event"); // 服务表新增一个字段:用于标识当前服务支持的事件,用于限制入参的事件
|
|
|
-
|
|
|
- //初始化 列权限为空
|
|
|
- // 如果存在用户编号 且事件为查询
|
|
|
- //通过用户编号,服务编号,从用户权限表获取行列权限
|
|
|
- // 如果列权限为空则 直接返回空
|
|
|
- // 重新组建入口参数:添加filter(行权限)
|
|
|
- if (!event.equals("1") && Objects.nonNull(user_id) && inputData.containsKey("auth_id")) {
|
|
|
- MyDbHelper myDbHelper = ObjectMap.getordropMyDbHelper(AppConfig.getSystemParams(AppConfig.REMOTE_DB_CONNECT));
|
|
|
- if (Objects.nonNull(myDbHelper.getErrorMessage())) {
|
|
|
- LogUtils.log("processData:2", "-1", null, "数据接收错误,获取远程数据库对象: " + myDbHelper.getErrorMessage(), serviceId, AppConfig.WORK_ID, MapTools.jacksonObjToStr(inputData), null, null, null);
|
|
|
- return processFail("数据接收错误,获取远程数据库对象: " + myDbHelper.getErrorMessage(), null);
|
|
|
- }
|
|
|
- Map<String, Object> authServiceMap = myDbHelper.queryByParamsReturnList("SELECT auth_id FROM t_auth WHERE serviceID = ? and auth_id = ? ", serviceId, inputData.get("auth_id"));
|
|
|
- if (authServiceMap.get("code").equals("-1") || (authServiceMap.get("returnData") instanceof List<?> returnDataList && returnDataList.isEmpty())) return authServiceMap;
|
|
|
- //判断filter不为空 且 filter不包含left---{"filter":[{"user_id":""}]} ---- >[{filter:[{left:,column:},{left:,column:}
|
|
|
- /*获取权限filter*/
|
|
|
- Map<String, Object> userDataAuthMap = myDbHelper.queryByParamsReturnList("SELECT QCS.columnName,TUA.row_auth FROM t_user_group_auth TUA LEFT JOIN querytemplatecolumnset QCS on TUA.queryTemplateColumnSetID = QCS.queryTemplateColumnSetID WHERE TUA.user_id = ? AND TUA.auth_id = ? AND TUA.queryTemplateColumnSetID IS NOT NULL", user_id[0], inputData.get("auth_id"));
|
|
|
- if (userDataAuthMap.get("code").equals("-1")) return userDataAuthMap;
|
|
|
- List<Object> userList = new ArrayList<>();
|
|
|
- ((List<Map<String, Object>>) userDataAuthMap.get("returnData")).stream().filter(map -> Objects.nonNull(map.get("row_auth"))).forEach(tempMap -> {
|
|
|
- String[] row_auths = tempMap.get("row_auth").toString().split(",");
|
|
|
- for (String row_auth : row_auths) {
|
|
|
- if (Objects.isNull(row_auth)) continue;
|
|
|
- HashMap<String, Object> hashMap = new HashMap<>();
|
|
|
- hashMap.put(tempMap.get("columnName").toString(), row_auth.startsWith("!") ? row_auth.substring(1) : row_auth);
|
|
|
- if (row_auth.startsWith("!")) {
|
|
|
- userList.addAll(changeSignFilter(hashMap, "!", "or"));
|
|
|
- } else {
|
|
|
- userList.addAll(changeSignFilter(hashMap, null, "or"));
|
|
|
- }
|
|
|
- }
|
|
|
- });
|
|
|
- /*userList第一个以及最后一个添加()*/
|
|
|
- ((Map<String, Object>) userList.get(0)).put("left", "(");
|
|
|
- ((Map<String, Object>) userList.get(userList.size() - 1)).put("connector", " and ");
|
|
|
- ((Map<String, Object>) userList.get(userList.size() - 1)).put("right", " ) ");
|
|
|
- inputData = initParam(inputData, "0", userList);
|
|
|
- }
|
|
|
-
|
|
|
- List<Map<String, Object>> calculationResult = new ArrayList<>();//定义算法全量结果列表
|
|
|
- calculationResult.add(inputData);//默认接收数据为第0个算法的结果
|
|
|
- Map<String, List<Map<String, Object>>> execResult = execCalultion(calculationResult, null, event, dataObjectId, inputData.get("page"), inputData.get("pageSize"));
|
|
|
- calculationResult = execResult.get("calcData");//执行当前服务对应的算法库
|
|
|
- Map<String, Object> lastResult = calculationResult.get(calculationResult.size() - 1);//获取最后一个计算结果
|
|
|
- String libraryId = lastResult.containsKey("library_id") ? lastResult.get("library_id").toString() : null;
|
|
|
+ String dataObjectId = inputData.get("dataObjectId").toString();
|
|
|
+ inputData = authCheck(inputData, user_id); /*权限检查,添加列权限以及行权限到inputData中*/
|
|
|
+ List<Map<String, Object>> calcData = new ArrayList<>();//定义算法全量结果列表
|
|
|
+ calcData.add(inputData);//默认入口参数为第0个算法的结果
|
|
|
+ Map<String, Object> execResult = execCalultion(calcData, null, dataObjectId);//执行所有的算法库
|
|
|
+ String library_id = execResult.get("library_id").toString();//获取最后一个算法的编号
|
|
|
+ List<Map<String, Object>> preListData = (List<Map<String, Object>>) execResult.get("preData");//获取所有的前置检测结果
|
|
|
+ calcData = (List<Map<String, Object>>) execResult.get("calcData");//所有的算法结果
|
|
|
+ //依据算法编号规则确定最后一个算法结果,是前置还是算法
|
|
|
+ Map<String, Object> lastResult = library_id.endsWith("N")?preListData.get(preListData.size() - 1):calcData.get(calcData.size() - 1);//获取最后一个计算结果
|
|
|
if (Objects.equals(lastResult.get("code"), "-1")) {//最后一个计算结果为-1则记录异常,否则全部记录正常
|
|
|
Object message = lastResult.get("message");
|
|
|
- LogUtils.log("processData:3", "-1", libraryId, Objects.nonNull(message) ? message.toString() : null, serviceId, AppConfig.WORK_ID, MapTools.jacksonObjToStr(calculationResult), dataObjectId, MapTools.jacksonObjToStr(execResult.get("calcData")), event);
|
|
|
- setServiceErrorCount(libraryId, serviceId, "-1");//标记错误的算法:连续错误累加
|
|
|
- return processFail(libraryId + "数据计算错误:" + lastResult.get("message"), libraryId);
|
|
|
+ LogUtils.log("processData:3", "-1", library_id, Objects.nonNull(message)?message.toString():null, serviceId, calcData, dataObjectId, preListData, inputData.get("event"));
|
|
|
+ return processFail(library_id.concat("数据计算错误:").concat(lastResult.get("message").toString()), library_id);
|
|
|
}
|
|
|
//写入 成功日志
|
|
|
- LogUtils.log("DataProcess:9999", "0", libraryId, "数据接收后处理成功", serviceId, AppConfig.WORK_ID, MapTools.jacksonObjToStr(calculationResult), dataObjectId, MapTools.jacksonObjToStr(execResult.get("calcData")), event); //此处无法把所有的记过集返回过去
|
|
|
- // 如果列权限不为空
|
|
|
- // 循环lastResult
|
|
|
- // 遍历map
|
|
|
- // 如果key不在列权限则删除
|
|
|
- List<Map<String, Object>> collect = calculationResult.stream().filter(map -> Objects.isNull(map.get("library_id")) || !map.get("library_id").toString().contains("_N")).toList();
|
|
|
- return processSuccess(collect.get(collect.size() - 1));
|
|
|
+ LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, calcData, dataObjectId, preListData, inputData.get("event")); //此处无法把所有的记过集返回过去
|
|
|
+ return processSuccess(lastResult); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
- private Map<String, Object> initParam(Map<String, Object> inputData, String event, List<Object> authList) { // authList 转换:left过后的
|
|
|
- Object dataContent = inputData.get("dataContent");//这个就是最终的dataContent,所有的变化都是针对它进行
|
|
|
- if (Objects.isNull(dataContent)) {//如果为空则默认为inputData中除去常规变量的其它变量,最终可以是一个空Map
|
|
|
- Map<String, Object> tempDataContent = new HashMap<>(inputData);//因为inputData并不会发生变化,所以使用浅拷贝即可:
|
|
|
- baseInfo.forEach(tempDataContent.keySet()::remove);//移除常规不参与计算的键值
|
|
|
- dataContent = tempDataContent;//处理为Map,后续继续处理
|
|
|
- }
|
|
|
- if (dataContent instanceof Map<?, ?> dataContentMap) {//如果是一个Map则转数组
|
|
|
- List<Map<?, ?>> tempDataContent = new ArrayList<>();//初始化一个数组
|
|
|
- tempDataContent.add(dataContentMap);
|
|
|
- dataContent = tempDataContent;
|
|
|
+ /*权限检查*/
|
|
|
+ public Map<String, Object> authCheck(Map<String, Object> inputData, String... user_id) {
|
|
|
+ Object appid = inputData.get("appid");//获取应用编号
|
|
|
+ if (Objects.nonNull(appid) && !serviceAuthMap.contains(appid.toString())) {//上传了应用编号但是不在当前服务授权的应用列表中则返回错误
|
|
|
+ return processFail("服务未授权", null);
|
|
|
}
|
|
|
- if (dataContent instanceof List dataContentList) {//如果是一个数组则循环转标准格式
|
|
|
- List<Map<String, Object>> returnDataContent = new ArrayList<>();
|
|
|
- for (Object signDataContent : dataContentList) {//循环List
|
|
|
- if (!(signDataContent instanceof Map)) return inputData; //如果不是一个Map则不进行处理
|
|
|
- returnDataContent.add(createStandardParameters((Map<String, Object>) signDataContent, event, authList)); //得到Filter里面的内容
|
|
|
- }
|
|
|
- dataContent = returnDataContent;
|
|
|
- }
|
|
|
- inputData.put("dataContent", dataContent);
|
|
|
- return inputData;//
|
|
|
- }
|
|
|
-
|
|
|
- @SuppressWarnings("unchecked")
|
|
|
- private Map<String, Object> createStandardParameters(Map<String, Object> inDataContent, String event, List<Object> authList) {
|
|
|
- Map<String, Object> returnData = new HashMap<>();//创建返回值returnData{}
|
|
|
- if ("1,2".contains(event)) {//如果事件是新增或更新则(查询和删除无需处理value)
|
|
|
- Object tempValueObject = inDataContent.get("Value");//获取Value----{c1:"123",c2:}
|
|
|
- if (Objects.isNull(tempValueObject)) {//如果入参数inDataContent不包含Value则等于整个inDataContent -- //处理value
|
|
|
- HashMap<String, Object> tempValue = new HashMap<>(inDataContent);//深拷贝
|
|
|
- tempValue.remove("filter");//先移除filter,理论上不存在
|
|
|
- tempValueObject = tempValue;
|
|
|
- }
|
|
|
- returnData.put((tempValueObject instanceof Map) ? "Value" : "newValue", tempValueObject);//重命名避免后期处理错误
|
|
|
- }
|
|
|
- if (!("1".equals(event))) {//如果事件不是新增(新增无需处理filter)
|
|
|
- Object tempFilterObject = inDataContent.get("filter");//获取filter
|
|
|
- tempFilterObject = Objects.isNull(tempFilterObject) ? inDataContent : tempFilterObject;//如果不存在filter则等于原始MAP
|
|
|
- if (tempFilterObject instanceof Map) {////如果filter是单个Map则
|
|
|
- authList.addAll(changeSignFilter((Map<String, Object>) tempFilterObject, null, null));
|
|
|
- returnData.put("filter", authList);//转换为标准条件集
|
|
|
- } else {
|
|
|
- if (tempFilterObject instanceof List tempFilterObjectList && tempFilterObjectList.get(0) instanceof Map) {//如果filter是List则"123"
|
|
|
- List<Object> tempFilterList = new ArrayList<>(authList);
|
|
|
- for (Object o : tempFilterObjectList) {//循环List -{c1:c2,c3:c4}
|
|
|
- tempFilterList.addAll(changeSignFilter((Map<String, Object>) o, null, null)); //循环添加
|
|
|
- }
|
|
|
- returnData.put("filter", tempFilterList);//转换为标准条件集
|
|
|
- } else {//此时tempFilterObject既不是List也不是Map:1、dataContent包含filter且filter是一个字符串
|
|
|
- returnData.put("newFilter", tempFilterObject);
|
|
|
+ /*在构造函数中获取超级用户名单,方便此处的判定*/
|
|
|
+ if (Objects.nonNull(user_id) && inputData.containsKey("authId") && !"1,9999".contains(user_id[0])) {//用户编号不为空,权限编号不为空,不是超级用户
|
|
|
+ Map<String, Object> userDataAuthMap = baseDbHelper.queryByParamsReturnList("SELECT QCS.columnName,TUA.row_auth FROM t_user_group_auth TUA LEFT JOIN querytemplatecolumnset QCS on TUA.queryTemplateColumnSetID = QCS.queryTemplateColumnSetID WHERE TUA.user_id = ? AND TUA.auth_id = ? AND TUA.queryTemplateColumnSetID IS NOT NULL", user_id[0], inputData.get("authId"));
|
|
|
+ if (userDataAuthMap.get("code").equals("-1") || Objects.isNull(userDataAuthMap.get("returnData")) || ((List<Map<String, Object>>) userDataAuthMap.get("returnData")).isEmpty()) {
|
|
|
+ if (userDataAuthMap.get("code").equals("0")) {
|
|
|
+ userDataAuthMap.put("message", "服务未授任何数据权限");
|
|
|
}
|
|
|
+ return userDataAuthMap;
|
|
|
}
|
|
|
- }
|
|
|
- return returnData;//返回组建好的参数
|
|
|
- }
|
|
|
-
|
|
|
- private List<Map<String, Object>> changeSignFilter(Map<String, Object> filterMap, String compare, String connect) {
|
|
|
- List<Map<String, Object>> objects = new ArrayList<>();
|
|
|
- if (filterMap.containsKey("left")) {
|
|
|
- objects.add(filterMap);
|
|
|
- return objects;
|
|
|
- }
|
|
|
- for (String filterKey : filterMap.keySet()) {
|
|
|
- Map<String, Object> returnMap = new HashMap<>();
|
|
|
- Object keyValues = filterMap.get(filterKey);
|
|
|
- returnMap.put("left", "");
|
|
|
- returnMap.put("column", filterKey);
|
|
|
- returnMap.put("comparator", (Objects.nonNull(compare) && compare.startsWith("!")) ? "!=" : " = ");
|
|
|
- if (Objects.isNull(keyValues)) {
|
|
|
- returnMap.put("comparator", " is null ");
|
|
|
+ List<Map<String, Object>> rowAuth = new ArrayList<>();//初始化行权限
|
|
|
+ List<String> authColumn = new ArrayList<>();//初始化列权限
|
|
|
+ List<Map<String, Object>> dataAuth = (List<Map<String, Object>>) userDataAuthMap.get("returnData");
|
|
|
+ dataAuth.forEach(dataAuthMap -> {//循环获取到的权限
|
|
|
+ String columnName = dataAuthMap.get("columnName").toString();//字段名
|
|
|
+ authColumn.add(columnName);//添加到列权限
|
|
|
+ Object rowAuthObj = dataAuthMap.get("row_auth");//获取对应列的行权限
|
|
|
+ if (Objects.nonNull(rowAuthObj)) {//如果存在则组建标准的限制条件
|
|
|
+ String tempRowAuth = rowAuthObj.toString();
|
|
|
+ String connect = tempRowAuth.startsWith("!") ? "!=" : "=";//如果首位是!代表不等于,否则代表等于
|
|
|
+ tempRowAuth = tempRowAuth.startsWith("!") ? tempRowAuth.substring(1) : tempRowAuth;//修订行权限表达式
|
|
|
+ String[] row_auths = tempRowAuth.split(","); //按,号进行分组!PEK,
|
|
|
+ for (String row_auth : row_auths) {//循环进行标准化
|
|
|
+ if (MapTools.isBlank(row_auth)) continue;//为空不设置--误操作填写了空格或者删除未设置为NULL
|
|
|
+ Map<String, Object> signRowAuth = new HashMap<>();
|
|
|
+ signRowAuth.put(columnName, row_auth);
|
|
|
+ rowAuth.addAll(baseDbHelper.changeSignFilter(signRowAuth, connect, connect.equals("!=") ? "and" : "or"));
|
|
|
+ }
|
|
|
+ if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
|
|
|
+ rowAuth.get(0).put("left", "(");
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("connector", " and ");
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("right", " ) ");
|
|
|
+ } //(c1 = pek )
|
|
|
+ }
|
|
|
+ });
|
|
|
+ if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
|
|
|
+ rowAuth.get(0).put("left", "(".concat(rowAuth.get(0).get("left").toString()));
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("connector", " and ");
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("right", " ) ".concat(rowAuth.get(rowAuth.size() - 1).get("right").toString()));
|
|
|
+ inputData.put("rowAuth", rowAuth);//添加到入参,方便后面调用
|
|
|
}
|
|
|
- returnMap.put("value", keyValues);
|
|
|
- returnMap.put("right", "");
|
|
|
- returnMap.put("connector", Objects.nonNull(connect) ? (" " + connect) : " and ");
|
|
|
- objects.add(returnMap);
|
|
|
+ inputData.put("authColumn", authColumn);//添加到入参,方便后面调用
|
|
|
}
|
|
|
- return objects;
|
|
|
+ return inputData;
|
|
|
}
|
|
|
-
|
|
|
- public Map<String, List<Map<String, Object>>> execCalultion(List<Map<String, Object>> inData, String beginLibraryId, Object event, String dataObjectId, Object page, Object pageSize) {
|
|
|
- Map<String, List<Map<String, Object>>> returnData = new HashMap<>();
|
|
|
- List<Map<String, Object>> preData = new ArrayList<>();
|
|
|
- List<Map<String, Object>> calcData = Objects.isNull(inData) || inData.isEmpty() ? new ArrayList<>() : inData;
|
|
|
-// long l2 = System.currentTimeMillis();
|
|
|
- for (Map<String, Object> calculationLibrary : calculationLibraryList) {//循环算法库
|
|
|
-// long l = System.currentTimeMillis();
|
|
|
- String library_id = calculationLibrary.get("library_id").toString();//算法编号
|
|
|
- if (MapTools.isNotBlank(beginLibraryId) && !beginLibraryId.equals(library_id)) {
|
|
|
+ /*执行算法*/
|
|
|
+ public Map<String, Object> execCalultion(List<Map<String, Object>> inData, String beginLibraryId, String dataObjectId) {
|
|
|
+ Map<String, Object> returnData = new HashMap<>();//初始化最终返回结果
|
|
|
+ List<Map<String, Object>> preData = new ArrayList<>();//初始化前置检测结果列表
|
|
|
+ String lastLibraryId = "";//初始化最后一个算法的编号
|
|
|
+ List<Map<String, Object>> calcData = Objects.isNull(inData) || inData.isEmpty() ? new ArrayList<>() : inData;//默认算法全量计算结果等于入口参数
|
|
|
+ Map<String, Object> preCalMap = new HashMap<>();//用于组建前置检测算法配置
|
|
|
+ for (Map<String, Object> calculationLibrary : calcList) {//循环算法库
|
|
|
+ lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
|
|
|
+ if (MapTools.isNotBlank(beginLibraryId) && !beginLibraryId.equals(lastLibraryId)) {
|
|
|
continue;//如果异常恢复开始算法编号不为空且不等于当前循环的算法编号则跳过
|
|
|
}
|
|
|
beginLibraryId = null;//为避免异常恢复开始后的后续算法编号不等于当前循环的算法编号的跳过
|
|
|
- String library_type = calculationLibrary.get("library_type").toString();
|
|
|
- String service_id = calculationLibrary.get("service_id").toString();
|
|
|
- String computing_expression = MapTools.objectToString(calculationLibrary.get("computing_expression"));
|
|
|
- Object is_exec = calculationLibrary.get("is_exec");
|
|
|
- Map<String, Object> currentResult;
|
|
|
- if (Objects.nonNull(is_exec)) {
|
|
|
- Map<String, Object> preCalMap = new HashMap<>();
|
|
|
- preCalMap.put("library_id", library_id + "_N");
|
|
|
- preCalMap.put("service_id", service_id);
|
|
|
- preCalMap.put("library_type", 2);
|
|
|
- preCalMap.put("computing_expression", is_exec); //
|
|
|
- Map<String, Object> preEnginResult = execEngine(preCalMap, calcData, dataObjectId); //todo 前置算法的结果添加到里面 ,在数据恢复时,是否会恢复成功呢
|
|
|
- preEnginResult.put("library_id", library_id + "_N");// 补充算法编号,方便上层处理异常
|
|
|
-// inData.add(preEnginResult);
|
|
|
- preData.add(preEnginResult);
|
|
|
- returnData.put("preData", preData);
|
|
|
- //如果code为-1 则表示脚本计算错误 退出计算;如果code为2 则后续算法都不执行
|
|
|
- if ("-1".equals(preEnginResult.get("code")) || Objects.equals("2", preEnginResult.get("returnData"))) { //执行错误
|
|
|
- return returnData;
|
|
|
+ Object is_exec = calculationLibrary.get("is_exec");//获取前置检测表达式
|
|
|
+ if (Objects.nonNull(is_exec) && !MapTools.isBlank(is_exec.toString())) {//存在则进行前置检测
|
|
|
+ lastLibraryId = lastLibraryId.concat("_N");//修订算法编号,避免与正式算法冲突
|
|
|
+ preCalMap.put("library_id", lastLibraryId);//组建前置算法配置信息
|
|
|
+ preCalMap.put("library_type", 2);/*如果脚本包含function则js否则java*/
|
|
|
+ preCalMap.put("computing_expression", is_exec); //组建前置算法配置信息
|
|
|
+ Map<String, Object> preEnginResult = execEngine(lastLibraryId, preCalMap, calcData);//调用算法引擎进行执行
|
|
|
+ preData.add(preEnginResult);//添加执行结果到前置检测结果列表
|
|
|
+ setServiceErrorCount(lastLibraryId, preEnginResult.get("code").toString());//依据返回的code进行连续错误计数和重置
|
|
|
+ if ("-1".equals(preEnginResult.get("code")) || Objects.equals("2", preEnginResult.get("returnData"))) {
|
|
|
+ break; //如果code为-1 则表示脚本计算错误 退出计算;如果code为2 则后续算法都不执行
|
|
|
}
|
|
|
- setServiceErrorCount(library_id + "_N", serviceId, "0");
|
|
|
if (Objects.equals("1", preEnginResult.get("returnData"))) { //前置检查不通过,当前算法不执行,后续 还需执行
|
|
|
+ calcData.add(null);//自动添加算法空结果,避免数据订阅时的序号出现错误
|
|
|
continue;
|
|
|
}
|
|
|
+ lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
|
|
|
}
|
|
|
- if (library_type.equals("3")) {//数据库算法
|
|
|
- String connectConfig = calculationLibrary.get("connectConfig").toString();//数据库链接字符串
|
|
|
- MyDbHelper myDbHelper = ObjectMap.getordropMyDbHelper(connectConfig);//获取对应连接的数据库处理对象
|
|
|
- if (Objects.nonNull(myDbHelper.getErrorMessage())) {//获取出错
|
|
|
- calcData.add(processFail("算法获取数据库连接异常:" + myDbHelper.getErrorMessage(), library_id));
|
|
|
- returnData.put("calcData", calcData);
|
|
|
- return returnData;
|
|
|
- }
|
|
|
- //目标对象名(表名)
|
|
|
- String tableName = Objects.isNull(calculationLibrary.get("sourceObjectName")) ? null : calculationLibrary.get("sourceObjectName").toString();
|
|
|
- Object isActiveTable = calculationLibrary.get("isActiveTable");//是否动态表
|
|
|
- //指定当前算法事件
|
|
|
- String eventFlag = Objects.isNull(calculationLibrary.get("event")) ? (MapTools.isNotBlank(tableName) ? event.toString() : AppConfig.staticEvent.get(Objects.requireNonNull(computing_expression).trim().toLowerCase().substring(0, 6))) : calculationLibrary.get("event").toString();
|
|
|
- if (null == eventFlag) {
|
|
|
- calcData.add(processFail("事件标志为空", library_id));
|
|
|
- returnData.put("calcData", calcData);
|
|
|
- return returnData;
|
|
|
- }
|
|
|
- //算法取参
|
|
|
- List<Map<String,Object>> dbPrams = new ArrayList<>();
|
|
|
- Object tempDBPrams = null;
|
|
|
- try {//添加try捕获异常
|
|
|
- if (Objects.isNull(calculationLibrary.get("parmIndex"))) {//未配置取参则默认取上一个算法
|
|
|
- Map<String, Object> tempMap = calcData.get(calcData.size() - 1);
|
|
|
- tempDBPrams = tempMap.containsKey("dataContent") ? tempMap.get("dataContent") : tempMap.get("returnData");
|
|
|
- } else {//算法取值位置表达式不为空
|
|
|
- tempDBPrams = calcData;//默认等于全量参数
|
|
|
- String[] parmSplit = calculationLibrary.get("parmIndex").toString().split("\\.");//按.进行分割
|
|
|
- for (String item : parmSplit) {//循环提取参数
|
|
|
- if (MapTools.isNumber(item) && tempDBPrams instanceof List<?> tempList) {//数字代表数组中的位置
|
|
|
- tempDBPrams = tempList.get(Integer.parseInt(item));
|
|
|
- } else {
|
|
|
- if (tempDBPrams instanceof Map<?,?> tempMap){
|
|
|
- tempDBPrams = tempMap.get(item);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- if (tempDBPrams instanceof Map tempMap) {//最终参数是Map则转换为List<Map<String, Object>>
|
|
|
-// if (tempMap.isEmpty()) {
|
|
|
-// calcData.add(new HashMap<>());
|
|
|
-// returnData.put("calcData", calcData);
|
|
|
-// return returnData;
|
|
|
-// }
|
|
|
- dbPrams.add(tempMap);
|
|
|
- } else if (tempDBPrams instanceof List tempList) {
|
|
|
-// if (tempMap.isEmpty()) {
|
|
|
-// calcData.add(new HashMap<>());
|
|
|
-// returnData.put("calcData", calcData);
|
|
|
-// return returnData;
|
|
|
-// }
|
|
|
- dbPrams = tempList;
|
|
|
- }
|
|
|
- } catch (Exception e) {
|
|
|
- calcData.add(processFail("入参数据格式错误:不是List<Map<String,Object> " + tempDBPrams, library_id));
|
|
|
- returnData.put("calcData", calcData);
|
|
|
- return returnData;
|
|
|
- }
|
|
|
- // 事件6 在算法表中的event定义,实现自动的新增或更新
|
|
|
- if (eventFlag.equals("6")) {
|
|
|
- currentResult = myDbHelper.insertOrUpdate(tableName, dbPrams);
|
|
|
- } else {
|
|
|
- if (eventFlag.equals("7")){
|
|
|
- currentResult = myDbHelper.bathEvent7(computing_expression, dbPrams);
|
|
|
- }else {
|
|
|
- currentResult = myDbHelper.generalProcess(eventFlag, tableName, computing_expression, null, dbPrams, Objects.nonNull(isActiveTable) && Objects.equals(isActiveTable, "true"), page, pageSize);
|
|
|
- }
|
|
|
- }
|
|
|
- } else {
|
|
|
- currentResult = execEngine(calculationLibrary, inData, dataObjectId);
|
|
|
+ //依据算法类型调用数据库对象或脚本引擎对象进行算法执行
|
|
|
+ Map<String, Object> currentResult = calculationLibrary.get("library_type").toString().equals("3") ? execDB(lastLibraryId, calculationLibrary, inData, dataObjectId) : execEngine(lastLibraryId, calculationLibrary, inData);
|
|
|
+ setServiceErrorCount(lastLibraryId, Objects.nonNull(currentResult) && currentResult.containsKey("code") ? currentResult.get("code").toString() : "0");//依据返回的code进行连续错误计数和重置
|
|
|
+ calcData.add(currentResult);//添加到全量算法结果中
|
|
|
+ if (Objects.nonNull(currentResult) && !currentResult.get("code").equals("0")) {//算法执行异常则退出
|
|
|
+ break;
|
|
|
}
|
|
|
-// if (!currentResult.get("code").equals("0")) {//算法执行异常
|
|
|
-// currentResult.put("library_id", library_id);
|
|
|
-// }
|
|
|
- currentResult.put("library_id", library_id);
|
|
|
- calcData.add(currentResult);
|
|
|
- returnData.put("calcData", calcData);
|
|
|
-// System.out.println("算法ID : " + library_id + " 执行时间为:" + (System.currentTimeMillis() - l));
|
|
|
- setServiceErrorCount(library_id, serviceId, "0"); /// 错误次数恢复??
|
|
|
}
|
|
|
-// System.out.println("服务ID : " + serviceId + " 执行时间为:" + (System.currentTimeMillis() - l2));
|
|
|
+ returnData.put("library_id", lastLibraryId);
|
|
|
+ returnData.put("preData", preData);
|
|
|
+ returnData.put("calcData", calcData);
|
|
|
return returnData;
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 算法连续错误次数判定
|
|
|
- *
|
|
|
- * @param location
|
|
|
- * @param serviceId
|
|
|
- * @param success
|
|
|
- */
|
|
|
- public void setServiceErrorCount(String location, String serviceId, String success) {
|
|
|
- String key = String.format("processData %s_%s", location, serviceId);
|
|
|
- if (!success.equals("0")) {
|
|
|
- Integer errCount = serviceErrorCount.get(key);
|
|
|
- errCount = (null == errCount) ? 1 : errCount + 1;
|
|
|
- serviceErrorCount.put(key, errCount);
|
|
|
- if (errCount > AppConfig.SERVICE_ERR_MAX) {
|
|
|
- ServiceInputControl.stop(serviceId);
|
|
|
- }
|
|
|
- } else {
|
|
|
- serviceErrorCount.put(key, 0);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private Map<String, Object> execEngine(Map<String, Object> currentCalMap, List<Map<String, Object>> allData, String dataObjectId) {
|
|
|
- String library_id = currentCalMap.get("library_id").toString();//获取算法编号
|
|
|
+ //脚本引擎执行
|
|
|
+ private Map<String, Object> execEngine(String library_id, Map<String, Object> currentCalMap, List<Map<String, Object>> calcAllData) {
|
|
|
if (!ScriptEngineProMaps.containsKey(library_id)) {//不存在缓存则创建
|
|
|
ScriptEngineProMaps.put(library_id, new ScriptEnginePro(currentCalMap));
|
|
|
}
|
|
|
ScriptEnginePro currentEngin = ScriptEngineProMaps.get(library_id);//依据算法编号获取算法引擎
|
|
|
- if (Objects.nonNull(currentEngin) && Objects.nonNull(currentEngin.getErrorMessage())) {//获取错误引擎
|
|
|
+ String errMessage = "";
|
|
|
+ if (Objects.nonNull(currentEngin) && Objects.nonNull(currentEngin.getErrorMessage())) {//获取引擎出错则进行关闭
|
|
|
+ errMessage = currentEngin.getErrorMessage();
|
|
|
currentEngin.close();
|
|
|
currentEngin = null;
|
|
|
+ }
|
|
|
+ if (Objects.isNull(currentEngin)) {//未获取到引擎则返回异常
|
|
|
ScriptEngineProMaps.remove(library_id);
|
|
|
+ return processFail("创建引擎失败".concat(errMessage), library_id);
|
|
|
}
|
|
|
- if (Objects.isNull(currentEngin) || Objects.nonNull(currentEngin.getErrorMessage())) {
|
|
|
- return processFail("创建引擎失败" + (Objects.nonNull(currentEngin) ? currentEngin.getErrorMessage() : ""), library_id);
|
|
|
+ Map<String, String> parmaNames = currentEngin.getParmaNames();//获取对应脚本订阅的变量列表/*依据当前算法 数据订阅规则 进行数据的提取*/
|
|
|
+ Map<String, Object> paramValue = new HashMap<>();
|
|
|
+ for (String key : parmaNames.keySet()) {//理论上此处最多单个key是list[],如果存在多个list[]则应开启迪卡乘积
|
|
|
+ Object currentData = dataSubscription(calcAllData, key, currentCalMap);//获取数据
|
|
|
+ //if(key.indexOf("[") > 0){//订阅规则明确开启线程:String[3] 代表开启3个线程执行;Map[col1] 代表按COL1数据分组数开启多线程执行
|
|
|
+ //开启多线程操作同一个脚本对象会存在数据污染情况,除非开启多线程时,每个线程对应一个独立脚本对象,或者脚本对象做了数据隔离,目前解析并没有出现性能瓶颈,暂时不考虑
|
|
|
+ //}else{
|
|
|
+ if (currentCalMap.get("library_type").toString().equals("2")){
|
|
|
+ paramValue.put(parmaNames.get(key), currentData);
|
|
|
+ }else {
|
|
|
+ paramValue.put(key, currentData);
|
|
|
+ }
|
|
|
+ //}
|
|
|
}
|
|
|
- return currentEngin.execScript(allData);
|
|
|
+ return currentEngin.execScript(paramValue);
|
|
|
}
|
|
|
+ //数据库算法执行
|
|
|
+ private Map<String, Object> execDB(String library_id, Map<String, Object> calculationLibrary, List<Map<String, Object>> calcData, String dataObjectId) {
|
|
|
+ Object connectConfigObj = calculationLibrary.get("connectConfig");
|
|
|
+ if (Objects.isNull(connectConfigObj)) {
|
|
|
+ return processFail("连接信息未配置 ", library_id);
|
|
|
+ }
|
|
|
+ if (!calcDbHelperMaps.containsKey(library_id)) {//不存在缓存则创建
|
|
|
+ calcDbHelperMaps.put(library_id, new MyDbHelper(connectConfigObj.toString()));
|
|
|
+ }
|
|
|
+ MyDbHelper myDbHelper = calcDbHelperMaps.get(library_id);//依据算法编号获取数据库处理对象
|
|
|
+ String errMessage = "";
|
|
|
+ if (Objects.nonNull(myDbHelper) && MapTools.isNotBlank(errMessage)) {//获取出现错误则进行关闭
|
|
|
+ errMessage = myDbHelper.getErrorMessage();
|
|
|
+ myDbHelper.close();
|
|
|
+ myDbHelper = null;
|
|
|
+ }
|
|
|
+ if (Objects.isNull(myDbHelper)) {//未获取到则返回异常
|
|
|
+ calcDbHelperMaps.remove(library_id);
|
|
|
+ return processFail("获取业务数据库对象失败".concat(errMessage), library_id);
|
|
|
+ }
|
|
|
+ //未指定订阅则默认为上一个算法的结果:
|
|
|
+ String paramIndex = Objects.isNull(calculationLibrary.get("parmIndex")) ? (String.valueOf(calcData.size() - 1)).concat(".returnData") : calculationLibrary.get("parmIndex").toString();
|
|
|
+ Object tempDBPrams = dataSubscription(calcData, "List.".concat(paramIndex), calculationLibrary);//获取数据订阅结果
|
|
|
+ //如果表名和SQL同时不为空且事件为7则,分解为先查询,后更新方式,可以有效使用预编译,性能会更高
|
|
|
+ //事件7的select应该是固定为书名号方式,而不是动态组建方式,因为要跟insert或update对应,如果是动态条件则不使用事件7而是采用算法分离方式进行
|
|
|
+ if("7".equals(calculationLibrary.get("event")) && !Objects.isNull(calculationLibrary.get("tableName")) && !Objects.isNull(calculationLibrary.get("computing_expression"))){
|
|
|
+ Map<String,Object> tempCalcInfo = calculationLibrary;
|
|
|
+ String tableName = tempCalcInfo.get("tableName").toString();//先记录表名
|
|
|
+ String sqlStr = tempCalcInfo.get("computing_expression").toString();//记录SQL语句,方便获取查询条件,用于组建事件6的filter
|
|
|
+ tempCalcInfo.put("tableName","");//清空表名,方便执行SQL
|
|
|
+ tempDBPrams = myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//先执行sql获取结果后
|
|
|
+ //从myDbHelper获取sqlStrVarList.get(sqlStr)可以获取到查询条件,用于组建事件6的filter
|
|
|
+ //获取tempDBPrams,循环returnData,添加filter,否则无法执行事件6
|
|
|
+ tempCalcInfo.put("tableName",tableName);//还原表名
|
|
|
+ tempCalcInfo.put("computing_expression","");//清除SQL
|
|
|
+ tempCalcInfo.put("event","6");//执行新增或更新
|
|
|
+ return myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//调用表名方式执行事件
|
|
|
+ }
|
|
|
+ return myDbHelper.generalProcess(calculationLibrary, tempDBPrams, dataObjectId, calcData.get(0));
|
|
|
+ }
|
|
|
+ /*数据订阅:注意因前置导致算法未执行时,全量结果集的序号会存在问题*/
|
|
|
+ private Object dataSubscription(List<Map<String, Object>> calcAllData, String paramRule, Map<String, Object> calculationLibrary) { // List.1.returnData
|
|
|
+ String[] itemRule = paramRule.split("\\.");//订阅规则按.进行分割
|
|
|
+ String dataType = itemRule.length > 0 ? itemRule[0] : "List"; //首位是最终返回的数据类型
|
|
|
+ dataType = dataType.endsWith("]")?dataType.substring(0,dataType.indexOf("[")):dataType;//订阅时需要把[]去掉,在调用脚本引擎时进行分解
|
|
|
+ String dataLocation = itemRule.length > 1 ? itemRule[1] : ""; //数据位置:代表从哪个算法结果中进行取值
|
|
|
+ if (dataLocation.equals("T")) {//固定参数 //JSON.T.{"BSM":""}----String.T.代表"",List.T.代表[],Map.T.代表{},任意类型.T代表null
|
|
|
+ Object tempObj = itemRule.length > 2 ? MapTools.isBlank(itemRule[2])?("List".equals(dataType)?new ArrayList<>():("JSON,Map".contains(dataType)?new HashMap<>():null)):itemRule[2]:null;
|
|
|
+ if ("Boolean".equals(dataType)){
|
|
|
+ tempObj = itemRule[2].equals("true");
|
|
|
+ }
|
|
|
+ return tempObj;
|
|
|
|
|
|
- /**
|
|
|
- * 支持的最大机器id,结果是31 (这个移位算法可以很快计算出几位二进制数所能表示的最大十进制数)
|
|
|
- */
|
|
|
- private final String maxWorkerId = "4";
|
|
|
- /**
|
|
|
- * 支持的最大数据标识id,结果是31
|
|
|
- */
|
|
|
- private final String maxServiceId = "4";
|
|
|
- /**
|
|
|
- * 毫秒内序列(0~4095)
|
|
|
- */
|
|
|
- private final Long maxSequence = 999L;
|
|
|
-
|
|
|
- private long sequence = 0L;
|
|
|
-
|
|
|
- /**
|
|
|
- * 上次生成ID的时间截
|
|
|
- */
|
|
|
- private long lastTimestamp = -1L;
|
|
|
-
|
|
|
- /**
|
|
|
- * 生命周期ID生成
|
|
|
- *
|
|
|
- * @param workerId
|
|
|
- * @param serviceId
|
|
|
- * @return
|
|
|
- */
|
|
|
- public String createLifeCycleCol(Long workerId, Integer serviceId) {
|
|
|
+ }
|
|
|
+ //如果是L则从算法配置信息中获取,否则从全量计算结果中获取
|
|
|
+ Object returnData = dataLocation.equals("L") ? calculationLibrary : MapTools.isNumber(dataLocation) ? calcAllData.get(Integer.parseInt(dataLocation)) : null; // String.L.connectConfig.username
|
|
|
+ for (int index = 2; index < itemRule.length; index++) {
|
|
|
+ if(Objects.isNull(returnData)) return null;
|
|
|
+ if (returnData instanceof String) {//如果获取的是字符串则自动转MAP--XML字符串也自动转Map
|
|
|
+ returnData = MapTools.strToObj(returnData.toString());// 1. 简单字符串 , 2. map字符串 3. List字符串
|
|
|
+ }
|
|
|
+ if (MapTools.isNumber(itemRule[index])) {//数字代表从当前参数中取第N位,如果当前参数并不是List应该返回全部不应该返回空
|
|
|
+ //
|
|
|
+ returnData = returnData instanceof List ? ((List<?>) returnData).get(Integer.parseInt(itemRule[index])) : returnData;
|
|
|
+ } else {//不是数字代表从当前参数中取对应的键,如果当前参数是List应该返回List中首个对应的键值
|
|
|
+ if(returnData instanceof Map){//是MAP则直接获取对应Key的值
|
|
|
+ returnData = ((Map<?, ?>) returnData).get(itemRule[index]);
|
|
|
+ }else{//简单的错误兼容,尤其是某些时候数据可能是单条可能是数组时
|
|
|
+ if(returnData instanceof List && ((List<?>) returnData).size() > 0 && ((List<?>) returnData).get(0) instanceof Map){//如果是数组则且有下层
|
|
|
+ returnData = ((Map<?, ?>) ((List<?>) returnData).get(0)).get(itemRule[index]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (Objects.nonNull(returnData)) {//获取到了订阅的数据则进行数据格式转换
|
|
|
+ if ("List,Map".contains(dataType) && returnData instanceof String) {//订阅是数组或对象则字符串转对象
|
|
|
+ returnData = MapTools.strToObj(returnData.toString());
|
|
|
+ }
|
|
|
+ if ("List".equals(dataType) && !(returnData instanceof List<?>)) {//订阅是数组但是实际不是数组则添加到数组
|
|
|
+ List<Object> tempList = new ArrayList<>();
|
|
|
+ tempList.add(returnData);
|
|
|
+ returnData = tempList;
|
|
|
+ }
|
|
|
+// if ("List".equals(dataType) && returnData instanceof List<?> && MapTools.isXMLList(returnData)) {//订阅是数组但是实际是XML字符串数组则自动转换为list<map>
|
|
|
+// returnData = MapTools.XMLListToListMap(returnData);//减少JAVA的动态调用,目前主要就是航班报文,BSMBPM等订阅的是String,所以不会进入这里
|
|
|
+// }
|
|
|
+ returnData = "Map".equals(dataType) && !(returnData instanceof Map<?, ?>) ? null//是不是不妥
|
|
|
+ : ("String".equals(dataType) ? returnData.toString()
|
|
|
+ : ("Boolean".equals(dataType) ? returnData.toString().equals("true")
|
|
|
+ : ("JSONStr".equals(dataType) ? MapTools.objToJSONStr(returnData)
|
|
|
+ : returnData)));
|
|
|
+ }
|
|
|
+ return returnData;
|
|
|
+ }
|
|
|
+ /*算法连续错误次数判定*/
|
|
|
+ public void setServiceErrorCount(String library_id, String success) {
|
|
|
+ if (!success.equals("0")) {//如果是异常
|
|
|
+ Integer errCount = Objects.isNull(serviceErrorCount.get(library_id))?1:serviceErrorCount.get(library_id)+1;//获取当前异常连续数
|
|
|
+ serviceErrorCount.put(library_id,errCount);//连续数加1
|
|
|
+ if (errCount > AppConfig.SERVICE_ERR_MAX) {//如果连续错误数超过设定值则停止当前服务
|
|
|
+ ServiceInputControl.stop(serviceId);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ serviceErrorCount.put(library_id, 0);//如果是成功则重置连续错误数为0
|
|
|
+ }
|
|
|
+ }
|
|
|
+ private long sequence = 0L;
|
|
|
+ private long lastTimestamp = -1L;
|
|
|
+ public String createLifeCycleCol(Long workerId, Integer serviceId) {
|
|
|
long timestamp = System.currentTimeMillis();
|
|
|
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
|
|
|
- //如果是同一时间生成的,则进行毫秒内序列
|
|
|
- if (lastTimestamp == timestamp) {
|
|
|
+ if (lastTimestamp == timestamp) { //如果是同一时间生成的,则进行毫秒内序列
|
|
|
sequence++;
|
|
|
- //毫秒内序列溢出
|
|
|
- if (sequence > maxSequence) {
|
|
|
- //阻塞到下一个毫秒,获得新的时间戳
|
|
|
+ if (sequence > 999L) {//毫秒内序列溢出
|
|
|
sequence = 0;
|
|
|
- while (lastTimestamp == System.currentTimeMillis()) {
|
|
|
+ while (lastTimestamp == System.currentTimeMillis()) {//阻塞到下一个毫秒,获得新的时间戳
|
|
|
}
|
|
|
timestamp = System.currentTimeMillis();
|
|
|
}
|
|
|
- }
|
|
|
- //时间戳改变,毫秒内序列重置
|
|
|
- else {
|
|
|
+ } else {
|
|
|
sequence = 0L;
|
|
|
}
|
|
|
- //上次生成ID的时间截
|
|
|
- lastTimestamp = timestamp;
|
|
|
+ lastTimestamp = timestamp;//上次生成ID的时间截
|
|
|
//移位并通过或运算拼到一起组成64位的ID
|
|
|
- return timestamp + "" + (String.format("%0" + (maxSequence.toString().length()) + "d", sequence)) + (String.format("%0" + maxWorkerId + "d", workerId)) + (String.format("%0" + maxServiceId + "d", serviceId));
|
|
|
+ return String.valueOf(timestamp).concat(String.format("%03d", sequence)).concat(String.format("%04d", workerId)).concat(String.format("%04d", serviceId));
|
|
|
}
|
|
|
|
|
|
public String getServiceId() {
|
|
@@ -481,12 +383,7 @@ public class DataProcess {
|
|
|
return errorMessage;
|
|
|
}
|
|
|
|
|
|
- public long getLastTimestamp() {
|
|
|
- return lastTimestamp;
|
|
|
- }
|
|
|
-
|
|
|
public long getLastActive() {
|
|
|
return lastActive;
|
|
|
}
|
|
|
-
|
|
|
}
|