|
@@ -13,6 +13,7 @@ import com.scbfkj.uni.system.Config;
|
|
|
import java.time.LocalDateTime;
|
|
|
import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
public class DataProcessService {
|
|
@@ -33,41 +34,67 @@ public class DataProcessService {
|
|
|
|
|
|
try {
|
|
|
serviceIdOpt = DataAliasGetUtil.getValue("serviceid", inData);
|
|
|
- if (Objects.isNull(inData) || inData.isEmpty() || serviceIdOpt.isEmpty()) {
|
|
|
+ if (serviceIdOpt.isEmpty()) {
|
|
|
throw new RuntimeException("服务编号不能为空");
|
|
|
}
|
|
|
serviceId = serviceIdOpt.get();
|
|
|
|
|
|
- List<Map<String, Object>> serviceInfoList = DataBase.query(Config.getCenterConnectionStr(), "select enablelog from serviceinfo where serviceid = ?", serviceId);
|
|
|
+// 熔断
|
|
|
+// 查询服务运行状态
|
|
|
+ List<Map<String, Object>> serviceState = DataBase.query(Config.getCenterConnectionStr(), """
|
|
|
+ select 1 as runstate
|
|
|
+ from servicestate
|
|
|
+ where serviceid =? and runstate = '1'""", serviceId);
|
|
|
+ if (serviceState.isEmpty()) {
|
|
|
+ throw new RuntimeException("服务没有运行");
|
|
|
+ }
|
|
|
+ List<Map<String, Object>> serviceInfoList = DataBase.query(Config.getCenterConnectionStr(), """
|
|
|
+ select serviceid,
|
|
|
+ servicename,
|
|
|
+ servicetype,
|
|
|
+ containercode,
|
|
|
+ tasktype,
|
|
|
+ cronexpress,
|
|
|
+ urilist,
|
|
|
+ nullresultparameters,
|
|
|
+ autonotification,
|
|
|
+ loopcount,
|
|
|
+ frequency,
|
|
|
+ enablelog
|
|
|
+ from serviceinfo;
|
|
|
+ where serviceid = ?""", serviceId);
|
|
|
if (!serviceInfoList.isEmpty()) {
|
|
|
serviceInfo = serviceInfoList.get(0);
|
|
|
}
|
|
|
|
|
|
-// 熔断
|
|
|
+
|
|
|
+ Optional<String> lifecycleidOpt = DataAliasGetUtil.getValue("lifecycleid", inData);
|
|
|
+
|
|
|
+ if (lifecycleidOpt.isEmpty()) {
|
|
|
+ lifecycleid = DataAliasGetUtil.createLifeCycleCol(Config.getContainerCode(), serviceId);
|
|
|
+ inData.put("lifecycleid", lifecycleid);
|
|
|
+ } else {
|
|
|
+ lifecycleid = lifecycleidOpt.get();
|
|
|
+ }
|
|
|
+ HashMap<String, Object> source = new HashMap<>();
|
|
|
|
|
|
List<Map<String, Object>> algorithmLibraries = DataBase.query(Config.getCenterConnectionStr(), """
|
|
|
select algorithmlibraryid,
|
|
|
serviceid,
|
|
|
algorithmtype,
|
|
|
+ algorithmname,
|
|
|
computingexpression,
|
|
|
parameterset,
|
|
|
preconditions,
|
|
|
- executionnumber,
|
|
|
+ executionorder,
|
|
|
datasourceid,
|
|
|
- preparameterset
|
|
|
+ preparameterset,
|
|
|
+ targetsource,
|
|
|
+ algorithmdescription
|
|
|
from algorithmlibrary
|
|
|
- where serviceid = ?""", serviceId);
|
|
|
-
|
|
|
+ where serviceid = ? order by executionorder""", serviceId);
|
|
|
|
|
|
- Optional<String> lifecycleidOpt = DataAliasGetUtil.getValue("lifecycleid", inData);
|
|
|
|
|
|
- if (lifecycleidOpt.isEmpty()) {
|
|
|
- lifecycleid = DataAliasGetUtil.createLifeCycleCol(Config.getContainerCode(), serviceId);
|
|
|
- inData.put("lifecycleid", lifecycleid);
|
|
|
- } else {
|
|
|
- lifecycleid = lifecycleidOpt.get();
|
|
|
- }
|
|
|
- HashMap<String, Object> source = new HashMap<>();
|
|
|
for (Map<String, Object> algorithmLibrary : algorithmLibraries) {
|
|
|
|
|
|
Map<String, Object> algorithmResult = null;
|
|
@@ -85,6 +112,7 @@ public class DataProcessService {
|
|
|
data.put("algorithmlibraryid", algorithmlibraryid);
|
|
|
source.put("args", resource);
|
|
|
source.put("algorithm", algorithmLibrary);
|
|
|
+// 计算前置算法
|
|
|
if (Objects.nonNull(preConditions)) {
|
|
|
HashMap<String, Object> preData = new HashMap<>();
|
|
|
preResource.add(preData);
|
|
@@ -93,25 +121,31 @@ public class DataProcessService {
|
|
|
|
|
|
|
|
|
String preCode;
|
|
|
+// 前置算法参数
|
|
|
List<Object> params = getParams(Optional.ofNullable(preparameterset).map(DataFormatUtil::toString).orElse(null), source);
|
|
|
Map<String, Object> eval = JsScriptEngineUtil.eval(DataFormatUtil.toString(preConditions), params.toArray());
|
|
|
|
|
|
preData.put("preResult", eval);
|
|
|
- if (!Objects.equals(eval.get("code"), "0"))
|
|
|
+ if (!Objects.equals(eval.get("code"), "0")) {
|
|
|
throw new RuntimeException(eval.get("message").toString());
|
|
|
+ }
|
|
|
preCode = eval.get("returnData").toString();
|
|
|
|
|
|
|
|
|
preData.put("preCode", preCode);
|
|
|
// 记录前置条件结果
|
|
|
preData.put("preParameters", params);
|
|
|
+// 直接结束后续算法
|
|
|
if (Objects.equals("2", preCode)) {
|
|
|
break;
|
|
|
+// 跳过当前算法
|
|
|
} else if (Objects.equals("1", preCode)) {
|
|
|
resource.add(null);
|
|
|
continue;
|
|
|
+// 返回不等于0的情况暂时不支持,不处理
|
|
|
} else if (!Objects.equals("0", preCode)) {
|
|
|
- continue;
|
|
|
+//
|
|
|
+ throw new RuntimeException("返回类型暂时不支持,不能处理");
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -125,10 +159,20 @@ public class DataProcessService {
|
|
|
List<Object> parameters = new ArrayList<>();
|
|
|
if (Objects.nonNull(parameterSet)) {
|
|
|
source.put("datasource", datasource);
|
|
|
- parameters.addAll(getParams(parameterSet.toString(), source));
|
|
|
+// 算法参数配置 获取算法参数 组合成String.$.datasource.host;;String.$.datasource.port;;String.$.datasource.username;;String.$.datasource.password;;List.$.args[0].result.returnData
|
|
|
+ List<Map<String, Object>> params = DataBase.query(Config.getCenterConnectionStr(), """
|
|
|
+ select concat(parametertype,'.',subscriptionexpressions) as expressions
|
|
|
+ from algorithmparameters where algorithmlibraryid =?""", algorithmlibraryid.toString());
|
|
|
+
|
|
|
+// 从algorithmparameters 获取算法参数
|
|
|
+ parameters.addAll(getParams(params.stream().map(it -> it.get("expressions").toString()).collect(Collectors.joining(";;")), source));
|
|
|
+
|
|
|
+// 从algorithmlibrary 的parameterSet 获取算法参数
|
|
|
+ if (Objects.isNull(parameterSet)) {
|
|
|
+ parameters.addAll(getParams(parameterSet.toString(), source));
|
|
|
+ }
|
|
|
// 算法入参
|
|
|
data.put("parameters", parameters);
|
|
|
-
|
|
|
}
|
|
|
Object type = algorithmLibrary.get("algorithmtype");
|
|
|
algorithmResult = processByAlgorithm(type, parameters);
|
|
@@ -278,6 +322,7 @@ public class DataProcessService {
|
|
|
res = result;
|
|
|
}
|
|
|
return res;
|
|
|
+
|
|
|
}
|
|
|
|
|
|
|