|
@@ -3,14 +3,20 @@ package com.scbfkj.uni.process;
|
|
|
import cn.hutool.extra.spring.SpringUtil;
|
|
|
import com.alibaba.fastjson.JSONObject;
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
|
|
+import com.scbfkj.uni.dtos.responses.UniResult;
|
|
|
import com.scbfkj.uni.exceptions.UniException;
|
|
|
import com.scbfkj.uni.library.DataFormatUtil;
|
|
|
import com.scbfkj.uni.logs.Log;
|
|
|
import com.scbfkj.uni.utils.SystemUtil;
|
|
|
+import io.micrometer.common.util.StringUtils;
|
|
|
import jakarta.annotation.Resource;
|
|
|
import lombok.AllArgsConstructor;
|
|
|
import lombok.Data;
|
|
|
import lombok.NoArgsConstructor;
|
|
|
+import org.apache.kafka.clients.producer.KafkaProducer;
|
|
|
+import org.apache.kafka.clients.producer.Producer;
|
|
|
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
|
|
|
+import org.apache.rocketmq.acl.common.SessionCredentials;
|
|
|
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
|
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
|
@@ -18,15 +24,16 @@ import org.apache.rocketmq.client.exception.MQBrokerException;
|
|
|
import org.apache.rocketmq.client.exception.MQClientException;
|
|
|
import org.apache.rocketmq.client.producer.DefaultMQProducer;
|
|
|
import org.apache.rocketmq.client.producer.SendResult;
|
|
|
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
|
|
|
import org.apache.rocketmq.common.message.Message;
|
|
|
import org.apache.rocketmq.common.message.MessageExt;
|
|
|
+import org.apache.rocketmq.remoting.RPCHook;
|
|
|
import org.apache.rocketmq.remoting.common.RemotingHelper;
|
|
|
import org.apache.rocketmq.remoting.exception.RemotingException;
|
|
|
import org.springframework.jms.core.JmsTemplate;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
-import java.util.ArrayList;
|
|
|
-import java.util.List;
|
|
|
+import java.util.*;
|
|
|
|
|
|
/**
|
|
|
* @author l7871
|
|
@@ -39,20 +46,43 @@ public class RocketMQClient {
|
|
|
|
|
|
@Resource
|
|
|
private DataFormatUtil dataFormatUtil;
|
|
|
+ /**
|
|
|
+ * 因为生产者是线程安全的所以相同的连接只需要一个生产者实例 不需要对象池
|
|
|
+ */
|
|
|
+ private final Map<String, DefaultMQProducer> producerMap = new HashMap<>();
|
|
|
|
|
|
/**
|
|
|
* 读取配置文件中设置的rocketmq相关属性,创建消息生产者
|
|
|
*/
|
|
|
private DefaultMQProducer getRocketMqProducer(RocketMqConfig rocketMqConfig){
|
|
|
- // 1 创建消息生产者,指定生成组名
|
|
|
- DefaultMQProducer defaultMQProducer = new DefaultMQProducer(rocketMqConfig.getGroupId());
|
|
|
- // 2 指定NameServer的地址
|
|
|
- defaultMQProducer.setNamesrvAddr(rocketMqConfig.getAddr());
|
|
|
- // 3 设置消息超时时间
|
|
|
- defaultMQProducer.setSendMsgTimeout(Integer.parseInt("10000"));
|
|
|
- // 4 同步发送消息,如果SendMsgTimeout时间内没有发送成功,则重试retryWhenSendFailed次
|
|
|
- defaultMQProducer.setRetryTimesWhenSendFailed(Integer.parseInt("5"));
|
|
|
- return defaultMQProducer;
|
|
|
+ synchronized (producerMap) {
|
|
|
+ if (producerMap.containsKey(rocketMqConfig.getAddr() + rocketMqConfig.getGroupId())) {
|
|
|
+ return producerMap.get(rocketMqConfig.getAddr() + rocketMqConfig.getGroupId());
|
|
|
+ }
|
|
|
+ AclClientRPCHook hook = new AclClientRPCHook(new SessionCredentials(rocketMqConfig.getAccessKey(), rocketMqConfig.getSecretKey()));
|
|
|
+ // 1 创建消息生产者,指定生成组名
|
|
|
+ DefaultMQProducer defaultMQProducer = new DefaultMQProducer(rocketMqConfig.getGroupId(),hook);
|
|
|
+ // 2 指定NameServer的地址
|
|
|
+ defaultMQProducer.setNamesrvAddr(rocketMqConfig.getAddr());
|
|
|
+ // 设置ACL参数
|
|
|
+ defaultMQProducer.setVipChannelEnabled(false);
|
|
|
+ // 3 设置消息超时时间
|
|
|
+ defaultMQProducer.setSendMsgTimeout(Integer.parseInt("10000"));
|
|
|
+ // 4 同步发送消息,如果SendMsgTimeout时间内没有发送成功,则重试retryWhenSendFailed次
|
|
|
+ defaultMQProducer.setRetryTimesWhenSendFailed(Integer.parseInt("5"));
|
|
|
+
|
|
|
+ // 将新的生产者实例与连接字符串关联并保存到映射中。
|
|
|
+ producerMap.put(rocketMqConfig.getAddr() + rocketMqConfig.getGroupId(), defaultMQProducer);
|
|
|
+
|
|
|
+ try {
|
|
|
+ defaultMQProducer.start();
|
|
|
+ } catch (MQClientException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 返回新的生产者实例。
|
|
|
+ return defaultMQProducer;
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|
|
@@ -68,21 +98,40 @@ public class RocketMQClient {
|
|
|
* @param queueName 需要发送消息的队列名称
|
|
|
* @param data 要发送的消息内容,支持发送Object数组或实现Iterable接口的对象
|
|
|
*/
|
|
|
- public void sendMessage(RocketMqConfig rocketMqConfig,String topic,String tag, Object data)
|
|
|
+ public UniResult<String> sendMessage(RocketMqConfig rocketMqConfig,String topic,String tag, Object data,Object sign)
|
|
|
throws UniException, JsonProcessingException {
|
|
|
DefaultMQProducer rocketMqProducer = getRocketMqProducer(rocketMqConfig);
|
|
|
-
|
|
|
+ SendResult sendResult = new SendResult();
|
|
|
try {
|
|
|
- rocketMqProducer.start();
|
|
|
Message message = new Message(topic, tag, data.toString().getBytes());
|
|
|
+ if(!StringUtils.isEmpty(rocketMqConfig.getFirstMsgType())){
|
|
|
+ message.putUserProperty("firstMsgType",rocketMqConfig.getFirstMsgType());
|
|
|
+ }
|
|
|
+ if(!StringUtils.isEmpty(rocketMqConfig.getEncryptedFlag())){
|
|
|
+ message.putUserProperty("encryptedFlag",rocketMqConfig.getEncryptedFlag());
|
|
|
+ }
|
|
|
+ if(!StringUtils.isEmpty(rocketMqConfig.getAppId())){
|
|
|
+ message.putUserProperty("appId",rocketMqConfig.getAppId());
|
|
|
+ }
|
|
|
+ if(sign != null){
|
|
|
+ message.putUserProperty("sign",sign.toString());
|
|
|
+ }
|
|
|
+ sendResult = rocketMqProducer.send(message, 10000);
|
|
|
|
|
|
- SendResult result = null;
|
|
|
- result = rocketMqProducer.send(message);
|
|
|
+ if (sendResult.getSendStatus() == null) {
|
|
|
+ rocketMqProducer.shutdown();
|
|
|
+ producerMap.remove(rocketMqConfig.getAddr() + rocketMqConfig.getGroupId());
|
|
|
+ return UniResult.fail(String.valueOf(sendResult),"-101");
|
|
|
+ }
|
|
|
|
|
|
} catch (Exception e) {
|
|
|
+ rocketMqProducer.shutdown();
|
|
|
+ producerMap.remove(rocketMqConfig.getAddr() + rocketMqConfig.getGroupId());
|
|
|
+
|
|
|
throw new RuntimeException(e);
|
|
|
} finally {
|
|
|
- rocketMqProducer.shutdown();
|
|
|
+ return UniResult.success(String.valueOf(sendResult));
|
|
|
+// rocketMqProducer.shutdown();
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -92,6 +141,7 @@ public class RocketMQClient {
|
|
|
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer(rocketMqConfig.getGroupId() );
|
|
|
// 2 指定NameServer的地址
|
|
|
defaultMQPushConsumer.setNamesrvAddr(rocketMqConfig.getAddr());
|
|
|
+ defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//订阅的主题和标签(*代表所有标签)
|
|
|
// 3 指定消费者订阅的主题和标签
|
|
|
try {
|
|
|
defaultMQPushConsumer.subscribe(topic, tag);
|
|
@@ -118,23 +168,24 @@ public class RocketMQClient {
|
|
|
throws UniException, JsonProcessingException {
|
|
|
DefaultMQPushConsumer rocketMqConsumer = getRocketMqConsumer(rocketMqConfig, topic,tag);
|
|
|
|
|
|
- rocketMqConsumer.registerMessageListener((List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
|
|
|
-
|
|
|
- try {
|
|
|
- System.out.println("收到消息--》" + list);
|
|
|
- for (MessageExt messageExt : list) {
|
|
|
- String message=new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
|
|
|
- JSONObject object= JSONObject.parseObject(message);
|
|
|
- String fileId = (String) object.get("fileId");
|
|
|
- String fileCreateDate = (String) object.get("fileCreateDate");
|
|
|
+ rocketMqConsumer.registerMessageListener(
|
|
|
+ (List<MessageExt> list, ConsumeConcurrentlyContext context) -> {
|
|
|
+ List<String> messageList = new ArrayList<>();
|
|
|
+ try {
|
|
|
+ for (MessageExt messageExt : list) {
|
|
|
+ String message = new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET);
|
|
|
+ // JSONObject object= JSONObject.parseObject(message);
|
|
|
+
|
|
|
+ messageList.add(message);
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ throw new UniException(e);
|
|
|
+ }
|
|
|
+// return ListenerMQ(messageList);
|
|
|
+ return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
}
|
|
|
-
|
|
|
- } catch (Exception e) {
|
|
|
- throw new UniException(e);
|
|
|
- }
|
|
|
-
|
|
|
- return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
- });
|
|
|
+ );
|
|
|
|
|
|
// 5 启动消费者
|
|
|
try {
|
|
@@ -145,7 +196,10 @@ public class RocketMQClient {
|
|
|
|
|
|
return null;
|
|
|
}
|
|
|
-
|
|
|
+public UniResult ListenerMQ(List<String> messageList) throws UniException {
|
|
|
+ return UniResult.success(messageList);
|
|
|
+// return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
|
|
+}
|
|
|
|
|
|
@Data
|
|
|
@AllArgsConstructor
|
|
@@ -158,6 +212,12 @@ public class RocketMQClient {
|
|
|
private String secretKey;
|
|
|
|
|
|
private String groupId;
|
|
|
+
|
|
|
+ private String firstMsgType;
|
|
|
+
|
|
|
+ private String encryptedFlag;
|
|
|
+
|
|
|
+ private String appId;
|
|
|
}
|
|
|
}
|
|
|
|