Przeglądaj źródła

zeromq 方法修改

andy 1 rok temu
rodzic
commit
278c503eee
1 zmienionych plików z 24 dodań i 48 usunięć
  1. 24 48
      src/main/java/com/scbfkj/uni/process/ZeroMQ.java

+ 24 - 48
src/main/java/com/scbfkj/uni/process/ZeroMQ.java

@@ -1,82 +1,58 @@
 package com.scbfkj.uni.process;
 
-import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
-import org.apache.commons.pool2.KeyedObjectPool;
-import org.apache.commons.pool2.PooledObject;
+import org.apache.commons.pool2.*;
 import org.apache.commons.pool2.impl.DefaultPooledObject;
 import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
+import org.apache.commons.pool2.impl.GenericObjectPool;
 import org.zeromq.SocketType;
 import org.zeromq.ZContext;
 import org.zeromq.ZMQ;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
+import java.util.*;
 
 public class ZeroMQ {
 
-    private static final KeyedObjectPool<String, ZMQ.Socket> sockets = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<>() {
+    private static final ObjectPool<ZContext> contexts = new GenericObjectPool<>(new BasePooledObjectFactory<>() {
         @Override
-        public ZMQ.Socket create(String key) {
-            ZContext context = new ZContext();
-            ZMQ.Socket socket = context.createSocket(SocketType.REQ);
-            socket.connect(key);
-            return socket;
+        public ZContext create() {
+            return new ZContext();
         }
 
         @Override
-        public PooledObject<ZMQ.Socket> wrap(ZMQ.Socket value) {
-            return new DefaultPooledObject<>(value);
+        public PooledObject<ZContext> wrap(ZContext zContext) {
+            return new DefaultPooledObject<>(zContext);
         }
 
         @Override
-        public void destroyObject(String key, PooledObject<ZMQ.Socket> p) throws Exception {
-            ZMQ.Socket socket = p.getObject();
-            socket.close();
-            super.destroyObject(key, p);
+        public void destroyObject(PooledObject<ZContext> p) throws Exception {
+            p.getObject().close();
+            super.destroyObject(p);
         }
     });
 
 
     public void sendMessage(String url, List<String> messages) throws Exception {
 
-        ZMQ.Socket socket = null;
+//        ZMQ.Socket socket = getSocket(url);
+
+        ZContext context = null;
+        synchronized (contexts) {
+            context = contexts.borrowObject();
+        }
         try {
-            synchronized (sockets) {
-                socket = sockets.borrowObject(url);
-            }
             for (String message : messages) {
-                socket.send(message.getBytes(ZMQ.CHARSET), 0);
+                ZMQ.Socket socket = context.createSocket(SocketType.REQ);
+                socket.connect(url);
+                socket.send(message, 0);
             }
         } finally {
-            if (Objects.nonNull(socket)) {
-                sockets.returnObject(url, socket);
+            if (context != null) {
+                contexts.returnObject(context);
             }
         }
-    }
 
-    public List<String> receiveMessage(String url, Long pollSize) throws Exception {
-
-        ZMQ.Socket socket = null;
-        try {
-            synchronized (sockets) {
-                socket = sockets.borrowObject(url);
-            }
-            List<String> results = new ArrayList<>();
-            if (Objects.isNull(pollSize) || pollSize < 1) {
-                pollSize = 1L;
-            }
-            for (Long i = 0L; i < pollSize; i++) {
 
-                byte[] reply = socket.recv(0);
-                String value = new String(reply, ZMQ.CHARSET);
-                results.add(value);
-            }
-            return results;
-        } finally {
-            if (Objects.nonNull(socket)) {
-                sockets.returnObject(url, socket);
-            }
-        }
     }
+
+
 }