|
@@ -0,0 +1,301 @@
|
|
|
+package org.bfkj.protocol;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
+import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
+import com.ibm.mq.jakarta.jms.MQQueueConnectionFactory;
|
|
|
+import jakarta.jms.ConnectionFactory;
|
|
|
+import jakarta.jms.JMSException;
|
|
|
+import jakarta.jms.Message;
|
|
|
+import jakarta.jms.TextMessage;
|
|
|
+import org.springframework.jms.connection.CachingConnectionFactory;
|
|
|
+import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
|
|
|
+import org.springframework.jms.core.JmsTemplate;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Optional;
|
|
|
+
|
|
|
+public class IBMMQ {
|
|
|
+
|
|
|
+ private JmsTemplate jmsTemplate = null;
|
|
|
+
|
|
|
+ public ObjectMapper objectMapper = new ObjectMapper();
|
|
|
+
|
|
|
+ public List<String> receptionMessageByConnection(String connectionStr, String queueName, String receiveTimeout, String pollSize, String retry) throws JMSException, JsonProcessingException {
|
|
|
+
|
|
|
+ final Map<String,Object> config = objectMapper.readValue(connectionStr, Map.class);
|
|
|
+
|
|
|
+ return receptionMessage(config.get("host").toString(), Long.parseLong(config.get("port").toString()), config.get("channel").toString(), config.get("queueManager").toString(), queueName, Long.parseLong(config.get("ccsid").toString()), Optional.ofNullable(config.get("username")).map(Object::toString).orElse(null), Optional.ofNullable(config.get("password")).map(Object::toString).orElse(null), Long.parseLong(receiveTimeout), Long.parseLong(pollSize), Long.parseLong(retry));
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 从指定队列接收消息的函数。
|
|
|
+ *
|
|
|
+ * @param host 主机地址。
|
|
|
+ * @param port 端口号。
|
|
|
+ * @param channel 频道或连接标识。
|
|
|
+ * @param queueManager 队列管理器名称。
|
|
|
+ * @param queueName 需要接收消息的队列名称。
|
|
|
+ * @param ccsid 编码集ID。
|
|
|
+ * @param username 连接认证的用户名。
|
|
|
+ * @param password 连接认证的密码。
|
|
|
+ * @param receiveTimeout 接收消息的超时时间。
|
|
|
+ * @param pollSize 每次轮询的最大消息数。如果为0,则默认为100。
|
|
|
+ * @param retry 接收消息失败后的最大重试次数。
|
|
|
+ *
|
|
|
+ * @return 返回接收到的消息列表。
|
|
|
+ *
|
|
|
+ * @throws JMSException 如果JMS操作失败,则抛出此异常。
|
|
|
+ */
|
|
|
+ public List<String> receptionMessage(
|
|
|
+ String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password, Long receiveTimeout, Long pollSize, Long retry) throws JMSException {
|
|
|
+ // 初始化JmsTemplate用于消息接收
|
|
|
+ initJmsOperations(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
|
|
|
+
|
|
|
+ // 设置接收超时时间
|
|
|
+ jmsTemplate.setReceiveTimeout(receiveTimeout);
|
|
|
+
|
|
|
+ // 设置最大轮询消息数
|
|
|
+ long maxSize = 100; // 默认值
|
|
|
+ if ( pollSize > 0 ) {
|
|
|
+ maxSize = pollSize;
|
|
|
+ }
|
|
|
+
|
|
|
+ // 初始化最大重试次数
|
|
|
+ int maxRetry = 0;
|
|
|
+ List<String> result = new ArrayList<>();
|
|
|
+
|
|
|
+ // 不断尝试接收消息,直到满足退出条件
|
|
|
+ while ( retry>= maxRetry ) {
|
|
|
+ try {
|
|
|
+ // 尝试从指定队列接收消息
|
|
|
+ Message message = jmsTemplate.receive(queueName);
|
|
|
+ // 检查是否需要退出循环
|
|
|
+ if ( checkIsBreak(message, result, maxSize) ) break;
|
|
|
+ maxRetry++;
|
|
|
+ } catch ( Exception e ) {
|
|
|
+ // 捕获异常,增加重试计数
|
|
|
+ maxRetry++;
|
|
|
+ // 当重试次数超过设定值时,退出循环
|
|
|
+ if ( maxRetry > retry ) {
|
|
|
+ // 清理JmsTemplate资源并返回当前结果
|
|
|
+ if ( jmsTemplate != null ) {
|
|
|
+ jmsTemplate = null;
|
|
|
+ }
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return result; // 返回最终接收的消息列表
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送单个消息到指定的队列。此方法封装了消息的发送过程,方便发送单个消息。
|
|
|
+ *
|
|
|
+ * @param host 主机地址,消息将从该主机发送。
|
|
|
+ * @param port 端口号,用于指定主机上的通信端口。
|
|
|
+ * @param channel 通道名称,用于连接到消息队列管理器。
|
|
|
+ * @param queueManager 队列管理器名称,负责管理消息队列。
|
|
|
+ * @param queueName 队列名称,指定消息将被发送到的队列。
|
|
|
+ * @param ccsid CCSID(Character Code Set Identifier),指定字符编码集。
|
|
|
+ * @param username 用户名,用于认证发送消息的用户。
|
|
|
+ * @param password 密码,与用户名一起用于认证。
|
|
|
+ * @param data 要发送的数据,该方法支持任意类型的数据对象。
|
|
|
+ *
|
|
|
+ * @throws JMSException 如果发送消息过程中发生任何JMS异常,则抛出。
|
|
|
+ */
|
|
|
+ public void sendSingletonMessage(
|
|
|
+ String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password,
|
|
|
+ String data
|
|
|
+ ) throws JMSException {
|
|
|
+ // 将数据封装为单元素列表,并调用sendMessage方法发送消息
|
|
|
+ sendMessage(host, port, channel, queueManager, queueName, ccsid, username, password, List.of(data));
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 发送消息到指定的队列
|
|
|
+ *
|
|
|
+ * @param host 主机地址
|
|
|
+ * @param port 端口号
|
|
|
+ * @param channel 频道或连接标识
|
|
|
+ * @param queueManager 队列管理器名称
|
|
|
+ * @param queueName 队列名称
|
|
|
+ * @param ccsid 编码集ID
|
|
|
+ * @param username 用户名,用于认证
|
|
|
+ * @param password 密码,用于认证
|
|
|
+ * @param data 要发送的消息数据列表
|
|
|
+ *
|
|
|
+ * @throws JMSException 如果发送消息过程中出现JMS异常
|
|
|
+ */
|
|
|
+ public void sendMessage(
|
|
|
+ String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password,
|
|
|
+ List<String> data
|
|
|
+ ) throws JMSException {
|
|
|
+
|
|
|
+ // 初始化JMS模板,配置连接信息
|
|
|
+ initJmsOperations(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
|
|
|
+
|
|
|
+ try {
|
|
|
+ // 遍历数据列表,发送每条消息
|
|
|
+ for ( String it : data ) {
|
|
|
+ execSend(it, queueName);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch ( Exception e ) {
|
|
|
+ // 若开启调试模式,打印异常堆栈
|
|
|
+ e.printStackTrace();
|
|
|
+ // 出现异常时,清理jmsTemplate对象
|
|
|
+ jmsTemplate = null;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 检查是否应该中断处理消息的循环。
|
|
|
+ *
|
|
|
+ * @param message 需要检查的消息对象,如果为null表示找到终止条件。
|
|
|
+ * @param result 存储处理结果的列表,每处理一个消息,将其文本添加到列表中。
|
|
|
+ * @param maxSize 当结果列表的大小达到或超过此值时,视为找到终止条件。
|
|
|
+ *
|
|
|
+ * @return 如果未找到终止条件,返回true;反之,返回false。
|
|
|
+ *
|
|
|
+ * @throws JMSException 如果处理消息时发生错误。
|
|
|
+ */
|
|
|
+ private static boolean checkIsBreak(Message message, List<String> result, long maxSize) throws JMSException {
|
|
|
+ // 判断消息是否为null,若为null则认为找到了终止条件
|
|
|
+ boolean found = message == null;
|
|
|
+ // 如果消息是TextMessage类型,提取并存储其文本内容
|
|
|
+ if ( message instanceof TextMessage msg ) {
|
|
|
+ String text = msg.getText();
|
|
|
+ result.add(text);
|
|
|
+ }
|
|
|
+ // 如果结果列表的大小达到或超过最大值,认为找到了终止条件
|
|
|
+ if ( result.size() >= maxSize ) {
|
|
|
+ found = true;
|
|
|
+ }
|
|
|
+ // 返回相反的逻辑,即未找到终止条件为true,找到为false
|
|
|
+ return ! found;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 使用JmsTemplate发送消息到指定的队列。
|
|
|
+ *
|
|
|
+ * @param data 要发送的数据,可以是任意类型,最终会被转换为字符串格式发送。
|
|
|
+ * @param queueName 队列名称,指定消息要发送到的目标队列。
|
|
|
+ */
|
|
|
+ private void execSend(
|
|
|
+ String data,
|
|
|
+ String queueName
|
|
|
+ ) {
|
|
|
+ // 将数据转换为字符串格式
|
|
|
+ // 使用JmsTemplate发送消息
|
|
|
+ jmsTemplate.convertAndSend(queueName, data);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建一个 MQ 队列连接工厂实例。
|
|
|
+ *
|
|
|
+ * @param host 主机名,用于指定 MQ 服务器的地址。
|
|
|
+ * @param port 端口号,用于指定 MQ 服务器监听的端口。
|
|
|
+ * @param ccsid CCSID(Character Code Set Identifier),用于指定字符集。
|
|
|
+ * @param queueManager 队列管理器名,标识一个 MQ 队列管理器。
|
|
|
+ * @param channel 通道名,用于客户端和队列管理器之间的通信。
|
|
|
+ *
|
|
|
+ * @return 初始化后的 MQQueueConnectionFactory 实例。
|
|
|
+ *
|
|
|
+ * @throws JMSException 如果创建连接工厂时发生错误。
|
|
|
+ */
|
|
|
+ private MQQueueConnectionFactory createMqQueueConnectionFactory(String host, int port, int ccsid, String queueManager, String channel) throws JMSException {
|
|
|
+ MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
|
|
|
+ // 设置连接工厂的主机名
|
|
|
+ mqQueueConnectionFactory.setHostName(host);
|
|
|
+ // 设置连接工厂的端口
|
|
|
+ mqQueueConnectionFactory.setPort(port);
|
|
|
+ // 设置连接工厂的字符集
|
|
|
+ mqQueueConnectionFactory.setCCSID(ccsid);
|
|
|
+ // 设置连接工厂的队列管理器名
|
|
|
+ mqQueueConnectionFactory.setQueueManager(queueManager);
|
|
|
+ // 设置连接工厂的通道名
|
|
|
+ mqQueueConnectionFactory.setChannel(channel);
|
|
|
+ // 设置连接工厂的传输类型为 TCP
|
|
|
+ mqQueueConnectionFactory.setTransportType(1);
|
|
|
+ return mqQueueConnectionFactory;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建并配置一个JmsTemplate实例。
|
|
|
+ *
|
|
|
+ * @param factory CachingConnectionFactory的实例,用于设置JmsTemplate的消息工厂。
|
|
|
+ *
|
|
|
+ * @return 配置好的JmsTemplate实例,可用于发送消息。
|
|
|
+ */
|
|
|
+ private JmsTemplate createJmsTemplate(CachingConnectionFactory factory) {
|
|
|
+ JmsTemplate template = new JmsTemplate();
|
|
|
+ template.setConnectionFactory(factory); // 设置消息工厂
|
|
|
+ return template;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建一个 CachingConnectionFactory 实例。 此方法通过将一个给定的 ConnectionFactory 设置为缓存工厂的目标ConnectionFactory来配置
|
|
|
+ * CachingConnectionFactory。
|
|
|
+ *
|
|
|
+ * @param factory 用于创建缓存工厂的目标ConnectionFactory。
|
|
|
+ *
|
|
|
+ * @return 配置好的 CachingConnectionFactory 实例。
|
|
|
+ */
|
|
|
+ private CachingConnectionFactory createCachingConnectionFactory(ConnectionFactory factory) {
|
|
|
+ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
|
|
|
+ // 设置目标ConnectionFactory,以便CachingConnectionFactory可以使用它来创建连接
|
|
|
+ cachingConnectionFactory.setTargetConnectionFactory(factory);
|
|
|
+ return cachingConnectionFactory;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 创建并配置一个 UserCredentialsConnectionFactoryAdapter 实例。
|
|
|
+ * 这个方法通过提供用户名、密码和一个ConnectionFactory,来创建一个已经设置了认证信息的ConnectionFactory适配器。
|
|
|
+ *
|
|
|
+ * @param username 将要用于认证的用户名。
|
|
|
+ * @param password 将要用于认证的密码。
|
|
|
+ * @param factory 是底层的ConnectionFactory,认证通过后将使用这个ConnectionFactory创建连接。
|
|
|
+ *
|
|
|
+ * @return 配置好的 UserCredentialsConnectionFactoryAdapter 实例,可用于进一步的使用。
|
|
|
+ */
|
|
|
+ private UserCredentialsConnectionFactoryAdapter createAdapter(String username, String password, ConnectionFactory factory) {
|
|
|
+ UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
|
|
|
+ adapter.setUsername(username); // 设置认证使用的用户名
|
|
|
+ adapter.setPassword(password); // 设置认证使用的密码
|
|
|
+ adapter.setTargetConnectionFactory(factory); // 设置目标ConnectionFactory
|
|
|
+ return adapter;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 获取一个配置好的JmsTemplate实例,用于与MQ队列进行交互。
|
|
|
+ *
|
|
|
+ * @param host MQ服务器的主机名。
|
|
|
+ * @param port MQ服务器的端口号。
|
|
|
+ * @param ccsid MQ服务器的字符集编码。
|
|
|
+ * @param queueManager MQ队列管理器的名称。
|
|
|
+ * @param channel 与MQ队列管理器通信的通道名称。
|
|
|
+ * @param username 连接MQ时的用户名。
|
|
|
+ * @param password 连接MQ时的密码。
|
|
|
+ *
|
|
|
+ * @throws JMSException 如果在创建JMS连接工厂或模板时发生错误。
|
|
|
+ */
|
|
|
+ private void initJmsOperations(String host, int port, int ccsid, String queueManager, String channel, String username, String password) throws JMSException {
|
|
|
+ // 如果jmsTemplate尚未初始化,则进行初始化
|
|
|
+ if ( jmsTemplate == null ) {
|
|
|
+ // 创建MQ队列连接工厂
|
|
|
+ MQQueueConnectionFactory mqQueueConnectionFactory = createMqQueueConnectionFactory(host, port, ccsid, queueManager, channel);
|
|
|
+ // 使用用户名和密码适配器包装MQ队列连接工厂
|
|
|
+ UserCredentialsConnectionFactoryAdapter adapter = createAdapter(username, password, mqQueueConnectionFactory);
|
|
|
+ // 创建缓存连接工厂,以提高性能
|
|
|
+ CachingConnectionFactory cachingConnectionFactory = createCachingConnectionFactory(adapter);
|
|
|
+
|
|
|
+ // 创建并配置JmsTemplate实例
|
|
|
+ jmsTemplate = createJmsTemplate(cachingConnectionFactory);
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+}
|