|
@@ -17,6 +17,7 @@ import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.core.io.Resource;
|
|
|
import org.springframework.core.io.support.ResourcePatternResolver;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
|
|
|
import java.io.*;
|
|
|
import java.nio.charset.StandardCharsets;
|
|
@@ -231,9 +232,9 @@ public class SystemInit {
|
|
|
}
|
|
|
Map<String, Object> it2 = qualityControlEntities2.get(0);
|
|
|
Object frequency2 = it2.get("frequency");
|
|
|
- if (frequency2!=null){
|
|
|
+ if (frequency2 != null) {
|
|
|
// 提交任务到定时线程池
|
|
|
- scheduledExecutorService.scheduleWithFixedDelay(()->{
|
|
|
+ scheduledExecutorService.scheduleWithFixedDelay(() -> {
|
|
|
|
|
|
try {
|
|
|
|
|
@@ -287,7 +288,8 @@ public class SystemInit {
|
|
|
}
|
|
|
|
|
|
Object outData = null;
|
|
|
- List<Map<String, Object>> result = new ArrayList<>();
|
|
|
+ List<Object> result = new ArrayList<>();
|
|
|
+ Object message = null;
|
|
|
try {
|
|
|
// 查询数据源信息
|
|
|
List<Map<String, Object>> dataSources = DATA_BASE.query(Config.getCenterConnectionStr(), "select * from datasource where datasourceid = ?", sourceId);
|
|
@@ -299,10 +301,29 @@ public class SystemInit {
|
|
|
// 如果数据源类型为数据库
|
|
|
if ("DB".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
String sql = rules.toString().trim();
|
|
|
- if (!sql.toLowerCase().startsWith("select ")) {
|
|
|
- throw new RuntimeException("数据库规则必须是查询语句:" + sql);
|
|
|
+ if (DataFormatUtil.isJson(sql)) {
|
|
|
+ List filters = DataFormatUtil.toList(sql);
|
|
|
+ Map<String, Object> data = new HashMap<>() {{
|
|
|
+ put("datacontent", new HashMap<>() {{
|
|
|
+ put("filter", filters);
|
|
|
+ }});
|
|
|
+ put("event","0");
|
|
|
+ }};
|
|
|
+ Map<String, Object> stringObjectMap = new DatabaseScriptUtil().execByTableName(sourceId.toString(), sourceName.toString(), data);
|
|
|
+ if ("0".equals(stringObjectMap.get("code"))) {
|
|
|
+ List<Object> returnData = (List<Object>) stringObjectMap.get("returnData");
|
|
|
+ Object exec = JsScriptEngineUtil.exec(rules.toString(), returnData);
|
|
|
+ result.add(exec);
|
|
|
+ } else {
|
|
|
+ message = stringObjectMap.get("message");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ if (!sql.toLowerCase().startsWith("select ")) {
|
|
|
+ message = "数据库规则必须是查询语句:" + sql;
|
|
|
+ throw new RuntimeException(String.valueOf(message));
|
|
|
+ }
|
|
|
+ result.addAll(DATA_BASE.query(DATA_BASE.queryConnectionStr(sourceId.toString()), sql));
|
|
|
}
|
|
|
- result.addAll(DATA_BASE.query(DATA_BASE.queryConnectionStr(sourceId.toString()), sql));
|
|
|
} else if ("KAFKA".equalsIgnoreCase(datasourceType.toString()) || "ES".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
Map<String, Object> stringObjectMap;
|
|
|
if ("ES".equalsIgnoreCase(datasourceType.toString())) {
|
|
@@ -314,16 +335,17 @@ public class SystemInit {
|
|
|
List<Object> data = (List<Object>) stringObjectMap.get("returnData");
|
|
|
Map<String, Object> exec = JsScriptEngineUtil.eval(rules.toString(), data);
|
|
|
result.add(exec);
|
|
|
+ } else {
|
|
|
+ message = stringObjectMap.get("message");
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
boolean send = true;
|
|
|
Object filter = it.get("resultfilter");
|
|
|
- if (Objects.nonNull(filter)) {
|
|
|
+ if (Objects.nonNull(filter) && StringUtils.hasText(filter.toString())) {
|
|
|
String filterString = filter.toString();
|
|
|
-
|
|
|
outData = JsScriptEngineUtil.exec(filterString, result);
|
|
|
- if (outData == null || (!"true".equalsIgnoreCase(outData.toString()) && !"1".equalsIgnoreCase(outData.toString()))) {
|
|
|
+ if (outData == null || !("true".equalsIgnoreCase(outData.toString()) || "1".equalsIgnoreCase(outData.toString()))) {
|
|
|
send = false;
|
|
|
}
|
|
|
}
|
|
@@ -343,11 +365,13 @@ public class SystemInit {
|
|
|
if (enablelog != null && "1".equalsIgnoreCase(enablelog.toString())) {
|
|
|
LoggerService.LogType qualityControl = LoggerService.LogType.QUALITY_CONTROL;
|
|
|
Object finalExec = outData;
|
|
|
+ Object finalMessage = message;
|
|
|
LoggerService.log(qualityControl, new HashMap<>() {{
|
|
|
put("logtime", LocalDateTime.now());
|
|
|
put("qualitycontrolid", id);
|
|
|
put("prepesource", DataFormatUtil.toString(finalExec));
|
|
|
put("inputdata", DataFormatUtil.toString(result));
|
|
|
+ put("returnmessage", DataFormatUtil.toString(finalMessage));
|
|
|
put("containercode", Config.getContainerCode());
|
|
|
}});
|
|
|
}
|
|
@@ -358,11 +382,8 @@ public class SystemInit {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
- },0,1,TimeUnit.MILLISECONDS);
|
|
|
+ }, 0, 1, TimeUnit.MILLISECONDS);
|
|
|
}
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
if (Config.isDebug()) {
|
|
@@ -388,9 +409,9 @@ public class SystemInit {
|
|
|
return template;
|
|
|
}
|
|
|
|
|
|
- private void sendMessage(String target, String message, List<Map<String, Object>> data) throws Exception {
|
|
|
+ private void sendMessage(String target, String message, List<Object> data) throws Exception {
|
|
|
|
|
|
- if (data != null && !data.isEmpty())
|
|
|
+ if (data != null && !data.isEmpty()) {
|
|
|
for (String targetName : target.split(",")) {
|
|
|
String[] split = targetName.split(":");
|
|
|
String datasourceId = split[0];
|
|
@@ -401,26 +422,22 @@ public class SystemInit {
|
|
|
Object datasourceType = dataSourceMap.get("datasourcetype");
|
|
|
if ("KAFKA".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
List<String> messages = new ArrayList<>();
|
|
|
- for (Map<String, Object> it : data) {
|
|
|
+ for (Object it : data) {
|
|
|
if (message != null) {
|
|
|
- messages.add(replaceTemplate(message, it));
|
|
|
+ messages.add(replaceTemplate(message, (Map<String, Object>) it));
|
|
|
} else {
|
|
|
messages.add(DataFormatUtil.toString(it));
|
|
|
}
|
|
|
}
|
|
|
KafkaScriptUtil.sendMessage(datasourceId, targetTopic, messages);
|
|
|
} else if ("DB".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
- List<String> keys = data.get(0).keySet().stream().toList();
|
|
|
-
|
|
|
+ List<String> keys = ((Map<String, Object>) data.get(0)).keySet().stream().toList();
|
|
|
String sql = "insert into %s (%s)values(%s)".formatted(targetTopic, String.join(",", keys), keys.stream().map(it -> "?").collect(Collectors.joining(",")));
|
|
|
- List<Object[]> values = data.stream().map(d -> keys.stream().map(d::get).toArray()).toList();
|
|
|
-
|
|
|
+ List<Object[]> values = data.stream().map(it -> ((Map) it)).map(d -> keys.stream().map(d::get).toArray()).toList();
|
|
|
DATA_BASE.update(DATA_BASE.queryConnectionStr(datasourceId), sql, values);
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
-
|
|
|
}
|
|
|
-
|
|
|
+ }
|
|
|
}
|
|
|
}
|