|
@@ -6,10 +6,9 @@ import org.bfkj.utils.LogUtils;
|
|
|
import org.bfkj.utils.MapTools;
|
|
|
import org.bfkj.utils.MyDbHelper;
|
|
|
import org.bfkj.utils.ScriptEnginePro;
|
|
|
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
|
|
|
|
|
import java.util.*;
|
|
|
-import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
public class DataProcess {//数据处理对象
|
|
|
private String serviceId;//作为实例化服务对象的Key
|
|
@@ -28,7 +27,7 @@ public class DataProcess {//数据处理对象
|
|
|
public DataProcess(String service_Id) {//初始化构造,实例化一个服务对象
|
|
|
lastActive = System.currentTimeMillis();//默认最后活跃时间
|
|
|
serviceId = service_Id;
|
|
|
- try{
|
|
|
+ try {
|
|
|
baseDbHelper = new MyDbHelper(AppConfig.getSystemParams(AppConfig.REMOTE_DB_CONNECT));//获取底座数据库对象
|
|
|
if (Objects.nonNull(baseDbHelper.getErrorMessage())) {
|
|
|
errorMessage = "获取底座myDbHelper对象异常: ".concat(baseDbHelper.getErrorMessage());
|
|
@@ -36,7 +35,7 @@ public class DataProcess {//数据处理对象
|
|
|
}
|
|
|
//算法信息
|
|
|
Map<String, Object> calculationMap = baseDbHelper.queryByParamsReturnList("SELECT CL.*,DI.* FROM calculation_library CL left JOIN datasourceinfo DI ON DI.dataSourceID = CL.datasource_id WHERE CL.service_id =? and CL.library_type is not null order by library_sort,library_id", service_Id);//直接数据库查询
|
|
|
- calcList = Objects.isNull(calculationMap.get("returnData"))?null:(List<Map<String, Object>>) calculationMap.get("returnData");
|
|
|
+ calcList = Objects.isNull(calculationMap.get("returnData")) ? null : (List<Map<String, Object>>) calculationMap.get("returnData");
|
|
|
if (!calculationMap.get("code").equals("0") || calcList == null || calcList.isEmpty()) {//查询数据库失败
|
|
|
errorMessage = "查询".concat(serviceId).concat("的算法失败:").concat(calculationMap.get("message").toString());
|
|
|
return;
|
|
@@ -51,10 +50,11 @@ public class DataProcess {//数据处理对象
|
|
|
if (!Objects.isNull(serviceAuthList)) {//将ListMap合并为一个Map,方便使用 // map:{1,appid},{2,appidd}
|
|
|
serviceAuthMap = ((List<Map<String, Object>>) serviceAuthList).stream().map(map -> map.get("app_id").toString()).toList();
|
|
|
}
|
|
|
- }catch (Exception e) {
|
|
|
+ } catch (Exception e) {
|
|
|
errorMessage = "数据处理对象初始化异常: ".concat(serviceId).concat(";异常信息:").concat(LogUtils.getException(e));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
//数据处理服务对象关闭时,关闭缓存的数据库对象以及脚本引擎对象
|
|
|
public void close() {//销毁当前服务对象时,需要将缓存的算法引擎实例同步销毁
|
|
|
for (ScriptEnginePro scriptEnginePro : ScriptEngineProMaps.values()) {//脚本引擎
|
|
@@ -67,6 +67,7 @@ public class DataProcess {//数据处理对象
|
|
|
calcDbHelperMaps.clear();//清除缓存
|
|
|
baseDbHelper.close();//底座数据库对象
|
|
|
}
|
|
|
+
|
|
|
//统一成功信息处理
|
|
|
public Map<String, Object> processSuccess(Object returnData) {
|
|
|
if (returnData instanceof Map) {
|
|
@@ -79,6 +80,7 @@ public class DataProcess {//数据处理对象
|
|
|
returnMap.put("returnData", returnData);
|
|
|
return returnMap;
|
|
|
}
|
|
|
+
|
|
|
//统一错误信息处理
|
|
|
public Map<String, Object> processFail(String errorMessage, String libraryId) {
|
|
|
Map<String, Object> returnMap = new HashMap<>();//用于当前方法统一返回参数,不存在深拷贝问题
|
|
@@ -89,6 +91,7 @@ public class DataProcess {//数据处理对象
|
|
|
}
|
|
|
return returnMap;
|
|
|
}
|
|
|
+
|
|
|
//数据处理对外接口
|
|
|
public Map<String, Object> processData(Map<String, Object> inputData, String... user_id) { //{serviceid:1,datacontent:[{key:value},{key:value}],event:0,page:1,pagesize:10}
|
|
|
if (!MapTools.isBlank(errorMessage)) {//如果当前服务存在问题,代表数据库对象不可用,此时应该重构当前对象,调用方控制
|
|
@@ -105,6 +108,9 @@ public class DataProcess {//数据处理对象
|
|
|
}
|
|
|
String dataObjectId = inputData.get("dataObjectId").toString();
|
|
|
inputData = authCheck(inputData, user_id); /*权限检查,添加列权限以及行权限到inputData中*/
|
|
|
+ if ("-1".equals(inputData.get("code")) || (Objects.nonNull( inputData.get("message")) && inputData.get("message").toString().equals("服务未授任何数据权限"))) {
|
|
|
+ return inputData;
|
|
|
+ }
|
|
|
List<Map<String, Object>> calcData = new ArrayList<>();//定义算法全量结果列表
|
|
|
calcData.add(inputData);//默认入口参数为第0个算法的结果
|
|
|
Map<String, Object> execResult = execCalultion(calcData, null, dataObjectId);//执行所有的算法库
|
|
@@ -112,16 +118,23 @@ public class DataProcess {//数据处理对象
|
|
|
List<Map<String, Object>> preListData = (List<Map<String, Object>>) execResult.get("preData");//获取所有的前置检测结果
|
|
|
calcData = (List<Map<String, Object>>) execResult.get("calcData");//所有的算法结果
|
|
|
//依据算法编号规则确定最后一个算法结果,是前置还是算法
|
|
|
- Map<String, Object> lastResult = library_id.endsWith("N")?preListData.get(preListData.size() - 1):calcData.get(calcData.size() - 1);//获取最后一个计算结果
|
|
|
+
|
|
|
+// preListData = preListData.stream().filter(Objects::isNull).collect(Collectors.toList());
|
|
|
+
|
|
|
+
|
|
|
+ Map<String, Object> lastResult = library_id.endsWith("N") ? preListData.get(preListData.size() - 1) : calcData.get(calcData.size() - 1);//获取最后一个计算结果
|
|
|
if (Objects.equals(lastResult.get("code"), "-1")) {//最后一个计算结果为-1则记录异常,否则全部记录正常
|
|
|
Object message = lastResult.get("message");
|
|
|
- LogUtils.log("processData:3", "-1", library_id, Objects.nonNull(message)?message.toString():null, serviceId, calcData, dataObjectId, preListData, inputData.get("event"));
|
|
|
+ LogUtils.log("processData:3", "-1", library_id, Objects.nonNull(message) ? message.toString() : null, serviceId, calcData, dataObjectId, preListData, inputData.get("event"));
|
|
|
return processFail(library_id.concat("数据计算错误:").concat(lastResult.get("message").toString()), library_id);
|
|
|
}
|
|
|
//写入 成功日志
|
|
|
LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, calcData, dataObjectId, preListData, inputData.get("event")); //此处无法把所有的记过集返回过去
|
|
|
- return processSuccess(lastResult); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
|
|
|
+
|
|
|
+ calcData = calcData.stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
|
+ return processSuccess(calcData.get(calcData.size() - 1)); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
|
|
|
}
|
|
|
+
|
|
|
/*权限检查*/
|
|
|
public Map<String, Object> authCheck(Map<String, Object> inputData, String... user_id) {
|
|
|
Object appid = inputData.get("appid");//获取应用编号
|
|
@@ -160,7 +173,7 @@ public class DataProcess {//数据处理对象
|
|
|
rowAuth.get(rowAuth.size() - 1).put("connector", " and ");
|
|
|
rowAuth.get(rowAuth.size() - 1).put("right", " ) ");
|
|
|
} //(c1 = pek )
|
|
|
- }
|
|
|
+ }
|
|
|
});
|
|
|
if (rowAuth.size() > 0) {//存在行权限则进行括号包裹
|
|
|
rowAuth.get(0).put("left", "(".concat(rowAuth.get(0).get("left").toString()));
|
|
@@ -172,6 +185,7 @@ public class DataProcess {//数据处理对象
|
|
|
}
|
|
|
return inputData;
|
|
|
}
|
|
|
+
|
|
|
/*执行算法*/
|
|
|
public Map<String, Object> execCalultion(List<Map<String, Object>> inData, String beginLibraryId, String dataObjectId) {
|
|
|
Map<String, Object> returnData = new HashMap<>();//初始化最终返回结果
|
|
@@ -216,6 +230,7 @@ public class DataProcess {//数据处理对象
|
|
|
returnData.put("calcData", calcData);
|
|
|
return returnData;
|
|
|
}
|
|
|
+
|
|
|
//脚本引擎执行
|
|
|
private Map<String, Object> execEngine(String library_id, Map<String, Object> currentCalMap, List<Map<String, Object>> calcAllData) {
|
|
|
if (!ScriptEngineProMaps.containsKey(library_id)) {//不存在缓存则创建
|
|
@@ -239,15 +254,16 @@ public class DataProcess {//数据处理对象
|
|
|
//if(key.indexOf("[") > 0){//订阅规则明确开启线程:String[3] 代表开启3个线程执行;Map[col1] 代表按COL1数据分组数开启多线程执行
|
|
|
//开启多线程操作同一个脚本对象会存在数据污染情况,除非开启多线程时,每个线程对应一个独立脚本对象,或者脚本对象做了数据隔离,目前解析并没有出现性能瓶颈,暂时不考虑
|
|
|
//}else{
|
|
|
- if (currentCalMap.get("library_type").toString().equals("2")){
|
|
|
+ if (currentCalMap.get("library_type").toString().equals("2")) {
|
|
|
paramValue.put(parmaNames.get(key), currentData);
|
|
|
- }else {
|
|
|
+ } else {
|
|
|
paramValue.put(key, currentData);
|
|
|
}
|
|
|
//}
|
|
|
}
|
|
|
return currentEngin.execScript(paramValue);
|
|
|
}
|
|
|
+
|
|
|
//数据库算法执行
|
|
|
private Map<String, Object> execDB(String library_id, Map<String, Object> calculationLibrary, List<Map<String, Object>> calcData, String dataObjectId) {
|
|
|
Object connectConfigObj = calculationLibrary.get("connectConfig");
|
|
@@ -269,35 +285,36 @@ public class DataProcess {//数据处理对象
|
|
|
return processFail("获取业务数据库对象失败".concat(errMessage), library_id);
|
|
|
}
|
|
|
//未指定订阅则默认为上一个算法的结果:
|
|
|
- String paramIndex = Objects.isNull(calculationLibrary.get("parmIndex")) ? (String.valueOf(calcData.size() - 1)).concat(".returnData") : calculationLibrary.get("parmIndex").toString();
|
|
|
+ String paramIndex = Objects.isNull(calculationLibrary.get("parmIndex"))?(String.valueOf(calcData.size() - 1)).concat(calcData.size() ==1?".dataContent":".returnData") : calculationLibrary.get("parmIndex").toString();
|
|
|
Object tempDBPrams = dataSubscription(calcData, "List.".concat(paramIndex), calculationLibrary);//获取数据订阅结果
|
|
|
//如果表名和SQL同时不为空且事件为7则,分解为先查询,后更新方式,可以有效使用预编译,性能会更高
|
|
|
//事件7的select应该是固定为书名号方式,而不是动态组建方式,因为要跟insert或update对应,如果是动态条件则不使用事件7而是采用算法分离方式进行
|
|
|
- if("7".equals(calculationLibrary.get("event")) && !Objects.isNull(calculationLibrary.get("tableName")) && !Objects.isNull(calculationLibrary.get("computing_expression"))){
|
|
|
- Map<String,Object> tempCalcInfo = calculationLibrary;
|
|
|
+ if ("7".equals(calculationLibrary.get("event")) && !Objects.isNull(calculationLibrary.get("tableName")) && !Objects.isNull(calculationLibrary.get("computing_expression"))) {
|
|
|
+ Map<String, Object> tempCalcInfo = calculationLibrary;
|
|
|
String tableName = tempCalcInfo.get("tableName").toString();//先记录表名
|
|
|
String sqlStr = tempCalcInfo.get("computing_expression").toString();//记录SQL语句,方便获取查询条件,用于组建事件6的filter
|
|
|
- tempCalcInfo.put("tableName","");//清空表名,方便执行SQL
|
|
|
+ tempCalcInfo.put("tableName", "");//清空表名,方便执行SQL
|
|
|
tempDBPrams = myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//先执行sql获取结果后
|
|
|
//从myDbHelper获取sqlStrVarList.get(sqlStr)可以获取到查询条件,用于组建事件6的filter
|
|
|
//获取tempDBPrams,循环returnData,添加filter,否则无法执行事件6
|
|
|
- tempCalcInfo.put("tableName",tableName);//还原表名
|
|
|
- tempCalcInfo.put("computing_expression","");//清除SQL
|
|
|
- tempCalcInfo.put("event","6");//执行新增或更新
|
|
|
+ tempCalcInfo.put("tableName", tableName);//还原表名
|
|
|
+ tempCalcInfo.put("computing_expression", "");//清除SQL
|
|
|
+ tempCalcInfo.put("event", "6");//执行新增或更新
|
|
|
return myDbHelper.generalProcess(tempCalcInfo, tempDBPrams, dataObjectId, calcData.get(0));//调用表名方式执行事件
|
|
|
}
|
|
|
return myDbHelper.generalProcess(calculationLibrary, tempDBPrams, dataObjectId, calcData.get(0));
|
|
|
}
|
|
|
+
|
|
|
/*数据订阅:注意因前置导致算法未执行时,全量结果集的序号会存在问题*/
|
|
|
private Object dataSubscription(List<Map<String, Object>> calcAllData, String paramRule, Map<String, Object> calculationLibrary) { // List.1.returnData
|
|
|
String[] itemRule = paramRule.split("\\.");//订阅规则按.进行分割
|
|
|
String dataType = itemRule.length > 0 ? itemRule[0] : "List"; //首位是最终返回的数据类型
|
|
|
- dataType = dataType.endsWith("]")?dataType.substring(0,dataType.indexOf("[")):dataType;//订阅时需要把[]去掉,在调用脚本引擎时进行分解
|
|
|
+ dataType = dataType.endsWith("]") ? dataType.substring(0, dataType.indexOf("[")) : dataType;//订阅时需要把[]去掉,在调用脚本引擎时进行分解
|
|
|
String dataLocation = itemRule.length > 1 ? itemRule[1] : ""; //数据位置:代表从哪个算法结果中进行取值
|
|
|
if (dataLocation.equals("T")) {//固定参数 //JSON.T.{"BSM":""}----String.T.代表"",List.T.代表[],Map.T.代表{},任意类型.T代表null
|
|
|
- Object tempObj = itemRule.length > 2 ? MapTools.isBlank(itemRule[2])?("List".equals(dataType)?new ArrayList<>():("JSON,Map".contains(dataType)?new HashMap<>():null)):itemRule[2]:null;
|
|
|
- if ("Boolean".equals(dataType)){
|
|
|
- tempObj = itemRule[2].equals("true");
|
|
|
+ Object tempObj = itemRule.length > 2 ? MapTools.isBlank(itemRule[2]) ? ("List".equals(dataType) ? new ArrayList<>() : ("JSON,Map".contains(dataType) ? new HashMap<>() : null)) : itemRule[2] : null;
|
|
|
+ if ("Boolean".equals(dataType)) {
|
|
|
+ tempObj = itemRule[2].equals("true");
|
|
|
}
|
|
|
return tempObj;
|
|
|
|
|
@@ -305,7 +322,7 @@ public class DataProcess {//数据处理对象
|
|
|
//如果是L则从算法配置信息中获取,否则从全量计算结果中获取
|
|
|
Object returnData = dataLocation.equals("L") ? calculationLibrary : MapTools.isNumber(dataLocation) ? calcAllData.get(Integer.parseInt(dataLocation)) : null; // String.L.connectConfig.username
|
|
|
for (int index = 2; index < itemRule.length; index++) {
|
|
|
- if(Objects.isNull(returnData)) return null;
|
|
|
+ if (Objects.isNull(returnData)) return null;
|
|
|
if (returnData instanceof String) {//如果获取的是字符串则自动转MAP--XML字符串也自动转Map
|
|
|
returnData = MapTools.strToObj(returnData.toString());// 1. 简单字符串 , 2. map字符串 3. List字符串
|
|
|
}
|
|
@@ -313,10 +330,10 @@ public class DataProcess {//数据处理对象
|
|
|
//
|
|
|
returnData = returnData instanceof List ? ((List<?>) returnData).get(Integer.parseInt(itemRule[index])) : returnData;
|
|
|
} else {//不是数字代表从当前参数中取对应的键,如果当前参数是List应该返回List中首个对应的键值
|
|
|
- if(returnData instanceof Map){//是MAP则直接获取对应Key的值
|
|
|
- returnData = ((Map<?, ?>) returnData).get(itemRule[index]);
|
|
|
- }else{//简单的错误兼容,尤其是某些时候数据可能是单条可能是数组时
|
|
|
- if(returnData instanceof List && ((List<?>) returnData).size() > 0 && ((List<?>) returnData).get(0) instanceof Map){//如果是数组则且有下层
|
|
|
+ if (returnData instanceof Map) {//是MAP则直接获取对应Key的值
|
|
|
+ returnData = itemRule[index].equals("returnData") && Objects.isNull(((Map<?, ?>) returnData).get("returnData")) ? ((Map<?, ?>) returnData).get("dataContent") : ((Map<?, ?>) returnData).get(itemRule[index]);
|
|
|
+ } else {//简单的错误兼容,尤其是某些时候数据可能是单条可能是数组时
|
|
|
+ if (returnData instanceof List && ((List<?>) returnData).size() > 0 && ((List<?>) returnData).get(0) instanceof Map) {//如果是数组则且有下层
|
|
|
returnData = ((Map<?, ?>) ((List<?>) returnData).get(0)).get(itemRule[index]);
|
|
|
}
|
|
|
}
|
|
@@ -342,11 +359,12 @@ public class DataProcess {//数据处理对象
|
|
|
}
|
|
|
return returnData;
|
|
|
}
|
|
|
+
|
|
|
/*算法连续错误次数判定*/
|
|
|
public void setServiceErrorCount(String library_id, String success) {
|
|
|
if (!success.equals("0")) {//如果是异常
|
|
|
- Integer errCount = Objects.isNull(serviceErrorCount.get(library_id))?1:serviceErrorCount.get(library_id)+1;//获取当前异常连续数
|
|
|
- serviceErrorCount.put(library_id,errCount);//连续数加1
|
|
|
+ Integer errCount = Objects.isNull(serviceErrorCount.get(library_id)) ? 1 : serviceErrorCount.get(library_id) + 1;//获取当前异常连续数
|
|
|
+ serviceErrorCount.put(library_id, errCount);//连续数加1
|
|
|
if (errCount > AppConfig.SERVICE_ERR_MAX) {//如果连续错误数超过设定值则停止当前服务
|
|
|
ServiceInputControl.stop(serviceId);
|
|
|
}
|
|
@@ -354,8 +372,10 @@ public class DataProcess {//数据处理对象
|
|
|
serviceErrorCount.put(library_id, 0);//如果是成功则重置连续错误数为0
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
private long sequence = 0L;
|
|
|
private long lastTimestamp = -1L;
|
|
|
+
|
|
|
public String createLifeCycleCol(Long workerId, Integer serviceId) {
|
|
|
long timestamp = System.currentTimeMillis();
|
|
|
//如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
|