Эх сурвалжийг харах

rabbitMq database 调试通过

andy 1 жил өмнө
parent
commit
8da100f1cb

+ 3 - 3
pom.xml

@@ -40,9 +40,9 @@
         </dependency>
 
         <dependency>
-            <groupId>com.rabbitmq</groupId>
-            <artifactId>amqp-client</artifactId>
-            <version>5.18.0</version>
+            <groupId>com.rabbitmq.jms</groupId>
+            <artifactId>rabbitmq-jms</artifactId>
+            <version>3.2.0</version>
         </dependency>
         <dependency>
             <groupId>commons-net</groupId>

+ 6 - 0
src/main/java/com/scbfkj/uni/api/LogAop.java

@@ -226,6 +226,12 @@ public class LogAop {
                     if (Objects.nonNull(usergroupid) && Objects.equals("0", usergroupid.toString())) {
                         body.put("filterColumns", Collections.singletonList("*"));
                     }
+                }else {
+//                    不需要登录也没有设置权限的默认添加一个所有列权限
+
+                    if(body!=null && !body.containsKey("filterColumns")){
+                        body.put("filterColumns", Collections.singletonList("*"));
+                    }
                 }
             }
 

+ 4 - 26
src/main/java/com/scbfkj/uni/library/script/DatabaseScriptUtil.java

@@ -179,7 +179,7 @@ public class DatabaseScriptUtil {
 
         String connectionStr = datasourceId;
         if (Pattern.compile("^\\d+$").matcher(connectionStr.trim()).matches()) {
-            connectionStr = queryConnectionStr(connectionStr.trim());
+            connectionStr = DATABASE.queryConnectionStr(connectionStr.trim());
         }
 
         if (expression.contains("《whereStr》")) {
@@ -256,7 +256,6 @@ public class DatabaseScriptUtil {
         if (Objects.isNull(event) || !StringUtils.hasText(event.toString())) {
             throw new RuntimeException("执行编号不能为空");
         }
-
         event = event.toString().trim();
 
         if (!events.contains(event)) {
@@ -282,7 +281,7 @@ public class DatabaseScriptUtil {
 
         String connectionStr = datasourceId;
         if (Pattern.compile("^\\d+$").matcher(connectionStr.trim()).matches()) {
-            connectionStr = queryConnectionStr(connectionStr.trim());
+            connectionStr = DATABASE.queryConnectionStr(connectionStr.trim());
         }
         List<String> valueNames = null;
         List<String> filterNames = null;
@@ -342,7 +341,7 @@ public class DatabaseScriptUtil {
                 expression = "select %s from %s where   %s  %s".formatted(String.join(",", allColumns), expression, Objects.isNull(filterLineWhereStr) ? " 1=1 " : filterLineWhereStr,
                         filterNames.isEmpty() ?
                                 "" : (" and " +
-                                filterNames.stream().map("%s = ?"::formatted).collect(Collectors.joining(" and "))));
+                                      filterNames.stream().map("%s = ?"::formatted).collect(Collectors.joining(" and "))));
 
                 for (Map<String, Object> arg : dataContent) {
                     Map<String, Object> o1 = ((Map<String, Object>) arg.getOrDefault("filter", arg));
@@ -381,7 +380,7 @@ public class DatabaseScriptUtil {
 
                     String joinTableConnectionStr = connectionStr;
                     if (Objects.nonNull(joinConnectionDatasourceId)) {
-                        joinTableConnectionStr = queryConnectionStr(joinConnectionDatasourceId.toString());
+                        joinTableConnectionStr = DATABASE.queryConnectionStr(joinConnectionDatasourceId.toString());
                     }
 
 
@@ -533,27 +532,6 @@ public class DatabaseScriptUtil {
         return execByTableName(datasourceId, table, args);
     }
 
-    public String queryConnectionStr(String datasourceId) throws Exception {
-        List<Map<String, Object>> result = DATABASE.query(Config.getCenterConnectionStr(), """
-                select datasourceid,
-                       host,
-                       username,
-                       password,
-                       driverclassname
-                from datasource
-                where datasourceid = ?""", datasourceId);
-        if (result.isEmpty()) {
-            throw new RuntimeException("数据源错误:没有找到数据源");
-        }
-        return DataFormatUtil.toString(result.stream().findFirst().map(it -> {
-            HashMap<String, Object> hashMap = new HashMap<>();
-            hashMap.put("jdbcUrl", it.get("host"));
-            hashMap.put("username", it.get("username"));
-            hashMap.put("password", it.get("password"));
-            hashMap.put("driverClassName", it.get("driverclassname"));
-            return hashMap;
-        }).get());
-    }
 
     public record Pageable(Long page, Long pageSize) {
     }

+ 1 - 1
src/main/java/com/scbfkj/uni/library/script/JsScriptEngineUtil.java

@@ -45,7 +45,7 @@ public final class JsScriptEngineUtil {
     }, new GenericKeyedObjectPoolConfig<>());
 
 
-    public static Map<String, Object> eval( Object[] args) throws Exception {
+    public static Map<String, Object> eval( Object ...args) throws Exception {
         Value function = null;
         String script = ((String) args[0]);
         List<Object> arg = Arrays.stream(args).skip(1).toList();

+ 1 - 1
src/main/java/com/scbfkj/uni/library/script/KafkaScriptUtil.java

@@ -37,7 +37,7 @@ public class KafkaScriptUtil {
         return result.stream().findFirst().map(it -> {
             HashMap<String, Object> hashMap = new HashMap<>();
             hashMap.put("bootstrap.servers", it.get("host"));
-            hashMap.put("max.poll.records", "10");
+            hashMap.put("max.poll.records", "100");
             return hashMap;
         }).map(DataFormatUtil::toString).get();
     }

+ 74 - 0
src/main/java/com/scbfkj/uni/process/DataBase.java

@@ -371,6 +371,59 @@ public class DataBase {
         return names;
     }
 
+    public List<Map<String, String>> getColumnsByConnection(String connection, String catalog, String tableName) {
+
+        ResultSet resultSet;
+        DatabaseMetaData metaData;
+        ArrayList<Map<String, String>> result = new ArrayList<>();
+        try (Connection conn = getDataSourcePool(connection).getConnection()) {
+            metaData = conn.getMetaData();
+            resultSet = metaData.getColumns(catalog, null, tableName, null);
+
+            while (resultSet.next()) {
+                HashMap<String, String> value = new HashMap<>();
+                String columnName = resultSet.getString("COLUMN_NAME"); // 获取字段名
+                value.put("name", columnName);
+                String columnType = resultSet.getString("TYPE_NAME"); // 获取字段类型
+                value.put("typeName", columnType);
+                String remarks = resultSet.getString("REMARKS"); // 获取字段注释
+                value.put("remarks", remarks);
+                value.put("isPrimaryKey", resultSet.getString("IS_AUTOINCREMENT"));// 判断是否为主键
+                result.add(value);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return result;
+    }
+
+    public List<Map<String, String>> getColumnsByDataSourceId(String dataSourceId, String catalog, String tableName) throws Exception {
+        return getColumnsByConnection(queryConnectionStr(dataSourceId), catalog, tableName);
+    }
+
+    public List<String> getTablesByConnection(String connection, String catalog) {
+        ResultSet resultSet;
+        DatabaseMetaData metaData;
+        ArrayList<String> result = new ArrayList<>();
+        try (Connection conn = getDataSourcePool(connection).getConnection()) {
+            metaData = conn.getMetaData();
+            resultSet = metaData.getTables(catalog, null, null, new String[]{"TABLE"});
+
+            while (resultSet.next()) {
+                String tableName = resultSet.getString("TABLE_NAME"); // 获取字段名
+
+                result.add(tableName);
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        return result;
+    }
+
+    public List<String> getTablesByDataSourceId(String dataSourceId, String catalog) throws Exception {
+        return getTablesByConnection(queryConnectionStr(dataSourceId), catalog);
+    }
+
     public List<Map<String, Object>> getResult(String connection, String sql, ResultSet resultSet) throws SQLException {
         List<String> cs = getColumns(connection, sql, resultSet);
         List<Map<String, Object>> result = new ArrayList<>();
@@ -392,4 +445,25 @@ public class DataBase {
         return result;
     }
 
+    public String queryConnectionStr(String datasourceId) throws Exception {
+        List<Map<String, Object>> result = query(Config.getCenterConnectionStr(), """
+                select datasourceid,
+                       host,
+                       username,
+                       password,
+                       driverclassname
+                from datasource
+                where datasourceid = ?""", datasourceId);
+        if (result.isEmpty()) {
+            throw new RuntimeException("数据源错误:没有找到数据源");
+        }
+        return DataFormatUtil.toString(result.stream().findFirst().map(it -> {
+            HashMap<String, Object> hashMap = new HashMap<>();
+            hashMap.put("jdbcUrl", it.get("host"));
+            hashMap.put("username", it.get("username"));
+            hashMap.put("password", it.get("password"));
+            hashMap.put("driverClassName", it.get("driverclassname"));
+            return hashMap;
+        }).get());
+    }
 }

+ 124 - 61
src/main/java/com/scbfkj/uni/process/RabbitMQ.java

@@ -1,83 +1,146 @@
 package com.scbfkj.uni.process;
 
-import com.rabbitmq.client.Channel;
-import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.ConnectionFactory;
-import com.rabbitmq.client.GetResponse;
-import com.scbfkj.uni.library.DataFormatUtil;
-import com.scbfkj.uni.library.UniReturnUtil;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.jms.admin.RMQConnectionFactory;
+import com.scbfkj.uni.system.Config;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.TextMessage;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
 
 import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Objects;
-import java.util.concurrent.TimeoutException;
+import java.util.ArrayList;
+import java.util.List;
 
 public class RabbitMQ {
 
-    private final static Map<String, ConnectionFactory> factories = new HashMap<>();
 
+    private JmsTemplate jmsTemplate;
 
-    public static ConnectionFactory createConnectFactory(String connectionStr) {
-        ConnectionFactory connectionFactory = factories.get(connectionStr);
-        if (Objects.nonNull(connectionFactory)) return connectionFactory;
+    private final ObjectMapper mapper = new ObjectMapper();
 
-        Map<?, ?> connectConfig = DataFormatUtil.toMap(connectionStr);
+    public List<String> receptionMessage(
+            String host, Long port, String username, String password, String virtualHost, String queueName, Long receiveTimeout, Long pollSize, Long retry) throws JMSException {
+        JmsTemplate template = getJmsTemplate(host, port.intValue(), username, password, virtualHost);
+
+
+        jmsTemplate.setReceiveTimeout(receiveTimeout);
+        long maxSize = 100;
+        if (pollSize > 0) {
+            maxSize = pollSize;
+        }
+        int maxRetry = 0;
+        List<String> result = new ArrayList<>();
 
-        connectionFactory = new ConnectionFactory();
-        connectionFactory.setHost(connectConfig.get("host").toString());
-        connectionFactory.setUsername(connectConfig.get("username").toString());
-        connectionFactory.setPassword(connectConfig.get("password").toString());
-        connectionFactory.setPort(Integer.parseInt(connectConfig.get("port").toString()));
-        Object virtualHost = connectConfig.get("virtualHost");
-        if (Objects.nonNull(virtualHost)) connectionFactory.setVirtualHost(virtualHost.toString());
-        connectionFactory.setConnectionTimeout(3000);
-        return connectionFactory;
-    }
 
-    public static Map<String, Object> sendMessage(String connectionConfig, String exchangeName, String data) throws Exception {
-
-        ConnectionFactory connectFactory = createConnectFactory(connectionConfig);
-        try (Connection connection = connectFactory.newConnection()) {
-            try (Channel channel = connection.createChannel()) {
-//                routingKey = (routingKey == null) ? queueName : routingKey;
-//                exchangeName = (null == exchangeName) ? "" : exchangeName;
-//                AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").build();
-                //第一个参数是exchange参数,如果是为空字符串,那么就会发送到(AMQP default)默认的exchange,而且routingKey
-                //便是所要发送到的队列名
-                //todo 如果rabbitmq的服务端为绑定路由,那么写入不进行
-                //todo exchangeName为“” 则rabbitmq服务端不需要配置
-                //todo exchangeName为不为"" 则rabbitmq必须绑定
-                channel.exchangeDeclare(exchangeName, "fanout");
-                channel.basicPublish(exchangeName, "/", null, data.getBytes(StandardCharsets.UTF_8));
+        while (true) {
+            try {
+                Message message = template.receive(queueName);
+                if (message == null) {
+                    break;
+                } else if (message instanceof TextMessage msg) {
+                    String text = msg.getText();
+                    result.add(text);
+                    if (result.size() >= maxSize) {
+                        break;
+                    }
+                }
             } catch (Exception e) {
-                return UniReturnUtil.fail(UniReturnUtil.getMessage(e));
+                maxRetry++;
+                if (maxRetry > retry) {
+                    if (Config.isDebug()) {
+                        e.printStackTrace();
+                    }
+                    if (jmsTemplate != null) {
+                        jmsTemplate = null;
+                    }
+
+                    return result;
+                }
             }
-        } catch (IOException | TimeoutException e) {
-            return UniReturnUtil.fail(UniReturnUtil.getMessage(e));
         }
+        return result;
+
 
-        return null;
     }
 
-    public static Map<String, Object> receptionMessage(String connectConfig, String routingKey, String exchangeName) throws Exception {
-
-        Map<String, Object> connectConfigMaps = (Map<String, Object>) DataFormatUtil.toMap(connectConfig);
-        Object arguments = connectConfigMaps.get("arguments");
-//        Object prefetchCount = connectConfigMaps.get("prefetchCount");
-        try (Connection connection = createConnectFactory(connectConfig).newConnection(); Channel currentChannel = connection.createChannel()) {
-            //创建消息通道
-            currentChannel.queueDeclare(exchangeName, true, false, false, Objects.isNull(arguments) ? null : ((Map<String, Object>) arguments));
-            currentChannel.basicQos(1); //服务端在同一时刻只发送1条数据
-            GetResponse getResponse = currentChannel.basicGet(exchangeName, true);
-            String repBody = null;
-            if (null != getResponse) {
-                repBody = new String(getResponse.getBody());
+
+    public void sendMessage(
+            String host, Long port, String username, String password, String virtualHost,String queueName,
+            Object data
+    ) throws JMSException {
+
+        JmsTemplate template = getJmsTemplate(host, port.intValue(), username, password, virtualHost);
+
+        try {
+            if (data instanceof Object[] datas) {
+                for (Object it : datas) {
+
+                    execSend(template, it,queueName);
+                }
+            } else if (data instanceof Iterable<?> datas) {
+
+                for (Object it : datas) {
+
+                    execSend(template, it, queueName);
+                }
+            } else {
+                execSend(template, data, queueName);
             }
-            return UniReturnUtil.success(repBody);
-        } catch (IOException | TimeoutException e) {
-            return UniReturnUtil.fail(UniReturnUtil.getMessage(e));
+
+        } catch (Exception e) {
+            if (Config.isDebug()) {
+                e.printStackTrace();
+            }
+            if (jmsTemplate != null) {
+                jmsTemplate = null;
+            }
+
         }
     }
+
+
+    private void execSend(
+            JmsTemplate template,
+            Object data,
+            String queueName) throws IOException {
+        String message = mapper.writeValueAsString(data);
+
+        template.send(queueName,session -> session.createTextMessage(message));
+    }
+
+
+    private jakarta.jms.ConnectionFactory createConnectionFactory(String host, int port, String username, String password, String virtualHost) {
+        RMQConnectionFactory connectionFactory = new RMQConnectionFactory();
+        connectionFactory.setUsername(username);
+        connectionFactory.setPassword(password);
+        connectionFactory.setVirtualHost(virtualHost);
+        connectionFactory.setHost(host);
+        connectionFactory.setPort(port);
+        return connectionFactory;
+    }
+
+
+    private CachingConnectionFactory createCachingConnectionFactory(jakarta.jms.ConnectionFactory factory) {
+        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
+        cachingConnectionFactory.setTargetConnectionFactory(factory);
+        return cachingConnectionFactory;
+    }
+
+
+    private JmsTemplate getJmsTemplate(String host, int port, String username, String password, String virtualHost) throws JMSException {
+        if (jmsTemplate == null) {
+            jakarta.jms.ConnectionFactory connectionFactory = createConnectionFactory(host, port, username, password, virtualHost);
+            CachingConnectionFactory cachingConnectionFactory = createCachingConnectionFactory(connectionFactory);
+            jmsTemplate = createJmsTemplate(cachingConnectionFactory);
+        }
+        return jmsTemplate;
+    }
+
+    private JmsTemplate createJmsTemplate(CachingConnectionFactory factory) {
+        JmsTemplate jmsTemplate = new JmsTemplate();
+        jmsTemplate.setConnectionFactory(factory);
+        return jmsTemplate;
+    }
 }

+ 7 - 6
src/main/java/com/scbfkj/uni/service/DataProcessService.java

@@ -241,16 +241,17 @@ public class DataProcessService {
                     Map<String, Object> algorithmsourcelibrary = algorithmsourcelibraryList.get(0);
 
                     HashMap<String, Object> configMap = new HashMap<>();
-                    configMap.put("methodName", algorithmsourcelibrary.get("code"));
+                    Object methodName = algorithmsourcelibrary.get("code");
+                    configMap.put("methodName", methodName);
                     configMap.put("path", algorithmsourcelibrary.get("filepath"));
                     Object className = algorithmsourcelibrary.get("library");
                     configMap.put("className", className);
                     Map<String, Object> result;
-//                    if ("com.scbfkj.uni.library.script.JsScriptEngineUtil".equals(className)) {
-//                        result = JsScriptEngineUtil.eval(parameters.toArray());
-//                    } else {
-                        result = JavaScriptEngineUtil.invoke(configMap, algorithmsourcelibrary.get("code").toString(), parameters.toArray());
-//                    }
+                    if ("com.scbfkj.uni.library.script.JsScriptEngineUtil".equals(className) && "eval".equals(methodName)) {
+                        result = JsScriptEngineUtil.eval(parameters.toArray());
+                    } else {
+                        result = JavaScriptEngineUtil.invoke(configMap, methodName.toString(), parameters.toArray());
+                    }
 
                     if ("0".equals(result.get("code"))) {
                         Object returnData = result.get("returnData");

+ 31 - 0
src/test/java/com/scbfkj/uni/process/DataBaseTest.java

@@ -0,0 +1,31 @@
+//package com.scbfkj.uni.process;
+//
+//import org.junit.jupiter.api.Test;
+//
+//import java.util.List;
+//import java.util.Map;
+//
+//import static org.junit.jupiter.api.Assertions.*;
+//
+//class DataBaseTest {
+//
+//    String connection = """
+//                                {
+//                  "jdbcUrl": "jdbc:mysql://120.26.64.82:3306/systemset",
+//                  "username": "root",
+//                  "password": "123@bigdata",
+//                  "driverClassName": "com.mysql.cj.jdbc.Driver"
+//                }""";
+//    DataBase dataBase=new DataBase();
+//    @Test
+//    void getColumns() {
+//        List<Map<String, String>> serviceinfo = dataBase.getColumns(connection, "systemset","serviceinfo");
+//        System.out.println(serviceinfo);
+//    }
+//
+//    @Test
+//    void getTables() {
+//        List<String> systemset = dataBase.getTables(connection, "systemset");
+//        System.out.println(systemset);
+//    }
+//}

+ 29 - 0
src/test/java/com/scbfkj/uni/process/RabbitMQTest.java

@@ -0,0 +1,29 @@
+package com.scbfkj.uni.process;
+
+import jakarta.jms.JMSException;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class RabbitMQTest {
+
+    private RabbitMQ rabbitMQ = new RabbitMQ();
+
+    @Test
+    void sendMessage() throws JMSException {
+        ArrayList<String> strings = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            strings.add("hello" + i);
+        }
+        rabbitMQ.sendMessage("120.26.64.82", 5672L, "admin", "admin", "/", "bimTest", strings);
+    }
+
+    @Test
+    void receptionMessage() throws JMSException {
+        List<String> strings = rabbitMQ.receptionMessage("120.26.64.82", 5672L, "admin", "admin", "/", "bimTest", 300L, 100L, 1L);
+        System.out.println(strings);
+    }
+}