项目中用到了ActiveMQ,本来是用spring方式写的,现在转换成jFinal给大家参考
只是一个粗略的插件,没有更严谨,大家可以自己改改
这个插件可以实现多链接(内部也有连接池)、多消息类型的发送
首先我是maven项目,加入依赖
- <dependency>
- <groupId>com.jfinal</groupId>
- <artifactId>jfinal</artifactId>
- <version>2.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.activemq</groupId>
- <artifactId>activemq-pool</artifactId>
- <version>5.14.0</version>
- </dependency>
定义Sender类
- package jfinal.plugin.activemq;
- import org.apache.activemq.pool.PooledConnection;
- import javax.jms.DeliveryMode;
- import javax.jms.JMSException;
- import javax.jms.Message;
- import javax.jms.MessageProducer;
- import javax.jms.Queue;
- import javax.jms.Session;
- import javax.jms.Topic;
- /**
- * 消息发送者
- *
- * @author 孙宇
- */
- public class JmsSender {
- private String name;
- private Session session;
- private MessageProducer producer;
- public JmsSender(String name,
- PooledConnection connection,
- Destination type,
- String subject) throws JMSException {
- this.name = name;
- // 事务性会话,自动确认消息
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 消息的目的地(Queue/Topic)
- if (type.equals(Destination.Topic)) {
- Topic destination = session.createTopic(subject);
- producer = session.createProducer(destination);
- } else {
- Queue destination = session.createQueue(subject);
- producer = session.createProducer(destination);
- }
- // 不持久化消息
- producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
- public String getName() {
- return name;
- }
- public Session getSession() {
- return session;
- }
- public void sendMessage(Message message) throws JMSException {
- producer.send(message);
- }
- }
定义Receiver类
- package jfinal.plugin.activemq;
- import org.apache.activemq.pool.PooledConnection;
- import javax.jms.BytesMessage;
- import javax.jms.JMSException;
- import javax.jms.MapMessage;
- import javax.jms.Message;
- import javax.jms.MessageConsumer;
- import javax.jms.MessageListener;
- import javax.jms.ObjectMessage;
- import javax.jms.Queue;
- import javax.jms.Session;
- import javax.jms.StreamMessage;
- import javax.jms.TextMessage;
- import javax.jms.Topic;
- import java.util.Enumeration;
- /**
- * 消息接收者
- *
- * @author 孙宇
- */
- public class JmsReceiver implements MessageListener {
- private String name;
- private Session session;
- private MessageConsumer consumer;
- public JmsReceiver(String name,
- PooledConnection connection,
- Destination type,
- String subject) throws JMSException {
- this.name = name;
- // 事务性会话,自动确认消息
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // 消息的目的地(Queue/Topic)
- if (type.equals(Destination.Topic)) {
- Topic destination = session.createTopic(subject);
- consumer = session.createConsumer(destination);
- } else {
- Queue destination = session.createQueue(subject);
- consumer = session.createConsumer(destination);
- }
- consumer.setMessageListener(this);
- }
- public String getName() {
- return name;
- }
- @Override
- public void onMessage(Message message) {
- try {
- if (message instanceof TextMessage) {
- TextMessage msg = (TextMessage) message;
- System.out.println(msg.getText());
- } else if (message instanceof MapMessage) {
- MapMessage msg = (MapMessage) message;
- Enumeration enumer = msg.getMapNames();
- while (enumer.hasMoreElements()) {
- Object obj = enumer.nextElement();
- System.out.println(msg.getObject(obj.toString()));
- }
- } else if (message instanceof StreamMessage) {
- StreamMessage msg = (StreamMessage) message;
- System.out.println(msg.readString());
- System.out.println(msg.readBoolean());
- System.out.println(msg.readLong());
- } else if (message instanceof ObjectMessage) {
- ObjectMessage msg = (ObjectMessage) message;
- System.out.println(msg);
- } else if (message instanceof BytesMessage) {
- BytesMessage msg = (BytesMessage) message;
- byte[] byteContent = new byte[1024];
- int length = -1;
- StringBuffer content = new StringBuffer();
- while ((length = msg.readBytes(byteContent)) != -1) {
- content.append(new String(byteContent, 0, length));
- }
- System.out.println(content.toString());
- } else {
- System.out.println(message);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
定义类型
- package jfinal.plugin.activemq;
- /**
- * 目标类型
- *
- * @author 孙宇
- */
- public enum Destination {
- Queue, Topic
- }
activeMq插件
- package jfinal.plugin.activemq;
- import com.jfinal.plugin.IPlugin;
- import org.apache.activemq.ActiveMQConnection;
- import org.apache.activemq.ActiveMQConnectionFactory;
- import org.apache.activemq.pool.PooledConnection;
- import org.apache.activemq.pool.PooledConnectionFactory;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import javax.jms.JMSException;
- /**
- * @author 孙宇
- */
- public class ActiveMQPlugin implements IPlugin {
- private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
- private String url;
- private String name;
- public ActiveMQPlugin(String url,
- String name) {
- this.url = url;
- this.name = name;
- }
- public ActiveMQPlugin(String url) {
- this.url = url;
- this.name = ActiveMQ.defaultName;
- }
- @Override
- public boolean start() {
- logger.info("初始化activeMQ配置");
- ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
- activeMQConnectionFactory.setUserName(ActiveMQConnection.DEFAULT_USER);
- activeMQConnectionFactory.setPassword(ActiveMQConnection.DEFAULT_PASSWORD);
- activeMQConnectionFactory.setBrokerURL(url);
- activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息
- PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
- pooledConnectionFactory.setMaximumActiveSessionPerConnection(200);
- pooledConnectionFactory.setIdleTimeout(120);
- pooledConnectionFactory.setMaxConnections(5);
- pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
- try {
- PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection();
- connection.start();
- ActiveMQ.pooledConnectionMap.put(name, connection);
- } catch (JMSException e) {
- e.printStackTrace();
- }
- return true;
- }
- @Override
- public boolean stop() {
- return true;
- }
- }
activeMQ工具
- package jfinal.plugin.activemq;
- import org.apache.activemq.pool.PooledConnection;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import java.util.concurrent.ConcurrentHashMap;
- /**
- * @author 孙宇
- */
- public class ActiveMQ {
- public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>();
- public static final ConcurrentHashMap<String, JmsSender> senderMap = new ConcurrentHashMap<>();
- public static final ConcurrentHashMap<String, JmsReceiver> receiverMap = new ConcurrentHashMap<>();
- public static final String defaultName = "main";
- private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
- public static void addSender(JmsSender sender) {
- senderMap.put(sender.getName(), sender);
- }
- public static JmsSender getSender(String name) {
- return senderMap.get(name);
- }
- public static void addReceiver(JmsReceiver receiver) {
- receiverMap.put(receiver.getName(), receiver);
- }
- public static JmsReceiver getReceiver(String name) {
- return receiverMap.get(name);
- }
- public static void addConnection(String connectionName,
- PooledConnection connection) {
- pooledConnectionMap.put(connectionName, connection);
- }
- public static PooledConnection getConnection() {
- return pooledConnectionMap.get(defaultName);
- }
- public static PooledConnection getConnection(String connectionName) {
- return pooledConnectionMap.get(connectionName);
- }
- }
使用的时候,
- ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
- p.start();
当然,这样只是创建了链接池,但是没有生产者和消费者
还要添加这两个对象
- String subject = "test";
- ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
- ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者
发送一个消息
- JmsSender sq1 = ActiveMQ.getSender("testSender1");
- TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
- sq1.sendMessage(msg);
我里面定义了很多消息类型,不一定非要文本型
一个测试类
- package test.jfinal.plugin.activemq;
- import jfinal.plugin.activemq.ActiveMQ;
- import jfinal.plugin.activemq.ActiveMQPlugin;
- import jfinal.plugin.activemq.Destination;
- import jfinal.plugin.activemq.JmsReceiver;
- import jfinal.plugin.activemq.JmsSender;
- import javax.jms.JMSException;
- import javax.jms.TextMessage;
- import java.util.Date;
- /**
- * @author 孙宇
- */
- public class TestActiveMQPlugin {
- public static void main(String[] args) throws JMSException {
- ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
- p.start();
- String subject = "test";
- ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
- ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者
- for (int i = 0; i < 10; i++) {
- new Runnable() {
- @Override
- public void run() {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- try {
- JmsSender sq1 = ActiveMQ.getSender("testSender1");
- TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
- sq1.sendMessage(msg);
- } catch (JMSException e) {
- e.printStackTrace();
- }
- }
- }.run();
- }
- }
- }
有兴趣你也可以下载源码跑一下,执行TestActiveMQPlugin类就行
- https://git.oschina.net/sypro/jfinalplugin.git
想看spring版的,看这个源码
- https://git.oschina.net/sypro/demo.git