|
@@ -1,73 +1,108 @@
|
|
|
package com.scbfkj.uni.service;
|
|
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
-import com.fasterxml.jackson.databind.JsonNode;
|
|
|
import com.jayway.jsonpath.JsonPath;
|
|
|
import com.scbfkj.uni.library.DataAliasGetUtil;
|
|
|
import com.scbfkj.uni.library.DataFormatUtil;
|
|
|
-import com.scbfkj.uni.library.ScriptEngineUtil;
|
|
|
+import com.scbfkj.uni.library.RequestUtil;
|
|
|
import com.scbfkj.uni.library.UniReturnUtil;
|
|
|
+import com.scbfkj.uni.library.script.JavaScriptEngineUtil;
|
|
|
+import com.scbfkj.uni.library.script.JsScriptEngineUtil;
|
|
|
import com.scbfkj.uni.process.DataBase;
|
|
|
import com.scbfkj.uni.system.Config;
|
|
|
-import jakarta.el.MethodNotFoundException;
|
|
|
|
|
|
-import java.io.File;
|
|
|
-import java.lang.reflect.Method;
|
|
|
-import java.net.URL;
|
|
|
-import java.net.URLClassLoader;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
-import java.util.stream.Collectors;
|
|
|
|
|
|
|
|
|
public class DataProcessService {
|
|
|
|
|
|
|
|
|
- public static Map<String, Object> process(Map<String, Object> inData) throws Exception {
|
|
|
-
|
|
|
- Optional<String> serviceIdOpt = DataAliasGetUtil.getValue("serviceid", inData);
|
|
|
- if (Objects.isNull(inData) || inData.isEmpty() || serviceIdOpt.isEmpty())
|
|
|
- return UniReturnUtil.fail("服务编号不能为空");
|
|
|
+ public static Map<String, Object> process(Map<String, Object> inData) {
|
|
|
|
|
|
+ String lifecycleid = null;
|
|
|
+ Map<String, Object> result;
|
|
|
List<Map<String, Object>> resource = new ArrayList<>();
|
|
|
resource.add(inData);
|
|
|
- String serviceId = serviceIdOpt.get();
|
|
|
+ Object algorithmlibraryid;
|
|
|
+ try {
|
|
|
+ Optional<String> serviceIdOpt = DataAliasGetUtil.getValue("serviceid", inData);
|
|
|
+ if (Objects.isNull(inData) || inData.isEmpty() || serviceIdOpt.isEmpty())
|
|
|
+ return UniReturnUtil.fail("服务编号不能为空");
|
|
|
+ String serviceId = serviceIdOpt.get();
|
|
|
+
|
|
|
+// 熔断
|
|
|
+
|
|
|
+ List<Map<String, Object>> algorithmLibraries;
|
|
|
+ if (serviceId.matches("^\\d+$")) {
|
|
|
+ algorithmLibraries = DataBase.query(Config.centerConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
|
|
|
+ } else {
|
|
|
+ algorithmLibraries = DataBase.query(Config.localCenterConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
|
|
|
+ }
|
|
|
|
|
|
- List<Map<String, Object>> algorithmLibraries;
|
|
|
- if(serviceId.matches("^\\d+$")){
|
|
|
- algorithmLibraries = DataBase.query(Config.centerConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
|
|
|
- }else{
|
|
|
- algorithmLibraries = DataBase.query(Config.localCenterConnectionStr, "select * from algorithmlibrary where serviceid=?", Collections.singletonList(new Object[]{serviceId}));
|
|
|
- }
|
|
|
+ Optional<String> lifecycleidOpt = DataAliasGetUtil.getValue("lifecycleid", inData);
|
|
|
+ if (lifecycleidOpt.isEmpty()) {
|
|
|
+ lifecycleid = createLifeCycleCol(Config.containerCode, serviceId);
|
|
|
+ inData.put("lifecycleid", lifecycleid);
|
|
|
+ } else {
|
|
|
+ lifecycleid = lifecycleidOpt.get();
|
|
|
+ }
|
|
|
|
|
|
- for (Map<String, Object> algorithmLibrary : algorithmLibraries) {
|
|
|
- resource.add(processByAlgorithm(algorithmLibrary, resource, null, null));
|
|
|
+ for (Map<String, Object> algorithmLibrary : algorithmLibraries) {
|
|
|
+
|
|
|
+ Object preConditions = algorithmLibrary.get("preconditions");
|
|
|
+ Object preparameterset = algorithmLibrary.get("preparameterset");
|
|
|
+ if (Objects.nonNull(preConditions)) {
|
|
|
+ String preCode = preConditionScript(preConditions.toString(), resource, DataFormatUtil.toString(preparameterset));
|
|
|
+ if (Objects.equals("2", preCode)) {
|
|
|
+ return UniReturnUtil.success(resource.size() - 1);
|
|
|
+ } else if (Objects.equals("1", preCode)) {
|
|
|
+ resource.add(null);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ algorithmlibraryid = algorithmLibrary.get("algorithmlibraryid");
|
|
|
+ Map<String, Object> algorithmResult = null;
|
|
|
+ try {
|
|
|
+ algorithmResult = processByAlgorithm(algorithmLibrary, resource);
|
|
|
+ } catch (Exception e) {
|
|
|
+// 把错误细化后往上一层抛出统一记录日志
|
|
|
+ throw new RuntimeException("算法id: %s 异常信息:%s ".formatted(algorithmlibraryid, e.getMessage()));
|
|
|
+ }
|
|
|
+ if (!Objects.equals(algorithmResult.get("code"), "0")) {
|
|
|
+ resource.add(algorithmResult);
|
|
|
+ } else {
|
|
|
+ throw new RuntimeException(algorithmResult.get("message").toString());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ result = resource.get(resource.size() - 1);
|
|
|
+ } catch (Exception e) {
|
|
|
+// 异常
|
|
|
+ LoggerService.logServiceError(RequestUtil.getIpAddr(), RequestUtil.getUri(), RequestUtil.getSessionId(), DataFormatUtil.toString(resource), null, "-1", e.getMessage(), lifecycleid);
|
|
|
+ return UniReturnUtil.fail(e.getMessage());
|
|
|
}
|
|
|
- return resource.get(resource.size() - 1);
|
|
|
+// 成功
|
|
|
+ LoggerService.logService(RequestUtil.getIpAddr(), RequestUtil.getUri(), RequestUtil.getSessionId(), DataFormatUtil.toString(inData), DataFormatUtil.toString(result), "0", null, lifecycleid);
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* @param algorithmLibrary 算法配置
|
|
|
* @param args 算法参数
|
|
|
- * @param filterColumns 数据库执行的列权限
|
|
|
- * @param filterLines 数据库执行的行权限
|
|
|
* @return
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- public static Map<String, Object> processByAlgorithm(Map<String, Object> algorithmLibrary, List<Map<String, Object>> args, List<String> filterColumns, List<Map<String,Object>> filterLines) throws Exception {
|
|
|
+ public static Map<String, Object> processByAlgorithm(Map<String, Object> algorithmLibrary, List<Map<String, Object>> args) throws Exception {
|
|
|
|
|
|
+ List<String> filterColumns = Optional.ofNullable(args).map(it -> it.get(0)).map(it -> it.get("filterColumns")).map(it -> ((List<String>) it)).orElse(new ArrayList<>());
|
|
|
+ List<Map<String, Object>> filterLines = Optional.ofNullable(args).map(it -> it.get(0)).map(it -> it.get("filterLines")).map(it -> ((List<Map<String, Object>>) it)).orElse(new ArrayList<>());
|
|
|
Object type = algorithmLibrary.get("algorithmtype");
|
|
|
Object expression = algorithmLibrary.get("computingexpression");
|
|
|
Object parameterSet = algorithmLibrary.get("parameterset");
|
|
|
- Object preConditions = algorithmLibrary.get("preconditions");
|
|
|
Object dataSourceId = algorithmLibrary.get("datasourceid");
|
|
|
List<Map<String, Object>> datasourceList = DataBase.query(Config.centerConnectionStr, "select * from datasource where datasourceid=?", Collections.singletonList(new Object[]{dataSourceId}));
|
|
|
Map<String, Object> datasource = datasourceList.get(0);
|
|
|
- if (Objects.nonNull(preConditions)) {
|
|
|
- String result = preConditionScript(preConditions.toString(), args);
|
|
|
- if (Objects.equals("1", result) || Objects.equals("2", result)) {
|
|
|
- return UniReturnUtil.success(result);
|
|
|
- }
|
|
|
- }
|
|
|
List<Object> parameters = new ArrayList<>();
|
|
|
// 获取入参列表
|
|
|
if (Objects.nonNull(parameterSet)) {
|
|
@@ -81,70 +116,21 @@ public class DataProcessService {
|
|
|
switch (type.toString()) {
|
|
|
// java反射
|
|
|
case "1" -> {
|
|
|
- Object connectset = datasource.get("connectset");
|
|
|
- JsonNode jsonNode = DataFormatUtil.toJsonNode(connectset);
|
|
|
- JsonNode path = jsonNode.get("path");
|
|
|
- if (Objects.nonNull(path)) {
|
|
|
- File file = new File(System.getProperty("user.dir").concat(File.separator).concat("plugins").concat(File.separator).concat(path.asText()));
|
|
|
- if (!file.exists()) {
|
|
|
- throw new RuntimeException("外部文件加载不存在:".concat(System.getProperty("user.dir")).concat(File.separator).concat("plugins").concat(File.separator).concat(path.asText()));
|
|
|
- }
|
|
|
- Method addURLMethod = null;
|
|
|
-
|
|
|
- boolean accessible = false;
|
|
|
- try {
|
|
|
- addURLMethod = URLClassLoader.class.getDeclaredMethod("addURL", URL.class);
|
|
|
- URLClassLoader classLoader = (URLClassLoader) ClassLoader.getSystemClassLoader();
|
|
|
- accessible = addURLMethod.canAccess(null); //获取方法的访问权限以便恢复:判读是否可以访问: true 代表可以访问
|
|
|
- if (!accessible) { //判读是否可以访问,如果是false,那就必须setAccessible(true)
|
|
|
- addURLMethod.setAccessible(true);//实际上是为了提效
|
|
|
- }
|
|
|
- addURLMethod.invoke(classLoader, file.toURI().toURL());//加载到系统中
|
|
|
- } finally {
|
|
|
- if (addURLMethod != null && !accessible) {
|
|
|
- addURLMethod.setAccessible(false);//恢复原访问权限
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- JsonNode className = jsonNode.get("className");
|
|
|
-
|
|
|
- Class<?> classExample = Class.forName(className.asText()); //获取类实例
|
|
|
- Method javaMethod = null;
|
|
|
- Method closeMethod = null;
|
|
|
- Object classInstance = classExample.getConstructor().newInstance();//类实例接口 无参数构造
|
|
|
- for (Method currentMethod : classExample.getMethods()) {//循环所有方法
|
|
|
- String methodName = currentMethod.getName();
|
|
|
- if (methodName.equals(expression)) {
|
|
|
- javaMethod = currentMethod;
|
|
|
- } else if (methodName.equals("close")) {
|
|
|
- closeMethod = currentMethod;
|
|
|
- }
|
|
|
- }
|
|
|
- if (Objects.isNull(javaMethod)) {
|
|
|
- throw new MethodNotFoundException(expression + ": 没找到");
|
|
|
- }
|
|
|
-
|
|
|
- Object invoke = javaMethod.invoke(classInstance, parameters.toArray());
|
|
|
-// 关闭 因为没有做实例的缓存 所以要关闭算法
|
|
|
- if (Objects.nonNull(closeMethod)) {
|
|
|
- closeMethod.invoke(classInstance);
|
|
|
- }
|
|
|
+ Object invoke = JavaScriptEngineUtil.invoke(datasource, expression.toString(), parameters);
|
|
|
return UniReturnUtil.success(invoke);
|
|
|
}
|
|
|
// JS表达式
|
|
|
case "2" -> {
|
|
|
- return UniReturnUtil.success(ScriptEngineUtil.eval(expression.toString(), parameters));
|
|
|
+ return UniReturnUtil.success(JsScriptEngineUtil.eval(expression.toString(), parameters));
|
|
|
}
|
|
|
// 数据库
|
|
|
case "3" -> {
|
|
|
// 下放到Database中处理数据
|
|
|
-// 参数表达式顺序是 数据源连接字符串($.datasource.connectset),sql表达式($.algorithm.computingexpression),需要操作的值($.args[1].returnData),执行编号($.algorithm.executionnumber)
|
|
|
+// 参数表达式顺序是 数据源连接字符串(String.$.datasource.connectset),sql表达式(String.$.algorithm.computingexpression),需要操作的值(List.$.args[1].returnData),执行编号(String.$.algorithm.executionnumber)
|
|
|
return DataBase.exec(parameters.get(0).toString(), parameters.get(1).toString(), ((List<Map<String, Object>>) parameters.get(2)), parameters.get(3), filterColumns, filterLines);
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
}
|
|
|
- return UniReturnUtil.fail("");
|
|
|
+ return UniReturnUtil.fail("算法类型不支持");
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -153,19 +139,96 @@ public class DataProcessService {
|
|
|
* @return
|
|
|
* @throws Exception
|
|
|
*/
|
|
|
- private static String preConditionScript(String script, List<Map<String, Object>> args) throws Exception {
|
|
|
- return ScriptEngineUtil.eval(script, args);
|
|
|
+ private static String preConditionScript(String script, List<Map<String, Object>> args, String parameterSet) throws Exception {
|
|
|
+ if (Objects.isNull(parameterSet)) {
|
|
|
+ return JsScriptEngineUtil.eval(script, args);
|
|
|
+ } else {
|
|
|
+ List<Object> params = getParams(parameterSet, args);
|
|
|
+ return JsScriptEngineUtil.eval(script, params.toArray());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @param parameterSet 使用标准的jsonPath表达式 $.a.b[0].c,多个参数之间使用;;分隔
|
|
|
+ * @param parameterSet 使用非标准的jsonPath表达式 String.$.a.b[0].c,String.abcc 多个参数之间使用;;分隔
|
|
|
* @param source jsonpath需要取值的数据源
|
|
|
* @return
|
|
|
*/
|
|
|
public static List<Object> getParams(String parameterSet, Object source) throws JsonProcessingException {
|
|
|
String[] paths = parameterSet.split(";;");
|
|
|
String json = DataFormatUtil.toString(source);
|
|
|
- return Arrays.stream(paths).map(it -> JsonPath.read(json, it.trim())).toList();
|
|
|
+ List<Object> result = new ArrayList<>();
|
|
|
+
|
|
|
+ for (String it : paths) {
|
|
|
+ it = it.trim();
|
|
|
+ String type = it.split("\\.")[0];
|
|
|
+ String path = it.replace(type + ".", "");
|
|
|
+ Object value = null;
|
|
|
+// $开头的使用标准的jsonPath
|
|
|
+ if (path.startsWith("$")) {
|
|
|
+ value = JsonPath.read(json, path);
|
|
|
+ } else if (path.startsWith("#")) {
|
|
|
+// todo 缓存之类的特殊值
|
|
|
+
|
|
|
+// 不是使用$#开头的直接取后面的字符串
|
|
|
+ } else {
|
|
|
+ value = path;
|
|
|
+ }
|
|
|
+ if (Objects.isNull(value)) {
|
|
|
+ result.add(null);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ if ("String".equalsIgnoreCase(type)) {
|
|
|
+ value = DataFormatUtil.toString(value);
|
|
|
+ } else if ("List".equalsIgnoreCase(type)) {
|
|
|
+ value = DataFormatUtil.toList(value);
|
|
|
+ } else if ("Array".equalsIgnoreCase(type)) {
|
|
|
+ value = DataFormatUtil.toArray(value);
|
|
|
+ } else if ("Map".equalsIgnoreCase(type)) {
|
|
|
+ value = DataFormatUtil.toMap(value);
|
|
|
+ } else if ("Long".equalsIgnoreCase(type)) {
|
|
|
+ value = Long.parseLong(value.toString());
|
|
|
+ } else if ("Integer".equalsIgnoreCase(type)) {
|
|
|
+ value = Integer.parseInt(value.toString());
|
|
|
+ } else if ("Double".equalsIgnoreCase(type)) {
|
|
|
+ value = Double.parseDouble(value.toString());
|
|
|
+ } else if ("Float".equalsIgnoreCase(type)) {
|
|
|
+ value = Float.parseFloat(value.toString());
|
|
|
+ } else if ("Boolean".equalsIgnoreCase(type)) {
|
|
|
+ value = Boolean.parseBoolean(value.toString());
|
|
|
+ } else if ("Datetime".equalsIgnoreCase(type)) {
|
|
|
+ String string = value.toString();
|
|
|
+ String patten = "yyyy-MM-dd HH:mm:ss";
|
|
|
+ if (string.contains("(")) {
|
|
|
+ patten = string.substring(string.indexOf("(") + 1, string.length() - 1);
|
|
|
+ string = string.substring(0, string.indexOf("("));
|
|
|
+ }
|
|
|
+ value = LocalDateTime.parse(string, DateTimeFormatter.ofPattern(patten));
|
|
|
+ }
|
|
|
+
|
|
|
+ result.add(value);
|
|
|
+ }
|
|
|
+ return result;
|
|
|
}
|
|
|
|
|
|
+ private static long sequence = 0L;
|
|
|
+ private static long lastTimestamp = -1L;
|
|
|
+
|
|
|
+ public synchronized static String createLifeCycleCol(String containerCode, String serviceId) {
|
|
|
+ long timestamp = System.currentTimeMillis();
|
|
|
+ //如果当前时间小于上一次ID生成的时间戳,说明系统时钟回退过这个时候应当抛出异常
|
|
|
+ if (lastTimestamp == timestamp) { //如果是同一时间生成的,则进行毫秒内序列
|
|
|
+ sequence++;
|
|
|
+ if (sequence > 999L) {//毫秒内序列溢出
|
|
|
+ sequence = 0;
|
|
|
+ while (lastTimestamp == System.currentTimeMillis()) {//阻塞到下一个毫秒,获得新的时间戳
|
|
|
+ }
|
|
|
+ timestamp = System.currentTimeMillis();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ sequence = 0L;
|
|
|
+ }
|
|
|
+ lastTimestamp = timestamp;//上次生成ID的时间截
|
|
|
+ //移位并通过或运算拼到一起组成64位的ID
|
|
|
+ return String.valueOf(timestamp).concat(String.format("%03d", sequence)).concat(String.format("%6s", containerCode)).concat(String.format("%6s", serviceId)).replaceAll("\\s", "0");
|
|
|
+ }
|
|
|
}
|