|
@@ -252,6 +252,7 @@ public class SystemInit {
|
|
|
Object sourceId = it.get("sourceid");
|
|
|
Object sourceName = it.get("sourcename");
|
|
|
Object target = it.get("target");
|
|
|
+ Object targetName = it.get("targetname");
|
|
|
Object rules = it.get("rules");
|
|
|
Object messageTemplate = it.get("messagetemplate");
|
|
|
Object lastRunContainerCode = it.get("lastruncontainercode");
|
|
@@ -307,7 +308,7 @@ public class SystemInit {
|
|
|
put("datacontent", new HashMap<>() {{
|
|
|
put("filter", filters);
|
|
|
}});
|
|
|
- put("event","0");
|
|
|
+ put("event", "0");
|
|
|
}};
|
|
|
Map<String, Object> stringObjectMap = new DatabaseScriptUtil().execByTableName(sourceId.toString(), sourceName.toString(), data);
|
|
|
if ("0".equals(stringObjectMap.get("code"))) {
|
|
@@ -352,7 +353,7 @@ public class SystemInit {
|
|
|
|
|
|
if (send) {
|
|
|
|
|
|
- sendMessage(target == null ? null : target.toString(), messageTemplate == null ? null : messageTemplate.toString(), result);
|
|
|
+ sendMessage(target == null ? null : target.toString(),targetName == null ? null : targetName.toString(), messageTemplate == null ? null : messageTemplate.toString(), result);
|
|
|
DATA_BASE.update(Config.getCenterConnectionStr(), "update qualitycontrol set lastruntime = ? , lastruncontainercode= ? where id =?", LocalDateTime.now(), Config.getContainerCode(), id);
|
|
|
}
|
|
|
|
|
@@ -409,34 +410,43 @@ public class SystemInit {
|
|
|
return template;
|
|
|
}
|
|
|
|
|
|
- private void sendMessage(String target, String message, List<Object> data) throws Exception {
|
|
|
+ private void sendMessage(String target, String targetName, String message, List<Object> data) throws Exception {
|
|
|
|
|
|
if (data != null && !data.isEmpty()) {
|
|
|
- for (String targetName : target.split(",")) {
|
|
|
- String[] split = targetName.split(":");
|
|
|
- String datasourceId = split[0];
|
|
|
- String targetTopic = split[1];
|
|
|
- List<Map<String, Object>> dataSources = DATA_BASE.query(Config.getCenterConnectionStr(), "select * from datasource where datasourceid = ?", datasourceId);
|
|
|
- if (!dataSources.isEmpty()) {
|
|
|
- Map<String, Object> dataSourceMap = dataSources.get(0);
|
|
|
- Object datasourceType = dataSourceMap.get("datasourcetype");
|
|
|
- if ("KAFKA".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
- List<String> messages = new ArrayList<>();
|
|
|
- for (Object it : data) {
|
|
|
- if (message != null) {
|
|
|
- messages.add(replaceTemplate(message, (Map<String, Object>) it));
|
|
|
- } else {
|
|
|
- messages.add(DataFormatUtil.toString(it));
|
|
|
- }
|
|
|
- }
|
|
|
- KafkaScriptUtil.sendMessage(datasourceId, targetTopic, messages);
|
|
|
- } else if ("DB".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
- List<String> keys = ((Map<String, Object>) data.get(0)).keySet().stream().toList();
|
|
|
- String sql = "insert into %s (%s)values(%s)".formatted(targetTopic, String.join(",", keys), keys.stream().map(it -> "?").collect(Collectors.joining(",")));
|
|
|
- List<Object[]> values = data.stream().map(it -> ((Map) it)).map(d -> keys.stream().map(d::get).toArray()).toList();
|
|
|
- DATA_BASE.update(DATA_BASE.queryConnectionStr(datasourceId), sql, values);
|
|
|
+
|
|
|
+ if (targetName == null) {
|
|
|
+ for (String targetStr : target.split(",")) {
|
|
|
+ String[] split = targetStr.split(":");
|
|
|
+ String datasourceId = split[0];
|
|
|
+ String targetTopic = split[1];
|
|
|
+ sendData(message, data, datasourceId, targetTopic);
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ sendData(message, data, target, targetName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void sendData(String message, List<Object> data, String datasourceId, String targetTopic) throws Exception {
|
|
|
+ List<Map<String, Object>> dataSources = DATA_BASE.query(Config.getCenterConnectionStr(), "select * from datasource where datasourceid = ?", datasourceId);
|
|
|
+ if (!dataSources.isEmpty()) {
|
|
|
+ Map<String, Object> dataSourceMap = dataSources.get(0);
|
|
|
+ Object datasourceType = dataSourceMap.get("datasourcetype");
|
|
|
+ if ("KAFKA".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
+ List<String> messages = new ArrayList<>();
|
|
|
+ for (Object it : data) {
|
|
|
+ if (message != null) {
|
|
|
+ messages.add(replaceTemplate(message, (Map<String, Object>) it));
|
|
|
+ } else {
|
|
|
+ messages.add(DataFormatUtil.toString(it));
|
|
|
}
|
|
|
}
|
|
|
+ KafkaScriptUtil.sendMessage(datasourceId, targetTopic, messages);
|
|
|
+ } else if ("DB".equalsIgnoreCase(datasourceType.toString())) {
|
|
|
+ List<String> keys = ((Map<String, Object>) data.get(0)).keySet().stream().toList();
|
|
|
+ String sql = "insert into %s (%s)values(%s)".formatted(targetTopic, String.join(",", keys), keys.stream().map(it -> "?").collect(Collectors.joining(",")));
|
|
|
+ List<Object[]> values = data.stream().map(it -> ((Map) it)).map(d -> keys.stream().map(d::get).toArray()).toList();
|
|
|
+ DATA_BASE.update(DATA_BASE.queryConnectionStr(datasourceId), sql, values);
|
|
|
}
|
|
|
}
|
|
|
}
|