AppConfig.java
public static void main(String[] args) {
//UndertowServer.start(MainConfig.class);
UndertowServer.create(MainConfig.class)
.configWeb( builder -> {
// 配置 WebSocket,MyWebSocket 需使用 ServerEndpoint 注解
builder.addWebSocketEndpoint("com.xxx.xxx.common.websocket.NotifyWebSocket");
})
.start();
}NotifyWebSocket.java
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import javax.websocket.EndpointConfig;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import com.jfinal.aop.Aop;
import com.jfinal.log.Log;
import com.jzflow.cws.common.kit.DictKit;
import com.jzflow.cws.common.model.SysUser;
import com.jzflow.cws.common.model.SysUserPcMsg;
import com.jzflow.cws.modules.my.msg.MyMsgService;
import com.jzflow.cws.modules.sys.login.LoginService;
import cn.hutool.core.util.StrUtil;
@ServerEndpoint(value = "/socket.ws",configurator=GetCookieConfigurator.class)
public class NotifyWebSocket {
private static final Log log = Log.getLog(DictKit.class);
static LoginService loginService = Aop.get(LoginService.class);
static MyMsgService myMsgService = Aop.get(MyMsgService.class);
private static final Map<String, Session> userSocketSessionMap;
static {
userSocketSessionMap = new HashMap<String, Session>();
}
/**
* 给单个用户发送消息
*/
public void sendMsg(String userId, String message) {
try {
Session session = userSocketSessionMap.get(userId);
if (session != null && session.isOpen()) {
message(message, session);
}
} catch (Exception e) {
log.error("WebSocket:用户[" + userId+ "]发送消息异常", e);
}
}
/**
* 接收消息和回复消息
* @param message
* @param session
*/
@OnMessage
public void message(String message, Session session) {
//这里可以接收到前台JS发送的数据(接收消息后回复)
//if("在吗?".equals(message)) {
// session.getAsyncRemote().sendText("在的");
//}
//也可建立连接后主动发消息
session.getAsyncRemote().sendText(message);
}
@OnOpen
public void onOpen(Session session,EndpointConfig config) {
String sessionId = (String) config.getUserProperties().get(String.class.getName());
if(StrUtil.isNotBlank(sessionId)) {
SysUser user = loginService.getLoginUserWithSessionId(sessionId);
if(user != null) {
userSocketSessionMap.put(user.getId(), session);
log.info("WebSocket:用户[" + user.getLoginName()+ "]已经建立连接");
int total = myMsgService.count(user.getId(), SysUserPcMsg.MARK_READ_NO);
String message = "{\"status\":\"FIRST\",\"total\":" + total + "}";
sendMsg(user.getId(), message);
}
}else{
log.error("WebSocket:用户未登录,无法建立连接");
}
}
@OnClose
public void onClose(Session session) throws Exception {
log.info("WebSocket: " + session.getId() + "已经关闭");
Iterator<Entry<String, Session>> it = userSocketSessionMap.entrySet().iterator();
// 移除Socket会话
while (it.hasNext()) {
Entry<String, Session> entry = it.next();
if (entry.getValue().getId().equals(session.getId())) {
userSocketSessionMap.remove(entry.getKey());
log.debug("WebSocket会话已经移除:用户ID " + entry.getKey());
break;
}
}
}
@OnError
public void onError(Session session, Throwable error) throws IOException {
if (session.isOpen()) {
session.close();
}
Iterator<Entry<String, Session>> it = userSocketSessionMap.entrySet().iterator();
// 移除Socket会话
while (it.hasNext()) {
Entry<String, Session> entry = it.next();
if (entry.getValue().getId().equals(session.getId())) {
userSocketSessionMap.remove(entry.getKey());
log.debug("WebSocket会话已经移除:用户ID " + entry.getKey());
break;
}
}
}
}GetCookieConfigurator.java
import java.util.List;
import java.util.Map;
import javax.servlet.http.HttpSession;
import javax.websocket.HandshakeResponse;
import javax.websocket.server.HandshakeRequest;
import javax.websocket.server.ServerEndpointConfig;
import javax.websocket.server.ServerEndpointConfig.Configurator;
import com.jzflow.cws.modules.sys.login.LoginService;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.map.MapUtil;
public class GetCookieConfigurator extends Configurator{
@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
Map<String, List<String>> headers = request.getHeaders();
List<String> cookieList = headers.get("Cookie");
Map<String,String> kv = MapUtil.newHashMap();
if(CollUtil.isNotEmpty(cookieList)) {
String cookieStr = cookieList.get(0);
String[] cookieArray = cookieStr.split("; ");
for(String cookie : cookieArray){
String[] cookieKv = cookie.split("=");
kv.put(cookieKv[0], cookieKv[1]);
}
}
String sessionId = kv.get(LoginService.sessionIdName);
sec.getUserProperties().put(String.class.getName(),sessionId);
}
/**
* 如果登录信息是存在session中的,不像jfinal-club通过cookie-session-id放在ehcache缓存中的可使用下面的方法
*/
/*@Override
public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
HttpSession httpSession = (HttpSession)request.getHttpSession();
sec.getUserProperties().put(HttpSession.class.getName(),httpSession);
}*/
}使用场景:管理员发布通告后即刻提醒在线的用户
static NotifyWebSocket notifyWebSocket = Aop.get(NotifyWebSocket.class);
JSONObject json = new JSONObject();
json.put("id", userPcMsg.getId());
json.put("title", userPcMsg.getTitle());
json.put("content", userPcMsg.getContent());
json.put("type", userPcMsg.getType());
json.put("sendTime", DateUtil.formatDateTime(userPcMsg.getSendTime()));
json.put("readFlag", false);
// 发送消息
notifyWebSocket.sendMsg(userId, json.toJSONString());前台js,登录后进入首页后建立websocket连接
// 建立websocket连接
var socket = new WebSocket('ws://' + window.location.host + $.ctx + '/socket.ws');
console.log('ws://' + window.location.host + $.ctx + '/socket.ws');
socket.onopen = function () {
//socket.send('发送数据');
};
socket.onmessage = function (event) {
var data = JSON.parse(event.data);
//得到后台发送的数据string或json格式
};
socket.onclose = function (event) {
toastr.info('消息通知服务已关闭', event);
};pom.xml
<!-- undertow --> <dependency> <groupId>com.jfinal</groupId> <artifactId>jfinal-undertow</artifactId> <version>1.9</version> </dependency> <!-- WebSocket 支持 --> <dependency> <groupId>io.undertow</groupId> <artifactId>undertow-websockets-jsr</artifactId> <version>2.0.25.Final</version> </dependency>
代码写的比较粗糙后面如有有什么建议或优雅的写法可以@我