该插件是基于官网用户分享的基础上进行改造而来
http://www.jfinal.com/share/77
【maven】
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.10</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.7.0</version> </dependency>
【基础】注解类
package plus.jfinal.plugin.acitvemq;
import javax.jms.DeliveryMode;
import java.lang.annotation.*;
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface JmsListener {
/**
* 队列分组标识,默认 default
* @return
*/
String group() default "default";
/**
* 类型:主题还是队列
* @return
*/
Destination type();
/**
* 主题/队列名称
* @return
*/
String subject();
/**
* 交付模式;存储还是丢弃
* @return
*/
int deliveryMode() default DeliveryMode.NON_PERSISTENT;
}【基础】发送者类
package plus.jfinal.plugin.acitvemq;
import lombok.Data;
import javax.jms.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* 发送者
*/
@Data
public abstract class JmsSender {
/**
* 消息队列分组
*/
private String group;
private String subject;
private Destination type;
private int deliveryMode;
/**
* 会话
*/
private Session session;
/**
* 消息生产者
*/
private MessageProducer producer;
//private static JmsSender sender;
public static ConcurrentMap<String, JmsSender> sender = new ConcurrentHashMap<>();
public JmsSender() {
JmsListener listener = this.getClass().getAnnotation(JmsListener.class);
if (listener != null) {
this.subject = listener.subject();
this.group = listener.group();
this.type = listener.type();
try {
init();
} catch (Exception e) {
e.printStackTrace();
System.out.println(String.format("%s 初始化失败", this.getClass().getName()));
}
} else {
System.out.println("接收者丢失相关配置,请采用注解或者参数实例化方式");
}
}
private void init() throws JMSException {
String key = this.getClass().getName();
if (sender.get(key) != null) {
return;
}
this.group = group;
// 事务性会话,自动确认消息
this.session = ActiveMQ.getConnection(group).createSession(false, Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
if (type.equals(Destination.Topic)) {
Topic destination = session.createTopic(subject);
producer = session.createProducer(destination);
} else {
Queue destination = session.createQueue(subject);
producer = session.createProducer(destination);
}
this.deliveryMode = deliveryMode;
// 不持久化消息
producer.setDeliveryMode(deliveryMode);
sender.put(key, this);
}
/*public void senderTextMessage(String text) {
JmsSender _sender = sender.get(this.getClass().getName());
try {
_sender.getProducer().send(_sender.getSession().createTextMessage(text));
} catch (JMSException e) {
e.printStackTrace();
}
}*/
public static void sendText(Class<? extends JmsSender> senderClass,String text){
JmsSender s = sender.get(senderClass.getName());
try {
s.getProducer().send(s.getSession().createTextMessage(text));
} catch (JMSException e) {
e.printStackTrace();
}
}
}【基础】接收者类
package plus.jfinal.plugin.acitvemq;
import javax.jms.*;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
public abstract class JmsReceiver implements MessageListener {
private String group;
private Destination type;
private String subject;
private Session session;
private MessageConsumer consumer;
public JmsReceiver(){
JmsListener listener = this.getClass().getAnnotation(JmsListener.class);
if(listener!=null){
this.subject = listener.subject();
this.group = listener.group();
this.type = listener.type();
try {
init();
} catch (JMSException e) {
e.printStackTrace();
System.out.println(String.format("%s 初始化失败", this.getClass().getName()));
}
}else{
System.out.println("接收者丢失相关配置,请采用注解或者参数实例化方式");
}
}
private void init() throws JMSException {
// 事务性会话,自动确认消息
session = ActiveMQ.getConnection(group).createSession(false,Session.AUTO_ACKNOWLEDGE);
// 消息的目的地(Queue/Topic)
if (type.equals(Destination.Topic)) {
Topic destination = session.createTopic(subject);
consumer = session.createConsumer(destination);
} else {
Queue destination = session.createQueue(subject);
consumer = session.createConsumer(destination);
}
consumer.setMessageListener(this);
}
public abstract void onTextMessage(String text);
public abstract void onMapMessage(Map map);
public abstract void onStreamMessage(StreamMessage streamMessage);
public abstract void onObjectMessage(ObjectMessage objectMessage);
public abstract void onBytesMessage(BytesMessage bytesMessage);
public abstract void onOtherMessage(Message message);
@Override
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
onTextMessage(msg.getText());
} else if (message instanceof MapMessage) {
MapMessage msg = (MapMessage) message;
Map data = new HashMap();
Enumeration enumer = msg.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
data.put(obj.toString(),msg.getObject(obj.toString()));
}
onMapMessage(data);
} else if (message instanceof StreamMessage) {
onStreamMessage((StreamMessage) message);
} else if (message instanceof ObjectMessage) {
onObjectMessage((ObjectMessage) message);
} else if (message instanceof BytesMessage) {
onBytesMessage((BytesMessage) message);
} else {
onOtherMessage(message);
}
/*try {
if (message instanceof TextMessage) {
TextMessage msg = (TextMessage) message;
System.out.println(msg.getText());
} else if (message instanceof MapMessage) {
MapMessage msg = (MapMessage) message;
Enumeration enumer = msg.getMapNames();
while (enumer.hasMoreElements()) {
Object obj = enumer.nextElement();
System.out.println(msg.getObject(obj.toString()));
}
} else if (message instanceof StreamMessage) {
StreamMessage msg = (StreamMessage) message;
System.out.println(msg.readString());
System.out.println(msg.readBoolean());
System.out.println(msg.readLong());
} else if (message instanceof ObjectMessage) {
ObjectMessage msg = (ObjectMessage) message;
System.out.println(msg);
} else if (message instanceof BytesMessage) {
BytesMessage msg = (BytesMessage) message;
byte[] byteContent = new byte[1024];
int length = -1;
StringBuffer content = new StringBuffer();
while ((length = msg.readBytes(byteContent)) != -1) {
content.append(new String(byteContent, 0, length));
}
System.out.println(content.toString());
} else {
System.out.println(message);
}*/
} catch (Exception e) {
e.printStackTrace();
}
}
}【基础】工具类
package plus.jfinal.plugin.acitvemq;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import org.apache.activemq.pool.PooledConnection;
import java.util.concurrent.ConcurrentHashMap;
public class ActiveMQ {
private static final Log log = LogFactory.get(Thread.currentThread().getClass());
public static final ConcurrentHashMap<String, PooledConnection> pooledConnectionMap = new ConcurrentHashMap<>();
public static final String defaultName = "default";
public static void addConnection(String connectionName,
PooledConnection connection) {
pooledConnectionMap.put(connectionName, connection);
}
public static PooledConnection getConnection() {
return pooledConnectionMap.get(defaultName);
}
public static PooledConnection getConnection(String connectionName) {
return pooledConnectionMap.get(connectionName);
}
}【基础】枚举
package plus.jfinal.plugin.acitvemq;
public enum Destination {
Queue, Topic
}【插件】插件类以及main方法
package plus.jfinal.plugin.acitvemq;
import cn.hutool.core.util.StrUtil;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.jfinal.plugin.IPlugin;
import lombok.Data;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.pool.PooledConnection;
import org.apache.activemq.pool.PooledConnectionFactory;
import javax.jms.JMSException;
import javax.jms.TextMessage;
import java.util.Date;@Datapublic class JFinalActiveMQPlugin implements IPlugin {
private static final Log log = LogFactory.get();
private String url;
private String name;
private String username;
private String password;
public JFinalActiveMQPlugin(String url,
String name, String username, String password) {
this.url = url;
this.name = name;
this.username = username;
this.password = password;
}
public JFinalActiveMQPlugin(String url) {
this.url = url;
this.name = ActiveMQ.defaultName;
this.username = ActiveMQConnection.DEFAULT_USER;
this.password = ActiveMQConnection.DEFAULT_PASSWORD;
}
public JFinalActiveMQPlugin(String url,String username,String password){
this.url = url;
this.name = ActiveMQ.defaultName;
this.username = StrUtil.isBlank(username)?ActiveMQConnection.DEFAULT_USER:username;
this.password = StrUtil.isBlank(password)?ActiveMQConnection.DEFAULT_PASSWORD:password;
}
@Override
public boolean start() {
log.info("初始化 activemq [{}] url = {}",name,url);
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setUserName(username);
activeMQConnectionFactory.setPassword(password);
activeMQConnectionFactory.setBrokerURL(url);
activeMQConnectionFactory.setDispatchAsync(true);//异步发送消息
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(200);
pooledConnectionFactory.setIdleTimeout(120);
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setBlockIfSessionPoolIsFull(true);
try {
PooledConnection connection = (PooledConnection) pooledConnectionFactory.createConnection();
connection.start();
ActiveMQ.pooledConnectionMap.put(name, connection);
} catch (JMSException e) {
e.printStackTrace();
}
return true;
}
@Override
public boolean stop() {
return true;
}
public static void main(String[] args) throws JMSException {
JFinalActiveMQPlugin p = new JFinalActiveMQPlugin("failover://(tcp://192.168.71.155:61616)?initialReconnectDelay=1000","l*******","l*******");
p.start();
String subject = "test";
// 注解方式
new TestSender();
new TestReceiver();
// 参数实例化方式
//new TestSender("default",Destination.Queue, subject);
//new TestReceiver("default",Destination.Queue, subject);
for (int i = 0; i < 10; i++) {
new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
TestSender.senderText("测试" + new Date());
}
}.run();
}
}
}【demo】发送者
package plus.jfinal.plugin.acitvemq;
import javax.jms.JMSException;
@JmsListener(type=Destination.Queue,subject = "test")
public class TestSender extends JmsSender{
}【demo】接收者
package plus.jfinal.plugin.acitvemq;
import javax.jms.*;
import java.util.Map;
@JmsListener(type=Destination.Queue,subject = "test")
public class TestReceiver extends JmsReceiver{
@Override
public void onTextMessage(String text) {
System.out.println("收到消息:"+text);
}
@Override
public void onMapMessage(Map map) {
}
@Override
public void onStreamMessage(StreamMessage streamMessage) {
}
@Override
public void onObjectMessage(ObjectMessage objectMessage) {
}
@Override
public void onBytesMessage(BytesMessage bytesMessage) {
}
@Override
public void onOtherMessage(Message message) {
}
}【配置文件】
[default] url=tcp://192.168.71.155:61616 initialReconnectDelay=1000 username=l*** password=l***
【运行方式】
public static void main(String[] args) throws JMSException {
JFinalActiveMQPlugin p = new JFinalActiveMQPlugin("failover://(tcp://192.168.71.155:61616)?initialReconnectDelay=1000","l****","l****");
p.start();
String subject = "test";
/**
* 以下实例化方式有两种
* 1)注解方式,直接无参构造
* 2)参数实例化方式
*/
// 注解方式
new TestSender();
new TestReceiver();
// 参数实例化方式
//new TestSender("default",Destination.Queue, subject);
//new TestReceiver("default",Destination.Queue, subject);
for (int i = 0; i < 10; i++) {
new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
JmsSender.sendText(TestSender.class,"测试" + new Date());
}
}.run();
}
}## 关于发送者和接受者的实例化,大家可以考虑在 服务启动后进行注解扫描启动【推荐】,或者在程序启动完成后的代码逻辑中实例化;不要在 插件 add 之后就立马实例化发送或者接受者,这个会有问题的,因为程序并未真正的执行了启动方法哦~
-----
2019/11/14 修改
发现之前版本一个bug,故修改了调用方式
JmsSender.sendText(TestSender.class,"测试" + new Date());