|
@@ -2,14 +2,19 @@ package com.scbfkj.uni.service;
|
|
|
|
|
|
import com.scbfkj.uni.library.DataFormatUtil;
|
|
|
import com.scbfkj.uni.library.UniReturnUtil;
|
|
|
+import com.scbfkj.uni.library.script.DatabaseScriptUtil;
|
|
|
import com.scbfkj.uni.process.DataBase;
|
|
|
+import com.scbfkj.uni.process.Elasticsearch;
|
|
|
+import com.scbfkj.uni.process.Kafka;
|
|
|
import com.scbfkj.uni.system.Config;
|
|
|
|
|
|
import java.io.File;
|
|
|
-import java.util.Collections;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Set;
|
|
|
+import java.io.FileNotFoundException;
|
|
|
+import java.util.*;
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
|
|
|
public class LoggerService {
|
|
|
|
|
@@ -20,14 +25,15 @@ public class LoggerService {
|
|
|
"enableCache":false
|
|
|
}
|
|
|
""";
|
|
|
+ private static long filename = 0;
|
|
|
|
|
|
private final static Set<Long> fileNames = new HashSet<>();
|
|
|
private static final String dbFileName = "logs/%s.sqlite";
|
|
|
|
|
|
public static void log(String target, Map<String, Object> data) {
|
|
|
- long filename = System.currentTimeMillis() / Config.splitCount;
|
|
|
+ filename = System.currentTimeMillis() / Config.splitCount;
|
|
|
|
|
|
- String filePath = dbFileName.formatted( filename);
|
|
|
+ String filePath = dbFileName.formatted(filename);
|
|
|
String connectionStr = connection.formatted(filePath);
|
|
|
synchronized (fileNames) {
|
|
|
if (!fileNames.contains(filename)) {
|
|
@@ -39,6 +45,8 @@ public class LoggerService {
|
|
|
logid Integer
|
|
|
primary key autoincrement,
|
|
|
target TEXT,
|
|
|
+ currentfile TEXT,
|
|
|
+ targetconnection TEXT,
|
|
|
datacontent TEXT
|
|
|
)""");
|
|
|
} catch (Exception e) {
|
|
@@ -50,9 +58,91 @@ public class LoggerService {
|
|
|
}
|
|
|
}
|
|
|
try {
|
|
|
- DataBase.updateBatch(connectionStr, "insert into logs (target,datacontent) values(?,?)", Collections.singletonList(new Object[]{target, DataFormatUtil.toString(data)}));
|
|
|
+ List<Object[]> datas = new ArrayList<>();
|
|
|
+ List<String> targets = Config.targets;
|
|
|
+ for (String targetconnection : targets) {
|
|
|
+ datas.add(new Object[]{target, filename, targetconnection, DataFormatUtil.toString(data)});
|
|
|
+ }
|
|
|
+
|
|
|
+ DataBase.updateBatch(connectionStr, "insert into logs (target,currentfile,targetconnection,datacontent) values(?,?)", datas);
|
|
|
} catch (Exception e) {
|
|
|
System.out.println(UniReturnUtil.getMessage(e));
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
+ public static void sendMessage(String dirPath) throws FileNotFoundException {
|
|
|
+// 判断目录
|
|
|
+ if (Objects.isNull(dirPath)) {
|
|
|
+ dirPath = "logs";
|
|
|
+ }
|
|
|
+ File file = new File(dirPath);
|
|
|
+ if (file.isFile()) {
|
|
|
+ throw new FileNotFoundException("%s不是一个目录".formatted(dirPath));
|
|
|
+ }
|
|
|
+// 日志目录下的文件
|
|
|
+ List<File> files = Arrays.stream(file.listFiles()).toList();
|
|
|
+// 过滤出当前不使用的文件
|
|
|
+ List<File> logsFiles = files.parallelStream()
|
|
|
+ .filter(logFile -> {
|
|
|
+ String fileName = logFile.getName();
|
|
|
+ String sqliteFileName = fileName.substring(0, fileName.indexOf(".sqlite"));
|
|
|
+ return !sqliteFileName.equals(filename);
|
|
|
+ }).toList();
|
|
|
+// 记录出错的日志文件
|
|
|
+ List<String> errorFileNames = new ArrayList<>();
|
|
|
+ logsFiles.stream()
|
|
|
+ .map(logFile -> {
|
|
|
+ String fileName = logFile.getName();
|
|
|
+// 转成连接字符串
|
|
|
+ return connection.formatted(fileName);
|
|
|
+ })
|
|
|
+ .flatMap(connectionStr -> {
|
|
|
+// 查询数据
|
|
|
+ try {
|
|
|
+ return DataBase.query(connectionStr, "select * from logs where 1=?", Collections.singletonList(new Object[]{1})).stream();
|
|
|
+ } catch (Exception e) {
|
|
|
+ Matcher matcher = Pattern.compile("\\d+").matcher(connectionStr);
|
|
|
+ errorFileNames.add(matcher.group());
|
|
|
+ return Stream.empty();
|
|
|
+ }
|
|
|
+ })
|
|
|
+// 分组
|
|
|
+ .collect(Collectors.groupingBy((log) -> log.get("target").toString()))
|
|
|
+ .entrySet().parallelStream().forEach(stringListEntry -> {
|
|
|
+ String targetName = stringListEntry.getKey();
|
|
|
+// 需要发送的数据
|
|
|
+ List<Map<String, Object>> value = stringListEntry.getValue();
|
|
|
+// 按照目标连接字符串分组
|
|
|
+ Map<String, List<Map<String, Object>>> targetconnection = value.stream().collect(Collectors.groupingBy(data -> data.get("targetconnection").toString()));
|
|
|
+ targetconnection.forEach((connectionStr, value1) -> {
|
|
|
+// 获取发送的数据流
|
|
|
+ Stream<String> datacontentStream = value1.stream().map(data -> data.get("datacontent").toString());
|
|
|
+
|
|
|
+// 解析发送目标连接字符串
|
|
|
+ Map<String, Object> config = ((Map<String, Object>) DataFormatUtil.toMap(connectionStr));
|
|
|
+ String type = config.getOrDefault("type", "DB").toString();
|
|
|
+ try {
|
|
|
+ switch (type.toUpperCase()) {
|
|
|
+ case "ES" -> Elasticsearch.sendMessage(connectionStr, targetName, datacontentStream.toList());
|
|
|
+ case "KAFKA" -> Kafka.sendMessage(connectionStr, targetName, datacontentStream.toList());
|
|
|
+ case "DB" ->
|
|
|
+ DatabaseScriptUtil.exec(connectionStr, targetName, datacontentStream.map(DataFormatUtil::toMap).map(dataContent -> ((Map<String, Object>) dataContent)).toList(), "1", null, null);
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+// 当前数据是在哪一个数据库文件中
|
|
|
+ String currentfile = value.stream().findFirst().map(d -> d.get("currentfile").toString()).orElse(null);
|
|
|
+ errorFileNames.add(currentfile);
|
|
|
+ System.out.println(UniReturnUtil.getMessage(e));
|
|
|
+ }
|
|
|
+
|
|
|
+ });
|
|
|
+ });
|
|
|
+// 过滤掉出错的数据库文件然后删除文件
|
|
|
+ if (logsFiles.removeIf(f -> errorFileNames.stream().anyMatch(errorFileName -> errorFileName.contains(f.getName())))) {
|
|
|
+ for (File logsFile : logsFiles) {
|
|
|
+ logsFile.delete();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
}
|