|
@@ -2,6 +2,7 @@ package com.bfkj.unidia.IOUtils;
|
|
|
|
|
|
import com.bfkj.unidia.DataUtils.DataFormatConverter;
|
|
|
import com.bfkj.unidia.Result;
|
|
|
+import com.bfkj.unidia.logUtils.Log;
|
|
|
import com.github.benmanes.caffeine.cache.Cache;
|
|
|
import com.ibm.mq.*;
|
|
|
import com.ibm.mq.constants.CMQC;
|
|
@@ -64,11 +65,11 @@ public class IBMMQHelper {
|
|
|
try {
|
|
|
Result<String> validateResult = validateConfig(mqConfig, queueName, messageList);
|
|
|
if (!validateResult.isSuccess()) {
|
|
|
- logger.error("MQ配置验证失败: {}", validateResult.getError());
|
|
|
+ Log.error("MQ配置验证失败: {}", validateResult.getError());
|
|
|
return Result.fail(validateResult.getError());
|
|
|
}
|
|
|
cacheKey = validateResult.getData();
|
|
|
- logger.info("IBMMQ发送,参数检测通过……");
|
|
|
+ Log.info("IBMMQ发送,参数检测通过……");
|
|
|
queueManager = getQueueManager(mqConfig,cacheKey);
|
|
|
if (queueManager == null) {
|
|
|
return Result.fail("获取IBMMQ队列管理器失败");
|
|
@@ -76,7 +77,7 @@ public class IBMMQHelper {
|
|
|
MQQueue queue = null;
|
|
|
try {
|
|
|
queue = getproducerMQQueue(queueManager,cacheKey,queueName);
|
|
|
- logger.info("IBMMQ发送,生产者获取成功,准备发送……");
|
|
|
+ Log.info("IBMMQ发送,生产者获取成功,准备发送……");
|
|
|
for (Object messageItem : messageList) {
|
|
|
String msg = sendSingleMessage(queue,mqConfig, DataFormatConverter.convertObjectToString(messageItem));
|
|
|
if(msg != null){
|
|
@@ -88,7 +89,7 @@ public class IBMMQHelper {
|
|
|
return Result.fail("IBM MQ 获取生产者队列失败: " + e.getMessage());
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- logger.error("IBM MQ 发送异常", e);
|
|
|
+ Log.error("IBM MQ 发送异常", e);
|
|
|
closeQueueManager(cacheKey,queueManager);
|
|
|
return Result.fail("IBM MQ 发送异常: " + e.getMessage());
|
|
|
}
|
|
@@ -133,7 +134,7 @@ public class IBMMQHelper {
|
|
|
return null;
|
|
|
} catch (Exception e) {
|
|
|
// 记录异常日志并返回失败结果
|
|
|
- logger.error("发送 IBM MQ 消息失败", e);
|
|
|
+ Log.error("发送 IBM MQ 消息失败", e);
|
|
|
// return Result.fail("发送消息失败: " + e.getMessage());
|
|
|
return message;
|
|
|
}
|
|
@@ -165,8 +166,8 @@ public class IBMMQHelper {
|
|
|
public static Result<List<String>> receiveMessage(Map<String, String> mqConfig,
|
|
|
String queueName,
|
|
|
Integer maxMessages) {
|
|
|
- logger.info("IBM MQ配置-------------------------------------------");
|
|
|
- logger.info(DataFormatConverter.mapToJson(mqConfig));
|
|
|
+ Log.info("IBM MQ配置-------------------------------------------");
|
|
|
+ Log.info(DataFormatConverter.mapToJson(mqConfig));
|
|
|
MQQueueManager queueManager = null;
|
|
|
String cacheKey = null;
|
|
|
try {
|
|
@@ -211,7 +212,7 @@ public class IBMMQHelper {
|
|
|
byte[] buffer = new byte[message.getMessageLength()];
|
|
|
message.readFully(buffer);
|
|
|
String read = new String(buffer);
|
|
|
- logger.info("接收 IBM MQ 队列消息:{}", read);
|
|
|
+ Log.info("接收 IBM MQ 队列消息:{}", read);
|
|
|
messages.add(read);
|
|
|
} catch (MQException e) {
|
|
|
if (e.getReason() == MQConstants.MQRC_NO_MSG_AVAILABLE) {
|
|
@@ -221,28 +222,28 @@ public class IBMMQHelper {
|
|
|
}
|
|
|
} catch (MQException e) {
|
|
|
if (e.reasonCode == MQConstants.MQRC_NO_MSG_AVAILABLE) {
|
|
|
- logger.info("IBMMQ没有更多消息");
|
|
|
+ Log.info("IBMMQ没有更多消息");
|
|
|
break; // 没有更多消息,提前结束
|
|
|
} else {
|
|
|
- logger.error("接收 IBM MQ 消息失败", e);
|
|
|
+ Log.error("接收 IBM MQ 消息失败", e);
|
|
|
return Result.fail("接收消息失败: " + e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
Result<List<String>> listResult = Result.success(messages);
|
|
|
- logger.info("最大接收数量:"+maxMessages);
|
|
|
- logger.info("队列名称:"+queueName);
|
|
|
+ Log.info("最大接收数量:"+maxMessages);
|
|
|
+ Log.info("队列名称:"+queueName);
|
|
|
if (Objects.nonNull(listResult.getData())&&listResult.getData().size()>0){
|
|
|
- logger.info("发送数量",listResult.getData().size());
|
|
|
+ Log.info("发送数量",listResult.getData().size());
|
|
|
}
|
|
|
return listResult;
|
|
|
} catch (Exception e) {
|
|
|
closeQueue(cacheKey, queue, consumerQueueCache);
|
|
|
- logger.error("IBM MQ 接收异常", e);
|
|
|
+ Log.error("IBM MQ 接收异常", e);
|
|
|
return Result.fail("IBM MQ 收异常: " + e.getMessage());
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- logger.error("IBM MQ 接收异常", e);
|
|
|
+ Log.error("IBM MQ 接收异常", e);
|
|
|
closeQueueManager(cacheKey,queueManager);
|
|
|
return Result.fail("IBM MQ 接收异常: " + e.getMessage());
|
|
|
}
|
|
@@ -268,15 +269,15 @@ public class IBMMQHelper {
|
|
|
try {
|
|
|
//绑定队列
|
|
|
queue = getconsumerMQQueue(queueManager, cacheKey, queueName);
|
|
|
- logger.info("*************************绑定队列深度");
|
|
|
- logger.info("队列深度:"+queue);
|
|
|
+ Log.info("*************************绑定队列深度");
|
|
|
+ Log.info("队列深度:"+queue);
|
|
|
if(queue == null){
|
|
|
return Result.fail("绑定队列失败");
|
|
|
}
|
|
|
//获取当前队列深度
|
|
|
int currentDepth = queue.getCurrentDepth();
|
|
|
- logger.info("获取队列深度");
|
|
|
- logger.info("队列深度:"+currentDepth);
|
|
|
+ Log.info("获取队列深度");
|
|
|
+ Log.info("队列深度:"+currentDepth);
|
|
|
if (currentDepth == 0) {//队列深度为0,返回空结果
|
|
|
return Result.success(new ArrayList<>());
|
|
|
}
|
|
@@ -300,7 +301,7 @@ public class IBMMQHelper {
|
|
|
if (e.reasonCode == MQConstants.MQRC_NO_MSG_AVAILABLE) {
|
|
|
break; // 没有更多消息,提前结束
|
|
|
} else {
|
|
|
- logger.error("接收 IBM MQ 消息失败", e);
|
|
|
+ Log.error("接收 IBM MQ 消息失败", e);
|
|
|
return Result.fail("接收消息失败: " + e.getMessage());
|
|
|
}
|
|
|
}
|
|
@@ -311,15 +312,15 @@ public class IBMMQHelper {
|
|
|
if (e.reasonCode == MQConstants.MQRC_NO_MSG_AVAILABLE) {
|
|
|
return Result.fail("没有可用消息");
|
|
|
}
|
|
|
- logger.error("接收 IBM MQ 消息失败", e);
|
|
|
+ Log.error("接收 IBM MQ 消息失败", e);
|
|
|
return Result.fail("接收消息失败: " + e.getMessage());
|
|
|
} catch (Exception e) {
|
|
|
closeQueue(cacheKey, queue, consumerQueueCache);
|
|
|
- logger.error("IBM MQ 接收异常", e);
|
|
|
+ Log.error("IBM MQ 接收异常", e);
|
|
|
return Result.fail("IBM MQ 收异常: " + e.getMessage());
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
- logger.error("IBM MQ 接收异常", e);
|
|
|
+ Log.error("IBM MQ 接收异常", e);
|
|
|
closeQueueManager(cacheKey,queueManager);
|
|
|
return Result.fail("IBM MQ 接收异常: " + e.getMessage());
|
|
|
}
|
|
@@ -348,10 +349,10 @@ public class IBMMQHelper {
|
|
|
if (queueManager != null && queueManager.isConnected()) {
|
|
|
return queueManager;
|
|
|
}
|
|
|
- logger.info("********************获取或创建 IBM MQ 队列管理器实例");
|
|
|
+ Log.info("********************获取或创建 IBM MQ 队列管理器实例");
|
|
|
if (!Objects.nonNull(mqConfig)){
|
|
|
- if (Objects.nonNull(mqConfig.get("host"))) logger.info("host:"+mqConfig.get("host"));
|
|
|
- if (Objects.nonNull(mqConfig.get("port"))) logger.info("端口:"+mqConfig.get("port"));
|
|
|
+ if (Objects.nonNull(mqConfig.get("host"))) Log.info("host:"+mqConfig.get("host"));
|
|
|
+ if (Objects.nonNull(mqConfig.get("port"))) Log.info("端口:"+mqConfig.get("port"));
|
|
|
}
|
|
|
|
|
|
|
|
@@ -381,11 +382,11 @@ public class IBMMQHelper {
|
|
|
ps -ef
|
|
|
// 将新创建的实例存入缓存
|
|
|
queueManagerCache.put(cacheKey, queueManager);
|
|
|
- logger.info("********************获取或创建 IBM MQ 队列管理器实例 end");
|
|
|
+ Log.info("********************获取或创建 IBM MQ 队列管理器实例 end");
|
|
|
return queueManager;
|
|
|
} catch (Exception e) {
|
|
|
// 记录连接失败的异常信息
|
|
|
- logger.error("创建 IBM MQ 连接失败", e);
|
|
|
+ Log.error("创建 IBM MQ 连接失败", e);
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
@@ -428,12 +429,12 @@ ps -ef
|
|
|
queueManagerCache.put(cacheKey, queueManager);
|
|
|
return queueManager;
|
|
|
}else {
|
|
|
- logger.error("创建MQ队列管理器失败: 队列名为空{}", mqConfig);
|
|
|
+ Log.error("创建MQ队列管理器失败: 队列名为空{}", mqConfig);
|
|
|
return null;
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
// 记录连接失败的异常信息
|
|
|
- logger.error("创建 IBM MQ 连接失败", e);
|
|
|
+ Log.error("创建 IBM MQ 连接失败", e);
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
@@ -471,7 +472,7 @@ ps -ef
|
|
|
return queue;
|
|
|
} catch (Exception e) {
|
|
|
// 记录队列绑定失败的异常信息
|
|
|
- logger.error("IBM MQ 绑定队列失败", e);
|
|
|
+ Log.error("IBM MQ 绑定队列失败", e);
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
@@ -514,7 +515,7 @@ ps -ef
|
|
|
return queue;
|
|
|
} catch (Exception e) {
|
|
|
// 记录队列绑定失败的异常信息
|
|
|
- logger.error("IBM MQ 绑定队列失败", e);
|
|
|
+ Log.error("IBM MQ 绑定队列失败", e);
|
|
|
return null;
|
|
|
}
|
|
|
}
|
|
@@ -575,7 +576,7 @@ ps -ef
|
|
|
}
|
|
|
} catch (MQException e) {
|
|
|
// 记录断开连接时发生的异常,但不中断后续执行
|
|
|
- logger.error("关闭 IBM MQ 队列管理器失败", e);
|
|
|
+ Log.error("关闭 IBM MQ 队列管理器失败", e);
|
|
|
} finally {
|
|
|
// 确保无论如何都使缓存条目失效以避免脏数据
|
|
|
queueManagerCache.invalidate(cacheKey);
|
|
@@ -591,7 +592,7 @@ ps -ef
|
|
|
}
|
|
|
} catch (MQException e) {
|
|
|
// 记录断开连接时发生的异常,但不中断后续执行
|
|
|
- logger.error("关闭 IBM MQ 队列失败", e);
|
|
|
+ Log.error("关闭 IBM MQ 队列失败", e);
|
|
|
} finally {
|
|
|
// 确保无论如何都使缓存条目失效以避免脏数据
|
|
|
QueueCache.invalidate(cacheKey);
|