|
@@ -5,6 +5,7 @@ import com.bfkj.unidia.Result;
|
|
import com.github.benmanes.caffeine.cache.Cache;
|
|
import com.github.benmanes.caffeine.cache.Cache;
|
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
|
import com.github.benmanes.caffeine.cache.Caffeine;
|
|
import com.ibm.mq.*;
|
|
import com.ibm.mq.*;
|
|
|
|
+import com.ibm.mq.constants.CMQC;
|
|
import com.ibm.mq.constants.MQConstants;
|
|
import com.ibm.mq.constants.MQConstants;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
@@ -12,9 +13,7 @@ import org.springframework.stereotype.Component;
|
|
|
|
|
|
import javax.annotation.PreDestroy;
|
|
import javax.annotation.PreDestroy;
|
|
import java.time.Duration;
|
|
import java.time.Duration;
|
|
-import java.util.ArrayList;
|
|
|
|
-import java.util.List;
|
|
|
|
-import java.util.Map;
|
|
|
|
|
|
+import java.util.*;
|
|
|
|
|
|
@Component
|
|
@Component
|
|
public class IBMMQHelper {
|
|
public class IBMMQHelper {
|
|
@@ -182,6 +181,8 @@ public class IBMMQHelper {
|
|
public static Result<List<String>> receiveMessage(Map<String, String> mqConfig,
|
|
public static Result<List<String>> receiveMessage(Map<String, String> mqConfig,
|
|
String queueName,
|
|
String queueName,
|
|
Integer maxMessages) {
|
|
Integer maxMessages) {
|
|
|
|
+ logger.info("IBM MQ配置-------------------------------------------");
|
|
|
|
+ logger.info(DataFormatConverter.mapToJson(mqConfig));
|
|
MQQueueManager queueManager = null;
|
|
MQQueueManager queueManager = null;
|
|
String cacheKey = null;
|
|
String cacheKey = null;
|
|
try {
|
|
try {
|
|
@@ -203,8 +204,95 @@ public class IBMMQHelper {
|
|
if(queue == null){
|
|
if(queue == null){
|
|
return Result.fail("绑定队列失败");
|
|
return Result.fail("绑定队列失败");
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ List<String> messages = new ArrayList<>();
|
|
|
|
+ //创建消息获取选项并设置等待时间
|
|
|
|
+ MQGetMessageOptions gmo = new MQGetMessageOptions();
|
|
|
|
+ gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_FAIL_IF_QUIESCING;
|
|
|
|
+ gmo.waitInterval = 5000;
|
|
|
|
+ //gmo.options = MQConstants.MQGMO_WAIT | MQConstants.MQGMO_CONVERT | MQConstants.MQGMO_FAIL_IF_QUIESCING;
|
|
|
|
+ //gmo.waitInterval = 0; // 0表示无限等
|
|
|
|
+// //不能获取当前队列深度
|
|
|
|
+// int currentDepth = queue.getCurrentDepth();
|
|
|
|
+// if (currentDepth == 0) {//队列深度为0,返回空结果
|
|
|
|
+// return Result.success(new ArrayList<>());
|
|
|
|
+// }
|
|
|
|
+ //for (int i = 0; i < Math.min(maxMessages, currentDepth); i++) {//循环获取指定数量的消息
|
|
|
|
+ for (int i = 0; i < maxMessages; i++) {//循环获取指定数量的消息
|
|
|
|
+ try {
|
|
|
|
+ MQMessage message = new MQMessage();
|
|
|
|
+ try {
|
|
|
|
+ queue.get(message, gmo);
|
|
|
|
+ byte[] buffer = new byte[message.getMessageLength()];
|
|
|
|
+ message.readFully(buffer);
|
|
|
|
+ String read = new String(buffer);
|
|
|
|
+ logger.info("接收 IBM MQ 队列消息:{}", read);
|
|
|
|
+ messages.add(read);
|
|
|
|
+ } catch (MQException e) {
|
|
|
|
+ if (e.getReason() == MQConstants.MQRC_NO_MSG_AVAILABLE) {
|
|
|
|
+ break; // 没有消息是正常情况,跳出循环
|
|
|
|
+ }
|
|
|
|
+ throw e; // 其他MQ异常需要上层处理
|
|
|
|
+ }
|
|
|
|
+ } catch (MQException e) {
|
|
|
|
+ if (e.reasonCode == MQConstants.MQRC_NO_MSG_AVAILABLE) {
|
|
|
|
+ logger.info("IBMMQ没有更多消息");
|
|
|
|
+ break; // 没有更多消息,提前结束
|
|
|
|
+ } else {
|
|
|
|
+ logger.error("接收 IBM MQ 消息失败", e);
|
|
|
|
+ return Result.fail("接收消息失败: " + e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ Result<List<String>> listResult = Result.success(messages);
|
|
|
|
+ logger.info("最大接收数量:"+maxMessages);
|
|
|
|
+ logger.info("队列名称:"+queueName);
|
|
|
|
+ if (Objects.nonNull(listResult.getData())&&listResult.getData().size()>0){
|
|
|
|
+ logger.info("发送数量",listResult.getData().size());
|
|
|
|
+ }
|
|
|
|
+ return listResult;
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ closeQueue(cacheKey, queue, consumerQueueCache);
|
|
|
|
+ logger.error("IBM MQ 接收异常", e);
|
|
|
|
+ return Result.fail("IBM MQ 收异常: " + e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ logger.error("IBM MQ 接收异常", e);
|
|
|
|
+ closeQueueManager(cacheKey,queueManager);
|
|
|
|
+ return Result.fail("IBM MQ 接收异常: " + e.getMessage());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+/* public static Result<List<String>> receiveMessage(Map<String, String> mqConfig,
|
|
|
|
+ String queueName,
|
|
|
|
+ Integer maxMessages) {
|
|
|
|
+ MQQueueManager queueManager = null;
|
|
|
|
+ String cacheKey = null;
|
|
|
|
+ try {
|
|
|
|
+ //配置验证阶段
|
|
|
|
+ Result<String> validateResult = validateConfig(mqConfig, queueName, "data");
|
|
|
|
+ if (!validateResult.isSuccess()) {
|
|
|
|
+ return Result.fail(validateResult.getError());
|
|
|
|
+ }
|
|
|
|
+ cacheKey = validateResult.getData();
|
|
|
|
+ //获取队列管理器
|
|
|
|
+ queueManager = getQueueManager(mqConfig, cacheKey);
|
|
|
|
+ if (queueManager == null) {
|
|
|
|
+ return Result.fail("获取队列管理器失败");
|
|
|
|
+ }
|
|
|
|
+ MQQueue queue = null;
|
|
|
|
+ try {
|
|
|
|
+ //绑定队列
|
|
|
|
+ queue = getconsumerMQQueue(queueManager, cacheKey, queueName);
|
|
|
|
+ logger.info("*************************绑定队列深度");
|
|
|
|
+ logger.info("队列深度:"+queue);
|
|
|
|
+ if(queue == null){
|
|
|
|
+ return Result.fail("绑定队列失败");
|
|
|
|
+ }
|
|
//获取当前队列深度
|
|
//获取当前队列深度
|
|
int currentDepth = queue.getCurrentDepth();
|
|
int currentDepth = queue.getCurrentDepth();
|
|
|
|
+ logger.info("获取队列深度");
|
|
|
|
+ logger.info("队列深度:"+currentDepth);
|
|
if (currentDepth == 0) {//队列深度为0,返回空结果
|
|
if (currentDepth == 0) {//队列深度为0,返回空结果
|
|
return Result.success(new ArrayList<>());
|
|
return Result.success(new ArrayList<>());
|
|
}
|
|
}
|
|
@@ -251,7 +339,7 @@ public class IBMMQHelper {
|
|
closeQueueManager(cacheKey,queueManager);
|
|
closeQueueManager(cacheKey,queueManager);
|
|
return Result.fail("IBM MQ 接收异常: " + e.getMessage());
|
|
return Result.fail("IBM MQ 接收异常: " + e.getMessage());
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }*/
|
|
|
|
|
|
/**
|
|
/**
|
|
* 获取或创建 IBM MQ 队列管理器实例
|
|
* 获取或创建 IBM MQ 队列管理器实例
|
|
@@ -270,12 +358,18 @@ public class IBMMQHelper {
|
|
* @param cacheKey 缓存键值,用于唯一标识该队列管理器实例
|
|
* @param cacheKey 缓存键值,用于唯一标识该队列管理器实例
|
|
* @return 成功时返回队列管理器实例,失败返回 null
|
|
* @return 成功时返回队列管理器实例,失败返回 null
|
|
*/
|
|
*/
|
|
- private static MQQueueManager getQueueManager(Map<String, String> mqConfig, String cacheKey) {
|
|
|
|
|
|
+/* private static MQQueueManager getQueueManager(Map<String, String> mqConfig, String cacheKey) {
|
|
// 一级缓存检查:无需同步的快速失败路径
|
|
// 一级缓存检查:无需同步的快速失败路径
|
|
MQQueueManager queueManager = queueManagerCache.getIfPresent(cacheKey);
|
|
MQQueueManager queueManager = queueManagerCache.getIfPresent(cacheKey);
|
|
if (queueManager != null && queueManager.isConnected()) {
|
|
if (queueManager != null && queueManager.isConnected()) {
|
|
return queueManager;
|
|
return queueManager;
|
|
}
|
|
}
|
|
|
|
+ logger.info("********************获取或创建 IBM MQ 队列管理器实例");
|
|
|
|
+ if (!Objects.nonNull(mqConfig)){
|
|
|
|
+ if (Objects.nonNull(mqConfig.get("host"))) logger.info("host:"+mqConfig.get("host"));
|
|
|
|
+ if (Objects.nonNull(mqConfig.get("port"))) logger.info("端口:"+mqConfig.get("port"));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
// 同步块确保线程安全,防止重复创建连接
|
|
// 同步块确保线程安全,防止重复创建连接
|
|
synchronized (connectionLock) {
|
|
synchronized (connectionLock) {
|
|
@@ -290,16 +384,20 @@ public class IBMMQHelper {
|
|
MQEnvironment.hostname = mqConfig.get("host");
|
|
MQEnvironment.hostname = mqConfig.get("host");
|
|
MQEnvironment.port = Integer.parseInt(mqConfig.get("port"));
|
|
MQEnvironment.port = Integer.parseInt(mqConfig.get("port"));
|
|
MQEnvironment.channel = mqConfig.get("channel");
|
|
MQEnvironment.channel = mqConfig.get("channel");
|
|
- MQEnvironment.userID = mqConfig.get("username");
|
|
|
|
- MQEnvironment.password = mqConfig.get("password");
|
|
|
|
|
|
+ if (StringUtils.isNullOrEmpty(mqConfig.get("username"))){
|
|
|
|
+ MQEnvironment.userID = mqConfig.get("username");
|
|
|
|
+ }
|
|
|
|
+ if (StringUtils.isNullOrEmpty(mqConfig.get("password"))){
|
|
|
|
+ MQEnvironment.password = mqConfig.get("password");
|
|
|
|
+ }
|
|
MQEnvironment.CCSID = Integer.parseInt(mqConfig.getOrDefault("ccsid", "1208"));
|
|
MQEnvironment.CCSID = Integer.parseInt(mqConfig.getOrDefault("ccsid", "1208"));
|
|
System.setProperty("com.ibm.mq.transport.type", mqConfig.getOrDefault("transportType", "1"));
|
|
System.setProperty("com.ibm.mq.transport.type", mqConfig.getOrDefault("transportType", "1"));
|
|
|
|
|
|
// 创建新的队列管理器实例
|
|
// 创建新的队列管理器实例
|
|
- queueManager = new MQQueueManager(mqConfig.get("queueManager"));
|
|
|
|
-
|
|
|
|
|
|
+ps -ef
|
|
// 将新创建的实例存入缓存
|
|
// 将新创建的实例存入缓存
|
|
queueManagerCache.put(cacheKey, queueManager);
|
|
queueManagerCache.put(cacheKey, queueManager);
|
|
|
|
+ logger.info("********************获取或创建 IBM MQ 队列管理器实例 end");
|
|
return queueManager;
|
|
return queueManager;
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
// 记录连接失败的异常信息
|
|
// 记录连接失败的异常信息
|
|
@@ -307,8 +405,55 @@ public class IBMMQHelper {
|
|
return null;
|
|
return null;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- }
|
|
|
|
|
|
+ }*/
|
|
|
|
+ private static MQQueueManager getQueueManager(Map<String, String> mqConfig, String cacheKey) {
|
|
|
|
+ // 一级缓存检查:无需同步的快速失败路径
|
|
|
|
+ MQQueueManager queueManager = queueManagerCache.getIfPresent(cacheKey);
|
|
|
|
+ if (queueManager != null && queueManager.isConnected()) {
|
|
|
|
+ return queueManager;
|
|
|
|
+ }
|
|
|
|
|
|
|
|
+ // 同步块确保线程安全,防止重复创建连接
|
|
|
|
+ synchronized (connectionLock) {
|
|
|
|
+ // 二级缓存检查:防止重复初始化
|
|
|
|
+ queueManager = queueManagerCache.getIfPresent(cacheKey);
|
|
|
|
+ if (queueManager != null && queueManager.isConnected()) {
|
|
|
|
+ return queueManager;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ try {
|
|
|
|
+ //
|
|
|
|
+ Properties props = new Properties();
|
|
|
|
+ props.put(CMQC.HOST_NAME_PROPERTY, mqConfig.get("host"));
|
|
|
|
+ props.put(CMQC.PORT_PROPERTY, Integer.parseInt(mqConfig.get("port")));
|
|
|
|
+ props.put(CMQC.CHANNEL_PROPERTY, mqConfig.get("channel"));
|
|
|
|
+ props.put(CMQC.CCSID_PROPERTY,mqConfig.getOrDefault("ccsid", "1208"));
|
|
|
|
+
|
|
|
|
+ String userId = mqConfig.get("username");
|
|
|
|
+ if(userId != null && !userId.isEmpty()){
|
|
|
|
+ props.put(CMQC.USER_ID_PROPERTY, userId);
|
|
|
|
+ }
|
|
|
|
+ String password = mqConfig.get("password");
|
|
|
|
+ if(password != null && !password.isEmpty()) {
|
|
|
|
+ props.put(CMQC.PASSWORD_PROPERTY, password);
|
|
|
|
+ }
|
|
|
|
+ String queueManagerName = mqConfig.get("queueManager");
|
|
|
|
+ System.setProperty("com.ibm.mq.transport.type", mqConfig.getOrDefault("transportType", "1"));
|
|
|
|
+ if(queueManagerName != null && !queueManagerName.isEmpty()) {
|
|
|
|
+ queueManager = new MQQueueManager(queueManagerName, props);
|
|
|
|
+ queueManagerCache.put(cacheKey, queueManager);
|
|
|
|
+ return queueManager;
|
|
|
|
+ }else {
|
|
|
|
+ logger.error("创建MQ队列管理器失败: 队列名为空{}", mqConfig);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ // 记录连接失败的异常信息
|
|
|
|
+ logger.error("创建 IBM MQ 连接失败", e);
|
|
|
|
+ return null;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
/**
|
|
/**
|
|
* 获取或创建一个MQ队列实例用于生产者操作。
|
|
* 获取或创建一个MQ队列实例用于生产者操作。
|
|
* 该方法首先尝试从缓存中获取已存在的队列实例,若队列不存在或未打开,
|
|
* 该方法首先尝试从缓存中获取已存在的队列实例,若队列不存在或未打开,
|