|
@@ -0,0 +1,144 @@
|
|
|
+package org.bfkj.protocol;
|
|
|
+
|
|
|
+
|
|
|
+import org.apache.activemq.ActiveMQConnectionFactory;
|
|
|
+import org.bfkj.utils.MapTools;
|
|
|
+
|
|
|
+import javax.jms.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+public class MyActiveMQ {
|
|
|
+ private ConnectionFactory connectionFactory;
|
|
|
+ private Connection connection;
|
|
|
+ private Session session;
|
|
|
+ private Destination destination;
|
|
|
+ private MessageConsumer consumer;
|
|
|
+ private MessageProducer producer;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 采集kafka数据
|
|
|
+ *
|
|
|
+ * @return 读取内容信息
|
|
|
+ */
|
|
|
+ public Map<String, Object> readMethod(String sourceObjectName, String connectConfig, String pollNumber) {
|
|
|
+ if (Objects.isNull(sourceObjectName) || Objects.isNull(connectConfig)) {
|
|
|
+ return MapTools.processFail(Objects.isNull(sourceObjectName) ? "队列为空" : "连接字符串为空");
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, Object> connectConfigMaps = (Map<String, Object>) MapTools.strToObj(connectConfig);
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (startConnection(connectConfigMaps, sourceObjectName)) {
|
|
|
+
|
|
|
+ int max = pollNumber == null ? 1 : Integer.parseInt(pollNumber);
|
|
|
+ if (max < 0) {
|
|
|
+ max = Math.abs(max);
|
|
|
+ }
|
|
|
+ if (consumer == null) {
|
|
|
+ consumer = session.createConsumer(destination);
|
|
|
+ }
|
|
|
+ ArrayList<String> result = new ArrayList<>();
|
|
|
+
|
|
|
+ for (int i = 0; i < max; i++) {
|
|
|
+ TextMessage receive = (TextMessage) consumer.receive(100);
|
|
|
+ if (Objects.nonNull(receive)) {
|
|
|
+ result.add(receive.getText());
|
|
|
+ session.commit();
|
|
|
+ } else {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return MapTools.processSuccess(result);
|
|
|
+ } else {
|
|
|
+
|
|
|
+ return MapTools.processFail("目标服务器连接失败");
|
|
|
+ }
|
|
|
+ } catch (JMSException e) {
|
|
|
+ return MapTools.processFail(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean startConnection(Map<String, Object> connectConfig, String sourceObjectName) throws JMSException {
|
|
|
+ if (null == connectionFactory) {
|
|
|
+ String host = connectConfig.get("host").toString();
|
|
|
+ Object username = connectConfig.get("username");
|
|
|
+ Object password = connectConfig.get("password");
|
|
|
+ connectionFactory = new ActiveMQConnectionFactory(username == null ? null : username.toString(), password == null ? null : password.toString(), host);
|
|
|
+ connection = connectionFactory.createConnection();
|
|
|
+ connection.start();
|
|
|
+ session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
|
|
|
+ destination = session.createQueue(sourceObjectName);
|
|
|
+ return true;
|
|
|
+ } else {
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 数据发送到kafka
|
|
|
+ *
|
|
|
+ * @return 发送结果信息
|
|
|
+ */
|
|
|
+ public Map<String, Object> sendMethod(String sourceObjectName, List<Object> data, String connectConfig) {
|
|
|
+ if (Objects.isNull(sourceObjectName) || Objects.isNull(data)) {
|
|
|
+ return MapTools.processFail(Objects.isNull(sourceObjectName) ? ("队列为空 " + data) : ("数据内容为空 " + data));
|
|
|
+ }
|
|
|
+ Map<String, Object> connectConfigMaps = (Map<String, Object>) MapTools.strToObj(connectConfig);
|
|
|
+ try {
|
|
|
+ if (startConnection(connectConfigMaps, sourceObjectName)) {
|
|
|
+
|
|
|
+ if (producer == null) {
|
|
|
+ producer = session.createProducer(destination);
|
|
|
+ }
|
|
|
+ for (Object datum : data) {
|
|
|
+ //创建一个文本消息
|
|
|
+ if (Objects.nonNull(datum)) {
|
|
|
+ TextMessage textMessage = session.createTextMessage(datum.toString());
|
|
|
+ producer.send(textMessage);//生产者发送消息
|
|
|
+ }
|
|
|
+ }
|
|
|
+ session.commit();//会话提交
|
|
|
+ return MapTools.processSuccess(null);
|
|
|
+ } else {
|
|
|
+ return MapTools.processFail("目标服务器连接失败");
|
|
|
+ }
|
|
|
+ } catch (JMSException e) {
|
|
|
+ return MapTools.processFail(e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭kafka信息
|
|
|
+ *
|
|
|
+ * @throws Exception
|
|
|
+ */
|
|
|
+ public void close() {
|
|
|
+
|
|
|
+ try {
|
|
|
+ if (Objects.nonNull(consumer)) {
|
|
|
+ consumer.close();
|
|
|
+ consumer = null;
|
|
|
+ }
|
|
|
+ if (Objects.nonNull(producer)) {
|
|
|
+ producer.close();
|
|
|
+ producer = null;
|
|
|
+ }
|
|
|
+ if (Objects.nonNull(session)) {
|
|
|
+ session.close();
|
|
|
+ session = null;
|
|
|
+ }
|
|
|
+ if (Objects.nonNull(connectionFactory)) {
|
|
|
+
|
|
|
+ connectionFactory = null;
|
|
|
+ }
|
|
|
+ } catch (JMSException e) {
|
|
|
+ throw new RuntimeException(e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|