Browse Source

ActiveMQ jms实现

andy 1 year ago
parent
commit
64b9f4618c
1 changed files with 142 additions and 0 deletions
  1. 142 0
      src/main/java/com/scbfkj/uni/process/ActiveMQ.java

+ 142 - 0
src/main/java/com/scbfkj/uni/process/ActiveMQ.java

@@ -0,0 +1,142 @@
+package com.scbfkj.uni.process;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.scbfkj.uni.system.Config;
+import jakarta.jms.JMSException;
+import jakarta.jms.Message;
+import jakarta.jms.TextMessage;
+import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
+import org.springframework.jms.connection.CachingConnectionFactory;
+import org.springframework.jms.core.JmsTemplate;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+public class ActiveMQ {
+
+
+    private JmsTemplate jmsTemplate;
+
+    private final ObjectMapper mapper = new ObjectMapper();
+
+    public List<String> receptionMessage(
+            String host, String username, String password, String virtualHost, String queueName, Long receiveTimeout, Long pollSize, Long retry) throws JMSException {
+        JmsTemplate template = getJmsTemplate(host, username, password, virtualHost);
+
+
+        jmsTemplate.setReceiveTimeout(receiveTimeout);
+        long maxSize = 100;
+        if (pollSize > 0) {
+            maxSize = pollSize;
+        }
+        int maxRetry = 0;
+        List<String> result = new ArrayList<>();
+
+
+        while (true) {
+            try {
+                Message message = template.receive(queueName);
+                if (message == null) {
+                    break;
+                } else if (message instanceof TextMessage msg) {
+                    String text = msg.getText();
+                    result.add(text);
+                    if (result.size() >= maxSize) {
+                        break;
+                    }
+                }
+            } catch (Exception e) {
+                maxRetry++;
+                if (maxRetry > retry) {
+                    if (Config.isDebug()) {
+                        e.printStackTrace();
+                    }
+                    if (jmsTemplate != null) {
+                        jmsTemplate = null;
+                    }
+
+                    return result;
+                }
+            }
+        }
+        return result;
+
+
+    }
+
+
+    public void sendMessage(
+            String host, String username, String password, String virtualHost, String queueName,
+            Object data
+    ) throws JMSException {
+
+        JmsTemplate template = getJmsTemplate(host, username, password, virtualHost);
+
+        try {
+            if (data instanceof Object[] datas) {
+                for (Object it : datas) {
+
+                    execSend(template, it, queueName);
+                }
+            } else if (data instanceof Iterable<?> datas) {
+
+                for (Object it : datas) {
+
+                    execSend(template, it, queueName);
+                }
+            } else {
+                execSend(template, data, queueName);
+            }
+
+        } catch (Exception e) {
+            if (Config.isDebug()) {
+                e.printStackTrace();
+            }
+            if (jmsTemplate != null) {
+                jmsTemplate = null;
+            }
+
+        }
+    }
+
+
+    private void execSend(
+            JmsTemplate template,
+            Object data,
+            String queueName) throws IOException {
+        String message = mapper.writeValueAsString(data);
+
+        template.send(queueName, session -> session.createTextMessage(message));
+    }
+
+
+    private jakarta.jms.ConnectionFactory createConnectionFactory(String host, String username, String password, String virtualHost) {
+        ActiveMQJMSConnectionFactory connectionFactory = new ActiveMQJMSConnectionFactory(host, username, password);
+
+        return connectionFactory;
+    }
+
+
+    private CachingConnectionFactory createCachingConnectionFactory(jakarta.jms.ConnectionFactory factory) {
+        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
+        cachingConnectionFactory.setTargetConnectionFactory(factory);
+        return cachingConnectionFactory;
+    }
+
+
+    private JmsTemplate getJmsTemplate(String host, String username, String password, String virtualHost) throws JMSException {
+        if (jmsTemplate == null) {
+            jakarta.jms.ConnectionFactory connectionFactory = createConnectionFactory(host, username, password, virtualHost);
+            CachingConnectionFactory cachingConnectionFactory = createCachingConnectionFactory(connectionFactory);
+            jmsTemplate = createJmsTemplate(cachingConnectionFactory);
+        }
+        return jmsTemplate;
+    }
+
+    private JmsTemplate createJmsTemplate(CachingConnectionFactory factory) {
+        JmsTemplate jmsTemplate = new JmsTemplate();
+        jmsTemplate.setConnectionFactory(factory);
+        return jmsTemplate;
+    }
+}