瀏覽代碼

update_19

andy 1 年之前
父節點
當前提交
7ef1e0798c

+ 6 - 0
pom.xml

@@ -110,6 +110,12 @@
 			<artifactId>guava</artifactId>
 			<version>31.1-jre</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.activemq</groupId>
+			<artifactId>activemq-client</artifactId>
+			<version>5.18.2</version>
+		</dependency>
+
 	</dependencies>
 
 	<build>

+ 1 - 1
src/main/java/com/scbfkj/uni/library/DataEncryptionUtil.java

@@ -19,7 +19,7 @@ public class DataEncryptionUtil {
 
 
     //    私有密钥
-    private static String privateKeyStr = "MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAJ9vK5Gsb5hRft3L/OI2rGw8/tg8pSe2L6bfIN8l03YwprUZwi8+R8hdnkR8SSgj2bk3A994L6TAzV8NbG9j+gyj+VakgCjbOSHGywhdKS6QXv0jQ3i8f4kBy4c3uWU9iAwaSTIh78U/8DVmQYRrKMwyqbhx8+ze+2GxaB2ZufEfAgMBAAECgYAyMdTcuxYzNU0k1SkbqyzjstxlBcrVUtVzywHVX1pQ9oY1tBNfvlLpMRg35Y0+tvLADiMJAxS04QKHb3l5JFe/jE24hNxMj2h9JfxO36bGblyqZ7PlS+5/pvXdVaFYolN+5Rocf63/Iq2RSCb8W3D5uUQqLwO/i1iFQT+UROUA4QJBAOvvOJSB4BIu4VD/6XGqZ5cLU3DMtwzHIyvTTH2REGF33eEHc0z83VYi4xbUDGxvDD1d58bPqkpnvJiByXmYVa8CQQCs/mHfpO8fDy7ZKGzs1u4eBsPowSnJLsGbY2mYiaawnHeeLYOaGEdtxRVk06+seTFLw5oi3FDJG8U8LP6FiGeRAkAjZiQeHBJrh/8kcREsjb23KurdDMoWL7a2N6DNYjuL9DklL0H8diAbcWaTIUOv7UVv26wP506MlV31n9uD0/hfAkBo3gwWtrT97wZHPepJ6ECQkylPf0kFXAKhX7Izdb5GcZNRn+WXFAC42jAN3wUvWIg5lWlqmIOgZeU6hUwFRpsBAkEAsSe8v0cho1YTdmXiGQ7uhUxZ455mrw81AdzmuvDxvWFLx1uHAZna9eylZsfbEa7Y9DcmakLJKGWaTvKvYc55ZQ==";
+    public static String privateKeyStr = "MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAJ9vK5Gsb5hRft3L/OI2rGw8/tg8pSe2L6bfIN8l03YwprUZwi8+R8hdnkR8SSgj2bk3A994L6TAzV8NbG9j+gyj+VakgCjbOSHGywhdKS6QXv0jQ3i8f4kBy4c3uWU9iAwaSTIh78U/8DVmQYRrKMwyqbhx8+ze+2GxaB2ZufEfAgMBAAECgYAyMdTcuxYzNU0k1SkbqyzjstxlBcrVUtVzywHVX1pQ9oY1tBNfvlLpMRg35Y0+tvLADiMJAxS04QKHb3l5JFe/jE24hNxMj2h9JfxO36bGblyqZ7PlS+5/pvXdVaFYolN+5Rocf63/Iq2RSCb8W3D5uUQqLwO/i1iFQT+UROUA4QJBAOvvOJSB4BIu4VD/6XGqZ5cLU3DMtwzHIyvTTH2REGF33eEHc0z83VYi4xbUDGxvDD1d58bPqkpnvJiByXmYVa8CQQCs/mHfpO8fDy7ZKGzs1u4eBsPowSnJLsGbY2mYiaawnHeeLYOaGEdtxRVk06+seTFLw5oi3FDJG8U8LP6FiGeRAkAjZiQeHBJrh/8kcREsjb23KurdDMoWL7a2N6DNYjuL9DklL0H8diAbcWaTIUOv7UVv26wP506MlV31n9uD0/hfAkBo3gwWtrT97wZHPepJ6ECQkylPf0kFXAKhX7Izdb5GcZNRn+WXFAC42jAN3wUvWIg5lWlqmIOgZeU6hUwFRpsBAkEAsSe8v0cho1YTdmXiGQ7uhUxZ455mrw81AdzmuvDxvWFLx1uHAZna9eylZsfbEa7Y9DcmakLJKGWaTvKvYc55ZQ==";
     public static String publicKeyStr = "MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQCfbyuRrG+YUX7dy/ziNqxsPP7YPKUnti+m3yDfJdN2MKa1GcIvPkfIXZ5EfEkoI9m5NwPfeC+kwM1fDWxvY/oMo/lWpIAo2zkhxssIXSkukF79I0N4vH+JAcuHN7llPYgMGkkyIe/FP/A1ZkGEayjMMqm4cfPs3vthsWgdmbnxHwIDAQAB";
 
     //    加密类型

+ 11 - 2
src/main/java/com/scbfkj/uni/library/DataFormatUtil.java

@@ -119,7 +119,6 @@ public final class DataFormatUtil {
 
     public static Object[] toArray(Object value) {
         if (Objects.isNull(value)) return null;
-
         return Objects.requireNonNull(toList(value)).toArray();
     }
 
@@ -132,7 +131,17 @@ public final class DataFormatUtil {
             try {
                 return DataFormatUtil.stringToMap(str);
             } catch (JsonProcessingException ignored) {
-
+                try {
+                    return stringToMap(str);
+                } catch (JsonProcessingException e) {
+                    try {
+                        return new HashMap<>() {{
+                            put("root", toJsonNode(str));
+                        }};
+                    } catch (JsonProcessingException ex) {
+
+                    }
+                }
             }
         }
         return new HashMap<>() {{

+ 7 - 0
src/main/java/com/scbfkj/uni/library/UniReturnUtil.java

@@ -31,6 +31,13 @@ public class UniReturnUtil {
         }
         return result;
     }
+    public static Map<String, Object> fail(String message, Object returnData) {
+        Map<String, Object> result = fail(message);
+        if (Objects.nonNull(returnData)) {
+            result.put("returnData",returnData);
+        }
+        return result;
+    }
 
     public static Map<String, Object> fail(String message) {
         HashMap<String, Object> result = new HashMap<>();

+ 3 - 4
src/main/java/com/scbfkj/uni/library/script/JavaScriptEngineUtil.java

@@ -76,7 +76,7 @@ public class JavaScriptEngineUtil {
             JsonNode path = jsonNode.get("path");
             JsonNode className = jsonNode.get("className");
             JsonNode method = jsonNode.get("methodName");
-            if (Objects.nonNull(path)) {
+            if (Objects.nonNull(path) && !path.isNull()) {
                 File file = new File(System.getProperty("user.dir").concat(File.separator).concat("plugins").concat(File.separator).concat(path.asText()));
                 if (!file.exists()) {
                     throw new RuntimeException("外部文件加载不存在:".concat(System.getProperty("user.dir")).concat(File.separator).concat("plugins").concat(File.separator).concat(path.asText()));
@@ -126,20 +126,19 @@ public class JavaScriptEngineUtil {
         }
     });
 
-    public static Map<String, Object> invoke(Map<String, Object> datasource, String expression, List<Object> args) throws Exception {
+    public static Map<String, Object> invoke(Map<String, Object> datasource, String expression, Object... args) throws Exception {
         datasource.put("methodName", expression);
         String key = DataFormatUtil.toString(datasource);
 
         JavaApply javaApply = null;
         try {
             javaApply = javaRefPool.borrowObject(key);
-            return UniReturnUtil.success(javaApply.invokeApply(args.toArray()));
+            return UniReturnUtil.success(javaApply.invokeApply(args));
         } catch (Exception e) {
             return UniReturnUtil.fail(e);
         } finally {
             if (Objects.nonNull(javaApply)) {
                     javaRefPool.returnObject(key, javaApply);
-
             }
         }
     }

+ 98 - 0
src/main/java/com/scbfkj/uni/process/ActiveMQ.java

@@ -1,4 +1,102 @@
 package com.scbfkj.uni.process;
 
+import com.scbfkj.uni.library.DataFormatUtil;
+import com.scbfkj.uni.library.UniReturnUtil;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.*;
+import java.util.*;
+
 public class ActiveMQ {
+
+
+    private static Map<String, Session> sessionMap = new HashMap<>();
+
+
+    /**
+     * 采集
+     *
+     * @return 读取内容信息
+     */
+    public static Map<String, Object> readMessage(String sourceObjectName, String connectConfig, String pollNumber) {
+        if (Objects.isNull(sourceObjectName) || Objects.isNull(connectConfig)) {
+            return UniReturnUtil.fail(Objects.isNull(sourceObjectName) ? "队列为空" : "连接字符串为空");
+        }
+        try {
+
+            int max = pollNumber == null ? 1 : Integer.parseInt(pollNumber);
+            if (max < 0) {
+                max = Math.abs(max);
+            }
+            Session session;
+            session = sessionMap.get(connectConfig);
+            if (session == null) {
+                session = createSession(connectConfig);
+            }
+            try (MessageConsumer consumer = session.createConsumer(session.createQueue(sourceObjectName))) {
+
+                ArrayList<String> result = new ArrayList<>();
+
+                for (int i = 0; i < max; i++) {
+                    TextMessage receive = (TextMessage) consumer.receive(100);
+                    if (Objects.nonNull(receive)) {
+                        result.add(receive.getText());
+                        session.commit();
+                    } else {
+                        break;
+                    }
+                }
+                return UniReturnUtil.success(result);
+            }
+        } catch (JMSException e) {
+            return UniReturnUtil.fail(e.getMessage());
+        }
+    }
+
+    private static Session createSession(String connectionStr) throws JMSException {
+        Map<?, ?> connectionConfig = DataFormatUtil.toMap(connectionStr);
+
+        String host = connectionConfig.get("host").toString();
+        Object username = connectionConfig.get("username");
+        Object password = connectionConfig.get("password");
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(username == null ? null : username.toString(), password == null ? null : password.toString(), host);
+        Connection connection = connectionFactory.createConnection();
+        connection.start();
+        return connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+
+    }
+
+
+    /**
+     * 数据发送
+     *
+     * @return 发送结果信息
+     */
+    public static Map<String, Object> sendMethod(String sourceObjectName, List<Object> data, String connectConfig) {
+        if (Objects.isNull(sourceObjectName) || Objects.isNull(data)) {
+            return UniReturnUtil.fail(Objects.isNull(sourceObjectName) ? ("队列为空 " + data) : ("数据内容为空 " + data));
+        }
+        try {
+            Session session;
+            session = sessionMap.get(connectConfig);
+            if (session == null) {
+                session = createSession(connectConfig);
+            }
+            try (MessageProducer producer = session.createProducer(session.createQueue(sourceObjectName))) {
+
+                for (Object datum : data) {
+                    //创建一个文本消息
+                    if (Objects.nonNull(datum)) {
+                        TextMessage textMessage = session.createTextMessage(datum.toString());
+                        producer.send(textMessage);//生产者发送消息
+                    }
+                }
+                session.commit();//会话提交
+                return UniReturnUtil.success(null);
+            }
+        } catch (JMSException e) {
+            return UniReturnUtil.fail(e.getMessage());
+        }
+    }
+
 }

+ 109 - 3
src/main/java/com/scbfkj/uni/process/Elasticsearch.java

@@ -1,18 +1,124 @@
 package com.scbfkj.uni.process;
 
+import com.scbfkj.uni.library.DataFormatUtil;
 import com.scbfkj.uni.library.UniReturnUtil;
+import org.apache.http.Header;
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.message.BasicHeader;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Request;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
 
+import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Collectors;
 
 public class Elasticsearch {
 
-    public static Map<String,Object> sendMessage(String connection,String index, List<String> datas){
-        if(Objects.isNull(datas) || datas.isEmpty() ){
+    private static Map<String, RestClient> restClientMap = new HashMap<>();
+
+
+    public static Map<String, Object> exec(String connection, List<String> datas) {
+        if (Objects.isNull(datas) || datas.isEmpty()) {
             return UniReturnUtil.fail("数据为空");
         }
 
-        return null;
+        RestClient restClient = create(connection);
+        Map<?, ?> map = DataFormatUtil.toMap(connection);
+        Object endpoint = map.get("endpoint");
+        if (Objects.isNull(endpoint)) {
+            endpoint = "/";
+        }
+        Object method = map.get("method");
+        if (Objects.isNull(method)) {
+            method = "GET";
+        }
+
+        Map<String, String> parametersMap = new HashMap<>();
+        Object parameters = map.get("parameters");
+        if (Objects.nonNull(parameters)) {
+            parametersMap.putAll((Map<String, String>) parameters);
+        }
+
+        Object finalMethod = method;
+        Object finalEndpoint = endpoint;
+        List<HashMap<Object, Object>> results = datas.parallelStream().map(data -> {
+            Request request = new Request(finalMethod.toString(), finalEndpoint.toString());
+            request.addParameters(parametersMap);
+            request.setJsonEntity(data);
+            return new HashMap<>() {{
+
+                try {
+                    Response response = restClient.performRequest(request);
+                    put("response", response);
+                } catch (IOException e) {
+                    put("exception", e);
+                }
+            }};
+        }).toList();
+        List<Exception> exceptions = results.stream().map(it -> it.get("exception")).filter(Objects::nonNull).map(it -> ((Exception) it)).toList();
+        List<Response> responses = results.stream().map(it -> it.get("response")).filter(Objects::nonNull).map(it -> ((Response) it)).toList();
+
+        if (exceptions.isEmpty()) {
+            return UniReturnUtil.success(responses.stream().map(response -> {
+                try {
+                    return EntityUtils.toString(response.getEntity());
+                } catch (IOException e) {
+                    return null;
+                }
+            }).filter(Objects::nonNull));
+        } else {
+
+            return UniReturnUtil.fail(exceptions.stream().map(UniReturnUtil::getMessage).collect(Collectors.joining("\n")), responses.stream().map(response -> {
+                try {
+                    return EntityUtils.toString(response.getEntity());
+                } catch (IOException e) {
+                    return null;
+                }
+            }).filter(Objects::nonNull));
+        }
+    }
+
+
+    private static RestClient create(String connection) {
+        RestClient restClient = restClientMap.get(connection);
+        if (Objects.nonNull(restClient)) {
+            return restClient;
+        }
+        Map<?, ?> map = DataFormatUtil.toMap(connection);
+
+        List<String> hosts = (List<String>) map.get("host");
+        Object[] list = hosts.stream().map(HttpHost::create).toArray();
+
+        HttpHost[] httpHosts = (HttpHost[]) list;
+        RestClientBuilder builder = RestClient.builder(httpHosts);
+        Object username = map.get("username");
+        Object password = map.get("password");
+        Object token = map.get("token");
+        if (Objects.nonNull(username) && Objects.nonNull(password)) {
+            builder
+                    .setHttpClientConfigCallback(httpClientBuilder -> {
+                        httpClientBuilder.setDefaultCredentialsProvider(new BasicCredentialsProvider() {{
+                            setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username.toString(), password.toString()));
+                        }});
+                        return httpClientBuilder;
+                    });
+
+        } else if (Objects.nonNull(token)) {
+            builder.setDefaultHeaders(new Header[]{
+                    new BasicHeader("Authorization", "Bearer " + token)
+            });
+        }
+        restClient = builder.build();
+        restClientMap.put(connection, restClient);
+        return restClient;
     }
 }

+ 133 - 0
src/main/java/com/scbfkj/uni/process/FTP.java

@@ -1,4 +1,137 @@
 package com.scbfkj.uni.process;
 
+import com.scbfkj.uni.library.DataFormatUtil;
+import com.scbfkj.uni.library.UniReturnUtil;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPReply;
+import org.springframework.util.StringUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.zip.GZIPInputStream;
+
+import static org.apache.commons.net.ftp.FTP.BINARY_FILE_TYPE;
+
 public class FTP {
+    private static final int BUFFER_SIZE = 4096;
+
+    public static Map<String, Object> update(String connectionConfig, String targetPath, String sourcePath) {
+        return null;
+    }
+
+
+    /**
+     * 设置ftp客户端参数并创建ftp客户端
+     *
+     * @param config ftp连接配置信息
+     * @return ftp客户端
+     */
+    private static FTPClient createFtpClient(Map<String, Object> config) throws Exception {
+        FTPClient ftp = new FTPClient();
+        int port = Objects.nonNull(config.get("port")) ? Integer.parseInt(Objects.toString(config.get("port"))) : 21;
+        ftp.connect(Objects.toString(config.get("host")), port);
+        //common-net的ftpclient默认是使用ASCII_FILE_TYPE,文件会经过ASCII编码转换,所以可能会造成文件损坏
+        ftp.login(Objects.toString(config.get("username")), Objects.toString(config.get("password")));//登录
+        String timeOutStr = "timeOut";
+        //120秒
+        int timeOut = Objects.nonNull(config.get(timeOutStr)) ? Integer.parseInt(Objects.toString(config.get(timeOutStr))) : (1000 * 120);
+        ftp.setConnectTimeout(timeOut);
+        ftp.setDataTimeout(Duration.ofSeconds(timeOut));
+        //每次读取文件流时缓存数组的大小
+        ftp.setBufferSize(1024 * 1024 * 10);
+        // 取消服务器获取自身Ip地址和提交的host进行匹配
+        ftp.setRemoteVerificationEnabled(false);
+        //设置ftp编码
+        String encoding = Objects.nonNull(config.get("encoding")) ? config.get("encoding").toString() : "UTF-8";
+        ftp.setControlEncoding(encoding);
+        ftp.setFileType(BINARY_FILE_TYPE);//设置文件编码类型为二进制文件,
+        return ftp;
+    }
+
+    /**
+     * @param sourcePath       组装好的文件名称
+     * @param connectionConfig 连接fpt的连接信息
+     * @return
+     */
+    public static Map<String, Object> download(String connectionConfig, String sourcePath) {
+
+        Map connectionConfigMaps = DataFormatUtil.toMap(connectionConfig);
+        FTPClient ftp = null;
+        try {
+            List<Object> fileList = new ArrayList<>();
+            fileList.add(sourcePath);
+            String basePath = Objects.toString(connectionConfigMaps.get("basePath")); //根路径
+            ftp = createFtpClient(connectionConfigMaps);
+            if (!FTPReply.isPositiveCompletion(ftp.getReplyCode())) {
+                System.err.println("未连接到FTP,用户名或密码错误。");
+                ftp.logout();
+                return UniReturnUtil.fail("未连接到FTP,用户名或密码错误。");
+            }
+            if (StringUtils.hasLength(basePath)) {
+                ftp.changeWorkingDirectory(basePath);
+            }
+            String rootPath = ftp.printWorkingDirectory(); //ftp根目录
+            FTPFile[] ftpFiles = ftp.listFiles(rootPath);
+            if (ftpFiles.length == 0) {
+                System.err.println("没有找到文件");
+                ftp.logout();
+                return UniReturnUtil.fail("没有找到文件");
+            }
+            List<File> files = new ArrayList<>();
+            for (FTPFile ftpFile : ftpFiles) {
+                String tpFileName = ftpFile.getName();
+                System.err.println(tpFileName);
+                if (fileList.contains(tpFileName)) {
+                    String path = System.getProperty("user.dir") + File.separator + "ftp" + File.separator + ftpFile.getName();
+                    File targetFile = new File(path);
+                    files.add(targetFile);
+                    try (FileOutputStream fileOutputStream = new FileOutputStream(targetFile);) {
+                        ftp.retrieveFile(ftpFile.getName(), fileOutputStream);
+                    } catch (Exception e) {
+                        System.err.println("数据下载异常".concat(UniReturnUtil.getMessage(e)));
+                    }
+                }
+            }
+            ftp.logout();//退出ftp
+            List<String> fileContent = new ArrayList<>();
+            for (File ftpFile : files) {
+                String path = System.getProperty("user.dir") + File.separator + "ftp" + File.separator + ftpFile.getName();
+                File targetFile = new File(path);
+                if (targetFile.exists()) {
+                    try (GZIPInputStream gzipInputStream = new GZIPInputStream(Files.newInputStream(targetFile.toPath()));
+                         ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                    ) {
+                        byte[] buf = new byte[BUFFER_SIZE];
+                        int len;
+                        while ((len = gzipInputStream.read(buf, 0, BUFFER_SIZE)) != -1) {
+                            baos.write(buf, 0, len);
+                        }
+                        fileContent.add(baos.toString(StandardCharsets.UTF_8));
+                    } catch (Exception e) {
+                        System.err.println("数据解压异常".concat(UniReturnUtil.getMessage(e)));
+                    }
+                }
+            }
+            return UniReturnUtil.success(fileContent);
+        } catch (Exception e) {
+            return UniReturnUtil.fail("fpt读取文件异常".concat(UniReturnUtil.getMessage(e)));
+        } finally {
+            try {
+                if (null != ftp) {
+                    ftp.disconnect();
+                }
+            } catch (Exception ex) {
+                System.out.println("ftp关闭时异常:".concat(ex.getMessage()));
+            }
+        }
+    }
 }

+ 8 - 12
src/main/java/com/scbfkj/uni/process/Kafka.java

@@ -24,7 +24,7 @@ public class Kafka {
 
     // 因为生产者是线程安全的所以相同的连接只需要一个生产者实例 不需要对象池
     public static final Map<String, Producer<String, String>> producerMap = new HashMap<>();
-//    消费者是非线程安全的所以需要一个对象池
+    //    消费者是非线程安全的所以需要一个对象池
     private static final KeyedObjectPool<String, Consumer<String, String>> consumerPool = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<>() {
         @Override
         public Consumer<String, String> create(String key) throws Exception {
@@ -35,7 +35,6 @@ public class Kafka {
             connectConfigMaps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
             return new KafkaConsumer<>(connectConfigMaps);
         }
-
         @Override
         public PooledObject<Consumer<String, String>> wrap(Consumer<String, String> value) {
             return new DefaultPooledObject<>(value);
@@ -57,7 +56,7 @@ public class Kafka {
         return UniReturnUtil.success(sendResult);
     }
 
-    public synchronized static Producer<String, String> createProducer(String connectionStr) throws JsonProcessingException {
+    private synchronized static Producer<String, String> createProducer(String connectionStr) throws JsonProcessingException {
         if (producerMap.containsKey(connectionStr)) {
             return producerMap.get(connectionStr);
         }
@@ -73,23 +72,20 @@ public class Kafka {
         return kafkaProducer;
     }
 
-    public static Map<String, Object> receptionMessage(String connectConfig, String sourceObjectName, String libraryId, String pollNumber, String fetch) throws Exception {
+    public static Map<String, Object> receptionMessage(String connectConfig, String topic, String groupId) throws Exception {
         Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.stringToMap(connectConfig);
-        if (!connectConfigMaps.containsKey("group.id")) {
-            connectConfigMaps.put("group.id", "groupid" + (String.format("%s", libraryId)));
-        }
-        connectConfigMaps.put("max.poll.records", pollNumber == null ? 1 : Integer.parseInt(pollNumber));
-        if (StringUtils.hasText(fetch)) {
-            connectConfigMaps.put("max.partition.fetch.bytes", 11534336);
+        if (Objects.nonNull(groupId)) {
+            connectConfigMaps.put("group.id", groupId);
         }
+
         String key = DataFormatUtil.toString(connectConfig);
         Consumer<String, String> consumer = consumerPool.borrowObject(key);
         try {
             List<String> messageList = new ArrayList<>();
-            consumer.subscribe(Collections.singleton(sourceObjectName));
+            consumer.subscribe(Collections.singleton(topic));
             ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
             if (records.count() > 0) {
-                for (ConsumerRecord<String, String> record : records.records(sourceObjectName)) {
+                for (ConsumerRecord<String, String> record : records.records(topic)) {
                     String readValue = record.value().trim();
                     if (!StringUtils.hasLength(readValue) || readValue.equals("{}")) {
                         continue;

+ 80 - 0
src/main/java/com/scbfkj/uni/process/RabbitMQ.java

@@ -1,4 +1,84 @@
 package com.scbfkj.uni.process;
 
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.ConnectionFactory;
+import com.rabbitmq.client.GetResponse;
+import com.scbfkj.uni.library.DataFormatUtil;
+import com.scbfkj.uni.library.UniReturnUtil;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.TimeoutException;
+
 public class RabbitMQ {
+
+    private final static Map<String, ConnectionFactory> factories = new HashMap<>();
+
+
+    public static ConnectionFactory createConnectFactory(String connectionStr) {
+        ConnectionFactory connectionFactory = factories.get(connectionStr);
+        if (Objects.nonNull(connectionFactory)) return connectionFactory;
+
+        Map<?, ?> connectConfig = DataFormatUtil.toMap(connectionStr);
+
+        connectionFactory = new ConnectionFactory();
+        connectionFactory.setHost(connectConfig.get("host").toString());
+        connectionFactory.setUsername(connectConfig.get("username").toString());
+        connectionFactory.setPassword(connectConfig.get("password").toString());
+        connectionFactory.setPort(Integer.parseInt(connectConfig.get("port").toString()));
+        Object virtualHost = connectConfig.get("virtualHost");
+        if (Objects.nonNull(virtualHost)) connectionFactory.setVirtualHost(virtualHost.toString());
+        connectionFactory.setConnectionTimeout(3000);
+        return connectionFactory;
+    }
+
+    public static Map<String, Object> sendMessage(String connectionConfig, String exchangeName, String data) throws Exception {
+
+        ConnectionFactory connectFactory = createConnectFactory(connectionConfig);
+        try (Connection connection = connectFactory.newConnection()) {
+            try (Channel channel = connection.createChannel()) {
+//                routingKey = (routingKey == null) ? queueName : routingKey;
+//                exchangeName = (null == exchangeName) ? "" : exchangeName;
+//                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").build();
+                //第一个参数是exchange参数,如果是为空字符串,那么就会发送到(AMQP default)默认的exchange,而且routingKey
+                //便是所要发送到的队列名
+                //todo 如果rabbitmq的服务端为绑定路由,那么写入不进行
+                //todo exchangeName为“” 则rabbitmq服务端不需要配置
+                //todo exchangeName为不为"" 则rabbitmq必须绑定
+                channel.exchangeDeclare(exchangeName, "fanout");
+                channel.basicPublish(exchangeName, "/", null, data.getBytes(StandardCharsets.UTF_8));
+            } catch (Exception e) {
+                return UniReturnUtil.fail(UniReturnUtil.getMessage(e));
+            }
+        } catch (IOException | TimeoutException e) {
+            return UniReturnUtil.fail(UniReturnUtil.getMessage(e));
+        }
+
+        return null;
+    }
+
+    public static Map<String, Object> receptionMessage(String connectConfig, String routingKey, String exchangeName) throws Exception {
+
+        Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.toMap(connectConfig);
+        Object arguments = connectConfigMaps.get("arguments");
+//        Object prefetchCount = connectConfigMaps.get("prefetchCount");
+        try (Connection connection = createConnectFactory(connectConfig).newConnection(); Channel currentChannel = connection.createChannel();) {
+            //创建消息通道
+            currentChannel.queueDeclare(exchangeName, true, false, false, Objects.isNull(arguments) ? null : ((Map<String, Object>) arguments));
+            currentChannel.basicQos(1); //服务端在同一时刻只发送1条数据
+            GetResponse getResponse = currentChannel.basicGet(exchangeName, true);
+            String repBody = null;
+            if (null != getResponse) {
+                repBody = new String(getResponse.getBody());
+            }
+            return UniReturnUtil.success(repBody);
+        } catch (IOException | TimeoutException e) {
+            return UniReturnUtil.fail(UniReturnUtil.getMessage(e));
+        }
+    }
 }

+ 2 - 2
src/main/java/com/scbfkj/uni/service/DataProcessService.java

@@ -182,11 +182,11 @@ public class DataProcessService {
         switch (type.toString()) {
 //            java反射
             case "1" -> {
-                return JavaScriptEngineUtil.invoke((Map<String, Object>) parameters.get(0), DataFormatUtil.toString(parameters.get(0)), parameters.subList(2, parameters.size()));
+                return JavaScriptEngineUtil.invoke((Map<String, Object>) parameters.get(0), DataFormatUtil.toString(parameters.get(0)), parameters.subList(2, parameters.size()).toArray());
             }
 //            JS表达式
             case "2" -> {
-                return JsScriptEngineUtil.eval(DataFormatUtil.toString(parameters.get(0)), parameters.subList(1, parameters.size()));
+                return JsScriptEngineUtil.eval(DataFormatUtil.toString(parameters.get(0)), parameters.subList(1, parameters.size()).toArray());
             }
 //            数据库
             case "3" -> {

+ 11 - 9
src/main/java/com/scbfkj/uni/service/LoggerService.java

@@ -94,10 +94,8 @@ public class LoggerService {
 
         File file = new File(dir);
 
-//        日志目录下的文件
-        List<File> files = Arrays.stream(file.listFiles()).toList();
 //        过滤出当前不使用的文件
-        List<File> logsFiles = files.parallelStream()
+        List<File> logsFiles = Arrays.stream(file.listFiles())
                 .filter(logFile -> {
                     String fileName = logFile.getName();
                     return !fileName.equals(currentFileName) && fileName.endsWith("sqlite");
@@ -124,7 +122,9 @@ public class LoggerService {
                 })
 //                分组
                 .collect(Collectors.groupingBy((log) -> log.get("target").toString()))
-                .entrySet().parallelStream().forEach(stringListEntry -> {
+                .entrySet()
+                .parallelStream()
+                .forEach(stringListEntry -> {
                     String targetName = stringListEntry.getKey();
 //                    需要发送的数据
                     List<Map<String, Object>> value = stringListEntry.getValue();
@@ -168,8 +168,7 @@ public class LoggerService {
                             }
                         } catch (Exception e) {
 //                        当前数据是在哪一个数据库文件中
-                            String currentfile = value.stream().findFirst().map(d -> d.get("currentfile").toString()).orElse(null);
-                            errorFileNames.add(currentfile);
+                            value.stream().findFirst().map(d -> d.get("currentfile").toString()).ifPresent(errorFileNames::add);
                             System.out.println(UniReturnUtil.getMessage(e));
                         }
                     });
@@ -184,12 +183,15 @@ public class LoggerService {
                 HikariPool hikariPool = DataBase.dataSourcePools.remove(connectionStr);
                 if (Objects.nonNull(hikariPool))
                     hikariPool.shutdown();
-//                删除文件一直不成功
+//                删除文件一直不成功 怀疑是数据库连接导致文件被使用导致一直删除不成功
                 f.delete();
             } catch (Exception e) {
-                e.printStackTrace();
+                if (Config.debug) {
+                    e.printStackTrace();
+                } else {
+                    System.out.println(UniReturnUtil.getMessage(e));
+                }
             }
         });
-
     }
 }

+ 108 - 0
src/test/java/com/scbfkj/uni/library/DataEncryptionUtilTest.java

@@ -4,6 +4,7 @@ import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.security.KeyPairGenerator;
+import java.util.UUID;
 
 import static org.junit.jupiter.api.Assertions.*;
 
@@ -82,4 +83,111 @@ class DataEncryptionUtilTest {
     }
 
 
+    @Test
+    void encrypt() {
+    }
+
+    @Test
+    void decrypt() {
+    }
+
+    @Test
+    void encryptBase64() {
+    }
+
+    @Test
+    void decryptBase64() {
+    }
+
+    @Test
+    void signatureSHA() {
+    }
+
+    @Test
+    void verifySHA() {
+    }
+
+    @Test
+    void signatureMD5() {
+    }
+
+    @Test
+    void verifyMD5() {
+    }
+
+    @Test
+    void encryptRSAByPublicKey() {
+    }
+
+    @Test
+    void decryptRSAByPrivateKey() {
+    }
+
+    @Test
+    void encryptDES() throws Exception {
+        String helloWorld = DataEncryptionUtil.encryptDES("HELLO WORLD", "12345678");
+        System.out.println(helloWorld);
+    }
+
+    @Test
+    void decryptDES() throws Exception {
+        String helloWorld = DataEncryptionUtil.decryptDES("+dhTwlyrdA35S/6Xk5R+yQ==", "12345678");
+        System.out.println(helloWorld);
+    }
+
+    @Test
+    void encryptAES() throws Exception {
+        System.out.println(DataEncryptionUtil.publicKeyStr);
+        String helloWorld = DataEncryptionUtil.encryptRSAByPublicKey("HELLO WORLD");
+        System.out.println(helloWorld);
+    }
+
+    @Test
+    void decryptAES() throws Exception {
+        String data = DataEncryptionUtil.decryptRSAByPrivateKey("a4xjClMUhHo/X4vQoZRJPepvIzKIdiAvcYtcfo/zaFXk86OaABWwCO7gyrrsgPlfMAaXiFxPqxwVZwVCwpwnZv+CM/aSz4cE2NRbiMhUyvV1mng8Zf4D2jpVsKNDipEzhbjPQ7H14+Sitwxxj3JkF8hZlqf7zAgPRSBVfHnc2us=");
+        System.out.println(data);
+    }
+
+    @Test
+    void encrypt3DES() throws Exception {
+        String key = "123456781234567812345678";
+//        while (key.length()<128){
+//            key += UUID.randomUUID().toString().replaceAll("-","");
+//        }
+//        key = key.substring(0,128);
+        System.out.println(key);
+
+        String helloWorld = DataEncryptionUtil.encrypt3DES("HELLO WORLD", "123456781234567812345678");
+        System.out.println(helloWorld);
+    }
+
+    @Test
+    void decrypt3DES() throws Exception {
+        String helloWorld = DataEncryptionUtil.decrypt3DES("+dhTwlyrdA35S/6Xk5R+yQ==", "123456781234567812345678");
+        System.out.println(helloWorld);
+    }
+
+    @Test
+    void encryptByPublicKey() {
+    }
+
+    @Test
+    void decryptByPrivateKey() {
+    }
+
+    @Test
+    void encryptByPrivateKey() {
+    }
+
+    @Test
+    void decryptByPublicKey() {
+    }
+
+    @Test
+    void sign() {
+    }
+
+    @Test
+    void verify() {
+    }
 }

+ 61 - 5
src/test/java/com/scbfkj/uni/library/DataFormatUtilTest.java

@@ -4,19 +4,75 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.jayway.jsonpath.JsonPath;
+import jdk.swing.interop.SwingInterOpUtils;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+
 import static org.junit.jupiter.api.Assertions.*;
 
 class DataFormatUtilTest {
 
     @Test
     void toJsonNode() throws JsonProcessingException {
-        String json = "[{\"name\": \"Alice\", \"age\": 25}, {\"name\": \"Bob\", \"age\": 30}]";
-        ObjectMapper mapper = new ObjectMapper();
-        JsonNode rootNode = mapper.readTree(json);
-        Object read = JsonPath.read(json, "$[0]");
-        System.out.println(read);
+        JsonNode jsonNode = DataFormatUtil.toJsonNode("[{\"name\": \"Alice\", \"age\": 25}, {\"name\": \"Bob\", \"age\": 30}]");
+        System.out.println(jsonNode);
+        jsonNode = DataFormatUtil.toJsonNode(new HashMap<>(){{
+            put("a",1);
+        }});
+        System.out.println(jsonNode);
+
+        jsonNode = DataFormatUtil.toJsonNode("HELLO");
+        System.out.println(jsonNode);
+    }
+
+    @Test
+    void stringToBean() {
+    }
 
+    @Test
+    void stringToMap() {
+    }
+
+    @Test
+    void stringToObjectNode() {
+    }
+
+    @Test
+    void stringToArrayNode() {
+    }
+
+    @Test
+    void testToString() {
+    }
+
+    @Test
+    void toDate() {
+    }
+
+    @Test
+    void toDateTime() {
+    }
+
+    @Test
+    void stringToDate() {
+    }
+
+    @Test
+    void stringToDateTime() {
+    }
+
+    @Test
+    void toList() {
+        System.out.println(DataFormatUtil.toList("HELLO WORLD"));
+    }
+
+    @Test
+    void toArray() {
+    }
+
+    @Test
+    void toMap() {
+        System.out.println(DataFormatUtil.toMap("HELLO WORLD"));
     }
 }

+ 17 - 0
src/test/java/com/scbfkj/uni/library/ImageUtilTest.java

@@ -0,0 +1,17 @@
+package com.scbfkj.uni.library;
+
+import org.junit.jupiter.api.Test;
+
+import java.awt.*;
+import java.io.IOException;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class ImageUtilTest {
+
+    @Test
+    void stringToImage() throws IOException, FontFormatException {
+        String s = ImageUtil.stringToImage("1234");
+        System.out.println(s);
+    }
+}

+ 46 - 0
src/test/java/com/scbfkj/uni/library/script/DatabaseScriptUtilTest.java

@@ -0,0 +1,46 @@
+package com.scbfkj.uni.library.script;
+
+import com.scbfkj.uni.library.UniReturnUtil;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+//@SpringBootTest
+class DatabaseScriptUtilTest {
+
+    @Test
+    void exec() throws Exception {
+        Map<String, Object> exec = DatabaseScriptUtil.exec("""
+                {
+  "jdbcUrl": "jdbc:mysql://127.0.0.1:3306/test",
+  "username": "root",
+  "password": "123@bigdata",
+  "driverClassName": "com.mysql.cj.jdbc.Driver"
+}
+                """, "select * from interfacelog where  interfacelogid > 《id》", Collections.singletonList(new HashMap<>() {{
+            put("id", 1);
+        }}), "0", null, null);
+        System.out.println(UniReturnUtil.success(new ArrayList<>()));
+    }
+    String x = """
+            {
+  "url": "http://127.0.0.1:8080",
+  "method": "POST",
+  "header": {
+    "key1": ["value1","value2"],
+    "key2":"value"
+  },
+  "body": {
+    "key": "value"
+  },
+  "params": {
+    "key": "value"
+  }
+}""";
+}

+ 24 - 0
src/test/java/com/scbfkj/uni/library/script/JavaScriptEngineUtilTest.java

@@ -0,0 +1,24 @@
+package com.scbfkj.uni.library.script;
+
+import com.scbfkj.uni.process.Kafka;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class JavaScriptEngineUtilTest {
+
+    @Test
+    void invoke() throws Exception {
+        Map<String, Object> invoke = JavaScriptEngineUtil.invoke(new HashMap<>() {{
+            put("className", "com.scbfkj.uni.process.Kafka");
+            put("path", null);
+        }}, "sendMessage", "{\"bootstrap.servers\": \"127.0.0.1:9092\"}", "test", new ArrayList<String>() {{
+            add("HELLO WORLD");
+        }});
+        System.out.println(invoke);
+    }
+}

+ 16 - 0
src/test/java/com/scbfkj/uni/library/script/JsScriptEngineUtilTest.java

@@ -0,0 +1,16 @@
+package com.scbfkj.uni.library.script;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class JsScriptEngineUtilTest {
+
+    @Test
+    void eval() throws Exception {
+        Map<String, Object> result = JsScriptEngineUtil.eval("(a,b) => a+b", 1, 2);
+        System.out.println(result);
+    }
+}

+ 23 - 0
src/test/java/com/scbfkj/uni/process/KafkaTest.java

@@ -0,0 +1,23 @@
+package com.scbfkj.uni.process;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class KafkaTest {
+
+    @Test
+    void sendMessage() throws Exception {
+        Map<String, Object> map = Kafka.sendMessage("{\"bootstrap.servers\": \"127.0.0.1:9092\"}", "test", new ArrayList<>() {{
+            add("HELLO WORLD");
+        }});
+    }
+
+    @Test
+    void receptionMessage() throws Exception {
+        Map<String, Object> map = Kafka.receptionMessage("{\"bootstrap.servers\": \"127.0.0.1:9092\"}", "test", "1");
+    }
+}