|
@@ -3,7 +3,14 @@ package com.scbfkj.uni.process;
|
|
|
|
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
|
|
import com.ibm.mq.MQException;
|
|
|
-import com.ibm.msg.client.wmq.compat.base.internal.*;
|
|
|
+import com.ibm.mq.jakarta.jms.MQDestination;
|
|
|
+import com.ibm.mq.jakarta.jms.MQQueueConnectionFactory;
|
|
|
+import com.scbfkj.uni.system.Config;
|
|
|
+import jakarta.jms.*;
|
|
|
+import org.springframework.jms.connection.CachingConnectionFactory;
|
|
|
+import org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter;
|
|
|
+import org.springframework.jms.core.JmsTemplate;
|
|
|
+import org.springframework.jms.core.MessageCreator;
|
|
|
import org.springframework.util.StringUtils;
|
|
|
|
|
|
import java.io.IOException;
|
|
@@ -11,75 +18,50 @@ import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
|
|
|
-import static com.ibm.msg.client.wmq.compat.base.internal.MQC.MQCCSI_Q_MGR;
|
|
|
-
|
|
|
|
|
|
public class IBMMQ {
|
|
|
|
|
|
|
|
|
- private MQQueueManager queueManager = null;
|
|
|
+ private JmsTemplate jmsTemplate = null;
|
|
|
+
|
|
|
private final ObjectMapper mapper = new ObjectMapper();
|
|
|
|
|
|
public List<String> receptionMessage(
|
|
|
- String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password, Long receiveTimeout, Long pollSize, Long retry) throws MQException, InterruptedException {
|
|
|
- MQQueueManager mqQueueManager = getMqQueueManager(host, port.intValue(), channel, queueManager, ccsid.intValue(), username, password);
|
|
|
- MQQueue queue = mqQueueManager.accessQueue(
|
|
|
- queueName,
|
|
|
- MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE
|
|
|
- );
|
|
|
- MQGetMessageOptions gmo = new MQGetMessageOptions();
|
|
|
- gmo.waitInterval = receiveTimeout.intValue();
|
|
|
+ String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password, Long receiveTimeout, Long pollSize, Long retry) throws MQException, InterruptedException, JMSException {
|
|
|
+ JmsTemplate template = getJmsTemplate(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
|
|
|
|
|
|
+ jmsTemplate.setReceiveTimeout(receiveTimeout);
|
|
|
long maxSize = 100;
|
|
|
if (pollSize > 0) {
|
|
|
maxSize = pollSize;
|
|
|
}
|
|
|
int maxRetry = 0;
|
|
|
List<String> result = new ArrayList<>();
|
|
|
- if (queue.getCurrentDepth() == 0) {
|
|
|
- return result;
|
|
|
- }
|
|
|
|
|
|
- while (queue.getCurrentDepth() > 0) {
|
|
|
+
|
|
|
+ while (true) {
|
|
|
try {
|
|
|
- MQMessage message = new MQMessage();
|
|
|
- queue.get(message, gmo);
|
|
|
- if (message.getDataLength() > 0) {
|
|
|
- String messageResult = message.readStringOfByteLength(message.getDataLength());
|
|
|
- result.add(messageResult);
|
|
|
- if (maxSize <= result.size()) {
|
|
|
- return result;
|
|
|
+ Message message = template.receive(queueName);
|
|
|
+ if (message == null) {
|
|
|
+ break;
|
|
|
+ } else if (message instanceof TextMessage msg) {
|
|
|
+ String text = msg.getText();
|
|
|
+ result.add(text);
|
|
|
+ if (result.size() >= maxSize) {
|
|
|
+ break;
|
|
|
}
|
|
|
}
|
|
|
} catch (Exception e) {
|
|
|
maxRetry++;
|
|
|
if (maxRetry > retry) {
|
|
|
- e.printStackTrace();
|
|
|
- try {
|
|
|
- if (queue.isOpen()) {
|
|
|
-
|
|
|
- queue.close();
|
|
|
- }
|
|
|
- } catch (Exception ex) {
|
|
|
- ex.printStackTrace();
|
|
|
+ if (Config.isDebug()) {
|
|
|
+ e.printStackTrace();
|
|
|
}
|
|
|
- try {
|
|
|
- if (mqQueueManager.isOpen()) {
|
|
|
- mqQueueManager.close();
|
|
|
- }
|
|
|
- } catch (Exception ex) {
|
|
|
- ex.printStackTrace();
|
|
|
+ if (jmsTemplate != null) {
|
|
|
+ jmsTemplate = null;
|
|
|
}
|
|
|
- return result;
|
|
|
- }
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- if (queue.isOpen()) {
|
|
|
|
|
|
- queue.close();
|
|
|
- }
|
|
|
- } catch (Exception ex) {
|
|
|
- ex.printStackTrace();
|
|
|
+ return result;
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -90,102 +72,92 @@ public class IBMMQ {
|
|
|
|
|
|
|
|
|
public void sendMessage(
|
|
|
- String host, Long port, String channel, String queueManager, String queueName, int ccsid, String username,
|
|
|
- String password,
|
|
|
- Long characterSet,
|
|
|
+ String host, Long port, String channel, String queueManager, String queueName, Long ccsid, String username, String password,
|
|
|
Object data
|
|
|
- ) throws MQException {
|
|
|
+ ) throws JMSException {
|
|
|
+
|
|
|
+ JmsTemplate template = getJmsTemplate(host, port.intValue(), ccsid.intValue(), queueManager, channel, username, password);
|
|
|
|
|
|
- MQQueueManager mqQueueManager = getMqQueueManager(host, port.intValue(), channel, queueManager, ccsid, username, password);
|
|
|
- MQQueue queue = mqQueueManager.accessQueue(
|
|
|
- queueName,
|
|
|
- MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT | MQC.MQOO_INQUIRE | MQC.MQOO_FAIL_IF_QUIESCING
|
|
|
- );
|
|
|
try {
|
|
|
if (data instanceof Object[] datas) {
|
|
|
for (Object it : datas) {
|
|
|
|
|
|
- execSend(characterSet, it, queue);
|
|
|
+ execSend(template, it, queueName);
|
|
|
}
|
|
|
} else if (data instanceof Iterable<?> datas) {
|
|
|
- datas.forEach(it -> {
|
|
|
- });
|
|
|
|
|
|
for (Object it : datas) {
|
|
|
|
|
|
- execSend(characterSet, it, queue);
|
|
|
+ execSend(template, it, queueName);
|
|
|
}
|
|
|
} else {
|
|
|
- execSend(characterSet, data, queue);
|
|
|
+ execSend(template, data, queueName);
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- try {
|
|
|
- if (queue.isOpen()) {
|
|
|
-
|
|
|
- queue.close();
|
|
|
- }
|
|
|
- } catch (Exception ex) {
|
|
|
- ex.printStackTrace();
|
|
|
+ if (Config.isDebug()) {
|
|
|
+ e.printStackTrace();
|
|
|
}
|
|
|
- try {
|
|
|
- if (mqQueueManager.isOpen()) {
|
|
|
- mqQueueManager.close();
|
|
|
- }
|
|
|
- } catch (Exception ex) {
|
|
|
- ex.printStackTrace();
|
|
|
+ if (jmsTemplate != null) {
|
|
|
+ jmsTemplate = null;
|
|
|
}
|
|
|
|
|
|
- } finally {
|
|
|
- try {
|
|
|
- if (queue.isOpen()) {
|
|
|
-
|
|
|
- queue.close();
|
|
|
- }
|
|
|
- } catch (Exception ex) {
|
|
|
- ex.printStackTrace();
|
|
|
- }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
|
|
|
private void execSend(
|
|
|
- Long characterSet,
|
|
|
+ JmsTemplate template,
|
|
|
Object data,
|
|
|
- MQQueue queue
|
|
|
+ String queueName
|
|
|
) throws IOException, MQException {
|
|
|
- MQMessage mqMessage = new MQMessage();
|
|
|
+ String message = mapper.writeValueAsString(data);
|
|
|
|
|
|
- if (characterSet == null) {
|
|
|
- mqMessage.characterSet = MQCCSI_Q_MGR;
|
|
|
- } else {
|
|
|
- mqMessage.characterSet = characterSet.intValue();
|
|
|
+ template.send(queueName, session -> session.createTextMessage(message));
|
|
|
+ }
|
|
|
|
|
|
- }
|
|
|
|
|
|
- mqMessage.write(mapper.writeValueAsBytes(data));
|
|
|
- MQPutMessageOptions messageOptions = new MQPutMessageOptions();
|
|
|
- queue.put(mqMessage, messageOptions); //发送消息
|
|
|
+ private MQQueueConnectionFactory createMqQueueConnectionFactory(String host, int port, int ccsid, String queueManager, String channel) throws JMSException {
|
|
|
+ MQQueueConnectionFactory mqQueueConnectionFactory = new MQQueueConnectionFactory();
|
|
|
+ mqQueueConnectionFactory.setHostName(host);
|
|
|
+ mqQueueConnectionFactory.setPort(port);
|
|
|
+ mqQueueConnectionFactory.setCCSID(ccsid);
|
|
|
+ mqQueueConnectionFactory.setQueueManager(queueManager);
|
|
|
+ mqQueueConnectionFactory.setChannel(channel);
|
|
|
+ mqQueueConnectionFactory.setTransportType(1);
|
|
|
+ return mqQueueConnectionFactory;
|
|
|
}
|
|
|
|
|
|
|
|
|
- private MQQueueManager getMqQueueManager(String host, int port, String channel, String queueManagerName, int ccsid, String username, String password) throws MQException {
|
|
|
+ private JmsTemplate createJmsTemplate(CachingConnectionFactory factory) {
|
|
|
+ JmsTemplate jmsTemplate = new JmsTemplate();
|
|
|
+ jmsTemplate.setConnectionFactory(factory);
|
|
|
+ return jmsTemplate;
|
|
|
+ }
|
|
|
+
|
|
|
+ private CachingConnectionFactory createCachingConnectionFactory(ConnectionFactory factory) {
|
|
|
+ CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
|
|
|
+ cachingConnectionFactory.setTargetConnectionFactory(factory);
|
|
|
+ return cachingConnectionFactory;
|
|
|
+ }
|
|
|
|
|
|
- if (null == queueManager) {
|
|
|
- HashMap<String, Object> prop = new HashMap<>();
|
|
|
- prop.put(MQC.HOST_NAME_PROPERTY, host);
|
|
|
- prop.put(MQC.PORT_PROPERTY, port);
|
|
|
- prop.put(MQC.CHANNEL_PROPERTY, channel);
|
|
|
- prop.put(MQC.CCSID_PROPERTY, ccsid);
|
|
|
- if (StringUtils.hasText(username) && StringUtils.hasText(password)) {
|
|
|
- prop.put(MQC.USER_ID_PROPERTY, username);
|
|
|
- prop.put(MQC.PASSWORD_PROPERTY, password);
|
|
|
- }
|
|
|
- queueManager = new MQQueueManager(queueManagerName, prop);
|
|
|
- }
|
|
|
- return queueManager;
|
|
|
+ private UserCredentialsConnectionFactoryAdapter createAdapter(String username, String password, ConnectionFactory factory) {
|
|
|
+ UserCredentialsConnectionFactoryAdapter adapter = new UserCredentialsConnectionFactoryAdapter();
|
|
|
+ adapter.setUsername(username);
|
|
|
+ adapter.setPassword(password);
|
|
|
+ adapter.setTargetConnectionFactory(factory);
|
|
|
+ return adapter;
|
|
|
}
|
|
|
|
|
|
+ private JmsTemplate getJmsTemplate(String host, int port, int ccsid, String queueManager, String channel, String username, String password) throws JMSException {
|
|
|
+ if (jmsTemplate == null) {
|
|
|
+ MQQueueConnectionFactory mqQueueConnectionFactory = createMqQueueConnectionFactory(host, port, ccsid, queueManager, channel);
|
|
|
+ UserCredentialsConnectionFactoryAdapter adapter = createAdapter(username, password, mqQueueConnectionFactory);
|
|
|
+ CachingConnectionFactory cachingConnectionFactory = createCachingConnectionFactory(adapter);
|
|
|
|
|
|
+ jmsTemplate = createJmsTemplate(cachingConnectionFactory);
|
|
|
+
|
|
|
+ }
|
|
|
+ return jmsTemplate;
|
|
|
+ }
|
|
|
}
|