|
@@ -3,6 +3,8 @@ package com.scbfkj.uni.service;
|
|
|
import com.scbfkj.uni.library.DataFormatUtil;
|
|
|
import com.scbfkj.uni.library.UniReturnUtil;
|
|
|
import com.scbfkj.uni.process.DataBase;
|
|
|
+import com.scbfkj.uni.process.Elasticsearch;
|
|
|
+import com.scbfkj.uni.process.Kafka;
|
|
|
import com.scbfkj.uni.system.Config;
|
|
|
import com.zaxxer.hikari.pool.HikariPool;
|
|
|
|
|
@@ -72,20 +74,17 @@ public class LoggerService {
|
|
|
File file = new File(DIR);
|
|
|
|
|
|
// 过滤出当前不使用的文件
|
|
|
- List<File> logsFiles = Arrays.stream(Objects.requireNonNull(file.listFiles()))
|
|
|
- .filter(logFile -> {
|
|
|
- String fileName = logFile.getName();
|
|
|
- return !fileName.equals(currentFileName) && fileName.endsWith("sqlite");
|
|
|
- }).toList();
|
|
|
+ List<File> logsFiles = Arrays.stream(Objects.requireNonNull(file.listFiles())).filter(logFile -> {
|
|
|
+ String fileName = logFile.getName();
|
|
|
+ return !fileName.equals(currentFileName) && fileName.endsWith("sqlite");
|
|
|
+ }).toList();
|
|
|
// 记录出错的日志文件
|
|
|
List<String> errorFileNames = new ArrayList<>();
|
|
|
- logsFiles.parallelStream()
|
|
|
- .map(logFile -> {
|
|
|
+ logsFiles.parallelStream().map(logFile -> {
|
|
|
String fileName = logFile.getName();
|
|
|
// 转成连接字符串
|
|
|
return CONNECTION.formatted(DIR + fileName);
|
|
|
- })
|
|
|
- .flatMap(connectionStr -> {
|
|
|
+ }).flatMap(connectionStr -> {
|
|
|
// 查询数据
|
|
|
try {
|
|
|
return DataBase.query(connectionStr, "select logid, target, currentfile, datasourceid, expression, datacontent from logs ").stream();
|
|
@@ -103,10 +102,7 @@ public class LoggerService {
|
|
|
|
|
|
})
|
|
|
// 分组
|
|
|
- .collect(Collectors.groupingBy((log) -> log.get("target").toString()))
|
|
|
- .entrySet()
|
|
|
- .parallelStream()
|
|
|
- .forEach(stringListEntry -> {
|
|
|
+ .collect(Collectors.groupingBy((log) -> log.get("target").toString())).entrySet().parallelStream().forEach(stringListEntry -> {
|
|
|
String targetName = stringListEntry.getKey();
|
|
|
// 需要发送的数据
|
|
|
List<Map<String, Object>> value = stringListEntry.getValue();
|
|
@@ -128,18 +124,15 @@ public class LoggerService {
|
|
|
String connectionStr = config.get("connectset").toString();
|
|
|
String expression = value1.get(0).get("expression").toString();
|
|
|
List<Object> parameters = new ArrayList<>();
|
|
|
- if (connectionStr.contains("jdbc:")) {
|
|
|
- type = "DB";
|
|
|
- }
|
|
|
|
|
|
switch (type.toUpperCase()) {
|
|
|
- case "ES", "KAFKA" -> {
|
|
|
- parameters.add(DataFormatUtil.toMap(connectionStr));
|
|
|
- parameters.add(expression);
|
|
|
- parameters.add(connectionStr);
|
|
|
- parameters.add(targetName);
|
|
|
- parameters.add(datacontentStream.toList());
|
|
|
- DataProcessService.processByAlgorithm("1", parameters);
|
|
|
+ case "KAFKA" -> {
|
|
|
+ List<Object> datas = datacontentStream.map(Object.class::cast).toList();
|
|
|
+ Kafka.sendMessage(connectionStr, targetName, datas);
|
|
|
+ }
|
|
|
+ case "ES" -> {
|
|
|
+ List<String> datas = datacontentStream.toList();
|
|
|
+ Elasticsearch.exec(connectionStr, targetName, "CREATE", datas);
|
|
|
}
|
|
|
case "DB" -> {
|
|
|
parameters.add(connectionStr);
|
|
@@ -151,27 +144,39 @@ public class LoggerService {
|
|
|
DataProcessService.processByAlgorithm("3", parameters);
|
|
|
}
|
|
|
}
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
// 当前数据是在哪一个数据库文件中
|
|
|
value.stream().findFirst().map(d -> d.get("currentfile").toString()).ifPresent(errorFileNames::add);
|
|
|
System.out.println(UniReturnUtil.getMessage(e));
|
|
|
}
|
|
|
});
|
|
|
+ value.stream().findFirst().map(d -> d.get("currentfile").toString()).ifPresent((it) -> {
|
|
|
+ if (!errorFileNames.contains(it)) {
|
|
|
+ String connectionStr = CONNECTION.formatted(it);
|
|
|
+ try {
|
|
|
+ DataBase.exec(connectionStr, """
|
|
|
+ delete
|
|
|
+ from logs
|
|
|
+ where 1=1""");
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ } finally {
|
|
|
+ removeDataSource(connectionStr);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ });
|
|
|
});
|
|
|
cleanFile(logsFiles, errorFileNames);
|
|
|
}
|
|
|
|
|
|
private static void cleanFile(List<File> logsFiles, List<String> errorFileNames) {
|
|
|
- logsFiles.stream().filter(f ->
|
|
|
- errorFileNames.stream().filter(Objects::nonNull).noneMatch(errorFileName ->
|
|
|
- errorFileName.contains(f.getName()))
|
|
|
- ).forEach(f -> {
|
|
|
+ logsFiles.stream().filter(f -> errorFileNames.stream().filter(Objects::nonNull).noneMatch(errorFileName -> errorFileName.contains(f.getName()))).forEach(f -> {
|
|
|
String connectionStr = CONNECTION.formatted(DIR + f.getName());
|
|
|
try {
|
|
|
DataBase.exec(connectionStr, "delete from logs");
|
|
|
HikariPool hikariPool = DataBase.dataSourcePools.remove(connectionStr);
|
|
|
- if (Objects.nonNull(hikariPool))
|
|
|
- hikariPool.shutdown();
|
|
|
+ if (Objects.nonNull(hikariPool)) hikariPool.shutdown();
|
|
|
// 删除文件一直不成功 怀疑是数据库连接导致文件被使用导致一直删除不成功
|
|
|
f.delete();
|
|
|
fileNames.remove(Long.parseLong(f.getName().substring(0, f.getName().indexOf(".sqlite"))));
|
|
@@ -202,11 +207,7 @@ public class LoggerService {
|
|
|
|
|
|
public enum LogType {
|
|
|
|
|
|
- USER("userlog"),
|
|
|
- INTERFACE("interfacelog"),
|
|
|
- SYSTEM("systemerrlog"),
|
|
|
- SERVICE("servicelog"),
|
|
|
- SERVICE_ERR("serviceerrlog");
|
|
|
+ USER("userlog"), INTERFACE("interfacelog"), SYSTEM("systemerrlog"), SERVICE("servicelog"), SERVICE_ERR("serviceerrlog");
|
|
|
|
|
|
private final String name;
|
|
|
|