|
@@ -25,24 +25,22 @@ import java.util.Optional;
|
|
|
*/
|
|
|
public class IBMMQ {
|
|
|
private static final DataBase DATA_BASE = new DataBase();
|
|
|
-
|
|
|
+
|
|
|
private JmsTemplate jmsTemplate = null;
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 根据数据源ID接收消息。
|
|
|
* 通过给定的数据源ID查询连接配置,并使用这些配置连接到指定的队列管理器,从指定的队列中接收消息。
|
|
|
*
|
|
|
- * @param dataSourceId 数据源ID,用于查询连接字符串。
|
|
|
- * @param channel 通道名称。
|
|
|
- * @param queueManager 队列管理器名称。
|
|
|
- * @param queueName 队列名称。
|
|
|
- * @param ccsid 编码集ID。
|
|
|
+ * @param dataSourceId 数据源ID,用于查询连接字符串。
|
|
|
+ * @param channel 通道名称。
|
|
|
+ * @param queueManager 队列管理器名称。
|
|
|
+ * @param queueName 队列名称。
|
|
|
+ * @param ccsid 编码集ID。
|
|
|
* @param receiveTimeout 接收超时时间。
|
|
|
- * @param pollSize 每次轮询的大小。
|
|
|
- * @param retry 重试次数。
|
|
|
- *
|
|
|
+ * @param pollSize 每次轮询的大小。
|
|
|
+ * @param retry 重试次数。
|
|
|
* @return 返回接收到的消息列表。
|
|
|
- *
|
|
|
* @throws Exception 如果连接配置查询失败或消息接收过程中出现错误,则抛出异常。
|
|
|
*/
|
|
|
public List<String> receptionMessageByDataSourceId(String dataSourceId, String channel, String queueManager,
|
|
@@ -71,26 +69,24 @@ public class IBMMQ {
|
|
|
}
|
|
|
// 使用解析出的配置信息进行消息接收
|
|
|
return receptionMessage(host, port, channel, queueManager, queueName, ccsid, username, password, receiveTimeout,
|
|
|
- pollSize, retry);
|
|
|
+ pollSize, retry);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 从指定队列接收消息的函数。
|
|
|
*
|
|
|
- * @param host 主机地址。
|
|
|
- * @param port 端口号。
|
|
|
- * @param channel 频道或连接标识。
|
|
|
- * @param queueManager 队列管理器名称。
|
|
|
- * @param queueName 需要接收消息的队列名称。
|
|
|
- * @param ccsid 编码集ID。
|
|
|
- * @param username 连接认证的用户名。
|
|
|
- * @param password 连接认证的密码。
|
|
|
+ * @param host 主机地址。
|
|
|
+ * @param port 端口号。
|
|
|
+ * @param channel 频道或连接标识。
|
|
|
+ * @param queueManager 队列管理器名称。
|
|
|
+ * @param queueName 需要接收消息的队列名称。
|
|
|
+ * @param ccsid 编码集ID。
|
|
|
+ * @param username 连接认证的用户名。
|
|
|
+ * @param password 连接认证的密码。
|
|
|
* @param receiveTimeout 接收消息的超时时间。
|
|
|
- * @param pollSize 每次轮询的最大消息数。如果为0,则默认为100。
|
|
|
- * @param retry 接收消息失败后的最大重试次数。
|
|
|
- *
|
|
|
+ * @param pollSize 每次轮询的最大消息数。如果为0,则默认为100。
|
|
|
+ * @param retry 接收消息失败后的最大重试次数。
|
|
|
* @return 返回接收到的消息列表。
|
|
|
- *
|
|
|
* @throws JMSException 如果JMS操作失败,则抛出此异常。
|
|
|
*/
|
|
|
public List<String> receptionMessage(String host, Long port, String channel, String queueManager, String queueName,
|
|
@@ -133,16 +129,14 @@ public class IBMMQ {
|
|
|
}
|
|
|
return result; // 返回最终接收的消息列表
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 检查是否应该中断处理消息的循环。
|
|
|
*
|
|
|
* @param message 需要检查的消息对象,如果为null表示找到终止条件。
|
|
|
- * @param result 存储处理结果的列表,每处理一个消息,将其文本添加到列表中。
|
|
|
+ * @param result 存储处理结果的列表,每处理一个消息,将其文本添加到列表中。
|
|
|
* @param maxSize 当结果列表的大小达到或超过此值时,视为找到终止条件。
|
|
|
- *
|
|
|
* @return 如果未找到终止条件,返回true;反之,返回false。
|
|
|
- *
|
|
|
* @throws JMSException 如果处理消息时发生错误。
|
|
|
*/
|
|
|
private static boolean checkIsBreak(Message message, List<String> result, long maxSize) throws JMSException {
|
|
@@ -158,47 +152,45 @@ public class IBMMQ {
|
|
|
found = true;
|
|
|
}
|
|
|
// 返回相反的逻辑,即未找到终止条件为true,找到为false
|
|
|
- return ! found;
|
|
|
+ return !found;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 通过数据源ID发送单例消息。
|
|
|
* 此方法将指定的消息数据作为列表中的单个元素发送,适用于需要发送单一消息体的情况。
|
|
|
*
|
|
|
* @param dataSourceId 数据源ID,用于标识消息发送的目标数据源。
|
|
|
- * @param channel 通道名称,指定消息将通过的通道。
|
|
|
+ * @param channel 通道名称,指定消息将通过的通道。
|
|
|
* @param queueManager 队列管理器名称,指定消息将由哪个队列管理器处理。
|
|
|
- * @param queueName 队列名称,指定消息将被发送到的队列。
|
|
|
- * @param ccsid CCSID(字符集编码序列ID),指定消息的字符集。
|
|
|
- * @param data 要发送的消息数据对象。
|
|
|
- *
|
|
|
+ * @param queueName 队列名称,指定消息将被发送到的队列。
|
|
|
+ * @param ccsid CCSID(字符集编码序列ID),指定消息的字符集。
|
|
|
+ * @param data 要发送的消息数据对象。
|
|
|
* @throws Exception 如果发送过程中遇到任何错误,则抛出异常。
|
|
|
*/
|
|
|
public void sendSingletonMessageByDataSourceId(String dataSourceId, String channel, String queueManager,
|
|
|
String queueName, Long ccsid, Object data) throws Exception {
|
|
|
// 使用Collections.singletonList将数据封装为列表,然后调用sendMessageByDataSourceId方法进行发送
|
|
|
sendMessageByDataSourceId(dataSourceId, channel, queueManager, queueName, ccsid,
|
|
|
- Collections.singletonList(data));
|
|
|
+ Collections.singletonList(data));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void sendSingletonMessageToTopicByDataSourceId(String dataSourceId, String channel, String queueManager,
|
|
|
String topic, Long ccsid, Object data) throws Exception {
|
|
|
// 使用Collections.singletonList将数据封装为列表,然后调用sendMessageByDataSourceId方法进行发送
|
|
|
sendMessageToTopicByDataSourceId(dataSourceId, channel, queueManager, topic, ccsid,
|
|
|
- Collections.singletonList(data));
|
|
|
+ Collections.singletonList(data));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 根据数据源ID发送消息。
|
|
|
* 该方法通过查询数据源配置,获取连接信息,并使用提供的参数通过指定的通道、队列管理器和队列名称发送消息。
|
|
|
*
|
|
|
* @param dataSourceId 数据源ID,用于查询连接配置。
|
|
|
- * @param channel 通信通道名称。
|
|
|
+ * @param channel 通信通道名称。
|
|
|
* @param queueManager 队列管理器名称。
|
|
|
- * @param queueName 目标队列名称。
|
|
|
- * @param ccsid 编码集标识符。
|
|
|
- * @param data 要发送的消息数据列表。
|
|
|
- *
|
|
|
+ * @param queueName 目标队列名称。
|
|
|
+ * @param ccsid 编码集标识符。
|
|
|
+ * @param data 要发送的消息数据列表。
|
|
|
* @throws Exception 如果发送消息过程中遇到任何错误,则抛出异常。
|
|
|
*/
|
|
|
public void sendMessageByDataSourceId(String dataSourceId, String channel, String queueManager, String queueName,
|
|
@@ -227,7 +219,7 @@ public class IBMMQ {
|
|
|
// 发送消息
|
|
|
sendMessage(host, port, channel, queueManager, queueName, ccsid, username, password, data);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void sendMessageToTopicByDataSourceId(String dataSourceId, String channel, String queueManager, String topic,
|
|
|
Long ccsid, List<Object> data) throws Exception {
|
|
|
// 根据dataSourceId查询连接配置
|
|
@@ -254,50 +246,48 @@ public class IBMMQ {
|
|
|
// 发送消息
|
|
|
sendMessageToTopic(host, port, channel, queueManager, topic, ccsid, username, password, data);
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 发送单个消息到指定的队列。此方法封装了消息的发送过程,方便发送单个消息。
|
|
|
*
|
|
|
- * @param host 主机地址,消息将从该主机发送。
|
|
|
- * @param port 端口号,用于指定主机上的通信端口。
|
|
|
- * @param channel 通道名称,用于连接到消息队列管理器。
|
|
|
+ * @param host 主机地址,消息将从该主机发送。
|
|
|
+ * @param port 端口号,用于指定主机上的通信端口。
|
|
|
+ * @param channel 通道名称,用于连接到消息队列管理器。
|
|
|
* @param queueManager 队列管理器名称,负责管理消息队列。
|
|
|
- * @param queueName 队列名称,指定消息将被发送到的队列。
|
|
|
- * @param ccsid CCSID(Character Code Set Identifier),指定字符编码集。
|
|
|
- * @param username 用户名,用于认证发送消息的用户。
|
|
|
- * @param password 密码,与用户名一起用于认证。
|
|
|
- * @param data 要发送的数据,该方法支持任意类型的数据对象。
|
|
|
- *
|
|
|
+ * @param queueName 队列名称,指定消息将被发送到的队列。
|
|
|
+ * @param ccsid CCSID(Character Code Set Identifier),指定字符编码集。
|
|
|
+ * @param username 用户名,用于认证发送消息的用户。
|
|
|
+ * @param password 密码,与用户名一起用于认证。
|
|
|
+ * @param data 要发送的数据,该方法支持任意类型的数据对象。
|
|
|
* @throws JMSException 如果发送消息过程中发生任何JMS异常,则抛出。
|
|
|
*/
|
|
|
public void sendSingletonMessage(String host, Long port, String channel, String queueManager, String queueName,
|
|
|
Long ccsid, String username, String password, Object data) throws JMSException {
|
|
|
// 将数据封装为单元素列表,并调用sendMessage方法发送消息
|
|
|
sendMessage(host, port, channel, queueManager, queueName, ccsid, username, password,
|
|
|
- Collections.singletonList(data));
|
|
|
+ Collections.singletonList(data));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void sendSingletonMessageToTopic(String host, Long port, String channel, String queueManager, String topic,
|
|
|
Long ccsid, String username, String password, Object data)
|
|
|
throws JMSException {
|
|
|
// 将数据封装为单元素列表,并调用sendMessage方法发送消息
|
|
|
sendMessageToTopic(host, port, channel, queueManager, topic, ccsid, username, password,
|
|
|
- Collections.singletonList(data));
|
|
|
+ Collections.singletonList(data));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 发送消息到指定的队列
|
|
|
*
|
|
|
- * @param host 主机地址
|
|
|
- * @param port 端口号
|
|
|
- * @param channel 频道或连接标识
|
|
|
+ * @param host 主机地址
|
|
|
+ * @param port 端口号
|
|
|
+ * @param channel 频道或连接标识
|
|
|
* @param queueManager 队列管理器名称
|
|
|
- * @param queueName 队列名称
|
|
|
- * @param ccsid 编码集ID
|
|
|
- * @param username 用户名,用于认证
|
|
|
- * @param password 密码,用于认证
|
|
|
- * @param data 要发送的消息数据列表
|
|
|
- *
|
|
|
+ * @param queueName 队列名称
|
|
|
+ * @param ccsid 编码集ID
|
|
|
+ * @param username 用户名,用于认证
|
|
|
+ * @param password 密码,用于认证
|
|
|
+ * @param data 要发送的消息数据列表
|
|
|
* @throws JMSException 如果发送消息过程中出现JMS异常
|
|
|
*/
|
|
|
public void sendMessage(String host, Long port, String channel, String queueManager, String queueName, Long ccsid,
|
|
@@ -319,9 +309,10 @@ public class IBMMQ {
|
|
|
if (jmsTemplate != null) {
|
|
|
jmsTemplate = null;
|
|
|
}
|
|
|
+ throw e;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
public void sendMessageToTopic(String host, Long port, String channel, String queueManager, String topicName,
|
|
|
Long ccsid, String username, String password, List<Object> data)
|
|
|
throws JMSException {
|
|
@@ -342,14 +333,15 @@ public class IBMMQ {
|
|
|
if (jmsTemplate != null) {
|
|
|
jmsTemplate = null;
|
|
|
}
|
|
|
+ throw e;
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 使用JmsTemplate发送消息到指定的队列。
|
|
|
*
|
|
|
- * @param template JmsTemplate,用于发送消息的模板,封装了发送消息的详细逻辑。
|
|
|
- * @param data 要发送的数据,可以是任意类型,最终会被转换为字符串格式发送。
|
|
|
+ * @param template JmsTemplate,用于发送消息的模板,封装了发送消息的详细逻辑。
|
|
|
+ * @param data 要发送的数据,可以是任意类型,最终会被转换为字符串格式发送。
|
|
|
* @param queueName 队列名称,指定消息要发送到的目标队列。
|
|
|
*/
|
|
|
private void execSend(JmsTemplate template, Object data, String queueName) {
|
|
@@ -358,7 +350,7 @@ public class IBMMQ {
|
|
|
// 使用JmsTemplate发送消息,将消息封装为TextMessage并发送到指定队列
|
|
|
template.send(queueName, session -> session.createTextMessage(message));
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
private void execSendToTopic(JmsTemplate template, Object data, String topic) {
|
|
|
String message = DataFormatUtil.toString(data);
|
|
|
template.setDefaultDestinationName(topic);
|
|
@@ -368,18 +360,16 @@ public class IBMMQ {
|
|
|
System.out.println("message is null");
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 创建一个 MQ 队列连接工厂实例。
|
|
|
*
|
|
|
- * @param host 主机名,用于指定 MQ 服务器的地址。
|
|
|
- * @param port 端口号,用于指定 MQ 服务器监听的端口。
|
|
|
- * @param ccsid CCSID(Character Code Set Identifier),用于指定字符集。
|
|
|
+ * @param host 主机名,用于指定 MQ 服务器的地址。
|
|
|
+ * @param port 端口号,用于指定 MQ 服务器监听的端口。
|
|
|
+ * @param ccsid CCSID(Character Code Set Identifier),用于指定字符集。
|
|
|
* @param queueManager 队列管理器名,标识一个 MQ 队列管理器。
|
|
|
- * @param channel 通道名,用于客户端和队列管理器之间的通信。
|
|
|
- *
|
|
|
+ * @param channel 通道名,用于客户端和队列管理器之间的通信。
|
|
|
* @return 初始化后的 MQQueueConnectionFactory 实例。
|
|
|
- *
|
|
|
* @throws JMSException 如果创建连接工厂时发生错误。
|
|
|
*/
|
|
|
private MQQueueConnectionFactory createMqQueueConnectionFactory(String host, int port, int ccsid,
|
|
@@ -400,26 +390,24 @@ public class IBMMQ {
|
|
|
mqQueueConnectionFactory.setTransportType(1);
|
|
|
return mqQueueConnectionFactory;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 创建并配置一个JmsTemplate实例。
|
|
|
*
|
|
|
* @param factory CachingConnectionFactory的实例,用于设置JmsTemplate的消息工厂。
|
|
|
- *
|
|
|
* @return 配置好的JmsTemplate实例,可用于发送消息。
|
|
|
*/
|
|
|
- private JmsTemplate createJmsTemplate(CachingConnectionFactory factory) {
|
|
|
+ private JmsTemplate createJmsTemplate(ConnectionFactory factory) {
|
|
|
JmsTemplate template = new JmsTemplate();
|
|
|
template.setConnectionFactory(factory); // 设置消息工厂
|
|
|
return template;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 创建一个 CachingConnectionFactory 实例。
|
|
|
* 此方法通过将一个给定的 ConnectionFactory 设置为缓存工厂的目标ConnectionFactory来配置 CachingConnectionFactory。
|
|
|
*
|
|
|
* @param factory 用于创建缓存工厂的目标ConnectionFactory。
|
|
|
- *
|
|
|
* @return 配置好的 CachingConnectionFactory 实例。
|
|
|
*/
|
|
|
private CachingConnectionFactory createCachingConnectionFactory(ConnectionFactory factory) {
|
|
@@ -428,15 +416,14 @@ public class IBMMQ {
|
|
|
cachingConnectionFactory.setTargetConnectionFactory(factory);
|
|
|
return cachingConnectionFactory;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 创建并配置一个 UserCredentialsConnectionFactoryAdapter 实例。
|
|
|
* 这个方法通过提供用户名、密码和一个ConnectionFactory,来创建一个已经设置了认证信息的ConnectionFactory适配器。
|
|
|
*
|
|
|
* @param username 将要用于认证的用户名。
|
|
|
* @param password 将要用于认证的密码。
|
|
|
- * @param factory 是底层的ConnectionFactory,认证通过后将使用这个ConnectionFactory创建连接。
|
|
|
- *
|
|
|
+ * @param factory 是底层的ConnectionFactory,认证通过后将使用这个ConnectionFactory创建连接。
|
|
|
* @return 配置好的 UserCredentialsConnectionFactoryAdapter 实例,可用于进一步的使用。
|
|
|
*/
|
|
|
private UserCredentialsConnectionFactoryAdapter createAdapter(String username, String password,
|
|
@@ -447,20 +434,18 @@ public class IBMMQ {
|
|
|
adapter.setTargetConnectionFactory(factory); // 设置目标ConnectionFactory
|
|
|
return adapter;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 获取一个配置好的JmsTemplate实例,用于与MQ队列进行交互。
|
|
|
*
|
|
|
- * @param host MQ服务器的主机名。
|
|
|
- * @param port MQ服务器的端口号。
|
|
|
- * @param ccsid MQ服务器的字符集编码。
|
|
|
+ * @param host MQ服务器的主机名。
|
|
|
+ * @param port MQ服务器的端口号。
|
|
|
+ * @param ccsid MQ服务器的字符集编码。
|
|
|
* @param queueManager MQ队列管理器的名称。
|
|
|
- * @param channel 与MQ队列管理器通信的通道名称。
|
|
|
- * @param username 连接MQ时的用户名。
|
|
|
- * @param password 连接MQ时的密码。
|
|
|
- *
|
|
|
+ * @param channel 与MQ队列管理器通信的通道名称。
|
|
|
+ * @param username 连接MQ时的用户名。
|
|
|
+ * @param password 连接MQ时的密码。
|
|
|
* @return 配置好的JmsTemplate实例。
|
|
|
- *
|
|
|
* @throws JMSException 如果在创建JMS连接工厂或模板时发生错误。
|
|
|
*/
|
|
|
private JmsTemplate getJmsTemplate(String host, int port, int ccsid, String queueManager, String channel,
|
|
@@ -473,21 +458,17 @@ public class IBMMQ {
|
|
|
// 使用用户名和密码适配器包装MQ队列连接工厂
|
|
|
UserCredentialsConnectionFactoryAdapter adapter =
|
|
|
createAdapter(username, password, mqQueueConnectionFactory);
|
|
|
- // 创建缓存连接工厂,以提高性能
|
|
|
- CachingConnectionFactory cachingConnectionFactory = createCachingConnectionFactory(adapter);
|
|
|
// 创建并配置JmsTemplate实例
|
|
|
- jmsTemplate = createJmsTemplate(cachingConnectionFactory);
|
|
|
+ jmsTemplate = createJmsTemplate(adapter);
|
|
|
}
|
|
|
return jmsTemplate;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 查询指定数据源ID的连接字符串信息。
|
|
|
*
|
|
|
* @param datasourceId 数据源的唯一标识符。
|
|
|
- *
|
|
|
* @return 返回一个包含主机名、密码和用户名的Map对象。
|
|
|
- *
|
|
|
* @throws Exception 如果查询过程中发生错误,或者未找到指定的数据源ID,则抛出异常。
|
|
|
*/
|
|
|
private static Map<String, Object> queryConnectionStr(String datasourceId) throws Exception {
|