|
@@ -57,21 +57,31 @@ public class DataProcess {//数据处理对象
|
|
|
|
|
|
//数据处理服务对象关闭时,关闭缓存的数据库对象以及脚本引擎对象
|
|
|
public void close() {//销毁当前服务对象时,需要将缓存的算法引擎实例同步销毁
|
|
|
- for (ScriptEnginePro scriptEnginePro : ScriptEngineProMaps.values()) {//脚本引擎
|
|
|
- scriptEnginePro.close();//关闭动态方法实例、缓存等
|
|
|
- }
|
|
|
- ScriptEngineProMaps.clear();//清除缓存
|
|
|
- for (MyDbHelper calcMyDbHelper : calcDbHelperMaps.values()) {//数据库对象
|
|
|
- calcMyDbHelper.close();//关闭数据库连接池
|
|
|
+ try {
|
|
|
+ for (ScriptEnginePro scriptEnginePro : ScriptEngineProMaps.values()) {//脚本引擎
|
|
|
+ scriptEnginePro.close();//关闭动态方法实例、缓存等
|
|
|
+ }
|
|
|
+ ScriptEngineProMaps.clear();//清除缓存
|
|
|
+ for (MyDbHelper calcMyDbHelper : calcDbHelperMaps.values()) {//数据库对象
|
|
|
+ calcMyDbHelper.close();//关闭数据库连接池
|
|
|
+ }
|
|
|
+ calcDbHelperMaps.clear();//清除缓存
|
|
|
+ baseDbHelper.close();//底座数据库对象
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println("dataProcess 关闭异常: " + LogUtils.getException(e));
|
|
|
}
|
|
|
- calcDbHelperMaps.clear();//清除缓存
|
|
|
- baseDbHelper.close();//底座数据库对象
|
|
|
+
|
|
|
}
|
|
|
|
|
|
//统一成功信息处理
|
|
|
public Map<String, Object> processSuccess(Object returnData) {
|
|
|
if (returnData instanceof Map) {
|
|
|
- Map<String, Object> resultData1 = (Map<String, Object>) returnData;
|
|
|
+ Map<String, Object> resultData1 = new HashMap<>();
|
|
|
+ try {
|
|
|
+ resultData1 = (Map<String, Object>) returnData;
|
|
|
+ } catch (Exception e) {
|
|
|
+ resultData1.put("returnData",returnData);
|
|
|
+ }
|
|
|
resultData1.put("code", "0");
|
|
|
return resultData1;
|
|
|
}
|
|
@@ -94,95 +104,106 @@ public class DataProcess {//数据处理对象
|
|
|
|
|
|
//数据处理对外接口
|
|
|
public Map<String, Object> processData(Map<String, Object> inputData, String... user_id) { //{serviceid:1,datacontent:[{key:value},{key:value}],event:0,page:1,pagesize:10}
|
|
|
- if (!MapTools.isBlank(errorMessage)) {//如果当前服务存在问题,代表数据库对象不可用,此时应该重构当前对象,调用方控制
|
|
|
- LogUtils.log("processData:1", "-1", null, "服务不可用".concat(errorMessage), serviceId, inputData, null, null, null);
|
|
|
- return processFail("服务不可用".concat(errorMessage), null);
|
|
|
- }
|
|
|
- if (System.currentTimeMillis() - lastActive > 2000) { // 更新数据库中的服务最新活跃时间
|
|
|
- lastActive = System.currentTimeMillis();//更新最后活跃时间 //------更新当前服务的最后活跃时间,用于服务监控
|
|
|
- baseDbHelper.updateByCondition("update serviceinfo set runState = ? ,lastactive = ? where serviceID =?", null, "1", lastActive, serviceId); // 服务表增加最后活跃时间
|
|
|
- }
|
|
|
- //创建生命周期ID;//默认继承
|
|
|
- if (!inputData.containsKey("dataObjectId")) {
|
|
|
- inputData.put("dataObjectId", createLifeCycleCol(Long.valueOf(AppConfig.WORK_ID), Integer.parseInt(serviceId))); // 生成新的生命周期ID
|
|
|
- }
|
|
|
- String dataObjectId = inputData.get("dataObjectId").toString();
|
|
|
+ try {
|
|
|
|
|
|
- inputData = authCheck(inputData, user_id); /*权限检查,添加列权限以及行权限到inputData中*/
|
|
|
- if ("-1".equals(inputData.get("code")) || (Objects.nonNull( inputData.get("message")) && inputData.get("message").toString().equals("服务未授任何数据权限"))) {
|
|
|
- return inputData;
|
|
|
- }
|
|
|
- List<Map<String, Object>> calcData = new ArrayList<>();//定义算法全量结果列表
|
|
|
- calcData.add(inputData);//默认入口参数为第0个算法的结果
|
|
|
- Map<String, Object> execResult = execCalultion(calcData, null, dataObjectId);//执行所有的算法库
|
|
|
- String library_id = execResult.get("library_id").toString();//获取最后一个算法的编号
|
|
|
- List<Map<String, Object>> preListData = (List<Map<String, Object>>) execResult.get("preData");//获取所有的前置检测结果
|
|
|
- calcData = (List<Map<String, Object>>) execResult.get("calcData");//所有的算法结果
|
|
|
- //依据算法编号规则确定最后一个算法结果,是前置还是算法
|
|
|
+ if (!MapTools.isBlank(errorMessage)) {//如果当前服务存在问题,代表数据库对象不可用,此时应该重构当前对象,调用方控制
|
|
|
+ LogUtils.log("processData:1", "-1", null, "服务不可用".concat(errorMessage), serviceId, inputData, null, null, null);
|
|
|
+ return processFail("服务不可用".concat(errorMessage), null);
|
|
|
+ }
|
|
|
+ if (System.currentTimeMillis() - lastActive > 2000) { // 更新数据库中的服务最新活跃时间
|
|
|
+ lastActive = System.currentTimeMillis();//更新最后活跃时间 //------更新当前服务的最后活跃时间,用于服务监控
|
|
|
+ baseDbHelper.updateByCondition("update serviceinfo set runState = ? ,lastactive = ? where serviceID =?", null, "1", lastActive, serviceId); // 服务表增加最后活跃时间
|
|
|
+ }
|
|
|
+ //创建生命周期ID;//默认继承
|
|
|
+ if (!inputData.containsKey("dataObjectId")) {
|
|
|
+ inputData.put("dataObjectId", createLifeCycleCol(Long.valueOf(AppConfig.WORK_ID), Integer.parseInt(serviceId))); // 生成新的生命周期ID
|
|
|
+ }
|
|
|
+ String dataObjectId = inputData.get("dataObjectId").toString();
|
|
|
+
|
|
|
+ inputData = authCheck(inputData, user_id); /*权限检查,添加列权限以及行权限到inputData中*/
|
|
|
+ if ("-1".equals(inputData.get("code")) || (Objects.nonNull(inputData.get("message")) && inputData.get("message").toString().equals("服务未授任何数据权限"))) {
|
|
|
+ return inputData;
|
|
|
+ }
|
|
|
+ List<Map<String, Object>> calcData = new ArrayList<>();//定义算法全量结果列表
|
|
|
+ calcData.add(inputData);//默认入口参数为第0个算法的结果
|
|
|
+ Map<String, Object> execResult = execCalultion(calcData, null, dataObjectId);//执行所有的算法库
|
|
|
+ String library_id = execResult.get("library_id").toString();//获取最后一个算法的编号
|
|
|
+ List<Map<String, Object>> preListData = (List<Map<String, Object>>) execResult.get("preData");//获取所有的前置检测结果
|
|
|
+ calcData = (List<Map<String, Object>>) execResult.get("calcData");//所有的算法结果
|
|
|
+ //依据算法编号规则确定最后一个算法结果,是前置还是算法
|
|
|
|
|
|
// preListData = preListData.stream().filter(Objects::isNull).collect(Collectors.toList());
|
|
|
|
|
|
|
|
|
- Map<String, Object> lastResult = library_id.endsWith("N") ? preListData.get(preListData.size() - 1) : calcData.get(calcData.size() - 1);//获取最后一个计算结果
|
|
|
- if (Objects.equals(lastResult.get("code"), "-1")) {//最后一个计算结果为-1则记录异常,否则全部记录正常
|
|
|
- Object message = lastResult.get("message");
|
|
|
- LogUtils.log("processData:3", "-1", library_id, Objects.nonNull(message) ? message.toString() : null, serviceId, calcData, dataObjectId, preListData, inputData.get("event"));
|
|
|
- return processFail(library_id.concat("数据计算错误:").concat(lastResult.get("message").toString()), library_id);
|
|
|
+ Map<String, Object> lastResult = library_id.endsWith("N") ? preListData.get(preListData.size() - 1) : calcData.get(calcData.size() - 1);//获取最后一个计算结果
|
|
|
+ if (Objects.equals(lastResult.get("code"), "-1")) {//最后一个计算结果为-1则记录异常,否则全部记录正常
|
|
|
+ Object message = lastResult.get("message");
|
|
|
+ LogUtils.log("processData:3", "-1", library_id, Objects.nonNull(message) ? message.toString() : null, serviceId, calcData, dataObjectId, preListData, inputData.get("event"));
|
|
|
+ return processFail(library_id.concat("数据计算错误:").concat(lastResult.get("message").toString()), library_id);
|
|
|
+ }
|
|
|
+ //写入 成功日志
|
|
|
+ LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, calcData, dataObjectId, preListData, inputData.get("event")); //此处无法把所有的记过集返回过去
|
|
|
+
|
|
|
+ calcData = calcData.stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
|
+ return processSuccess(calcData.get(calcData.size() - 1)); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
|
|
|
+ } catch (Exception e) {
|
|
|
+ return processFail("processData 异常: " + LogUtils.getException(e),null);
|
|
|
}
|
|
|
- //写入 成功日志
|
|
|
- LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, calcData, dataObjectId, preListData, inputData.get("event")); //此处无法把所有的记过集返回过去
|
|
|
|
|
|
- calcData = calcData.stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
|
- return processSuccess(calcData.get(calcData.size() - 1)); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
|
|
|
}
|
|
|
|
|
|
/*权限检查*/
|
|
|
- public Map<String, Object> authCheck(Map<String, Object> inputData, String ... user_id) {
|
|
|
- Object appid = inputData.get("appid");//获取应用编号
|
|
|
- if (Objects.nonNull(appid) && !serviceAuthMap.contains(appid.toString())) {//上传了应用编号但是不在当前服务授权的应用列表中则返回错误
|
|
|
- return processFail("服务未授权", null);
|
|
|
- }
|
|
|
- /*在构造函数中获取超级用户名单,方便此处的判定*/
|
|
|
- if (Objects.nonNull(user_id) && inputData.containsKey("authId") && !"1,9999".contains(user_id[0])) {//用户编号不为空,权限编号不为空,不是超级用户
|
|
|
- Map<String, Object> userDataAuthMap = baseDbHelper.queryByParamsReturnList("SELECT QCS.columnName,TUA.row_auth FROM t_user_group_auth TUA LEFT JOIN querytemplatecolumnset QCS on TUA.queryTemplateColumnSetID = QCS.queryTemplateColumnSetID WHERE TUA.user_id = ? AND TUA.auth_id = ? AND QCS.columnName is not null AND TUA.queryTemplateColumnSetID IS NOT NULL", user_id[0], inputData.get("authId"));
|
|
|
- if (userDataAuthMap.get("code").equals("-1") || Objects.isNull(userDataAuthMap.get("returnData")) || ((List<Map<String, Object>>) userDataAuthMap.get("returnData")).isEmpty()) {
|
|
|
- if (userDataAuthMap.get("code").equals("0")) {
|
|
|
- userDataAuthMap.put("message", "服务未授任何数据权限");
|
|
|
- }
|
|
|
- return userDataAuthMap;
|
|
|
+ public Map<String, Object> authCheck(Map<String, Object> inputData, String... user_id) {
|
|
|
+ try {
|
|
|
+ Object appid = inputData.get("appid");//获取应用编号
|
|
|
+ if (Objects.nonNull(appid) && !serviceAuthMap.contains(appid.toString())) {//上传了应用编号但是不在当前服务授权的应用列表中则返回错误
|
|
|
+ return processFail("服务未授权", null);
|
|
|
}
|
|
|
- List<Map<String, Object>> rowAuth = new ArrayList<>();//初始化行权限
|
|
|
- List<String> authColumn = new ArrayList<>();//初始化列权限
|
|
|
- List<Map<String, Object>> dataAuth = (List<Map<String, Object>>) userDataAuthMap.get("returnData");
|
|
|
- dataAuth.forEach(dataAuthMap -> {//循环获取到的权限
|
|
|
- String columnName = dataAuthMap.get("columnName").toString();//字段名
|
|
|
- authColumn.add(columnName);//添加到列权限
|
|
|
- Object rowAuthObj = dataAuthMap.get("row_auth");//获取对应列的行权限
|
|
|
- if (Objects.nonNull(rowAuthObj)) {//如果存在则组建标准的限制条件
|
|
|
- String tempRowAuth = rowAuthObj.toString();
|
|
|
- String connect = tempRowAuth.startsWith("!") ? "!=" : "=";//如果首位是!代表不等于,否则代表等于
|
|
|
- tempRowAuth = tempRowAuth.startsWith("!") ? tempRowAuth.substring(1) : tempRowAuth;//修订行权限表达式
|
|
|
- String[] row_auths = tempRowAuth.split(","); //按,号进行分组!PEK,
|
|
|
- for (String row_auth : row_auths) {//循环进行标准化
|
|
|
- if (MapTools.isBlank(row_auth)) continue;//为空不设置--误操作填写了空格或者删除未设置为NULL
|
|
|
- Map<String, Object> signRowAuth = new HashMap<>();
|
|
|
- signRowAuth.put(columnName, row_auth);
|
|
|
- rowAuth.addAll(baseDbHelper.changeSignFilter(signRowAuth, connect, connect.equals("!=") ? "and" : "or"));
|
|
|
+ /*在构造函数中获取超级用户名单,方便此处的判定*/
|
|
|
+ if (Objects.nonNull(user_id) && inputData.containsKey("authId") && !"1,9999".contains(user_id[0])) {//用户编号不为空,权限编号不为空,不是超级用户
|
|
|
+ Map<String, Object> userDataAuthMap = baseDbHelper.queryByParamsReturnList("SELECT QCS.columnName,TUA.row_auth FROM t_user_group_auth TUA LEFT JOIN querytemplatecolumnset QCS on TUA.queryTemplateColumnSetID = QCS.queryTemplateColumnSetID WHERE TUA.user_id = ? AND TUA.auth_id = ? AND QCS.columnName is not null AND TUA.queryTemplateColumnSetID IS NOT NULL", user_id[0], inputData.get("authId"));
|
|
|
+ if (userDataAuthMap.get("code").equals("-1") || Objects.isNull(userDataAuthMap.get("returnData")) || ((List<Map<String, Object>>) userDataAuthMap.get("returnData")).isEmpty()) {
|
|
|
+ if (userDataAuthMap.get("code").equals("0")) {
|
|
|
+ userDataAuthMap.put("message", "服务未授任何数据权限");
|
|
|
+ }
|
|
|
+ return userDataAuthMap;
|
|
|
+ }
|
|
|
+ List<Map<String, Object>> rowAuth = new ArrayList<>();//初始化行权限
|
|
|
+ List<String> authColumn = new ArrayList<>();//初始化列权限
|
|
|
+ List<Map<String, Object>> dataAuth = (List<Map<String, Object>>) userDataAuthMap.get("returnData");
|
|
|
+ dataAuth.forEach(dataAuthMap -> {//循环获取到的权限
|
|
|
+ String columnName = dataAuthMap.get("columnName").toString();//字段名
|
|
|
+ authColumn.add(columnName);//添加到列权限
|
|
|
+ Object rowAuthObj = dataAuthMap.get("row_auth");//获取对应列的行权限
|
|
|
+ if (Objects.nonNull(rowAuthObj)) {//如果存在则组建标准的限制条件
|
|
|
+ String tempRowAuth = rowAuthObj.toString();
|
|
|
+ String connect = tempRowAuth.startsWith("!") ? "!=" : "=";//如果首位是!代表不等于,否则代表等于
|
|
|
+ tempRowAuth = tempRowAuth.startsWith("!") ? tempRowAuth.substring(1) : tempRowAuth;//修订行权限表达式
|
|
|
+ String[] row_auths = tempRowAuth.split(","); //按,号进行分组!PEK,
|
|
|
+ for (String row_auth : row_auths) {//循环进行标准化
|
|
|
+ if (MapTools.isBlank(row_auth)) continue;//为空不设置--误操作填写了空格或者删除未设置为NULL
|
|
|
+ Map<String, Object> signRowAuth = new HashMap<>();
|
|
|
+ signRowAuth.put(columnName, row_auth);
|
|
|
+ rowAuth.addAll(baseDbHelper.changeSignFilter(signRowAuth, connect, connect.equals("!=") ? "and" : "or"));
|
|
|
+ }
|
|
|
+ if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
|
|
|
+ rowAuth.get(0).put("left", "(");
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("connector", " and ");
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("right", " ) ");
|
|
|
+ } //(c1 = pek )
|
|
|
}
|
|
|
- if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
|
|
|
- rowAuth.get(0).put("left", "(");
|
|
|
- rowAuth.get(rowAuth.size() - 1).put("connector", " and ");
|
|
|
- rowAuth.get(rowAuth.size() - 1).put("right", " ) ");
|
|
|
- } //(c1 = pek )
|
|
|
+ });
|
|
|
+ if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
|
|
|
+ rowAuth.get(0).put("left", "(".concat(rowAuth.get(0).get("left").toString()));
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("connector", " and ");
|
|
|
+ rowAuth.get(rowAuth.size() - 1).put("right", " ) ".concat(rowAuth.get(rowAuth.size() - 1).get("right").toString()));
|
|
|
+ inputData.put("rowAuth", rowAuth);//添加到入参,方便后面调用
|
|
|
}
|
|
|
- });
|
|
|
- if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
|
|
|
- rowAuth.get(0).put("left", "(".concat(rowAuth.get(0).get("left").toString()));
|
|
|
- rowAuth.get(rowAuth.size() - 1).put("connector", " and ");
|
|
|
- rowAuth.get(rowAuth.size() - 1).put("right", " ) ".concat(rowAuth.get(rowAuth.size() - 1).get("right").toString()));
|
|
|
- inputData.put("rowAuth", rowAuth);//添加到入参,方便后面调用
|
|
|
+ inputData.put("authColumn", authColumn);//添加到入参,方便后面调用
|
|
|
}
|
|
|
- inputData.put("authColumn", authColumn);//添加到入参,方便后面调用
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ System.out.println("authCheck 异常:" + LogUtils.getException(e));
|
|
|
}
|
|
|
return inputData;
|
|
|
}
|
|
@@ -193,172 +214,184 @@ public class DataProcess {//数据处理对象
|
|
|
List<Map<String, Object>> preData = new ArrayList<>();//初始化前置检测结果列表
|
|
|
String lastLibraryId = "";//初始化最后一个算法的编号
|
|
|
List<Map<String, Object>> calcData = Objects.isNull(inData) || inData.isEmpty() ? new ArrayList<>() : inData;//默认算法全量计算结果等于入口参数
|
|
|
- Map<String, Object> preCalMap = new HashMap<>();//用于组建前置检测算法配置
|
|
|
- for (Map<String, Object> calculationLibrary : calcList) {//循环算法库
|
|
|
- lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
|
|
|
- if (MapTools.isNotBlank(beginLibraryId) && !beginLibraryId.equals(lastLibraryId)) {
|
|
|
- continue;//如果异常恢复开始算法编号不为空且不等于当前循环的算法编号则跳过
|
|
|
- }
|
|
|
- beginLibraryId = null;//为避免异常恢复开始后的后续算法编号不等于当前循环的算法编号的跳过
|
|
|
- Object is_exec = calculationLibrary.get("is_exec");//获取前置检测表达式
|
|
|
- if (Objects.nonNull(is_exec) && !MapTools.isBlank(is_exec.toString())) {//存在则进行前置检测
|
|
|
- lastLibraryId = lastLibraryId.concat("_N");//修订算法编号,避免与正式算法冲突
|
|
|
- preCalMap.put("library_id", lastLibraryId);//组建前置算法配置信息
|
|
|
- preCalMap.put("library_type", 2);/*如果脚本包含function则js否则java*/
|
|
|
- preCalMap.put("computing_expression", is_exec); //组建前置算法配置信息
|
|
|
- Map<String, Object> preEnginResult = execEngine(lastLibraryId, preCalMap, calcData);//调用算法引擎进行执行
|
|
|
- preData.add(preEnginResult);//添加执行结果到前置检测结果列表
|
|
|
- setServiceErrorCount(lastLibraryId, preEnginResult.get("code").toString());//依据返回的code进行连续错误计数和重置
|
|
|
- if ("-1".equals(preEnginResult.get("code")) || Objects.equals("2", preEnginResult.get("returnData"))) {
|
|
|
- break; //如果code为-1 则表示脚本计算错误 退出计算;如果code为2 则后续算法都不执行
|
|
|
+ try {
|
|
|
+ Map<String, Object> preCalMap = new HashMap<>();//用于组建前置检测算法配置
|
|
|
+ for (Map<String, Object> calculationLibrary : calcList) {//循环算法库
|
|
|
+ lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
|
|
|
+ if (MapTools.isNotBlank(beginLibraryId) && !beginLibraryId.equals(lastLibraryId)) {
|
|
|
+ continue;//如果异常恢复开始算法编号不为空且不等于当前循环的算法编号则跳过
|
|
|
}
|
|
|
- if (Objects.equals("1", preEnginResult.get("returnData"))) { //前置检查不通过,当前算法不执行,后续 还需执行
|
|
|
- calcData.add(null);//自动添加算法空结果,避免数据订阅时的序号出现错误
|
|
|
- continue;
|
|
|
+ beginLibraryId = null;//为避免异常恢复开始后的后续算法编号不等于当前循环的算法编号的跳过
|
|
|
+ Object is_exec = calculationLibrary.get("is_exec");//获取前置检测表达式
|
|
|
+ if (Objects.nonNull(is_exec) && !MapTools.isBlank(is_exec.toString())) {//存在则进行前置检测
|
|
|
+ lastLibraryId = lastLibraryId.concat("_N");//修订算法编号,避免与正式算法冲突
|
|
|
+ preCalMap.put("library_id", lastLibraryId);//组建前置算法配置信息
|
|
|
+ preCalMap.put("library_type", 2);/*如果脚本包含function则js否则java*/
|
|
|
+ preCalMap.put("computing_expression", is_exec); //组建前置算法配置信息
|
|
|
+ Map<String, Object> preEnginResult = execEngine(lastLibraryId, preCalMap, calcData);//调用算法引擎进行执行
|
|
|
+ preData.add(preEnginResult);//添加执行结果到前置检测结果列表
|
|
|
+ setServiceErrorCount(lastLibraryId, preEnginResult.get("code").toString());//依据返回的code进行连续错误计数和重置
|
|
|
+ if ("-1".equals(preEnginResult.get("code")) || Objects.equals("2", preEnginResult.get("returnData"))) {
|
|
|
+ break; //如果code为-1 则表示脚本计算错误 退出计算;如果code为2 则后续算法都不执行
|
|
|
+ }
|
|
|
+ if (Objects.equals("1", preEnginResult.get("returnData"))) { //前置检查不通过,当前算法不执行,后续 还需执行
|
|
|
+ calcData.add(null);//自动添加算法空结果,避免数据订阅时的序号出现错误
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
|
|
|
+ }
|
|
|
+ //依据算法类型调用数据库对象或脚本引擎对象进行算法执行
|
|
|
+ Map<String, Object> currentResult = calculationLibrary.get("library_type").toString().equals("3") ? execDB(lastLibraryId, calculationLibrary, inData, dataObjectId) : execEngine(lastLibraryId, calculationLibrary, inData);
|
|
|
+ setServiceErrorCount(lastLibraryId, Objects.nonNull(currentResult) && currentResult.containsKey("code") ? currentResult.get("code").toString() : "0");//依据返回的code进行连续错误计数和重置
|
|
|
+ calcData.add(currentResult);//添加到全量算法结果中
|
|
|
+ if (Objects.nonNull(currentResult) && !currentResult.get("code").equals("0")) {//算法执行异常则退出
|
|
|
+ break;
|
|
|
}
|
|
|
- lastLibraryId = calculationLibrary.get("library_id").toString();//当前进行的算法编号
|
|
|
- }
|
|
|
- //依据算法类型调用数据库对象或脚本引擎对象进行算法执行
|
|
|
- Map<String, Object> currentResult = calculationLibrary.get("library_type").toString().equals("3") ? execDB(lastLibraryId, calculationLibrary, inData, dataObjectId) : execEngine(lastLibraryId, calculationLibrary, inData);
|
|
|
- setServiceErrorCount(lastLibraryId, Objects.nonNull(currentResult) && currentResult.containsKey("code") ? currentResult.get("code").toString() : "0");//依据返回的code进行连续错误计数和重置
|
|
|
- calcData.add(currentResult);//添加到全量算法结果中
|
|
|
- if (Objects.nonNull(currentResult) && !currentResult.get("code").equals("0")) {//算法执行异常则退出
|
|
|
- break;
|
|
|
}
|
|
|
+
|
|
|
+ }catch (Exception e){
|
|
|
+ System.out.println("execCalultion: 算法执行异常: " +LogUtils.getException(e) );
|
|
|
}
|
|
|
returnData.put("library_id", lastLibraryId);
|
|
|
returnData.put("preData", preData);
|
|
|
returnData.put("calcData", calcData);
|
|
|
return returnData;
|
|
|
+
|
|
|
}
|
|
|
|
|
|
//脚本引擎执行
|
|
|
private Map<String, Object> execEngine(String library_id, Map<String, Object> currentCalMap, List<Map<String, Object>> calcAllData) {
|
|
|
- if (!ScriptEngineProMaps.containsKey(library_id)) {//不存在缓存则创建
|
|
|
- ScriptEngineProMaps.put(library_id, new ScriptEnginePro(currentCalMap));
|
|
|
- }
|
|
|
- ScriptEnginePro currentEngin = ScriptEngineProMaps.get(library_id);//依据算法编号获取算法引擎
|
|
|
- String errMessage = "";
|
|
|
- if (Objects.nonNull(currentEngin) && Objects.nonNull(currentEngin.getErrorMessage())) {//获取引擎出错则进行关闭
|
|
|
- errMessage = currentEngin.getErrorMessage();
|
|
|
- currentEngin.close();
|
|
|
- currentEngin = null;
|
|
|
- }
|
|
|
- if (Objects.isNull(currentEngin)) {//未获取到引擎则返回异常
|
|
|
- ScriptEngineProMaps.remove(library_id);
|
|
|
- return processFail("创建引擎失败".concat(errMessage), library_id);
|
|
|
- }
|
|
|
- Map<String, String> parmaNames = currentEngin.getParmaNames();//获取对应脚本订阅的变量列表/*依据当前算法 数据订阅规则 进行数据的提取*/
|
|
|
- Map<String, Object> paramValue = new HashMap<>();
|
|
|
- for (String key : parmaNames.keySet()) {//理论上此处最多单个key是list[],如果存在多个list[]则应开启迪卡乘积
|
|
|
- Object currentData = dataSubscription(calcAllData, key, currentCalMap);//获取数据
|
|
|
- //if(key.indexOf("[") > 0){//订阅规则明确开启线程:String[3] 代表开启3个线程执行;Map[col1] 代表按COL1数据分组数开启多线程执行
|
|
|
- //开启多线程操作同一个脚本对象会存在数据污染情况,除非开启多线程时,每个线程对应一个独立脚本对象,或者脚本对象做了数据隔离,目前解析并没有出现性能瓶颈,暂时不考虑
|
|
|
- //}else{
|
|
|
- if (currentCalMap.get("library_type").toString().equals("2")) {
|
|
|
- paramValue.put(parmaNames.get(key), currentData);
|
|
|
- } else {
|
|
|
- paramValue.put(key, currentData);
|
|
|
+ try {
|
|
|
+ if (!ScriptEngineProMaps.containsKey(library_id)) {//不存在缓存则创建
|
|
|
+ ScriptEngineProMaps.put(library_id, new ScriptEnginePro(currentCalMap));
|
|
|
+ }
|
|
|
+ ScriptEnginePro currentEngin = ScriptEngineProMaps.get(library_id);//依据算法编号获取算法引擎
|
|
|
+ String errMessage = "";
|
|
|
+ if (Objects.nonNull(currentEngin) && Objects.nonNull(currentEngin.getErrorMessage())) {//获取引擎出错则进行关闭
|
|
|
+ errMessage = currentEngin.getErrorMessage();
|
|
|
+ currentEngin.close();
|
|
|
+ currentEngin = null;
|
|
|
+ }
|
|
|
+ if (Objects.isNull(currentEngin)) {//未获取到引擎则返回异常
|
|
|
+ ScriptEngineProMaps.remove(library_id);
|
|
|
+ return processFail("创建引擎失败".concat(errMessage), library_id);
|
|
|
}
|
|
|
- //}
|
|
|
+ Map<String, String> parmaNames = currentEngin.getParmaNames();//获取对应脚本订阅的变量列表/*依据当前算法 数据订阅规则 进行数据的提取*/
|
|
|
+ Map<String, Object> paramValue = new HashMap<>();
|
|
|
+ for (String key : parmaNames.keySet()) {//理论上此处最多单个key是list[],如果存在多个list[]则应开启迪卡乘积
|
|
|
+ Object currentData = dataSubscription(calcAllData, key, currentCalMap);//获取数据
|
|
|
+ paramValue.put(currentCalMap.get("library_type").toString().equals("2")?parmaNames.get(key):key, currentData);
|
|
|
+ }
|
|
|
+ return currentEngin.execScript(paramValue);
|
|
|
+ }catch (Exception e){
|
|
|
+ return processFail("创建引擎失败".concat(LogUtils.getException(e)), library_id);
|
|
|
}
|
|
|
- return currentEngin.execScript(paramValue);
|
|
|
}
|
|
|
|
|
|
//数据库算法执行
|
|
|
private Map<String, Object> execDB(String library_id, Map<String, Object> calculationLibrary, List<Map<String, Object>> calcData, String dataObjectId) {
|
|
|
- Object connectConfigObj = calculationLibrary.get("connectConfig");
|
|
|
- if (Objects.isNull(connectConfigObj)) {
|
|
|
- return processFail("连接信息未配置 ", library_id);
|
|
|
- }
|
|
|
- if (!calcDbHelperMaps.containsKey(library_id)) {//不存在缓存则创建
|
|
|
- calcDbHelperMaps.put(library_id, new MyDbHelper(connectConfigObj.toString()));
|
|
|
- }
|
|
|
- MyDbHelper myDbHelper = calcDbHelperMaps.get(library_id);//依据算法编号获取数据库处理对象
|
|
|
- String errMessage = "";
|
|
|
- if (Objects.nonNull(myDbHelper) && MapTools.isNotBlank(errMessage)) {//获取出现错误则进行关闭
|
|
|
- errMessage = myDbHelper.getErrorMessage();
|
|
|
- myDbHelper.close();
|
|
|
- myDbHelper = null;
|
|
|
- }
|
|
|
- if (Objects.isNull(myDbHelper)) {//未获取到则返回异常
|
|
|
- calcDbHelperMaps.remove(library_id);
|
|
|
- return processFail("获取业务数据库对象失败".concat(errMessage), library_id);
|
|
|
- }
|
|
|
- //未指定订阅则默认为上一个算法的结果:
|
|
|
- String paramIndex = Objects.isNull(calculationLibrary.get("parmIndex"))?(String.valueOf(calcData.size() - 1)).concat(calcData.size() ==1?".dataContent":".returnData") : calculationLibrary.get("parmIndex").toString();
|
|
|
- Object tempDBPrams = dataSubscription(calcData, "List.".concat(paramIndex), calculationLibrary);//获取数据订阅结果
|
|
|
- //如果表名和SQL同时不为空且事件为7则,分解为先查询,后更新方式,可以有效使用预编译,性能会更高
|
|
|
- //事件7的select应该是固定为书名号方式,而不是动态组建方式,因为要跟insert或update对应,如果是动态条件则不使用事件7而是采用算法分离方式进行
|
|
|
- if ("7".equals(calculationLibrary.get("event")) && !Objects.isNull(calculationLibrary.get("tableName")) && !Objects.isNull(calculationLibrary.get("computing_expression"))) {
|
|
|
- Map<String, Object> tempCalcInfo = calculationLibrary;
|
|
|
- String tableName = tempCalcInfo.get("tableName").toString();//先记录表名
|
|
|
- String sqlStr = tempCalcInfo.get("computing_expression").toString();//记录SQL语句,方便获取查询条件,用于组建事件6的filter
|
|
|
- tempCalcInfo.put("tableName", "");//清空表名,方便执行SQL
|
|
|
- tempDBPrams = myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//先执行sql获取结果后
|
|
|
- //从myDbHelper获取sqlStrVarList.get(sqlStr)可以获取到查询条件,用于组建事件6的filter
|
|
|
- //获取tempDBPrams,循环returnData,添加filter,否则无法执行事件6
|
|
|
- tempCalcInfo.put("tableName", tableName);//还原表名
|
|
|
- tempCalcInfo.put("computing_expression", "");//清除SQL
|
|
|
- tempCalcInfo.put("event", "6");//执行新增或更新
|
|
|
- return myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//调用表名方式执行事件
|
|
|
+ try {
|
|
|
+ Object connectConfigObj = calculationLibrary.get("connectConfig");
|
|
|
+ if (Objects.isNull(connectConfigObj)) {
|
|
|
+ return processFail("连接信息未配置 ", library_id);
|
|
|
+ }
|
|
|
+ if (!calcDbHelperMaps.containsKey(library_id)) {//不存在缓存则创建
|
|
|
+ calcDbHelperMaps.put(library_id, new MyDbHelper(connectConfigObj.toString()));
|
|
|
+ }
|
|
|
+ MyDbHelper myDbHelper = calcDbHelperMaps.get(library_id);//依据算法编号获取数据库处理对象
|
|
|
+ String errMessage = "";
|
|
|
+ if (Objects.nonNull(myDbHelper) && MapTools.isNotBlank(errMessage)) {//获取出现错误则进行关闭
|
|
|
+ errMessage = myDbHelper.getErrorMessage();
|
|
|
+ myDbHelper.close();
|
|
|
+ myDbHelper = null;
|
|
|
+ }
|
|
|
+ if (Objects.isNull(myDbHelper)) {//未获取到则返回异常
|
|
|
+ calcDbHelperMaps.remove(library_id);
|
|
|
+ return processFail("获取业务数据库对象失败".concat(errMessage), library_id);
|
|
|
+ }
|
|
|
+ //未指定订阅则默认为上一个算法的结果:
|
|
|
+ String paramIndex = Objects.isNull(calculationLibrary.get("parmIndex")) ? (String.valueOf(calcData.size() - 1)).concat(calcData.size() == 1 ? ".dataContent" : ".returnData") : calculationLibrary.get("parmIndex").toString();
|
|
|
+ Object tempDBPrams = dataSubscription(calcData, "List.".concat(paramIndex), calculationLibrary);//获取数据订阅结果
|
|
|
+ //如果表名和SQL同时不为空且事件为7则,分解为先查询,后更新方式,可以有效使用预编译,性能会更高
|
|
|
+ //事件7的select应该是固定为书名号方式,而不是动态组建方式,因为要跟insert或update对应,如果是动态条件则不使用事件7而是采用算法分离方式进行
|
|
|
+ if ("7".equals(calculationLibrary.get("event")) && !Objects.isNull(calculationLibrary.get("tableName")) && !Objects.isNull(calculationLibrary.get("computing_expression"))) {
|
|
|
+ Map<String, Object> tempCalcInfo = calculationLibrary;
|
|
|
+ String tableName = tempCalcInfo.get("tableName").toString();//先记录表名
|
|
|
+ String sqlStr = tempCalcInfo.get("computing_expression").toString();//记录SQL语句,方便获取查询条件,用于组建事件6的filter
|
|
|
+ tempCalcInfo.put("tableName", "");//清空表名,方便执行SQL
|
|
|
+ tempDBPrams = myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//先执行sql获取结果后
|
|
|
+ //从myDbHelper获取sqlStrVarList.get(sqlStr)可以获取到查询条件,用于组建事件6的filter
|
|
|
+ //获取tempDBPrams,循环returnData,添加filter,否则无法执行事件6
|
|
|
+ tempCalcInfo.put("tableName", tableName);//还原表名
|
|
|
+ tempCalcInfo.put("computing_expression", "");//清除SQL
|
|
|
+ tempCalcInfo.put("event", "6");//执行新增或更新
|
|
|
+ return myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//调用表名方式执行事件
|
|
|
+ }
|
|
|
+ return myDbHelper.generalProcess(calculationLibrary, tempDBPrams, dataObjectId, calcData.get(0));
|
|
|
+ }catch (Exception e){
|
|
|
+ return processFail("数据库执行失败".concat(LogUtils.getException(e)), library_id);
|
|
|
}
|
|
|
- return myDbHelper.generalProcess(calculationLibrary, tempDBPrams, dataObjectId, calcData.get(0));
|
|
|
}
|
|
|
|
|
|
/*数据订阅:注意因前置导致算法未执行时,全量结果集的序号会存在问题*/
|
|
|
private Object dataSubscription(List<Map<String, Object>> calcAllData, String paramRule, Map<String, Object> calculationLibrary) { // List.1.returnData
|
|
|
- String[] itemRule = paramRule.split("\\.");//订阅规则按.进行分割
|
|
|
- String dataType = itemRule.length > 0 ? itemRule[0] : "List"; //首位是最终返回的数据类型
|
|
|
- dataType = dataType.endsWith("]") ? dataType.substring(0, dataType.indexOf("[")) : dataType;//订阅时需要把[]去掉,在调用脚本引擎时进行分解
|
|
|
- String dataLocation = itemRule.length > 1 ? itemRule[1] : ""; //数据位置:代表从哪个算法结果中进行取值
|
|
|
- if (dataLocation.equals("T")) {//固定参数 //JSON.T.{"BSM":""}----String.T.代表"",List.T.代表[],Map.T.代表{},任意类型.T代表null
|
|
|
- Object tempObj = itemRule.length > 2 ? MapTools.isBlank(itemRule[2]) ? ("List".equals(dataType) ? new ArrayList<>() : ("JSON,Map".contains(dataType) ? new HashMap<>() : null)) : itemRule[2] : null;
|
|
|
- if ("Boolean".equals(dataType)) {
|
|
|
- tempObj = itemRule[2].equals("true");
|
|
|
- }
|
|
|
- return tempObj;
|
|
|
+ try {
|
|
|
+ String[] itemRule = paramRule.split("\\.");//订阅规则按.进行分割
|
|
|
+ String dataType = itemRule.length > 0 ? itemRule[0] : "List"; //首位是最终返回的数据类型
|
|
|
+ dataType = dataType.endsWith("]") ? dataType.substring(0, dataType.indexOf("[")) : dataType;//订阅时需要把[]去掉,在调用脚本引擎时进行分解
|
|
|
+ String dataLocation = itemRule.length > 1 ? itemRule[1] : ""; //数据位置:代表从哪个算法结果中进行取值
|
|
|
+ if (dataLocation.equals("T")) {//固定参数 //JSON.T.{"BSM":""}----String.T.代表"",List.T.代表[],Map.T.代表{},任意类型.T代表null
|
|
|
+ Object tempObj = itemRule.length > 2 ? MapTools.isBlank(itemRule[2]) ? ("List".equals(dataType) ? new ArrayList<>() : ("JSON,Map".contains(dataType) ? new HashMap<>() : null)) : itemRule[2] : null;
|
|
|
+ if ("Boolean".equals(dataType)) {
|
|
|
+ tempObj = itemRule[2].equals("true");
|
|
|
+ }
|
|
|
+ return tempObj;
|
|
|
|
|
|
- }
|
|
|
- //如果是L则从算法配置信息中获取,否则从全量计算结果中获取
|
|
|
- Object returnData = dataLocation.equals("L") ? calculationLibrary : MapTools.isNumber(dataLocation) ? calcAllData.get(Integer.parseInt(dataLocation)) : null; // String.L.connectConfig.username
|
|
|
- for (int index = 2; index < itemRule.length; index++) {
|
|
|
- if (Objects.isNull(returnData)) return null;
|
|
|
- if (returnData instanceof String) {//如果获取的是字符串则自动转MAP--XML字符串也自动转Map
|
|
|
- returnData = MapTools.strToObj(returnData.toString());// 1. 简单字符串 , 2. map字符串 3. List字符串
|
|
|
}
|
|
|
- if (MapTools.isNumber(itemRule[index])) {//数字代表从当前参数中取第N位,如果当前参数并不是List应该返回全部不应该返回空
|
|
|
- //
|
|
|
- returnData = returnData instanceof List ? ((List<?>) returnData).get(Integer.parseInt(itemRule[index])) : returnData;
|
|
|
- } else {//不是数字代表从当前参数中取对应的键,如果当前参数是List应该返回List中首个对应的键值
|
|
|
- if (returnData instanceof Map) {//是MAP则直接获取对应Key的值
|
|
|
- returnData = itemRule[index].equals("returnData") && Objects.isNull(((Map<?, ?>) returnData).get("returnData")) ? ((Map<?, ?>) returnData).get("dataContent") : ((Map<?, ?>) returnData).get(itemRule[index]);
|
|
|
- } else {//简单的错误兼容,尤其是某些时候数据可能是单条可能是数组时
|
|
|
- if (returnData instanceof List && ((List<?>) returnData).size() > 0 && ((List<?>) returnData).get(0) instanceof Map) {//如果是数组则且有下层
|
|
|
- returnData = ((Map<?, ?>) ((List<?>) returnData).get(0)).get(itemRule[index]);
|
|
|
+ //如果是L则从算法配置信息中获取,否则从全量计算结果中获取
|
|
|
+ Object returnData = dataLocation.equals("L") ? calculationLibrary : MapTools.isNumber(dataLocation) ? calcAllData.get(Integer.parseInt(dataLocation)) : null; // String.L.connectConfig.username
|
|
|
+ for (int index = 2; index < itemRule.length; index++) {
|
|
|
+ if (Objects.isNull(returnData)) return null;
|
|
|
+ if (returnData instanceof String) {//如果获取的是字符串则自动转MAP--XML字符串也自动转Map
|
|
|
+ returnData = MapTools.strToObj(returnData.toString());// 1. 简单字符串 , 2. map字符串 3. List字符串
|
|
|
+ }
|
|
|
+ if (MapTools.isNumber(itemRule[index])) {//数字代表从当前参数中取第N位,如果当前参数并不是List应该返回全部不应该返回空
|
|
|
+ //
|
|
|
+ returnData = returnData instanceof List ? ((List<?>) returnData).get(Integer.parseInt(itemRule[index])) : returnData;
|
|
|
+ } else {//不是数字代表从当前参数中取对应的键,如果当前参数是List应该返回List中首个对应的键值
|
|
|
+ if (returnData instanceof Map) {//是MAP则直接获取对应Key的值
|
|
|
+ returnData = itemRule[index].equals("returnData") && Objects.isNull(((Map<?, ?>) returnData).get("returnData")) ? ((Map<?, ?>) returnData).get("dataContent") : ((Map<?, ?>) returnData).get(itemRule[index]);
|
|
|
+ } else {//简单的错误兼容,尤其是某些时候数据可能是单条可能是数组时
|
|
|
+ if (returnData instanceof List && ((List<?>) returnData).size() > 0 && ((List<?>) returnData).get(0) instanceof Map) {//如果是数组则且有下层
|
|
|
+ returnData = ((Map<?, ?>) ((List<?>) returnData).get(0)).get(itemRule[index]);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
- if (Objects.nonNull(returnData)) {//获取到了订阅的数据则进行数据格式转换
|
|
|
- if ("List,Map".contains(dataType) && returnData instanceof String) {//订阅是数组或对象则字符串转对象
|
|
|
- returnData = MapTools.strToObj(returnData.toString());
|
|
|
- }
|
|
|
- if ("List".equals(dataType) && !(returnData instanceof List<?>)) {//订阅是数组但是实际不是数组则添加到数组
|
|
|
- List<Object> tempList = new ArrayList<>();
|
|
|
- tempList.add(returnData);
|
|
|
- returnData = tempList;
|
|
|
- }
|
|
|
+ if (Objects.nonNull(returnData)) {//获取到了订阅的数据则进行数据格式转换
|
|
|
+ if ("List,Map".contains(dataType) && returnData instanceof String) {//订阅是数组或对象则字符串转对象
|
|
|
+ returnData = MapTools.strToObj(returnData.toString());
|
|
|
+ }
|
|
|
+ if ("List".equals(dataType) && !(returnData instanceof List<?>)) {//订阅是数组但是实际不是数组则添加到数组
|
|
|
+ List<Object> tempList = new ArrayList<>();
|
|
|
+ tempList.add(returnData);
|
|
|
+ returnData = tempList;
|
|
|
+ }
|
|
|
// if ("List".equals(dataType) && returnData instanceof List<?> && MapTools.isXMLList(returnData)) {//订阅是数组但是实际是XML字符串数组则自动转换为list<map>
|
|
|
// returnData = MapTools.XMLListToListMap(returnData);//减少JAVA的动态调用,目前主要就是航班报文,BSMBPM等订阅的是String,所以不会进入这里
|
|
|
// }
|
|
|
- returnData = "Map".equals(dataType) && !(returnData instanceof Map<?, ?>) ? null//是不是不妥
|
|
|
- : ("String".equals(dataType) ? returnData.toString()
|
|
|
- : ("Boolean".equals(dataType) ? returnData.toString().equals("true")
|
|
|
- : ("JSONStr".equals(dataType) ? MapTools.objToJSONStr(returnData)
|
|
|
- : returnData)));
|
|
|
+ returnData = "Map".equals(dataType) && !(returnData instanceof Map<?, ?>) ? null//是不是不妥
|
|
|
+ : ("String".equals(dataType) ? returnData.toString()
|
|
|
+ : ("Boolean".equals(dataType) ? returnData.toString().equals("true")
|
|
|
+ : ("JSONStr".equals(dataType) ? MapTools.objToJSONStr(returnData)
|
|
|
+ : returnData)));
|
|
|
+ }
|
|
|
+ return returnData;
|
|
|
+ }catch (Exception e){
|
|
|
+ System.out.println("数据订阅异常: ".concat(LogUtils.getException(e)));
|
|
|
+ return null;
|
|
|
}
|
|
|
- return returnData;
|
|
|
+
|
|
|
}
|
|
|
|
|
|
/*算法连续错误次数判定*/
|