Эх сурвалжийг харах

ibmmq 增加 主题发送

andy 8 сар өмнө
parent
commit
0546960555

+ 163 - 92
src/main/java/com/scbfkj/uni/process/IBMMQ.java

@@ -1,5 +1,8 @@
 package com.scbfkj.uni.process;
 
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
+import org.springframework.jms.core.JmsTemplate;
 
 import com.ibm.mq.jakarta.jms.MQQueueConnectionFactory;
 import com.scbfkj.uni.exceptions.ConnectionNotFoundException;
@@ -9,39 +12,44 @@ import jakarta.jms.ConnectionFactory;
 import jakarta.jms.JMSException;
 import jakarta.jms.Message;
 import jakarta.jms.TextMessage;
-import org.springframework.jms.connection.CachingConnectionFactory;
-import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
-import org.springframework.jms.core.JmsTemplate;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
 
 /**
  * IBM MQ客户端类,用于发送和接收消息。
  */
 public class IBMMQ {
-
     private static final DataBase DATA_BASE = new DataBase();
+    
     private JmsTemplate jmsTemplate = null;
-
+    
     /**
      * 根据数据源ID接收消息。
      * 通过给定的数据源ID查询连接配置,并使用这些配置连接到指定的队列管理器,从指定的队列中接收消息。
      *
      * @param dataSourceId 数据源ID,用于查询连接字符串。
-     * @param channel      通道名称。
+     * @param channel 通道名称。
      * @param queueManager 队列管理器名称。
-     * @param queueName    队列名称。
-     * @param ccsid        编码集ID。
+     * @param queueName 队列名称。
+     * @param ccsid 编码集ID。
      * @param receiveTimeout 接收超时时间。
-     * @param pollSize     每次轮询的大小。
-     * @param retry        重试次数。
+     * @param pollSize 每次轮询的大小。
+     * @param retry 重试次数。
+     *
      * @return 返回接收到的消息列表。
+     *
      * @throws Exception 如果连接配置查询失败或消息接收过程中出现错误,则抛出异常。
      */
-    public List<String> receptionMessageByDataSourceId(String dataSourceId, String channel, String queueManager, String queueName, Long ccsid, Long receiveTimeout, Long pollSize, Long retry) throws Exception {
+    public List<String> receptionMessageByDataSourceId(String dataSourceId, String channel, String queueManager,
+                                                       String queueName, Long ccsid, Long receiveTimeout, Long pollSize,
+                                                       Long retry) throws Exception {
         // 根据数据源ID查询连接配置
         Map<String, Object> config = queryConnectionStr(dataSourceId);
-
         long port = 1414L; // 默认端口号
         // 从连接配置中解析主机名和端口号
         String host = Objects.toString(config.get("host"));
@@ -62,11 +70,10 @@ public class IBMMQ {
             password = password1.toString();
         }
         // 使用解析出的配置信息进行消息接收
-        return receptionMessage(host, port, channel, queueManager, queueName, ccsid, username, password, receiveTimeout, pollSize, retry);
-
+        return receptionMessage(host, port, channel, queueManager, queueName, ccsid, username, password, receiveTimeout,
+                                pollSize, retry);
     }
-
-
+    
     /**
      * 从指定队列接收消息的函数。
      *
@@ -81,34 +88,36 @@ public class IBMMQ {
      * @param receiveTimeout 接收消息的超时时间。
      * @param pollSize 每次轮询的最大消息数。如果为0,则默认为100。
      * @param retry 接收消息失败后的最大重试次数。
+     *
      * @return 返回接收到的消息列表。
+     *
      * @throws JMSException 如果JMS操作失败,则抛出此异常。
      */
-    public List<String> receptionMessage(
-            String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password, Long receiveTimeout, Long pollSize, Long retry) throws JMSException {
+    public List<String> receptionMessage(String host, Long port, String channel, String queueManager, String queueName,
+                                         Long ccsid, String username, String password, Long receiveTimeout,
+                                         Long pollSize, Long retry) throws JMSException {
         // 初始化JmsTemplate用于消息接收
-        JmsTemplate template = getJmsTemplate(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
-
+        JmsTemplate template =
+                getJmsTemplate(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
         // 设置接收超时时间
         jmsTemplate.setReceiveTimeout(receiveTimeout);
-
         // 设置最大轮询消息数
         long maxSize = 100; // 默认值
         if (pollSize > 0) {
             maxSize = pollSize;
         }
-
         // 初始化最大重试次数
         int maxRetry = 0;
         List<String> result = new ArrayList<>();
-
         // 不断尝试接收消息,直到满足退出条件
         while (true) {
             try {
                 // 尝试从指定队列接收消息
                 Message message = template.receive(queueName);
                 // 检查是否需要退出循环
-                if (checkIsBreak(message, result, maxSize)) break;
+                if (checkIsBreak(message, result, maxSize)) {
+                    break;
+                }
             } catch (Exception e) {
                 // 捕获异常,增加重试计数
                 maxRetry++;
@@ -124,15 +133,16 @@ public class IBMMQ {
         }
         return result; // 返回最终接收的消息列表
     }
-
-
+    
     /**
      * 检查是否应该中断处理消息的循环。
      *
      * @param message 需要检查的消息对象,如果为null表示找到终止条件。
      * @param result 存储处理结果的列表,每处理一个消息,将其文本添加到列表中。
      * @param maxSize 当结果列表的大小达到或超过此值时,视为找到终止条件。
+     *
      * @return 如果未找到终止条件,返回true;反之,返回false。
+     *
      * @throws JMSException 如果处理消息时发生错误。
      */
     private static boolean checkIsBreak(Message message, List<String> result, long maxSize) throws JMSException {
@@ -148,10 +158,9 @@ public class IBMMQ {
             found = true;
         }
         // 返回相反的逻辑,即未找到终止条件为true,找到为false
-        return !found;
+        return ! found;
     }
-
-
+    
     /**
      * 通过数据源ID发送单例消息。
      * 此方法将指定的消息数据作为列表中的单个元素发送,适用于需要发送单一消息体的情况。
@@ -162,17 +171,23 @@ public class IBMMQ {
      * @param queueName 队列名称,指定消息将被发送到的队列。
      * @param ccsid CCSID(字符集编码序列ID),指定消息的字符集。
      * @param data 要发送的消息数据对象。
+     *
      * @throws Exception 如果发送过程中遇到任何错误,则抛出异常。
      */
-    public void sendSingletonMessageByDataSourceId(
-            String dataSourceId, String channel, String queueManager, String queueName, Long ccsid,
-            Object data
-    ) throws Exception {
+    public void sendSingletonMessageByDataSourceId(String dataSourceId, String channel, String queueManager,
+                                                   String queueName, Long ccsid, Object data) throws Exception {
         // 使用Collections.singletonList将数据封装为列表,然后调用sendMessageByDataSourceId方法进行发送
-        sendMessageByDataSourceId(dataSourceId, channel, queueManager, queueName, ccsid, Collections.singletonList(data));
+        sendMessageByDataSourceId(dataSourceId, channel, queueManager, queueName, ccsid,
+                                  Collections.singletonList(data));
     }
-
-
+    
+    public void sendSingletonMessageToTopicByDataSourceId(String dataSourceId, String channel, String queueManager,
+                                                          String topic, Long ccsid, Object data) throws Exception {
+        // 使用Collections.singletonList将数据封装为列表,然后调用sendMessageByDataSourceId方法进行发送
+        sendMessageToTopicByDataSourceId(dataSourceId, channel, queueManager, topic, ccsid,
+                                         Collections.singletonList(data));
+    }
+    
     /**
      * 根据数据源ID发送消息。
      * 该方法通过查询数据源配置,获取连接信息,并使用提供的参数通过指定的通道、队列管理器和队列名称发送消息。
@@ -183,15 +198,13 @@ public class IBMMQ {
      * @param queueName 目标队列名称。
      * @param ccsid 编码集标识符。
      * @param data 要发送的消息数据列表。
+     *
      * @throws Exception 如果发送消息过程中遇到任何错误,则抛出异常。
      */
-    public void sendMessageByDataSourceId(
-            String dataSourceId, String channel, String queueManager, String queueName, Long ccsid,
-            List<Object> data
-    ) throws Exception {
+    public void sendMessageByDataSourceId(String dataSourceId, String channel, String queueManager, String queueName,
+                                          Long ccsid, List<Object> data) throws Exception {
         // 根据dataSourceId查询连接配置
         Map<String, Object> config = queryConnectionStr(dataSourceId);
-
         long port = 1414L; // 默认端口号
         // 从配置中解析主机名和端口号
         String host = Objects.toString(config.get("host"));
@@ -214,8 +227,34 @@ public class IBMMQ {
         // 发送消息
         sendMessage(host, port, channel, queueManager, queueName, ccsid, username, password, data);
     }
-
-
+    
+    public void sendMessageToTopicByDataSourceId(String dataSourceId, String channel, String queueManager, String topic,
+                                                 Long ccsid, List<Object> data) throws Exception {
+        // 根据dataSourceId查询连接配置
+        Map<String, Object> config = queryConnectionStr(dataSourceId);
+        long port = 1414L; // 默认端口号
+        // 从配置中解析主机名和端口号
+        String host = Objects.toString(config.get("host"));
+        if (host.contains(":")) {
+            String[] split = host.split(":");
+            port = Long.parseLong(split[1]);
+            host = split[0];
+        }
+        // 解析用户名和密码 from 配置
+        String username = null;
+        String password = null;
+        Object username1 = config.get("username");
+        if (Objects.nonNull(username1)) {
+            username = username1.toString();
+        }
+        Object password1 = config.get("password");
+        if (Objects.nonNull(password1)) {
+            password = password1.toString();
+        }
+        // 发送消息
+        sendMessageToTopic(host, port, channel, queueManager, topic, ccsid, username, password, data);
+    }
+    
     /**
      * 发送单个消息到指定的队列。此方法封装了消息的发送过程,方便发送单个消息。
      *
@@ -228,17 +267,24 @@ public class IBMMQ {
      * @param username 用户名,用于认证发送消息的用户。
      * @param password 密码,与用户名一起用于认证。
      * @param data 要发送的数据,该方法支持任意类型的数据对象。
+     *
      * @throws JMSException 如果发送消息过程中发生任何JMS异常,则抛出。
      */
-    public void sendSingletonMessage(
-            String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password,
-            Object data
-    ) throws JMSException {
+    public void sendSingletonMessage(String host, Long port, String channel, String queueManager, String queueName,
+                                     Long ccsid, String username, String password, Object data) throws JMSException {
         // 将数据封装为单元素列表,并调用sendMessage方法发送消息
-        sendMessage(host, port, channel, queueManager, queueName, ccsid, username, password, Collections.singletonList(data));
+        sendMessage(host, port, channel, queueManager, queueName, ccsid, username, password,
+                    Collections.singletonList(data));
     }
-
-
+    
+    public void sendSingletonMessageToTopic(String host, Long port, String channel, String queueManager, String topic,
+                                            Long ccsid, String username, String password, Object data)
+            throws JMSException {
+        // 将数据封装为单元素列表,并调用sendMessage方法发送消息
+        sendMessageToTopic(host, port, channel, queueManager, topic, ccsid, username, password,
+                           Collections.singletonList(data));
+    }
+    
     /**
      * 发送消息到指定的队列
      *
@@ -251,23 +297,19 @@ public class IBMMQ {
      * @param username 用户名,用于认证
      * @param password 密码,用于认证
      * @param data 要发送的消息数据列表
+     *
      * @throws JMSException 如果发送消息过程中出现JMS异常
      */
-    public void sendMessage(
-            String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password,
-            List<Object> data
-    ) throws JMSException {
-
+    public void sendMessage(String host, Long port, String channel, String queueManager, String queueName, Long ccsid,
+                            String username, String password, List<Object> data) throws JMSException {
         // 初始化JMS模板,配置连接信息
-        JmsTemplate template = getJmsTemplate(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
-
+        JmsTemplate template =
+                getJmsTemplate(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
         try {
             // 遍历数据列表,发送每条消息
             for (Object it : data) {
-
                 execSend(template, it, queueName);
             }
-
         } catch (Exception e) {
             // 若开启调试模式,打印异常堆栈
             if (Config.isDebug()) {
@@ -277,12 +319,32 @@ public class IBMMQ {
             if (jmsTemplate != null) {
                 jmsTemplate = null;
             }
-
         }
     }
-
-
-
+    
+    public void sendMessageToTopic(String host, Long port, String channel, String queueManager, String topicName,
+                                   Long ccsid, String username, String password, List<Object> data)
+            throws JMSException {
+        // 初始化JMS模板,配置连接信息
+        JmsTemplate template =
+                getJmsTemplate(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
+        try {
+            // 遍历数据列表,发送每条消息
+            for (Object it : data) {
+                execSendToTopic(template, it, topicName);
+            }
+        } catch (Exception e) {
+            // 若开启调试模式,打印异常堆栈
+            if (Config.isDebug()) {
+                e.printStackTrace();
+            }
+            // 出现异常时,清理jmsTemplate对象
+            if (jmsTemplate != null) {
+                jmsTemplate = null;
+            }
+        }
+    }
+    
     /**
      * 使用JmsTemplate发送消息到指定的队列。
      *
@@ -290,19 +352,23 @@ public class IBMMQ {
      * @param data 要发送的数据,可以是任意类型,最终会被转换为字符串格式发送。
      * @param queueName 队列名称,指定消息要发送到的目标队列。
      */
-    private void execSend(
-            JmsTemplate template,
-            Object data,
-            String queueName
-    ) {
+    private void execSend(JmsTemplate template, Object data, String queueName) {
         // 将数据转换为字符串格式
         String message = DataFormatUtil.toString(data);
         // 使用JmsTemplate发送消息,将消息封装为TextMessage并发送到指定队列
         template.send(queueName, session -> session.createTextMessage(message));
     }
-
-
-
+    
+    private void execSendToTopic(JmsTemplate template, Object data, String topic) {
+        String message = DataFormatUtil.toString(data);
+        template.setDefaultDestinationName(topic);
+        if (message != null) {
+            template.convertAndSend(message);
+        } else {
+            System.out.println("message is null");
+        }
+    }
+    
     /**
      * 创建一个 MQ 队列连接工厂实例。
      *
@@ -311,10 +377,14 @@ public class IBMMQ {
      * @param ccsid CCSID(Character Code Set Identifier),用于指定字符集。
      * @param queueManager 队列管理器名,标识一个 MQ 队列管理器。
      * @param channel 通道名,用于客户端和队列管理器之间的通信。
+     *
      * @return 初始化后的 MQQueueConnectionFactory 实例。
+     *
      * @throws JMSException 如果创建连接工厂时发生错误。
      */
-    private MQQueueConnectionFactory createMqQueueConnectionFactory(String host, int port, int ccsid, String queueManager, String channel) throws JMSException {
+    private MQQueueConnectionFactory createMqQueueConnectionFactory(String host, int port, int ccsid,
+                                                                    String queueManager, String channel)
+            throws JMSException {
         MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
         // 设置连接工厂的主机名
         mqQueueConnectionFactory.setHostName(host);
@@ -330,13 +400,12 @@ public class IBMMQ {
         mqQueueConnectionFactory.setTransportType(1);
         return mqQueueConnectionFactory;
     }
-
-
-
+    
     /**
      * 创建并配置一个JmsTemplate实例。
      *
      * @param factory CachingConnectionFactory的实例,用于设置JmsTemplate的消息工厂。
+     *
      * @return 配置好的JmsTemplate实例,可用于发送消息。
      */
     private JmsTemplate createJmsTemplate(CachingConnectionFactory factory) {
@@ -344,13 +413,13 @@ public class IBMMQ {
         template.setConnectionFactory(factory); // 设置消息工厂
         return template;
     }
-
-
+    
     /**
      * 创建一个 CachingConnectionFactory 实例。
      * 此方法通过将一个给定的 ConnectionFactory 设置为缓存工厂的目标ConnectionFactory来配置 CachingConnectionFactory。
      *
      * @param factory 用于创建缓存工厂的目标ConnectionFactory。
+     *
      * @return 配置好的 CachingConnectionFactory 实例。
      */
     private CachingConnectionFactory createCachingConnectionFactory(ConnectionFactory factory) {
@@ -359,8 +428,7 @@ public class IBMMQ {
         cachingConnectionFactory.setTargetConnectionFactory(factory);
         return cachingConnectionFactory;
     }
-
-
+    
     /**
      * 创建并配置一个 UserCredentialsConnectionFactoryAdapter 实例。
      * 这个方法通过提供用户名、密码和一个ConnectionFactory,来创建一个已经设置了认证信息的ConnectionFactory适配器。
@@ -368,16 +436,18 @@ public class IBMMQ {
      * @param username 将要用于认证的用户名。
      * @param password 将要用于认证的密码。
      * @param factory 是底层的ConnectionFactory,认证通过后将使用这个ConnectionFactory创建连接。
+     *
      * @return 配置好的 UserCredentialsConnectionFactoryAdapter 实例,可用于进一步的使用。
      */
-    private UserCredentialsConnectionFactoryAdapter createAdapter(String username, String password, ConnectionFactory factory) {
+    private UserCredentialsConnectionFactoryAdapter createAdapter(String username, String password,
+                                                                  ConnectionFactory factory) {
         UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
         adapter.setUsername(username); // 设置认证使用的用户名
         adapter.setPassword(password); // 设置认证使用的密码
         adapter.setTargetConnectionFactory(factory); // 设置目标ConnectionFactory
         return adapter;
     }
-
+    
     /**
      * 获取一个配置好的JmsTemplate实例,用于与MQ队列进行交互。
      *
@@ -388,33 +458,36 @@ public class IBMMQ {
      * @param channel 与MQ队列管理器通信的通道名称。
      * @param username 连接MQ时的用户名。
      * @param password 连接MQ时的密码。
+     *
      * @return 配置好的JmsTemplate实例。
+     *
      * @throws JMSException 如果在创建JMS连接工厂或模板时发生错误。
      */
-    private JmsTemplate getJmsTemplate(String host, int port, int ccsid, String queueManager, String channel, String username, String password) throws JMSException {
+    private JmsTemplate getJmsTemplate(String host, int port, int ccsid, String queueManager, String channel,
+                                       String username, String password) throws JMSException {
         // 如果jmsTemplate尚未初始化,则进行初始化
         if (jmsTemplate == null) {
             // 创建MQ队列连接工厂
-            MQQueueConnectionFactory mqQueueConnectionFactory = createMqQueueConnectionFactory(host, port, ccsid, queueManager, channel);
+            MQQueueConnectionFactory mqQueueConnectionFactory =
+                    createMqQueueConnectionFactory(host, port, ccsid, queueManager, channel);
             // 使用用户名和密码适配器包装MQ队列连接工厂
-            UserCredentialsConnectionFactoryAdapter adapter = createAdapter(username, password, mqQueueConnectionFactory);
+            UserCredentialsConnectionFactoryAdapter adapter =
+                    createAdapter(username, password, mqQueueConnectionFactory);
             // 创建缓存连接工厂,以提高性能
             CachingConnectionFactory cachingConnectionFactory = createCachingConnectionFactory(adapter);
-
             // 创建并配置JmsTemplate实例
             jmsTemplate = createJmsTemplate(cachingConnectionFactory);
-
         }
         return jmsTemplate;
     }
-
-
-
+    
     /**
      * 查询指定数据源ID的连接字符串信息。
      *
      * @param datasourceId 数据源的唯一标识符。
+     *
      * @return 返回一个包含主机名、密码和用户名的Map对象。
+     *
      * @throws Exception 如果查询过程中发生错误,或者未找到指定的数据源ID,则抛出异常。
      */
     private static Map<String, Object> queryConnectionStr(String datasourceId) throws Exception {
@@ -425,12 +498,10 @@ public class IBMMQ {
                 username,
                 from datasource
                 where datasourceid = ?""", datasourceId).stream().findFirst();
-
         // 如果查询结果为空,抛出连接未找到异常
         if (result.isEmpty()) {
             throw new ConnectionNotFoundException("数据源错误:没有找到数据源");
         }
-
         // 返回查询结果
         return result.get();
     }