AppConfig.java
public static void main(String[] args) { //UndertowServer.start(MainConfig.class); UndertowServer.create(MainConfig.class) .configWeb( builder -> { // 配置 WebSocket,MyWebSocket 需使用 ServerEndpoint 注解 builder.addWebSocketEndpoint("com.xxx.xxx.common.websocket.NotifyWebSocket"); }) .start(); }
NotifyWebSocket.java
import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Map.Entry; import javax.websocket.EndpointConfig; import javax.websocket.OnClose; import javax.websocket.OnError; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import com.jfinal.aop.Aop; import com.jfinal.log.Log; import com.jzflow.cws.common.kit.DictKit; import com.jzflow.cws.common.model.SysUser; import com.jzflow.cws.common.model.SysUserPcMsg; import com.jzflow.cws.modules.my.msg.MyMsgService; import com.jzflow.cws.modules.sys.login.LoginService; import cn.hutool.core.util.StrUtil; @ServerEndpoint(value = "/socket.ws",configurator=GetCookieConfigurator.class) public class NotifyWebSocket { private static final Log log = Log.getLog(DictKit.class); static LoginService loginService = Aop.get(LoginService.class); static MyMsgService myMsgService = Aop.get(MyMsgService.class); private static final Map<String, Session> userSocketSessionMap; static { userSocketSessionMap = new HashMap<String, Session>(); } /** * 给单个用户发送消息 */ public void sendMsg(String userId, String message) { try { Session session = userSocketSessionMap.get(userId); if (session != null && session.isOpen()) { message(message, session); } } catch (Exception e) { log.error("WebSocket:用户[" + userId+ "]发送消息异常", e); } } /** * 接收消息和回复消息 * @param message * @param session */ @OnMessage public void message(String message, Session session) { //这里可以接收到前台JS发送的数据(接收消息后回复) //if("在吗?".equals(message)) { // session.getAsyncRemote().sendText("在的"); //} //也可建立连接后主动发消息 session.getAsyncRemote().sendText(message); } @OnOpen public void onOpen(Session session,EndpointConfig config) { String sessionId = (String) config.getUserProperties().get(String.class.getName()); if(StrUtil.isNotBlank(sessionId)) { SysUser user = loginService.getLoginUserWithSessionId(sessionId); if(user != null) { userSocketSessionMap.put(user.getId(), session); log.info("WebSocket:用户[" + user.getLoginName()+ "]已经建立连接"); int total = myMsgService.count(user.getId(), SysUserPcMsg.MARK_READ_NO); String message = "{\"status\":\"FIRST\",\"total\":" + total + "}"; sendMsg(user.getId(), message); } }else{ log.error("WebSocket:用户未登录,无法建立连接"); } } @OnClose public void onClose(Session session) throws Exception { log.info("WebSocket: " + session.getId() + "已经关闭"); Iterator<Entry<String, Session>> it = userSocketSessionMap.entrySet().iterator(); // 移除Socket会话 while (it.hasNext()) { Entry<String, Session> entry = it.next(); if (entry.getValue().getId().equals(session.getId())) { userSocketSessionMap.remove(entry.getKey()); log.debug("WebSocket会话已经移除:用户ID " + entry.getKey()); break; } } } @OnError public void onError(Session session, Throwable error) throws IOException { if (session.isOpen()) { session.close(); } Iterator<Entry<String, Session>> it = userSocketSessionMap.entrySet().iterator(); // 移除Socket会话 while (it.hasNext()) { Entry<String, Session> entry = it.next(); if (entry.getValue().getId().equals(session.getId())) { userSocketSessionMap.remove(entry.getKey()); log.debug("WebSocket会话已经移除:用户ID " + entry.getKey()); break; } } } }
GetCookieConfigurator.java
import java.util.List; import java.util.Map; import javax.servlet.http.HttpSession; import javax.websocket.HandshakeResponse; import javax.websocket.server.HandshakeRequest; import javax.websocket.server.ServerEndpointConfig; import javax.websocket.server.ServerEndpointConfig.Configurator; import com.jzflow.cws.modules.sys.login.LoginService; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.map.MapUtil; public class GetCookieConfigurator extends Configurator{ @Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { Map<String, List<String>> headers = request.getHeaders(); List<String> cookieList = headers.get("Cookie"); Map<String,String> kv = MapUtil.newHashMap(); if(CollUtil.isNotEmpty(cookieList)) { String cookieStr = cookieList.get(0); String[] cookieArray = cookieStr.split("; "); for(String cookie : cookieArray){ String[] cookieKv = cookie.split("="); kv.put(cookieKv[0], cookieKv[1]); } } String sessionId = kv.get(LoginService.sessionIdName); sec.getUserProperties().put(String.class.getName(),sessionId); } /** * 如果登录信息是存在session中的,不像jfinal-club通过cookie-session-id放在ehcache缓存中的可使用下面的方法 */ /*@Override public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) { HttpSession httpSession = (HttpSession)request.getHttpSession(); sec.getUserProperties().put(HttpSession.class.getName(),httpSession); }*/ }
使用场景:管理员发布通告后即刻提醒在线的用户
static NotifyWebSocket notifyWebSocket = Aop.get(NotifyWebSocket.class); JSONObject json = new JSONObject(); json.put("id", userPcMsg.getId()); json.put("title", userPcMsg.getTitle()); json.put("content", userPcMsg.getContent()); json.put("type", userPcMsg.getType()); json.put("sendTime", DateUtil.formatDateTime(userPcMsg.getSendTime())); json.put("readFlag", false); // 发送消息 notifyWebSocket.sendMsg(userId, json.toJSONString());
前台js,登录后进入首页后建立websocket连接
// 建立websocket连接 var socket = new WebSocket('ws://' + window.location.host + $.ctx + '/socket.ws'); console.log('ws://' + window.location.host + $.ctx + '/socket.ws'); socket.onopen = function () { //socket.send('发送数据'); }; socket.onmessage = function (event) { var data = JSON.parse(event.data); //得到后台发送的数据string或json格式 }; socket.onclose = function (event) { toastr.info('消息通知服务已关闭', event); };
pom.xml
<!-- undertow --> <dependency> <groupId>com.jfinal</groupId> <artifactId>jfinal-undertow</artifactId> <version>1.9</version> </dependency> <!-- WebSocket 支持 --> <dependency> <groupId>io.undertow</groupId> <artifactId>undertow-websockets-jsr</artifactId> <version>2.0.25.Final</version> </dependency>
代码写的比较粗糙后面如有有什么建议或优雅的写法可以@我