jFinal ActiveMQ插件

项目中用到了ActiveMQ,本来是用spring方式写的,现在转换成jFinal给大家参考

只是一个粗略的插件,没有更严谨,大家可以自己改改

这个插件可以实现多链接(内部也有连接池)、多消息类型的发送

首先我是maven项目,加入依赖

  1. <dependency>
  2.     <groupId>com.jfinal</groupId>
  3.     <artifactId>jfinal</artifactId>
  4.     <version>2.2</version>
  5. </dependency>
  6. <dependency>
  7.     <groupId>org.apache.activemq</groupId>
  8.     <artifactId>activemq-pool</artifactId>
  9.     <version>5.14.0</version>
  10. </dependency>



定义Sender类

  1. package jfinal.plugin.activemq;
  2. import org.apache.activemq.pool.PooledConnection;
  3. import javax.jms.DeliveryMode;
  4. import javax.jms.JMSException;
  5. import javax.jms.Message;
  6. import javax.jms.MessageProducer;
  7. import javax.jms.Queue;
  8. import javax.jms.Session;
  9. import javax.jms.Topic;
  10. /**
  11.  * 消息发送者
  12.  *
  13.  * @author 孙宇
  14.  */
  15. public class JmsSender {
  16.     private String name;
  17.     private Session session;
  18.     private MessageProducer producer;
  19.     public JmsSender(String name,
  20.                      PooledConnection connection,
  21.                      Destination type,
  22.                      String subject) throws JMSException {
  23.         this.name = name;
  24.         // 事务性会话,自动确认消息
  25.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  26.         // 消息的目的地(Queue/Topic)
  27.         if (type.equals(Destination.Topic)) {
  28.             Topic destination = session.createTopic(subject);
  29.             producer = session.createProducer(destination);
  30.         } else {
  31.             Queue destination = session.createQueue(subject);
  32.             producer = session.createProducer(destination);
  33.         }
  34.         // 不持久化消息
  35.         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
  36.     }
  37.     public String getName() {
  38.         return name;
  39.     }
  40.     public Session getSession() {
  41.         return session;
  42.     }
  43.     public void sendMessage(Message message) throws JMSException {
  44.         producer.send(message);
  45.     }
  46. }





定义Receiver类

  1. package jfinal.plugin.activemq;
  2. import org.apache.activemq.pool.PooledConnection;
  3. import javax.jms.BytesMessage;
  4. import javax.jms.JMSException;
  5. import javax.jms.MapMessage;
  6. import javax.jms.Message;
  7. import javax.jms.MessageConsumer;
  8. import javax.jms.MessageListener;
  9. import javax.jms.ObjectMessage;
  10. import javax.jms.Queue;
  11. import javax.jms.Session;
  12. import javax.jms.StreamMessage;
  13. import javax.jms.TextMessage;
  14. import javax.jms.Topic;
  15. import java.util.Enumeration;
  16. /**
  17.  * 消息接收者
  18.  *
  19.  * @author 孙宇
  20.  */
  21. public class JmsReceiver implements MessageListener {
  22.     private String name;
  23.     private Session session;
  24.     private MessageConsumer consumer;
  25.     public JmsReceiver(String name,
  26.                        PooledConnection connection,
  27.                        Destination type,
  28.                        String subject) throws JMSException {
  29.         this.name = name;
  30.         // 事务性会话,自动确认消息
  31.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  32.         // 消息的目的地(Queue/Topic)
  33.         if (type.equals(Destination.Topic)) {
  34.             Topic destination = session.createTopic(subject);
  35.             consumer = session.createConsumer(destination);
  36.         } else {
  37.             Queue destination = session.createQueue(subject);
  38.             consumer = session.createConsumer(destination);
  39.         }
  40.         consumer.setMessageListener(this);
  41.     }
  42.     public String getName() {
  43.         return name;
  44.     }
  45.     @Override
  46.     public void onMessage(Message message) {
  47.         try {
  48.             if (message instanceof TextMessage) {
  49.                 TextMessage msg = (TextMessage) message;
  50.                 System.out.println(msg.getText());
  51.             } else if (message instanceof MapMessage) {
  52.                 MapMessage msg = (MapMessage) message;
  53.                 Enumeration enumer = msg.getMapNames();
  54.                 while (enumer.hasMoreElements()) {
  55.                     Object obj = enumer.nextElement();
  56.                     System.out.println(msg.getObject(obj.toString()));
  57.                 }
  58.             } else if (message instanceof StreamMessage) {
  59.                 StreamMessage msg = (StreamMessage) message;
  60.                 System.out.println(msg.readString());
  61.                 System.out.println(msg.readBoolean());
  62.                 System.out.println(msg.readLong());
  63.             } else if (message instanceof ObjectMessage) {
  64.                 ObjectMessage msg = (ObjectMessage) message;
  65.                 System.out.println(msg);
  66.             } else if (message instanceof BytesMessage) {
  67.                 BytesMessage msg = (BytesMessage) message;
  68.                 byte[] byteContent = new byte[1024];
  69.                 int length = -1;
  70.                 StringBuffer content = new StringBuffer();
  71.                 while ((length = msg.readBytes(byteContent)) != -1) {
  72.                     content.append(new String(byteContent, 0, length));
  73.                 }
  74.                 System.out.println(content.toString());
  75.             } else {
  76.                 System.out.println(message);
  77.             }
  78.         } catch (Exception e) {
  79.             e.printStackTrace();
  80.         }
  81.     }
  82. }




定义类型

  1. package jfinal.plugin.activemq;
  2. /**
  3.  * 目标类型
  4.  *
  5.  * @author 孙宇
  6.  */
  7. public enum Destination {
  8.     Queue, Topic
  9. }




activeMq插件

  1. package jfinal.plugin.activemq;
  2. import com.jfinal.plugin.IPlugin;
  3. import org.apache.activemq.ActiveMQConnection;
  4. import org.apache.activemq.ActiveMQConnectionFactory;
  5. import org.apache.activemq.pool.PooledConnection;
  6. import org.apache.activemq.pool.PooledConnectionFactory;
  7. import org.slf4j.Logger;
  8. import org.slf4j.LoggerFactory;
  9. import javax.jms.JMSException;
  10. /**
  11.  * @author 孙宇
  12.  */
  13. public class ActiveMQPlugin implements IPlugin {
  14.     private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
  15.     private String url;
  16.     private String name;
  17.     public ActiveMQPlugin(String url,
  18.                           String name) {
  19.         this.url = url;
  20.         this.name = name;
  21.     }
  22.     public ActiveMQPlugin(String url) {
  23.         this.url = url;
  24.         this.name = ActiveMQ.defaultName;
  25.     }
  26.     @Override
  27.     public boolean start() {
  28.         logger.info("初始化activeMQ配置");
  29.         ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
  30.         activeMQConnectionFactory.setUserName(ActiveMQConnection.DEFAULT_USER);
  31.         activeMQConnectionFactory.setPassword(ActiveMQConnection.DEFAULT_PASSWORD);
  32.         activeMQConnectionFactory.setBrokerURL(url);
  33.         activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息
  34.         PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
  35.         pooledConnectionFactory.setMaximumActiveSessionPerConnection(200);
  36.         pooledConnectionFactory.setIdleTimeout(120);
  37.         pooledConnectionFactory.setMaxConnections(5);
  38.         pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
  39.         try {
  40.             PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection();
  41.             connection.start();
  42.             ActiveMQ.pooledConnectionMap.put(name, connection);
  43.         } catch (JMSException e) {
  44.             e.printStackTrace();
  45.         }
  46.         return true;
  47.     }
  48.     @Override
  49.     public boolean stop() {
  50.         return true;
  51.     }
  52. }





activeMQ工具

  1. package jfinal.plugin.activemq;
  2. import org.apache.activemq.pool.PooledConnection;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import java.util.concurrent.ConcurrentHashMap;
  6. /**
  7.  * @author 孙宇
  8.  */
  9. public class ActiveMQ {
  10.     public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>();
  11.     public static final ConcurrentHashMap<String, JmsSender> senderMap = new ConcurrentHashMap<>();
  12.     public static final ConcurrentHashMap<String, JmsReceiver> receiverMap = new ConcurrentHashMap<>();
  13.     public static final String defaultName = "main";
  14.     private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
  15.     public static void addSender(JmsSender sender) {
  16.         senderMap.put(sender.getName(), sender);
  17.     }
  18.     public static JmsSender getSender(String name) {
  19.         return senderMap.get(name);
  20.     }
  21.     public static void addReceiver(JmsReceiver receiver) {
  22.         receiverMap.put(receiver.getName(), receiver);
  23.     }
  24.     public static JmsReceiver getReceiver(String name) {
  25.         return receiverMap.get(name);
  26.     }
  27.     public static void addConnection(String connectionName,
  28.                                      PooledConnection connection) {
  29.         pooledConnectionMap.put(connectionName, connection);
  30.     }
  31.     public static PooledConnection getConnection() {
  32.         return pooledConnectionMap.get(defaultName);
  33.     }
  34.     public static PooledConnection getConnection(String connectionName) {
  35.         return pooledConnectionMap.get(connectionName);
  36.     }
  37. }





使用的时候,

  1. ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
  2. p.start();


当然,这样只是创建了链接池,但是没有生产者和消费者
还要添加这两个对象

  1. String subject = "test";
  2. ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
  3. ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者



发送一个消息

  1. JmsSender sq1 = ActiveMQ.getSender("testSender1");
  2. TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
  3. sq1.sendMessage(msg);



我里面定义了很多消息类型,不一定非要文本型
一个测试类

  1. package test.jfinal.plugin.activemq;
  2. import jfinal.plugin.activemq.ActiveMQ;
  3. import jfinal.plugin.activemq.ActiveMQPlugin;
  4. import jfinal.plugin.activemq.Destination;
  5. import jfinal.plugin.activemq.JmsReceiver;
  6. import jfinal.plugin.activemq.JmsSender;
  7. import javax.jms.JMSException;
  8. import javax.jms.TextMessage;
  9. import java.util.Date;
  10. /**
  11.  * @author 孙宇
  12.  */
  13. public class TestActiveMQPlugin {
  14.     public static void main(String[] args) throws JMSException {
  15.         ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
  16.         p.start();
  17.         String subject = "test";
  18.         ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
  19.         ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者
  20.         for (int i = 0; i < 10; i++) {
  21.             new Runnable() {
  22.                 @Override
  23.                 public void run() {
  24.                     try {
  25.                         Thread.sleep(1000);
  26.                     } catch (InterruptedException e) {
  27.                         e.printStackTrace();
  28.                     }
  29.                     try {
  30.                         JmsSender sq1 = ActiveMQ.getSender("testSender1");
  31.                         TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
  32.                         sq1.sendMessage(msg);
  33.                     } catch (JMSException e) {
  34.                         e.printStackTrace();
  35.                     }
  36.                 }
  37.             }.run();
  38.         }
  39.     }
  40. }




有兴趣你也可以下载源码跑一下,执行TestActiveMQPlugin类就行

  1. https://git.oschina.net/sypro/jfinalplugin.git



想看spring版的,看这个源码

  1. https://git.oschina.net/sypro/demo.git


评论区

sphsyv

2016-09-19 10:46

这个编辑器太难用了。。。编辑好几次,浪费时间。要求更换markdown

JFinal

2016-09-19 11:12

@sphsyv 后续会添加 markdown 支持,这个编辑也仍然在调整中,感谢分享,赞一个

lyh061619

2016-09-23 09:22

@JFinal 这个搞个官方支持下,也挺不错的,常用插件了。

阿帕奇

2018-06-04 14:13

@sphsyv 你好,请问一下你的这种配置方式,只可以配置一个queue。如何才能配置多个queue(qa,qb,qc),让消费者a去收qa,消费者b收qb,消费者c收qc。求指点。。。。。。

阿帕奇

2018-06-07 18:09

@JFinal 波总什么时候给jfinal集成一下这样的插件吧;我想加一个就是没思路,像spring那样用注解对应每一个queue或者topic;求指点;

Russell-178

2018-08-18 13:50

运行报错
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
不知是少了那些,求大佬给点指点

Russell-178

2018-08-18 13:54

以下这个空指针报的人不能理解
13873 ERROR [2018-08-18 13:53:01] /queue1?param=%E6%98%AF%E5%A4%9A%E5%B0%91
java.lang.NullPointerException
at cn.codeforfun.jfinalplugin.activemq.core.JFinalQueue.sendMessage(JFinalQueue.java:61)
at com.mq.comm.IndexController.queue1(IndexController.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.jfinal.aop.Invocation.invoke(Invocation.java:73)
at com.jfinal.core.ActionHandler.handle(ActionHandler.java:74)
at com.jfinal.core.JFinalFilter.doFilter(JFinalFilter.java:72)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:528)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1099)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:670)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1520)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1476)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)

jakle

2018-11-13 14:28

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/pool2/KeyedPooledObjectFactory

Targer

2019-04-09 10:59

@sphsyv demo码云删除了

激动

2019-06-17 09:52

源码好像删除了呀

892653986

2020-01-13 16:02

可以的 ,看这篇https://jfinal.com/share/1883 错误比较多

热门分享

扫码入社