官网对Websocket 多有介绍,这里就不在阐述他们有什么区别了。站内信一般都推荐使用SSE 技术的。
昨天又有社友问到我这边了,说 JF 里面的SseEmitter 没弄明白,之前分享的例子也没弄懂。
咋和登录用户关联起来,又咋给指定用户发消息。
(附知乎图)
练习 demo的话,聊天室绝对是个好例子,互通私发,群发都用到了。
还是使用JFinal 官网的 demo 做例子,项目源码文件在app栏下载。
1、对UndertowServer 开启处理异步请求功能: setAsyncSupported(true)
/** * 启动入口,运行此 main 方法可以启动项目,此 main 方法可以放置在任意的 Class 类定义中,不一定要放于此 */ public static void main(String[] args) { UndertowServer.create(DemoConfig.class).onDeploy((cl, di) -> di.getFilters().get("jfinal").setAsyncSupported(true)).start(); }
PS:项目要是 Tomcat 运行的话需要配置web.xml 里<async-supported>true</async-supported>开启处理异步请求
<filter> <filter-name>jfinal</filter-name> <filter-class>com.jfinal.core.JFinalFilter</filter-class> <async-supported>true</async-supported> </filter>
2、增加一个用户与AsyncContext的对应关系管理的 MAP 工具 SseEmitterKit :
package com.demo.common.kit; import com.jfinal.core.Const; import com.jfinal.core.Controller; import com.jfinal.kit.JsonKit; import com.jfinal.kit.LogKit; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.http.HttpServletResponse; import java.io.IOException; import java.io.PrintWriter; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * SSE消息发送工具类 * @author 杜福忠 */ @SuppressWarnings({"unused", "UnusedReturnValue"}) public class SseEmitterKit { private static final Map<String, AsyncContext> sseMap = new ConcurrentHashMap<>(); public static AsyncContext get(String user) { return sseMap.get(user); } public static Set<String> getUsers() { return sseMap.keySet(); } /** * 开启sse连接 * @param user 用户名 * @param c Controller * @return AsyncContext */ public static AsyncContext startAsync(String user, Controller c) { c.renderNull(); HttpServletResponse response = c.getResponse(); response.setContentType("text/event-stream"); response.setCharacterEncoding(Const.DEFAULT_ENCODING); response.setHeader("Content-Type", "text/event-stream; charset:utf-8"); response.setHeader("Cache-Control", "no-cache"); response.setHeader("Connection", "keep-alive"); AsyncContext ac = c.getRequest().startAsync(); // 默认1小时超时 ac.setTimeout(60 * 60 * 1000); sseMap.put(user, ac); ac.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent event) { SseEmitterKit.remove(user); } @Override public void onTimeout(AsyncEvent event) {} @Override public void onError(AsyncEvent event) { SseEmitterKit.remove(user); } @Override public void onStartAsync(AsyncEvent event) {} }); return ac; } public static void remove(String user) { if (user != null) { AsyncContext ac = sseMap.remove(user); if (ac != null) { ac.complete(); } } } public static boolean sendJsonMessage(String user, Object data) { String dataStr = String.format("data: %s\n\n", toJson(data)); return sendMessage(user, dataStr); } public static boolean sendJsonMessage(String user, String event, Object data) { String dataStr = String.format("event: %s\ndata: %s\n\n", event, toJson(data)); return sendMessage(user, dataStr); } public static boolean sendJsonMessage(String user, Integer id, Object data) { String dataStr = String.format("id: %d\ndata: %s\n\n", id, toJson(data)); return sendMessage(user, dataStr); } /** * 发送消息 * @param user 接收者 * @param id 消息 ID * @param event 事件 * @param data json消息内容 * @return 发送成功返回true,失败返回false */ public static boolean sendJsonMessage(String user, Integer id, String event, Object data) { String dataStr = String.format("id: %d\nevent: %s\ndata: %s\n\n", id, event, toJson(data)); return sendMessage(user, dataStr); } private static String toJson(Object data) { if (data == null){ return ""; } return data instanceof String ? (String) data : JsonKit.toJson(data); } /** * 发送消息 * @param user 用户 * @param dataStr 消息内容(需做格式化) * @return 发送成功返回true,失败返回false */ public static boolean sendMessage(String user, String dataStr) { AsyncContext ac = get(user); if (ac != null) { try { PrintWriter writer = ac.getResponse().getWriter(); writer.write(dataStr); writer.flush(); return true; } catch (IOException e) { LogKit.error(e.getMessage(), e); } } return false; } }
可以看见工具很简单:
开启sse连接 》SseEmitterKit.startAsync
发送 json 消息 》SseEmitterKit.sendJsonMessage
关闭sse连接 》SseEmitterKit.remove
三板斧方便使用~
PS:JF里面自带的SseEmitter适用一轮任务多次向客户端输出的场景,比如现流行的AI流输出。
像登录用户站内协作信息通知、多人协作功能等,开启异步请求会更好,不阻塞处理。
接下来上例子聊天室代码:
1、为了演示方便直接在IndexController中加了三板斧的使用:
@Path(value = "/", viewPath = "/index") public class IndexController extends Controller { public void index() { render("index.html"); } /** * /chat?name=a */ public void chat() { // 这里用 name 做键名演示,实际业务可能是用户ID,或登录id之类的 String name = get("name", "游客" + System.currentTimeMillis()); setSessionAttr("sseName", name); setAttr("name", name); render("chat_sse.html"); } /** * 创建 SseEmitter */ public void sse() { String name = getSessionAttr("sseName"); // 开启sse连接 SseEmitterKit.startAsync(name, this); // 通知其他客户端有新用户加入 Set<String> users = SseEmitterKit.getUsers(); String data = Kv.by("msg", "哈喽~大家好!元旦快乐~").set("name", name).set("users", users).toJson(); for (String user : users) { SseEmitterKit.sendJsonMessage(user, "addUser", data); } } public void sseClose() { // 主动关闭连接 String name = getSessionAttr("sseName"); SseEmitterKit.remove(name); renderJson(Ret.ok("OK")); } /** * 前端主动发送消息 */ public void sseSend() { String name = get("name", "ALL"); String msg = get("msg"); String sse_name = getSessionAttr("sseName"); // 发送客户端消息,这个是实时通知,同时业务上应该做好数据库持久化处理 // 增加 event 参数,是为了做一个 消息业务前缀,区分不同的业务通知。msg 可以是任何类型 // SseEmitterKit 任何位置都可以调用,比如系统产生的消息通知 String data = Kv.by("msg", msg).set("name", sse_name).set("sendALL", "ALL".equals(name)).toJson(); if ("ALL".equals(name)){ for (String user : SseEmitterKit.getUsers()) { // 排除自己 if (! sse_name.equals(user)){ SseEmitterKit.sendJsonMessage(user, data); } } renderJson(Ret.ok("OK")); return; } boolean sent = SseEmitterKit.sendJsonMessage(name, data); renderJson(sent ? Ret.ok("OK") : Ret.fail("不在线")); } }
上前端页面:
#@layout() #define main() <h1>JFinal 开发者聊天室</h1> <div class="table_box"> <p>欢迎来到 JFinal极速开发世界!</p> <br> <h1>我是:#(name)</h1> <hr> <div id="div_msg" style="white-space: pre-wrap;"></div> <hr> <br><br><br> <div> <h2>发送实时消息:</h2> <label for="select_name">接收用户:</label> <select id="select_name"><option>ALL</option></select> <label for="input_msg">消息:</label><input id="input_msg" type="text" placeholder="请输入消息" /> <button type="button" id="send_button">发送消息</button> <button type="button" id="close_button">下线</button> </div> <br><br><br> <br><br><br> </div> <script> window.onload = function () { const obj = $("#div_msg"); var textAll = ""; const addText = function (head, text, sendALL) { textAll += (sendALL?'世界':'私聊') + '[' + head + ']:' + text + '\n';; obj.text(textAll); }; // 接收实时消息 const eventSource = new EventSource('/sse'); eventSource.addEventListener('message', function (event) { console.log('收到消息:', event); var kv = JSON.parse(event.data); addText(kv.name, kv.msg, kv.sendALL); }); eventSource.addEventListener('addUser', function (event) { console.log('收到addUser消息:', event); var kv = JSON.parse(event.data); addText(kv.name, kv.msg, true); var options = '<option>ALL</option>'; for(let i = 0; i < kv.users.length; i++){ options += '<option>' + kv.users[i] + '</option>'; } $("#select_name").html(options); }); eventSource.addEventListener('error', function (event) { console.log('关闭:', event); // 这里不关闭时,浏览器可能会重新连接SSE eventSource.close(); }); // 下线 const sseClose = function () { $.get("/sseClose"); eventSource.close(); } window.onbeforeunload = sseClose; $("#close_button").click(sseClose); // 发送实时消息 $("#send_button").click(function(){ var name = $("#select_name").val(); var msg = $("#input_msg").val(); $.post("/sseSend", {name: name, msg: msg}, function(ret){ if(ret.state == "ok") { addText('我', msg); }else{ addText('系统', ret.msg); } }); return false; }); }; </script> #end
#@layout() #define main() <h1>EventSource 消息示例</h1> <div class="table_box"> <p>欢迎来到 JFinal极速开发世界!</p> <br><br> 本Demo采用 JFinal Template 作为视图文件。 <br/><br><br> <label for="name">用户名:</label><input type="text" id="name" value="#(name)"> <button type="button" id="start">进入聊天室</button> <br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/> <br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/> </div> <script> $("#start").click(function(){ window.location.href = "/chat?name=" + $("#name").val(); }); </script> #end
启动一下DemoConfig看下效果:
使用多个浏览器,或者使用无痕模式多开几个窗口:
上面例子代码中使用了 自定义 event 消息头:addUser ,JS 中可捕获做相应的业务处理。
以及浏览器关闭时,自动关闭连接处理。
PS:nginx
server { listen 80; listen 443 ssl; server_name 域名; include /xxxx/nginx/_https_demo.txt; location /sse { proxy_http_version 1.1; proxy_set_header Connection ""; proxy_set_header X-Real-IP $remote_addr; proxy_set_header Host $host; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; # SSE 连接时的超时时间 1天 proxy_read_timeout 86400s; # 取消缓冲 proxy_buffering off; # 关闭代理缓存 proxy_cache off; # 禁用分块传输编码 #chunked_transfer_encoding off # 反向代理到 SSE 应用的地址和端口 proxy_pass http://127.0.0.1:20035; } location / { proxy_set_header Host $host; proxy_set_header X-Forwarded-For $remote_addr; proxy_redirect off; proxy_pass http://127.0.0.1:20035; } ### 升级页面 error_page 500 502 503 504 /50x.html; location = /50x.html { default_type application/json; return 200 '{"code":502,"msg":"try again later"}'; } access_log off; }
更多业务场景可加入JFinal_SAAS微信群沟通(微信:dufuzhong)
看到下面这个 onDeploy 用法:
UndertowServer.create(DemoConfig.class).onDeploy((cl, di) ->
di.getFilters().get("jfinal").setAsyncSupported(true)).start();
我自己在做了这个 onDeploy 功能之后就没用过,甚至完全忘记了,本以为没有什么用,没想到用上了,极好极好