|
@@ -1,25 +1,27 @@
|
|
|
package com.scbfkj.uni.process;
|
|
|
|
|
|
import cn.hutool.extra.spring.SpringUtil;
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
import com.scbfkj.uni.exceptions.UniException;
|
|
|
import com.scbfkj.uni.library.DataFormatUtil;
|
|
|
import com.scbfkj.uni.logs.Log;
|
|
|
import com.scbfkj.uni.utils.SystemUtil;
|
|
|
import jakarta.annotation.Resource;
|
|
|
-import jakarta.jms.ConnectionFactory;
|
|
|
-import jakarta.jms.JMSException;
|
|
|
-import jakarta.jms.Message;
|
|
|
-import jakarta.jms.TextMessage;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
import lombok.NoArgsConstructor;
|
|
|
-import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
|
|
|
-import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
|
|
|
-import org.apache.commons.pool2.KeyedObjectPool;
|
|
|
-import org.apache.commons.pool2.PooledObject;
|
|
|
-import org.apache.commons.pool2.impl.DefaultPooledObject;
|
|
|
-import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
|
|
|
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
|
+import org.apache.rocketmq.client.exception.MQBrokerException;
|
|
|
+import org.apache.rocketmq.client.exception.MQClientException;
|
|
|
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
|
+import org.apache.rocketmq.client.producer.SendResult;
|
|
|
+import org.apache.rocketmq.common.message.Message;
|
|
|
+import org.apache.rocketmq.common.message.MessageExt;
|
|
|
+import org.apache.rocketmq.remoting.common.RemotingHelper;
|
|
|
+import org.apache.rocketmq.remoting.exception.RemotingException;
|
|
|
import org.springframework.jms.core.JmsTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
@@ -34,43 +36,25 @@ import java.util.List;
|
|
|
*/
|
|
|
@Component
|
|
|
public class RocketMQClient {
|
|
|
- /**
|
|
|
- * 消费者是非线程安全的所以需要一个对象池
|
|
|
- */
|
|
|
- private static final KeyedObjectPool<String, JmsTemplate> JMS_TEMPLATE_POOL =
|
|
|
- new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<>() {
|
|
|
- @Override
|
|
|
- public JmsTemplate create(String key) throws JsonProcessingException {
|
|
|
- RocketConnection rocketConnection = SystemUtil.OBJECT_MAPPER.readValue(key, RocketConnection.class);
|
|
|
- return createJmsTemplate(new ActiveMQJMSConnectionFactory(rocketConnection.getUsername(),
|
|
|
- rocketConnection.getPassword(),
|
|
|
- rocketConnection.getHost()));
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public PooledObject<JmsTemplate> wrap(JmsTemplate value) {
|
|
|
- return new DefaultPooledObject<>(value);
|
|
|
- }
|
|
|
- });
|
|
|
|
|
|
@Resource
|
|
|
private DataFormatUtil dataFormatUtil;
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
- * 创建JmsTemplate实例。 这个方法用于初始化一个新的JmsTemplate对象,并将其与给定的ConnectionFactory对象关联。
|
|
|
- *
|
|
|
- * @param factory ConnectionFactory对象,用于创建与消息服务的连接。
|
|
|
- *
|
|
|
- * @return 返回创建的JmsTemplate实例,这个实例可以用于发送消息。
|
|
|
+ * 读取配置文件中设置的rocketmq相关属性,创建消息生产者
|
|
|
*/
|
|
|
- private static JmsTemplate createJmsTemplate(ConnectionFactory factory) {
|
|
|
- JmsTemplate template = new JmsTemplate();
|
|
|
- // 设置ConnectionFactory到JmsTemplate
|
|
|
- template.setConnectionFactory(factory);
|
|
|
- // 返回配置好的JmsTemplate实例
|
|
|
- return template;
|
|
|
+ private DefaultMQProducer getRocketMqProducer(RocketMqConfig rocketMqConfig){
|
|
|
+ // 1 创建消息生产者,指定生成组名
|
|
|
+ DefaultMQProducer defaultMQProducer = new DefaultMQProducer(rocketMqConfig.getGroupId());
|
|
|
+ // 2 指定NameServer的地址
|
|
|
+ defaultMQProducer.setNamesrvAddr(rocketMqConfig.getAddr());
|
|
|
+ // 3 设置消息超时时间
|
|
|
+ defaultMQProducer.setSendMsgTimeout(Integer.parseInt("10000"));
|
|
|
+ // 4 同步发送消息,如果SendMsgTimeout时间内没有发送成功,则重试retryWhenSendFailed次
|
|
|
+ defaultMQProducer.setRetryTimesWhenSendFailed(Integer.parseInt("5"));
|
|
|
+ return defaultMQProducer;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 向指定队列发送消息。
|
|
|
* 此方法封装了向指定RocketMQ队列发送消息的逻辑,支持发送多种格式的数据。
|
|
@@ -84,57 +68,39 @@ public class RocketMQClient {
|
|
|
* @param queueName 需要发送消息的队列名称
|
|
|
* @param data 要发送的消息内容,支持发送Object数组或实现Iterable接口的对象
|
|
|
*/
|
|
|
- public void sendMessage(String host, String username, String password, String queueName, Object data)
|
|
|
+ public void sendMessage(RocketMqConfig rocketMqConfig,String topic,String tag, Object data)
|
|
|
throws UniException, JsonProcessingException {
|
|
|
- RocketConnection rocketConnection = new RocketConnection(host, username, password);
|
|
|
- String key = SystemUtil.OBJECT_MAPPER.writeValueAsString(rocketConnection);
|
|
|
- JmsTemplate template;
|
|
|
+ DefaultMQProducer rocketMqProducer = getRocketMqProducer(rocketMqConfig);
|
|
|
+
|
|
|
try {
|
|
|
- template = JMS_TEMPLATE_POOL.borrowObject(key);
|
|
|
+ rocketMqProducer.start();
|
|
|
+ Message message = new Message(topic, tag, data.toString().getBytes());
|
|
|
+
|
|
|
+ SendResult result = null;
|
|
|
+ result = rocketMqProducer.send(message);
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
- throw new UniException(e);
|
|
|
- }
|
|
|
- try {
|
|
|
- // 判断数据类型,支持发送Object数组或实现Iterable接口的对象
|
|
|
- if (data instanceof Object[] datas) {
|
|
|
- // 如果是对象数组,则循环发送每一个对象
|
|
|
- for (Object it : datas) {
|
|
|
- execSend(template, it, queueName);
|
|
|
- }
|
|
|
- } else if (data instanceof Iterable<?> datas) {
|
|
|
- // 如果是实现了Iterable接口的对象,则同样循环发送每一个对象
|
|
|
- for (Object it : datas) {
|
|
|
- execSend(template, it, queueName);
|
|
|
- }
|
|
|
- } else {
|
|
|
- // 如果是单个对象或不支持的类型,则直接发送
|
|
|
- execSend(template, data, queueName);
|
|
|
- }
|
|
|
+ throw new RuntimeException(e);
|
|
|
} finally {
|
|
|
- if (template != null) {
|
|
|
- try {
|
|
|
- JMS_TEMPLATE_POOL.returnObject(key, template);
|
|
|
- } catch (Exception e) {
|
|
|
- SpringUtil.getBean(Log.class).error("JmsTemplate 归还失败", e);
|
|
|
- }
|
|
|
- }
|
|
|
+ rocketMqProducer.shutdown();
|
|
|
}
|
|
|
}
|
|
|
-
|
|
|
- /**
|
|
|
- * 执行发送消息操作。此方法通过JmsTemplate发送消息到指定的队列。
|
|
|
- *
|
|
|
- * @param template JmsTemplate实例,用于发送消息。这是一个Spring提供的用于发送JMS消息的模板类,简化了消息发送的操作。
|
|
|
- * @param data 要发送的数据。此参数将被转换为JSON字符串后发送。
|
|
|
- * @param queueName 消息发送的目的地队列名称。指定消息要发送到的队列。
|
|
|
- */
|
|
|
- private void execSend(JmsTemplate template, Object data, String queueName) {
|
|
|
- // 将数据转换为JSON字符串
|
|
|
- String message = dataFormatUtil.toString(data);
|
|
|
- // 使用JmsTemplate发送消息到指定队列
|
|
|
- template.send(queueName, session -> session.createTextMessage(message));
|
|
|
+
|
|
|
+ private DefaultMQPushConsumer getRocketMqConsumer(RocketMqConfig rocketMqConfig,String topic,String tag)
|
|
|
+ throws UniException {
|
|
|
+ // 1 创建消费者,指定所属的消费者组名
|
|
|
+ DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(rocketMqConfig.getGroupId() );
|
|
|
+ // 2 指定NameServer的地址
|
|
|
+ defaultMQPushConsumer.setNamesrvAddr(rocketMqConfig.getAddr());
|
|
|
+ // 3 指定消费者订阅的主题和标签
|
|
|
+ try {
|
|
|
+ defaultMQPushConsumer.subscribe(topic, tag);
|
|
|
+ } catch (MQClientException e) {
|
|
|
+ throw new UniException(e);
|
|
|
+ }
|
|
|
+ return defaultMQPushConsumer;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
/**
|
|
|
* 从指定队列接收消息。
|
|
|
*
|
|
@@ -148,111 +114,50 @@ public class RocketMQClient {
|
|
|
*
|
|
|
* @return 返回接收到的消息列表
|
|
|
*/
|
|
|
- public List<String> receptionMessage(String host, String username, String password, String queueName,
|
|
|
- Long receiveTimeout, Long pollSize, Long retry)
|
|
|
+ public List<String> receptionMessage(RocketMqConfig rocketMqConfig,String topic,String tag)
|
|
|
throws UniException, JsonProcessingException {
|
|
|
- RocketConnection rocketConnection = new RocketConnection(host, username, password);
|
|
|
- String key = SystemUtil.OBJECT_MAPPER.writeValueAsString(rocketConnection);
|
|
|
- JmsTemplate jmsTemplate;
|
|
|
- try {
|
|
|
- jmsTemplate = JMS_TEMPLATE_POOL.borrowObject(key);
|
|
|
- } catch (Exception e) {
|
|
|
- throw new UniException(e);
|
|
|
- }
|
|
|
- try {
|
|
|
- jmsTemplate.setReceiveTimeout(receiveTimeout);
|
|
|
- // 默认最大接收消息数量
|
|
|
- long maxSize = pollSize > 0 ? pollSize : 100;
|
|
|
- // 记录消息接收失败的重试次数
|
|
|
- int maxRetry = 0;
|
|
|
- // 用于存储接收到的消息
|
|
|
- List<String> result = new ArrayList<>();
|
|
|
- long lastMessageDateTime = System.currentTimeMillis();
|
|
|
- // 循环接收消息,直到满足退出条件
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- // 尝试接收消息
|
|
|
- Message message = jmsTemplate.receive(queueName);
|
|
|
- boolean isBreak = getMessage(message, result, maxSize);
|
|
|
- if (! isBreak && message == null &&
|
|
|
- System.currentTimeMillis() - lastMessageDateTime > receiveTimeout) {
|
|
|
- isBreak = true;
|
|
|
- }
|
|
|
- if (isBreak) {
|
|
|
- break;
|
|
|
- }
|
|
|
- } catch (Exception e) { // 捕获接收消息过程中可能抛出的异常
|
|
|
- maxRetry++;
|
|
|
- boolean isReturn = reTry(retry, maxRetry);
|
|
|
- if (isReturn) {
|
|
|
- return result;
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- // 返回接收到的消息列表
|
|
|
- return result;
|
|
|
- } finally {
|
|
|
+ DefaultMQPushConsumer rocketMqConsumer = getRocketMqConsumer(rocketMqConfig, topic,tag);
|
|
|
+
|
|
|
+ rocketMqConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
|
|
|
+
|
|
|
try {
|
|
|
- JMS_TEMPLATE_POOL.returnObject(key, jmsTemplate);
|
|
|
+ System.out.println("收到消息--》" + list);
|
|
|
+ for (MessageExt messageExt : list) {
|
|
|
+ String message=new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
|
|
|
+ JSONObject object= JSONObject.parseObject(message);
|
|
|
+ String fileId = (String) object.get("fileId");
|
|
|
+ String fileCreateDate = (String) object.get("fileCreateDate");
|
|
|
+ }
|
|
|
+
|
|
|
} catch (Exception e) {
|
|
|
- SpringUtil.getBean(Log.class).error("JmsTemplate 归还失败", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 尝试从给定的消息对象中获取消息文本,并将其添加到结果列表中。 当达到最大消息数量或未找到消息时,停止获取。
|
|
|
- *
|
|
|
- * @param message 待处理的消息对象,如果为null表示没有消息可处理。
|
|
|
- * @param result 用于收集消息文本的结果列表。
|
|
|
- * @param maxSize 最大消息数量,当接收的消息数量达到此值时,将停止接收。
|
|
|
- *
|
|
|
- * @return 如果找到并处理了消息,返回true;如果未找到消息或已处理达到最大数量,返回false。
|
|
|
- *
|
|
|
- * @throws JMSException 如果处理消息时发生错误。
|
|
|
- */
|
|
|
- private static boolean getMessage(Message message, List<String> result, long maxSize) throws JMSException {
|
|
|
- // 标记是否找到(接收到)消息
|
|
|
- boolean found = message == null;
|
|
|
- if (found) {
|
|
|
- // 如果没有消息,则退出循环
|
|
|
- return false;
|
|
|
- }
|
|
|
- // 处理文本消息
|
|
|
- if (message instanceof TextMessage msg) {
|
|
|
- // 获取消息文本
|
|
|
- String text = msg.getText();
|
|
|
- // 将消息添加到结果列表
|
|
|
- result.add(text);
|
|
|
- // 如果接收的消息数量达到最大值,退出循环
|
|
|
- if (result.size() >= maxSize) {
|
|
|
- found = true;
|
|
|
+ throw new UniException(e);
|
|
|
}
|
|
|
+
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+ });
|
|
|
+
|
|
|
+ // 5 启动消费者
|
|
|
+ try {
|
|
|
+ rocketMqConsumer.start();
|
|
|
+ } catch (MQClientException e) {
|
|
|
+ throw new UniException(e);
|
|
|
}
|
|
|
- return found;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 尝试重新接收消息的函数。 对于给定的主机、用户名、密码和队列名称,如果接收消息失败,将根据设定的最大重试次数进行重试。
|
|
|
- *
|
|
|
- * @param retry 当前的重试次数。
|
|
|
- * @param maxRetry 允许的最大重试次数。
|
|
|
- *
|
|
|
- * @return 如果重试次数超过最大重试次数,返回已接收的消息列表;否则返回空列表。
|
|
|
- */
|
|
|
- private boolean reTry(Long retry, int maxRetry) {
|
|
|
- return maxRetry > retry;
|
|
|
+
|
|
|
+ return null;
|
|
|
}
|
|
|
-
|
|
|
+
|
|
|
+
|
|
|
@Data
|
|
|
@AllArgsConstructor
|
|
|
@NoArgsConstructor
|
|
|
- private static class RocketConnection {
|
|
|
- private String host;
|
|
|
-
|
|
|
- private String username;
|
|
|
-
|
|
|
- private String password;
|
|
|
+ static class RocketMqConfig {
|
|
|
+ private String addr;
|
|
|
+
|
|
|
+ private String accessKey;
|
|
|
+
|
|
|
+ private String secretKey;
|
|
|
+
|
|
|
+ private String groupId;
|
|
|
}
|
|
|
}
|
|
|
|