|
@@ -0,0 +1,82 @@
|
|
|
+package com.scbfkj.uni.process;
|
|
|
+
|
|
|
+import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
|
|
|
+import org.apache.commons.pool2.KeyedObjectPool;
|
|
|
+import org.apache.commons.pool2.PooledObject;
|
|
|
+import org.apache.commons.pool2.impl.DefaultPooledObject;
|
|
|
+import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
|
|
|
+import org.zeromq.SocketType;
|
|
|
+import org.zeromq.ZContext;
|
|
|
+import org.zeromq.ZMQ;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Objects;
|
|
|
+
|
|
|
+public class ZeroMQ {
|
|
|
+
|
|
|
+ private static final KeyedObjectPool<String, ZMQ.Socket> sockets = new GenericKeyedObjectPool<>(new BaseKeyedPooledObjectFactory<>() {
|
|
|
+ @Override
|
|
|
+ public ZMQ.Socket create(String key) {
|
|
|
+ ZContext context = new ZContext();
|
|
|
+ ZMQ.Socket socket = context.createSocket(SocketType.REQ);
|
|
|
+ socket.connect(key);
|
|
|
+ return socket;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public PooledObject<ZMQ.Socket> wrap(ZMQ.Socket value) {
|
|
|
+ return new DefaultPooledObject<>(value);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void destroyObject(String key, PooledObject<ZMQ.Socket> p) throws Exception {
|
|
|
+ ZMQ.Socket socket = p.getObject();
|
|
|
+ socket.close();
|
|
|
+ super.destroyObject(key, p);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+
|
|
|
+ public void sendMessage(String url, List<String> messages) throws Exception {
|
|
|
+
|
|
|
+ ZMQ.Socket socket = null;
|
|
|
+ try {
|
|
|
+ synchronized (sockets) {
|
|
|
+ socket = sockets.borrowObject(url);
|
|
|
+ }
|
|
|
+ for (String message : messages) {
|
|
|
+ socket.send(message.getBytes(ZMQ.CHARSET), 0);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(socket)) {
|
|
|
+ sockets.returnObject(url, socket);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<String> receiveMessage(String url, Long pollSize) throws Exception {
|
|
|
+
|
|
|
+ ZMQ.Socket socket = null;
|
|
|
+ try {
|
|
|
+ synchronized (sockets) {
|
|
|
+ socket = sockets.borrowObject(url);
|
|
|
+ }
|
|
|
+ List<String> results = new ArrayList<>();
|
|
|
+ if (Objects.isNull(pollSize) || pollSize < 1) {
|
|
|
+ pollSize = 1L;
|
|
|
+ }
|
|
|
+ for (Long i = 0L; i < pollSize; i++) {
|
|
|
+
|
|
|
+ byte[] reply = socket.recv(0);
|
|
|
+ String value = new String(reply, ZMQ.CHARSET);
|
|
|
+ results.add(value);
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ } finally {
|
|
|
+ if (Objects.nonNull(socket)) {
|
|
|
+ sockets.returnObject(url, socket);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|