jFinal ActiveMQ插件

项目中用到了ActiveMQ,本来是用spring方式写的,现在转换成jFinal给大家参考

只是一个粗略的插件,没有更严谨,大家可以自己改改

这个插件可以实现多链接(内部也有连接池)、多消息类型的发送

首先我是maven项目,加入依赖

<dependency>
    <groupId>com.jfinal</groupId>
    <artifactId>jfinal</artifactId>
    <version>2.2</version>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.14.0</version>
</dependency>



定义Sender类

package jfinal.plugin.activemq;
import org.apache.activemq.pool.PooledConnection;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
/**
 * 消息发送者
 *
 * @author 孙宇
 */
public class JmsSender {
    private String name;
    private Session session;
    private MessageProducer producer;
    public JmsSender(String name,
                     PooledConnection connection,
                     Destination type,
                     String subject) throws JMSException {
        this.name = name;
        // 事务性会话,自动确认消息
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 消息的目的地(Queue/Topic)
        if (type.equals(Destination.Topic)) {
            Topic destination = session.createTopic(subject);
            producer = session.createProducer(destination);
        } else {
            Queue destination = session.createQueue(subject);
            producer = session.createProducer(destination);
        }
        // 不持久化消息
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }
    public String getName() {
        return name;
    }
    public Session getSession() {
        return session;
    }
    public void sendMessage(Message message) throws JMSException {
        producer.send(message);
    }
}





定义Receiver类

package jfinal.plugin.activemq;
import org.apache.activemq.pool.PooledConnection;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.util.Enumeration;
/**
 * 消息接收者
 *
 * @author 孙宇
 */
public class JmsReceiver implements MessageListener {
    private String name;
    private Session session;
    private MessageConsumer consumer;
    public JmsReceiver(String name,
                       PooledConnection connection,
                       Destination type,
                       String subject) throws JMSException {
        this.name = name;
        // 事务性会话,自动确认消息
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 消息的目的地(Queue/Topic)
        if (type.equals(Destination.Topic)) {
            Topic destination = session.createTopic(subject);
            consumer = session.createConsumer(destination);
        } else {
            Queue destination = session.createQueue(subject);
            consumer = session.createConsumer(destination);
        }
        consumer.setMessageListener(this);
    }
    public String getName() {
        return name;
    }
    @Override
    public void onMessage(Message message) {
        try {
            if (message instanceof TextMessage) {
                TextMessage msg = (TextMessage) message;
                System.out.println(msg.getText());
            } else if (message instanceof MapMessage) {
                MapMessage msg = (MapMessage) message;
                Enumeration enumer = msg.getMapNames();
                while (enumer.hasMoreElements()) {
                    Object obj = enumer.nextElement();
                    System.out.println(msg.getObject(obj.toString()));
                }
            } else if (message instanceof StreamMessage) {
                StreamMessage msg = (StreamMessage) message;
                System.out.println(msg.readString());
                System.out.println(msg.readBoolean());
                System.out.println(msg.readLong());
            } else if (message instanceof ObjectMessage) {
                ObjectMessage msg = (ObjectMessage) message;
                System.out.println(msg);
            } else if (message instanceof BytesMessage) {
                BytesMessage msg = (BytesMessage) message;
                byte[] byteContent = new byte[1024];
                int length = -1;
                StringBuffer content = new StringBuffer();
                while ((length = msg.readBytes(byteContent)) != -1) {
                    content.append(new String(byteContent, 0, length));
                }
                System.out.println(content.toString());
            } else {
                System.out.println(message);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}




定义类型

package jfinal.plugin.activemq;
/**
 * 目标类型
 *
 * @author 孙宇
 */
public enum Destination {
    Queue, Topic
}




activeMq插件

package jfinal.plugin.activemq;
import com.jfinal.plugin.IPlugin;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
/**
 * @author 孙宇
 */
public class ActiveMQPlugin implements IPlugin {
    private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
    private String url;
    private String name;
    public ActiveMQPlugin(String url,
                          String name) {
        this.url = url;
        this.name = name;
    }
    public ActiveMQPlugin(String url) {
        this.url = url;
        this.name = ActiveMQ.defaultName;
    }
    @Override
    public boolean start() {
        logger.info("初始化activeMQ配置");
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
        activeMQConnectionFactory.setUserName(ActiveMQConnection.DEFAULT_USER);
        activeMQConnectionFactory.setPassword(ActiveMQConnection.DEFAULT_PASSWORD);
        activeMQConnectionFactory.setBrokerURL(url);
        activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息
        PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
        pooledConnectionFactory.setMaximumActiveSessionPerConnection(200);
        pooledConnectionFactory.setIdleTimeout(120);
        pooledConnectionFactory.setMaxConnections(5);
        pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
        try {
            PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection();
            connection.start();
            ActiveMQ.pooledConnectionMap.put(name, connection);
        } catch (JMSException e) {
            e.printStackTrace();
        }
        return true;
    }
    @Override
    public boolean stop() {
        return true;
    }
}





activeMQ工具

package jfinal.plugin.activemq;
import org.apache.activemq.pool.PooledConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
/**
 * @author 孙宇
 */
public class ActiveMQ {
    public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<String, JmsSender> senderMap = new ConcurrentHashMap<>();
    public static final ConcurrentHashMap<String, JmsReceiver> receiverMap = new ConcurrentHashMap<>();
    public static final String defaultName = "main";
    private static final Logger logger = LoggerFactory.getLogger(Thread.currentThread().getClass());
    public static void addSender(JmsSender sender) {
        senderMap.put(sender.getName(), sender);
    }
    public static JmsSender getSender(String name) {
        return senderMap.get(name);
    }
    public static void addReceiver(JmsReceiver receiver) {
        receiverMap.put(receiver.getName(), receiver);
    }
    public static JmsReceiver getReceiver(String name) {
        return receiverMap.get(name);
    }
    public static void addConnection(String connectionName,
                                     PooledConnection connection) {
        pooledConnectionMap.put(connectionName, connection);
    }
    public static PooledConnection getConnection() {
        return pooledConnectionMap.get(defaultName);
    }
    public static PooledConnection getConnection(String connectionName) {
        return pooledConnectionMap.get(connectionName);
    }
}





使用的时候,

ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
p.start();


当然,这样只是创建了链接池,但是没有生产者和消费者
还要添加这两个对象

String subject = "test";
ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者



发送一个消息

JmsSender sq1 = ActiveMQ.getSender("testSender1");
TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
sq1.sendMessage(msg);



我里面定义了很多消息类型,不一定非要文本型
一个测试类

package test.jfinal.plugin.activemq;
import jfinal.plugin.activemq.ActiveMQ;
import jfinal.plugin.activemq.ActiveMQPlugin;
import jfinal.plugin.activemq.Destination;
import jfinal.plugin.activemq.JmsReceiver;
import jfinal.plugin.activemq.JmsSender;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.Date;
/**
 * @author 孙宇
 */
public class TestActiveMQPlugin {
    public static void main(String[] args) throws JMSException {
        ActiveMQPlugin p = new ActiveMQPlugin("failover://(tcp://127.0.0.1:61616)?initialReconnectDelay=1000");
        p.start();
        String subject = "test";
        ActiveMQ.addSender(new JmsSender("testSender1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义发送者
        ActiveMQ.addReceiver(new JmsReceiver("testReceiver1", ActiveMQ.getConnection(), Destination.Queue, subject));//定义接受者
        for (int i = 0; i < 10; i++) {
            new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    try {
                        JmsSender sq1 = ActiveMQ.getSender("testSender1");
                        TextMessage msg = sq1.getSession().createTextMessage("测试" + new Date());
                        sq1.sendMessage(msg);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }.run();
        }
    }
}




有兴趣你也可以下载源码跑一下,执行TestActiveMQPlugin类就行

https://git.oschina.net/sypro/jfinalplugin.git



想看spring版的,看这个源码

https://git.oschina.net/sypro/demo.git


评论区

sphsyv

2016-09-19 10:46

这个编辑器太难用了。。。编辑好几次,浪费时间。要求更换markdown

JFinal

2016-09-19 11:12

@sphsyv 后续会添加 markdown 支持,这个编辑也仍然在调整中,感谢分享,赞一个

lyh061619

2016-09-23 09:22

@JFinal 这个搞个官方支持下,也挺不错的,常用插件了。

阿帕奇

2018-06-04 14:13

@sphsyv 你好,请问一下你的这种配置方式,只可以配置一个queue。如何才能配置多个queue(qa,qb,qc),让消费者a去收qa,消费者b收qb,消费者c收qc。求指点。。。。。。

阿帕奇

2018-06-07 18:09

@JFinal 波总什么时候给jfinal集成一下这样的插件吧;我想加一个就是没思路,像spring那样用注解对应每一个queue或者topic;求指点;

Russell-178

2018-08-18 13:50

运行报错
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
不知是少了那些,求大佬给点指点

Russell-178

2018-08-18 13:54

以下这个空指针报的人不能理解
13873 ERROR [2018-08-18 13:53:01] /queue1?param=%E6%98%AF%E5%A4%9A%E5%B0%91
java.lang.NullPointerException
at cn.codeforfun.jfinalplugin.activemq.core.JFinalQueue.sendMessage(JFinalQueue.java:61)
at com.mq.comm.IndexController.queue1(IndexController.java:21)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.jfinal.aop.Invocation.invoke(Invocation.java:73)
at com.jfinal.core.ActionHandler.handle(ActionHandler.java:74)
at com.jfinal.core.JFinalFilter.doFilter(JFinalFilter.java:72)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:240)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:207)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79)
at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:528)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1099)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:670)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1520)
at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.run(NioEndpoint.java:1476)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:745)

jakle

2018-11-13 14:28

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/pool2/KeyedPooledObjectFactory

Targer

2019-04-09 10:59

@sphsyv demo码云删除了

激动

2019-06-17 09:52

源码好像删除了呀

892653986

2020-01-13 16:02

可以的 ,看这篇https://jfinal.com/share/1883 错误比较多