项目中用到了ActiveMQ,本来是用spring方式写的,现在转换成jFinal给大家参考
只是一个粗略的插件,没有更严谨,大家可以自己改改
这个插件可以实现多链接(内部也有连接池)、多消息类型的发送
首先我是maven项目,加入依赖
<dependency> <groupId>com.jfinal</groupId> <artifactId>jfinal</artifactId> <version>2.2</version> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> <version>5.14.0</version> </dependency>
定义Sender类
package jfinal.plugin.activemq; import org.apache.activemq.pool.PooledConnection; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.Topic; /** * 消息发送者 * * @author 孙宇 */ public class JmsSender { private String name; private Session session; private MessageProducer producer; public JmsSender(String name, PooledConnection connection, Destination type, String subject) throws JMSException { this.name = name; // 事务性会话,自动确认消息 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地(Queue/Topic) if (type.equals(Destination.Topic)) { Topic destination = session.createTopic(subject); producer = session.createProducer(destination); } else { Queue destination = session.createQueue(subject); producer = session.createProducer(destination); } // 不持久化消息 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); } public String getName() { return name; } public Session getSession() { return session; } public void sendMessage(Message message) throws JMSException { producer.send(message); } }
定义Receiver类
package jfinal.plugin.activemq; import org.apache.activemq.pool.PooledConnection; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.MapMessage; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import javax.jms.Queue; import javax.jms.Session; import javax.jms.StreamMessage; import javax.jms.TextMessage; import javax.jms.Topic; import java.util.Enumeration; /** * 消息接收者 * * @author 孙宇 */ public class JmsReceiver implements MessageListener { private String name; private Session session; private MessageConsumer consumer; public JmsReceiver(String name, PooledConnection connection, Destination type, String subject) throws JMSException { this.name = name; // 事务性会话,自动确认消息 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地(Queue/Topic) if (type.equals(Destination.Topic)) { Topic destination = session.createTopic(subject); consumer = session.createConsumer(destination); } else { Queue destination = session.createQueue(subject); consumer = session.createConsumer(destination); } consumer.setMessageListener(this); } public String getName() { return name; } @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println(msg.getText()); } else if (message instanceof MapMessage) { MapMessage msg = (MapMessage) message; Enumeration enumer = msg.getMapNames(); while (enumer.hasMoreElements()) { Object obj = enumer.nextElement(); System.out.println(msg.getObject(obj.toString())); } } else if (message instanceof StreamMessage) { StreamMessage msg = (StreamMessage) message; System.out.println(msg.readString()); System.out.println(msg.readBoolean()); System.out.println(msg.readLong()); } else if (message instanceof ObjectMessage) { ObjectMessage msg = (ObjectMessage) message; System.out.println(msg); } else if (message instanceof BytesMessage) { BytesMessage msg = (BytesMessage) message; byte[] byteContent = new byte[1024]; int length = -1; StringBuffer content = new StringBuffer(); while ((length = msg.readBytes(byteContent)) != -1) { content.append(new String(byteContent, 0, length)); } System.out.println(content.toString()); } else { System.out.println(message); } } catch (Exception e) { e.printStackTrace(); } } }
定义类型
package jfinal.plugin.activemq; /** * 目标类型 * * @author 孙宇 */ public enum Destination { Queue, Topic }
activeMq插件
package jfinal.plugin.activemq; import com.jfinal.plugin.IPlugin; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.JMSException; /** * @author 孙宇 */ public class ActiveMQPlugin implements IPlugin { private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass()); private String url; private String name; public ActiveMQPlugin(String url, String name) { this.url = url; this.name = name; } public ActiveMQPlugin(String url) { this.url = url; this.name = ActiveMQ.defaultName; } @Override public boolean start() { logger.info("初始化activeMQ配置"); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setUserName(ActiveMQConnection.DEFAULT_USER); activeMQConnectionFactory.setPassword(ActiveMQConnection.DEFAULT_PASSWORD); activeMQConnectionFactory.setBrokerURL(url); activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息 PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); pooledConnectionFactory.setMaximumActiveSessionPerConnection(200); pooledConnectionFactory.setIdleTimeout(120); pooledConnectionFactory.setMaxConnections(5); pooledConnectionFactory.setBlockIfSessionPoolIsFull(true); try { PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection(); connection.start(); ActiveMQ.pooledConnectionMap.put(name, connection); } catch (JMSException e) { e.printStackTrace(); } return true; } @Override public boolean stop() { return true; } }
activeMQ工具
package jfinal.plugin.activemq; import org.apache.activemq.pool.PooledConnection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ConcurrentHashMap; /** * @author 孙宇 */ public class ActiveMQ { public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>(); public static final ConcurrentHashMap<String, JmsSender> senderMap = new ConcurrentHashMap<>(); public static final ConcurrentHashMap<String, JmsReceiver> receiverMap = new ConcurrentHashMap<>(); public static final String defaultName = "main"; private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass()); public static void addSender(JmsSender sender) { senderMap.put(sender.getName(), sender); } public static JmsSender getSender(String name) { return senderMap.get(name); } public static void addReceiver(JmsReceiver receiver) { receiverMap.put(receiver.getName(), receiver); } public static JmsReceiver getReceiver(String name) { return receiverMap.get(name); } public static void addConnection(String connectionName, PooledConnection connection) { pooledConnectionMap.put(connectionName, connection); } public static PooledConnection getConnection() { return pooledConnectionMap.get(defaultName); } public static PooledConnection getConnection(String connectionName) { return pooledConnectionMap.get(connectionName); } }
使用的时候,
ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000"); p.start();
当然,这样只是创建了链接池,但是没有生产者和消费者
还要添加这两个对象
String subject = "test"; ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者 ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者
发送一个消息
JmsSender sq1 = ActiveMQ.getSender("testSender1"); TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date()); sq1.sendMessage(msg);
我里面定义了很多消息类型,不一定非要文本型
一个测试类
package test.jfinal.plugin.activemq; import jfinal.plugin.activemq.ActiveMQ; import jfinal.plugin.activemq.ActiveMQPlugin; import jfinal.plugin.activemq.Destination; import jfinal.plugin.activemq.JmsReceiver; import jfinal.plugin.activemq.JmsSender; import javax.jms.JMSException; import javax.jms.TextMessage; import java.util.Date; /** * @author 孙宇 */ public class TestActiveMQPlugin { public static void main(String[] args) throws JMSException { ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000"); p.start(); String subject = "test"; ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者 ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者 for (int i = 0; i < 10; i++) { new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } try { JmsSender sq1 = ActiveMQ.getSender("testSender1"); TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date()); sq1.sendMessage(msg); } catch (JMSException e) { e.printStackTrace(); } } }.run(); } } }
有兴趣你也可以下载源码跑一下,执行TestActiveMQPlugin类就行
https://git.oschina.net/sypro/jfinalplugin.git
想看spring版的,看这个源码
https://git.oschina.net/sypro/demo.git