用jfinal-undertow-websocket实现web站内消息通知

AppConfig.java

public static void main(String[] args) {
		//UndertowServer.start(MainConfig.class);
		UndertowServer.create(MainConfig.class)
	     .configWeb( builder -> {
	         // 配置 WebSocket,MyWebSocket 需使用 ServerEndpoint 注解
	         builder.addWebSocketEndpoint("com.xxx.xxx.common.websocket.NotifyWebSocket");
	      })
	     .start();
	}

NotifyWebSocket.java

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;

import com.jfinal.aop.Aop;
import com.jfinal.log.Log;
import com.jzflow.cws.common.kit.DictKit;
import com.jzflow.cws.common.model.SysUser;
import com.jzflow.cws.common.model.SysUserPcMsg;
import com.jzflow.cws.modules.my.msg.MyMsgService;
import com.jzflow.cws.modules.sys.login.LoginService;

import cn.hutool.core.util.StrUtil;

@ServerEndpoint(value = "/socket.ws",configurator=GetCookieConfigurator.class)
public class NotifyWebSocket {

	private static final Log log = Log.getLog(DictKit.class);

	static LoginService loginService = Aop.get(LoginService.class);
	
	static MyMsgService myMsgService = Aop.get(MyMsgService.class);
	
	private static final Map<String, Session> userSocketSessionMap;

	static {
		userSocketSessionMap = new HashMap<String, Session>();
	}
	
	/**
	 * 给单个用户发送消息
	 */
	public void sendMsg(String userId, String message) {
		try {
			Session session = userSocketSessionMap.get(userId);
	        if (session != null && session.isOpen()) {
	        	message(message, session);
	        }
		} catch (Exception e) {
			log.error("WebSocket:用户[" + userId+ "]发送消息异常", e);
		}
	}
	
	/**
	 * 接收消息和回复消息
	 * @param message
	 * @param session
	 */
	@OnMessage
	public void message(String message, Session session) {
		//这里可以接收到前台JS发送的数据(接收消息后回复)
		//if("在吗?".equals(message)) {
		//	session.getAsyncRemote().sendText("在的");
		//}
		//也可建立连接后主动发消息
		session.getAsyncRemote().sendText(message);
	}

	@OnOpen
	public void onOpen(Session session,EndpointConfig config) {
		String sessionId = (String) config.getUserProperties().get(String.class.getName());
		if(StrUtil.isNotBlank(sessionId)) {
			SysUser user = loginService.getLoginUserWithSessionId(sessionId);
			if(user != null) {
				userSocketSessionMap.put(user.getId(), session);
				log.info("WebSocket:用户[" + user.getLoginName()+ "]已经建立连接");
				int total = myMsgService.count(user.getId(), SysUserPcMsg.MARK_READ_NO);
				String message = "{\"status\":\"FIRST\",\"total\":" + total + "}";
				sendMsg(user.getId(), message);
			}
		}else{
			log.error("WebSocket:用户未登录,无法建立连接");
		}
	}

	@OnClose
	public void onClose(Session session) throws Exception {
		log.info("WebSocket: " + session.getId() + "已经关闭");
	    Iterator<Entry<String, Session>> it = userSocketSessionMap.entrySet().iterator();
	    // 移除Socket会话
	    while (it.hasNext()) {
	        Entry<String, Session> entry = it.next();
	        if (entry.getValue().getId().equals(session.getId())) {
	            userSocketSessionMap.remove(entry.getKey());
	            log.debug("WebSocket会话已经移除:用户ID " + entry.getKey());
	            break;
	        }
	    }
	}

	@OnError
	public void onError(Session session, Throwable error) throws IOException {
		if (session.isOpen()) {
	        session.close();
	    }
	    Iterator<Entry<String, Session>> it = userSocketSessionMap.entrySet().iterator();
	    // 移除Socket会话
	    while (it.hasNext()) {
	        Entry<String, Session> entry = it.next();
	        if (entry.getValue().getId().equals(session.getId())) {
	            userSocketSessionMap.remove(entry.getKey());
	            log.debug("WebSocket会话已经移除:用户ID " + entry.getKey());
	            break;
	        }
	    }
	}
	
}

GetCookieConfigurator.java

import java.util.List;
import java.util.Map;

import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;

import com.jzflow.cws.modules.sys.login.LoginService;

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;

public class GetCookieConfigurator extends Configurator{	

	@Override
	public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
		Map<String, List<String>> headers = request.getHeaders();
		List<String> cookieList = headers.get("Cookie");
		Map<String,String> kv = MapUtil.newHashMap();
		if(CollUtil.isNotEmpty(cookieList)) {
			String cookieStr = cookieList.get(0);
			String[] cookieArray = cookieStr.split("; ");
			for(String cookie : cookieArray){
				String[] cookieKv = cookie.split("=");
				kv.put(cookieKv[0], cookieKv[1]);
			}
		}
		String sessionId = kv.get(LoginService.sessionIdName);
		sec.getUserProperties().put(String.class.getName(),sessionId);
	}
	/**
	 * 如果登录信息是存在session中的,不像jfinal-club通过cookie-session-id放在ehcache缓存中的可使用下面的方法
	 */
	/*@Override
	public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
		HttpSession httpSession = (HttpSession)request.getHttpSession();
		sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
	}*/

}

使用场景:管理员发布通告后即刻提醒在线的用户

static NotifyWebSocket notifyWebSocket = Aop.get(NotifyWebSocket.class);

JSONObject json = new JSONObject();
json.put("id", userPcMsg.getId());
json.put("title", userPcMsg.getTitle());
json.put("content", userPcMsg.getContent());
json.put("type", userPcMsg.getType());
json.put("sendTime", DateUtil.formatDateTime(userPcMsg.getSendTime()));
json.put("readFlag", false);
// 发送消息
notifyWebSocket.sendMsg(userId, json.toJSONString());

前台js,登录后进入首页后建立websocket连接

            	 // 建立websocket连接
                var socket = new WebSocket('ws://' + window.location.host + $.ctx + '/socket.ws');
                console.log('ws://' + window.location.host + $.ctx + '/socket.ws');
                socket.onopen = function () {
                    //socket.send('发送数据');
                };

                socket.onmessage = function (event) {
                    var data = JSON.parse(event.data);
                    //得到后台发送的数据string或json格式
                };

                socket.onclose = function (event) {
                    toastr.info('消息通知服务已关闭', event);
                };

pom.xml

<!-- undertow -->
		<dependency>
			<groupId>com.jfinal</groupId>
			<artifactId>jfinal-undertow</artifactId>
			<version>1.9</version>
		</dependency>
		
		<!-- WebSocket 支持 -->
		<dependency>
			<groupId>io.undertow</groupId>
			<artifactId>undertow-websockets-jsr</artifactId>
			<version>2.0.25.Final</version>
		</dependency>


代码写的比较粗糙后面如有有什么建议或优雅的写法可以@我

评论区

xialinlin

2018-12-26 17:02

JFinal

2018-12-26 17:21

第一个 jfinal undertow 下的 websocket 分享,好多人需要呢,非常感谢你的分享

点赞 + 收藏,下回有人问起这个问题,我可以直接扔链接过去了

杜福忠

2018-12-27 09:43

666 撞头了2333

码农3号

2019-01-01 20:31

请问有完整代码不?按着思路去码,但“管理员发布通告后即刻提醒在线的用户”这核心功能始终没法实现。水平有限,没看明白“static NotifyWebSocket notifyWebSocket = Aop.get(NotifyWebSocket.class);”这段代码的意思,请问有大神解释一下吗?

sky浪翻云

2019-01-02 12:42

@码农3号 static NotifyWebSocket notifyWebSocket = Aop.get(NotifyWebSocket.class);这行代码是类里面的,不是方法里面的

码农3号

2019-01-02 13:07

@sky浪翻云 嗯嗯,我知道。我想请教一下,这样拿出来的对象(notifyWebSocket)是什么对象,有什么用的?

sky浪翻云

2019-01-03 10:44

@码农3号 需要发送消息的时候通过aop拿到notifyWebSocket,然后调用sendMsg方法,传出userId及消息数据即可

码农3号

2019-01-03 13:15

@sky浪翻云 但我不知道为什么我拿出来的notifyWebsocket对象里面的userSocketSessionMap 是空的,所以根本发不出消息!

西门吹牛

2019-04-19 20:22

@JFinal mac 上用undertow启动报Permission denied怎么处理

JFinal

2019-04-20 00:33

@西门吹牛 mac 上默认不让用 80 端口,改成超过 1024 的端口号即可

或者百度搜索: mac 开启 80 端口

西门吹牛

2019-04-20 10:32

@JFinal 我用的是8888端口

JFinal

2019-04-20 10:50

@西门吹牛 Permission denied 与 jfinal 以及 undertow 肯定是无关的,找找别的原因

jfinal009

2019-09-10 21:48

@sky浪翻云 com.jzflow.cws.modules 请教这个些类,pom怎么定义?

renyc

2019-11-01 15:54

@JFinal
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
Map> headers = request.getHeaders();
List cookieList = headers.get("Cookie");
Map kv = MapUtil.newHashMap();
if(CollUtil.isNotEmpty(cookieList)) {
String cookieStr = cookieList.get(0);
String[] cookieArray = cookieStr.split("; ");
for(String cookie : cookieArray){
String[] cookieKv = cookie.split("=");
kv.put(cookieKv[0], cookieKv[1]);
}
}
String sessionId = kv.get(LoginService.sessionIdName);
sec.getUserProperties().put(String.class.getName(),sessionId);
}
List cookieList = headers.get("Cookie");
这段代码为什么拿不到Cookie信息?
运行环境为:
jfinal-undertow1.9
undertow-websockets-jsr2.0.25.Final
jfinal4.6

要输就输给追求

2020-05-11 15:53

项目停止的时候,会抛一个异常出来,怎么解决?