|
@@ -25,6 +25,8 @@ public class DataProcess {//数据处理对象
|
|
|
|
|
|
private List<String> serviceAuthMap = new ArrayList<>();//缓存当前服务的安全等级,对应各应用
|
|
|
|
|
|
+ private Set<Integer> enableLogCalculationLibrary = new HashSet<>();
|
|
|
+
|
|
|
public DataProcess(String service_Id) {//初始化构造,实例化一个服务对象
|
|
|
lastActive = System.currentTimeMillis();//默认最后活跃时间
|
|
|
serviceId = service_Id;
|
|
@@ -36,9 +38,10 @@ public class DataProcess {//数据处理对象
|
|
|
}
|
|
|
//算法信息
|
|
|
Map<String, Object> calculationMap = baseDbHelper.queryByParamsReturnList("SELECT CL.*,DI.* FROM calculation_library CL left JOIN datasourceinfo DI ON DI.dataSourceID = CL.datasource_id WHERE CL.service_id =? and CL.library_type is not null order by library_sort,library_id", service_Id);//直接数据库查询
|
|
|
+
|
|
|
calcList = Objects.isNull(calculationMap.get("returnData")) ? null : (List<Map<String, Object>>) calculationMap.get("returnData");
|
|
|
if (!calculationMap.get("code").equals("0") || calcList == null || calcList.isEmpty()) {//查询数据库失败
|
|
|
- errorMessage = "查询".concat(serviceId).concat("的算法失败:").concat(calculationMap.containsKey("message")?calculationMap.get("message").toString():"对应算法不存在");
|
|
|
+ errorMessage = "查询".concat(serviceId).concat("的算法失败:").concat(calculationMap.containsKey("message") ? calculationMap.get("message").toString() : "对应算法不存在");
|
|
|
return;
|
|
|
}
|
|
|
//获取当前服务的安全等级
|
|
@@ -51,6 +54,12 @@ public class DataProcess {//数据处理对象
|
|
|
if (!Objects.isNull(serviceAuthList)) {//将ListMap合并为一个Map,方便使用 // map:{1,appid},{2,appidd}
|
|
|
serviceAuthMap = ((List<Map<String, Object>>) serviceAuthList).stream().map(map -> map.get("app_id").toString()).toList();
|
|
|
}
|
|
|
+ List<Map<String, Object>> returnData = (List<Map<String, Object>>) calculationMap.getOrDefault("returnData", Collections.EMPTY_LIST);
|
|
|
+// 过滤出开启日志的算法ID
|
|
|
+ List<Integer> enabled = returnData.stream()
|
|
|
+ .filter(it -> Objects.nonNull(it.get("processType")) && "1".equals(it.get("processType").toString()))
|
|
|
+ .map(it -> ((Integer) it.get("library_id"))).toList();
|
|
|
+ enableLogCalculationLibrary.addAll(enabled);
|
|
|
} catch (Exception e) {
|
|
|
errorMessage = "数据处理对象初始化异常: ".concat(serviceId).concat(";异常信息:").concat(LogUtils.getException(e));
|
|
|
}
|
|
@@ -143,8 +152,24 @@ public class DataProcess {//数据处理对象
|
|
|
return processFail(library_id.concat("数据计算错误:").concat(lastResult.get("message").toString()), library_id);
|
|
|
}
|
|
|
//写入 成功日志
|
|
|
- LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, calcData, dataObjectId, preListData, inputData.get("event")); //此处无法把所有的记过集返回过去
|
|
|
-
|
|
|
+ List<Map<String, Object>> logData = calcData;
|
|
|
+ logData = logData.stream()
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .filter(it ->
|
|
|
+ {
|
|
|
+// 过滤出需要写入的日志
|
|
|
+ Object libraryId = it.get("library_id");
|
|
|
+ return enableLogCalculationLibrary.contains(libraryId);
|
|
|
+ }
|
|
|
+ ).collect(Collectors.toList());
|
|
|
+// 可能还保留一个索引为0的日志 若果只有这一条日志的情况则清空日志
|
|
|
+ if (logData.size() == 1 && "0".equals(logData.get(0).getOrDefault("library_id", 0).toString())) {
|
|
|
+ logData.clear();
|
|
|
+ }
|
|
|
+// 如果日志不为空则写入日志
|
|
|
+ if (!logData.isEmpty()) {
|
|
|
+ LogUtils.log("DataProcess:9999", "0", library_id, "数据接收后处理成功", serviceId, logData, dataObjectId, preListData, inputData.get("event")); //此处无法把所有的记过集返回过去
|
|
|
+ }
|
|
|
calcData = calcData.stream().filter(Objects::nonNull).collect(Collectors.toList());
|
|
|
return processSuccess(calcData.get(calcData.size() - 1)); /*可订阅任意算法结果,也可以订阅全量结果,由最后一个算法决定*/
|
|
|
} catch (Exception e) {
|
|
@@ -188,7 +213,7 @@ public class DataProcess {//数据处理对象
|
|
|
String tempRowAuth = rowAuthObj.toString();
|
|
|
String connect = tempRowAuth.startsWith("!") ? "!=" : "=";//如果首位是!代表不等于,否则代表等于
|
|
|
tempRowAuth = tempRowAuth.startsWith("!") ? tempRowAuth.substring(1) : tempRowAuth;//修订行权限表达式
|
|
|
- String[] row_auths = tempRowAuth.split(",",-1); //按,号进行分组!PEK,
|
|
|
+ String[] row_auths = tempRowAuth.split(",", -1); //按,号进行分组!PEK,
|
|
|
for (String row_auth : row_auths) {//循环进行标准化
|
|
|
if (MapTools.isBlank(row_auth)) continue;//为空不设置--误操作填写了空格或者删除未设置为NULL
|
|
|
Map<String, Object> signRowAuth = new HashMap<>();
|
|
@@ -224,7 +249,6 @@ public class DataProcess {//数据处理对象
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- *
|
|
|
* @param columnName ID
|
|
|
* @param row_auth 2
|
|
|
* @param nextRule t_user_group.up_user_groupid
|
|
@@ -232,7 +256,7 @@ public class DataProcess {//数据处理对象
|
|
|
*/
|
|
|
private List<Object> extendNextAuth(String columnName, String row_auth, Object nextRule) {
|
|
|
List<Object> returnData = new ArrayList<>();
|
|
|
- String[] rule = nextRule.toString().split("\\.",-1);
|
|
|
+ String[] rule = nextRule.toString().split("\\.", -1);
|
|
|
Map<String, Object> queryAuthReturnList = baseDbHelper.queryByParamsReturnList("select " + columnName + " from " + rule[0] + " where " + rule[1] + " = ?", row_auth);
|
|
|
if ("-1".equals(queryAuthReturnList.get("code")) || ((List) queryAuthReturnList.get("returnData")).isEmpty()) {
|
|
|
return returnData;
|
|
@@ -290,6 +314,8 @@ public class DataProcess {//数据处理对象
|
|
|
calcData.add(currentResult);//添加到全量算法结果中
|
|
|
if (Objects.nonNull(currentResult) && !currentResult.get("code").equals("0")) {//算法执行异常则退出
|
|
|
break;
|
|
|
+ } else {
|
|
|
+ currentResult.put("library_id", lastLibraryId);
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -423,15 +449,15 @@ public class DataProcess {//数据处理对象
|
|
|
tempList.add(returnData);
|
|
|
returnData = tempList;
|
|
|
}
|
|
|
- if ("Array".contains(dataType)){
|
|
|
- if (returnData instanceof List<?> tempList && !tempList.isEmpty() ) {
|
|
|
+ if ("Array".contains(dataType)) {
|
|
|
+ if (returnData instanceof List<?> tempList && !tempList.isEmpty()) {
|
|
|
List<Object> tpList = new ArrayList<>();
|
|
|
for (Object o : tempList) {
|
|
|
- if (o instanceof Map<?,?>){
|
|
|
+ if (o instanceof Map<?, ?>) {
|
|
|
tpList.add(MapTools.objToJSONStr(o));
|
|
|
- }else if (o instanceof String tpStr && tpStr.contains("=") && tpStr.contains("{") && tpStr.contains("}")){
|
|
|
+ } else if (o instanceof String tpStr && tpStr.contains("=") && tpStr.contains("{") && tpStr.contains("}")) {
|
|
|
tpList.add(MapTools.objToJSONStr(MapTools.strToObj(tpStr)));
|
|
|
- }else {
|
|
|
+ } else {
|
|
|
tpList.add(o);
|
|
|
}
|
|
|
}
|