该插件是基于官网用户分享的基础上进行改造而来
http://www.jfinal.com/share/77
【maven】
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.10</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.7.0</version> </dependency>
【基础】注解类
package plus.jfinal.plugin.acitvemq; import javax.jms.DeliveryMode; import java.lang.annotation.*; @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface JmsListener { /** * 队列分组标识,默认 default * @return */ String group() default "default"; /** * 类型:主题还是队列 * @return */ Destination type(); /** * 主题/队列名称 * @return */ String subject(); /** * 交付模式;存储还是丢弃 * @return */ int deliveryMode() default DeliveryMode.NON_PERSISTENT; }
【基础】发送者类
package plus.jfinal.plugin.acitvemq; import lombok.Data; import javax.jms.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; /** * 发送者 */ @Data public abstract class JmsSender { /** * 消息队列分组 */ private String group; private String subject; private Destination type; private int deliveryMode; /** * 会话 */ private Session session; /** * 消息生产者 */ private MessageProducer producer; //private static JmsSender sender; public static ConcurrentMap<String, JmsSender> sender = new ConcurrentHashMap<>(); public JmsSender() { JmsListener listener = this.getClass().getAnnotation(JmsListener.class); if (listener != null) { this.subject = listener.subject(); this.group = listener.group(); this.type = listener.type(); try { init(); } catch (Exception e) { e.printStackTrace(); System.out.println(String.format("%s 初始化失败", this.getClass().getName())); } } else { System.out.println("接收者丢失相关配置,请采用注解或者参数实例化方式"); } } private void init() throws JMSException { String key = this.getClass().getName(); if (sender.get(key) != null) { return; } this.group = group; // 事务性会话,自动确认消息 this.session = ActiveMQ.getConnection(group).createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地(Queue/Topic) if (type.equals(Destination.Topic)) { Topic destination = session.createTopic(subject); producer = session.createProducer(destination); } else { Queue destination = session.createQueue(subject); producer = session.createProducer(destination); } this.deliveryMode = deliveryMode; // 不持久化消息 producer.setDeliveryMode(deliveryMode); sender.put(key, this); } /*public void senderTextMessage(String text) { JmsSender _sender = sender.get(this.getClass().getName()); try { _sender.getProducer().send(_sender.getSession().createTextMessage(text)); } catch (JMSException e) { e.printStackTrace(); } }*/ public static void sendText(Class<? extends JmsSender> senderClass,String text){ JmsSender s = sender.get(senderClass.getName()); try { s.getProducer().send(s.getSession().createTextMessage(text)); } catch (JMSException e) { e.printStackTrace(); } } }
【基础】接收者类
package plus.jfinal.plugin.acitvemq; import javax.jms.*; import java.util.Enumeration; import java.util.HashMap; import java.util.Map; public abstract class JmsReceiver implements MessageListener { private String group; private Destination type; private String subject; private Session session; private MessageConsumer consumer; public JmsReceiver(){ JmsListener listener = this.getClass().getAnnotation(JmsListener.class); if(listener!=null){ this.subject = listener.subject(); this.group = listener.group(); this.type = listener.type(); try { init(); } catch (JMSException e) { e.printStackTrace(); System.out.println(String.format("%s 初始化失败", this.getClass().getName())); } }else{ System.out.println("接收者丢失相关配置,请采用注解或者参数实例化方式"); } } private void init() throws JMSException { // 事务性会话,自动确认消息 session = ActiveMQ.getConnection(group).createSession(false,Session.AUTO_ACKNOWLEDGE); // 消息的目的地(Queue/Topic) if (type.equals(Destination.Topic)) { Topic destination = session.createTopic(subject); consumer = session.createConsumer(destination); } else { Queue destination = session.createQueue(subject); consumer = session.createConsumer(destination); } consumer.setMessageListener(this); } public abstract void onTextMessage(String text); public abstract void onMapMessage(Map map); public abstract void onStreamMessage(StreamMessage streamMessage); public abstract void onObjectMessage(ObjectMessage objectMessage); public abstract void onBytesMessage(BytesMessage bytesMessage); public abstract void onOtherMessage(Message message); @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; onTextMessage(msg.getText()); } else if (message instanceof MapMessage) { MapMessage msg = (MapMessage) message; Map data = new HashMap(); Enumeration enumer = msg.getMapNames(); while (enumer.hasMoreElements()) { Object obj = enumer.nextElement(); data.put(obj.toString(),msg.getObject(obj.toString())); } onMapMessage(data); } else if (message instanceof StreamMessage) { onStreamMessage((StreamMessage) message); } else if (message instanceof ObjectMessage) { onObjectMessage((ObjectMessage) message); } else if (message instanceof BytesMessage) { onBytesMessage((BytesMessage) message); } else { onOtherMessage(message); } /*try { if (message instanceof TextMessage) { TextMessage msg = (TextMessage) message; System.out.println(msg.getText()); } else if (message instanceof MapMessage) { MapMessage msg = (MapMessage) message; Enumeration enumer = msg.getMapNames(); while (enumer.hasMoreElements()) { Object obj = enumer.nextElement(); System.out.println(msg.getObject(obj.toString())); } } else if (message instanceof StreamMessage) { StreamMessage msg = (StreamMessage) message; System.out.println(msg.readString()); System.out.println(msg.readBoolean()); System.out.println(msg.readLong()); } else if (message instanceof ObjectMessage) { ObjectMessage msg = (ObjectMessage) message; System.out.println(msg); } else if (message instanceof BytesMessage) { BytesMessage msg = (BytesMessage) message; byte[] byteContent = new byte[1024]; int length = -1; StringBuffer content = new StringBuffer(); while ((length = msg.readBytes(byteContent)) != -1) { content.append(new String(byteContent, 0, length)); } System.out.println(content.toString()); } else { System.out.println(message); }*/ } catch (Exception e) { e.printStackTrace(); } } }
【基础】工具类
package plus.jfinal.plugin.acitvemq; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import org.apache.activemq.pool.PooledConnection; import java.util.concurrent.ConcurrentHashMap; public class ActiveMQ { private static final Log log = LogFactory.get(Thread.currentThread().getClass()); public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>(); public static final String defaultName = "default"; public static void addConnection(String connectionName, PooledConnection connection) { pooledConnectionMap.put(connectionName, connection); } public static PooledConnection getConnection() { return pooledConnectionMap.get(defaultName); } public static PooledConnection getConnection(String connectionName) { return pooledConnectionMap.get(connectionName); } }
【基础】枚举
package plus.jfinal.plugin.acitvemq; public enum Destination { Queue, Topic }
【插件】插件类以及main方法
package plus.jfinal.plugin.acitvemq; import cn.hutool.core.util.StrUtil; import cn.hutool.log.Log; import cn.hutool.log.LogFactory; import com.jfinal.plugin.IPlugin; import lombok.Data; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.pool.PooledConnection; import org.apache.activemq.pool.PooledConnectionFactory; import javax.jms.JMSException; import javax.jms.TextMessage; import java.util.Date;@Datapublic class JFinalActiveMQPlugin implements IPlugin { private static final Log log = LogFactory.get(); private String url; private String name; private String username; private String password; public JFinalActiveMQPlugin(String url, String name, String username, String password) { this.url = url; this.name = name; this.username = username; this.password = password; } public JFinalActiveMQPlugin(String url) { this.url = url; this.name = ActiveMQ.defaultName; this.username = ActiveMQConnection.DEFAULT_USER; this.password = ActiveMQConnection.DEFAULT_PASSWORD; } public JFinalActiveMQPlugin(String url,String username,String password){ this.url = url; this.name = ActiveMQ.defaultName; this.username = StrUtil.isBlank(username)?ActiveMQConnection.DEFAULT_USER:username; this.password = StrUtil.isBlank(password)?ActiveMQConnection.DEFAULT_PASSWORD:password; } @Override public boolean start() { log.info("初始化 activemq [{}] url = {}",name,url); ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(); activeMQConnectionFactory.setUserName(username); activeMQConnectionFactory.setPassword(password); activeMQConnectionFactory.setBrokerURL(url); activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息 PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); pooledConnectionFactory.setMaximumActiveSessionPerConnection(200); pooledConnectionFactory.setIdleTimeout(120); pooledConnectionFactory.setMaxConnections(5); pooledConnectionFactory.setBlockIfSessionPoolIsFull(true); try { PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection(); connection.start(); ActiveMQ.pooledConnectionMap.put(name, connection); } catch (JMSException e) { e.printStackTrace(); } return true; } @Override public boolean stop() { return true; } public static void main(String[] args) throws JMSException { JFinalActiveMQPlugin p = new JFinalActiveMQPlugin("failover://(tcp://192.168.71.155:61616)?initialReconnectDelay=1000","l*******","l*******"); p.start(); String subject = "test"; // 注解方式 new TestSender(); new TestReceiver(); // 参数实例化方式 //new TestSender("default",Destination.Queue, subject); //new TestReceiver("default",Destination.Queue, subject); for (int i = 0; i < 10; i++) { new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } TestSender.senderText("测试" + new Date()); } }.run(); } } }
【demo】发送者
package plus.jfinal.plugin.acitvemq; import javax.jms.JMSException; @JmsListener(type=Destination.Queue,subject = "test") public class TestSender extends JmsSender{ }
【demo】接收者
package plus.jfinal.plugin.acitvemq; import javax.jms.*; import java.util.Map; @JmsListener(type=Destination.Queue,subject = "test") public class TestReceiver extends JmsReceiver{ @Override public void onTextMessage(String text) { System.out.println("收到消息:"+text); } @Override public void onMapMessage(Map map) { } @Override public void onStreamMessage(StreamMessage streamMessage) { } @Override public void onObjectMessage(ObjectMessage objectMessage) { } @Override public void onBytesMessage(BytesMessage bytesMessage) { } @Override public void onOtherMessage(Message message) { } }
【配置文件】
[default] url=tcp://192.168.71.155:61616 initialReconnectDelay=1000 username=l*** password=l***
【运行方式】
public static void main(String[] args) throws JMSException { JFinalActiveMQPlugin p = new JFinalActiveMQPlugin("failover://(tcp://192.168.71.155:61616)?initialReconnectDelay=1000","l****","l****"); p.start(); String subject = "test"; /** * 以下实例化方式有两种 * 1)注解方式,直接无参构造 * 2)参数实例化方式 */ // 注解方式 new TestSender(); new TestReceiver(); // 参数实例化方式 //new TestSender("default",Destination.Queue, subject); //new TestReceiver("default",Destination.Queue, subject); for (int i = 0; i < 10; i++) { new Runnable() { @Override public void run() { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } JmsSender.sendText(TestSender.class,"测试" + new Date()); } }.run(); } }
## 关于发送者和接受者的实例化,大家可以考虑在 服务启动后进行注解扫描启动【推荐】,或者在程序启动完成后的代码逻辑中实例化;不要在 插件 add 之后就立马实例化发送或者接受者,这个会有问题的,因为程序并未真正的执行了启动方法哦~
-----
2019/11/14 修改
发现之前版本一个bug,故修改了调用方式
JmsSender.sendText(TestSender.class,"测试" + new Date());