官网对Websocket 多有介绍,这里就不在阐述他们有什么区别了。站内信一般都推荐使用SSE 技术的。
昨天又有社友问到我这边了,说 JF 里面的SseEmitter 没弄明白,之前分享的例子也没弄懂。
咋和登录用户关联起来,又咋给指定用户发消息。
(附知乎图)
练习 demo的话,聊天室绝对是个好例子,互通私发,群发都用到了。
还是使用JFinal 官网的 demo 做例子,项目源码文件在app栏下载。
1、对UndertowServer 开启处理异步请求功能: setAsyncSupported(true)
/**
* 启动入口,运行此 main 方法可以启动项目,此 main 方法可以放置在任意的 Class 类定义中,不一定要放于此
*/
public static void main(String[] args) {
UndertowServer.create(DemoConfig.class).onDeploy((cl, di) ->
di.getFilters().get("jfinal").setAsyncSupported(true)).start();
}PS:项目要是 Tomcat 运行的话需要配置web.xml 里<async-supported>true</async-supported>开启处理异步请求。
(我在Tomcat8.5.73下正常运行 测试通过)
<filter> <filter-name>jfinal</filter-name> <filter-class>com.jfinal.core.JFinalFilter</filter-class> <async-supported>true</async-supported> </filter>
2、增加一个用户与AsyncContext的对应关系管理的 MAP 工具 SseEmitterKit :
package com.demo.common.kit;
import com.jfinal.core.Const;
import com.jfinal.core.Controller;
import com.jfinal.kit.JsonKit;
import com.jfinal.kit.LogKit;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* SSE消息发送工具类 v1.0.1
* @author 杜福忠
*/
@SuppressWarnings({"unused", "UnusedReturnValue"})
public class SseEmitterKit {
private static final Map<String, AsyncContext> sseMap = new ConcurrentHashMap<>();
public static AsyncContext get(String user) {
return sseMap.get(user);
}
public static Set<String> getUsers() {
return sseMap.keySet();
}
/**
* 开启sse连接
* @param user 用户名
* @param c Controller
* @return AsyncContext
*/
public static AsyncContext startAsync(String user, Controller c) {
Objects.requireNonNull(user, "user can not be null");
c.renderNull();
HttpServletResponse response = c.getResponse();
response.setContentType("text/event-stream");
response.setCharacterEncoding(Const.DEFAULT_ENCODING);
response.setHeader("Content-Type", "text/event-stream; charset:utf-8");
response.setHeader("Cache-Control", "no-cache");
response.setHeader("Connection", "keep-alive");
AsyncContext ac = c.getRequest().startAsync();
// 默认1小时超时
ac.setTimeout(60 * 60 * 1000);
remove(user);
sseMap.put(user, ac);
ac.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) {
SseEmitterKit.remove(user);
}
@Override
public void onTimeout(AsyncEvent event) {}
@Override
public void onError(AsyncEvent event) {
SseEmitterKit.remove(user);
}
@Override
public void onStartAsync(AsyncEvent event) {}
});
return ac;
}
public static void remove(String user) {
if (user != null) {
AsyncContext ac = sseMap.remove(user);
if (ac != null) {
ac.complete();
}
}
}
public static boolean sendJsonMessage(String user, Object data) {
String dataStr = String.format("data: %s\n\n", toJson(data));
return sendMessage(user, dataStr);
}
public static boolean sendJsonMessage(String user, String event, Object data) {
String dataStr = String.format("event: %s\ndata: %s\n\n", event, toJson(data));
return sendMessage(user, dataStr);
}
public static boolean sendJsonMessage(String user, Integer id, Object data) {
String dataStr = String.format("id: %d\ndata: %s\n\n", id, toJson(data));
return sendMessage(user, dataStr);
}
/**
* 发送消息
* @param user 接收者
* @param id 消息 ID
* @param event 事件
* @param data json消息内容
* @return 发送成功返回true,失败返回false
*/
public static boolean sendJsonMessage(String user, Integer id, String event, Object data) {
String dataStr = String.format("id: %d\nevent: %s\ndata: %s\n\n", id, event, toJson(data));
return sendMessage(user, dataStr);
}
private static String toJson(Object data) {
if (data == null){
return "";
}
return data instanceof String ? (String) data : JsonKit.toJson(data);
}
/**
* 发送消息
* @param user 用户
* @param dataStr 消息内容(需做格式化)
* @return 发送成功返回true,失败返回false
*/
public static boolean sendMessage(String user, String dataStr) {
AsyncContext ac = get(user);
if (ac != null) {
try {
PrintWriter writer = ac.getResponse().getWriter();
writer.write(dataStr);
writer.flush();
return true;
} catch (IOException e) {
LogKit.error(e.getMessage(), e);
}
}
return false;
}
}可以看见工具很简单:
开启sse连接 》SseEmitterKit.startAsync
发送 json 消息 》SseEmitterKit.sendJsonMessage
关闭sse连接 》SseEmitterKit.remove
三板斧方便使用~
PS:JF里面自带的SseEmitter适用一轮任务多次向客户端输出的场景,比如现流行的AI流输出。
像登录用户站内协作信息通知、多人协作功能等,开启异步请求会更好,不阻塞处理。
接下来上例子聊天室代码:
1、为了演示方便直接在IndexController中加了三板斧的使用:
@Path(value = "/", viewPath = "/index")
public class IndexController extends Controller {
public void index() {
render("index.html");
}
/**
* /chat?name=a
*/
public void chat() {
// 这里用 name 做键名演示,实际业务可能是用户ID,或登录id之类的
String name = get("name", "游客" + System.currentTimeMillis());
setSessionAttr("sseName", name);
setAttr("name", name);
render("chat_sse.html");
}
/**
* 创建 SseEmitter
*/
public void sse() {
String name = getSessionAttr("sseName");
// 开启sse连接
SseEmitterKit.startAsync(name, this);
// 通知其他客户端有新用户加入
Set<String> users = SseEmitterKit.getUsers();
String data = Kv.by("msg", "哈喽~大家好!元旦快乐~").set("name", name).set("users", users).toJson();
for (String user : users) {
SseEmitterKit.sendJsonMessage(user, "addUser", data);
}
}
public void sseClose() {
// 主动关闭连接
String name = getSessionAttr("sseName");
SseEmitterKit.remove(name);
renderJson(Ret.ok("OK"));
}
/**
* 前端主动发送消息
*/
public void sseSend() {
String name = get("name", "ALL");
String msg = get("msg");
String sse_name = getSessionAttr("sseName");
// 发送客户端消息,这个是实时通知,同时业务上应该做好数据库持久化处理
// 增加 event 参数,是为了做一个 消息业务前缀,区分不同的业务通知。msg 可以是任何类型
// SseEmitterKit 任何位置都可以调用,比如系统产生的消息通知
String data = Kv.by("msg", msg).set("name", sse_name).set("sendALL", "ALL".equals(name)).toJson();
if ("ALL".equals(name)){
for (String user : SseEmitterKit.getUsers()) {
// 排除自己
if (! sse_name.equals(user)){
SseEmitterKit.sendJsonMessage(user, data);
}
}
renderJson(Ret.ok("OK"));
return;
}
boolean sent = SseEmitterKit.sendJsonMessage(name, data);
renderJson(sent ? Ret.ok("OK") : Ret.fail("不在线"));
}
}上前端页面:
#@layout()
#define main()
<h1>JFinal 开发者聊天室</h1>
<div class="table_box">
<p>欢迎来到 JFinal极速开发世界!</p>
<br>
<h1>我是:#(name)</h1>
<hr>
<div id="div_msg" style="white-space: pre-wrap;"></div>
<hr>
<br><br><br>
<div>
<h2>发送实时消息:</h2>
<label for="select_name">接收用户:</label>
<select id="select_name"><option>ALL</option></select>
<label for="input_msg">消息:</label><input id="input_msg" type="text" placeholder="请输入消息" />
<button type="button" id="send_button">发送消息</button>
<button type="button" id="close_button">下线</button>
</div>
<br><br><br>
<br><br><br>
</div>
<script>
window.onload = function () {
const obj = $("#div_msg");
var textAll = "";
const addText = function (head, text, sendALL) {
textAll += (sendALL?'世界':'私聊') + '[' + head + ']:' + text + '\n';;
obj.text(textAll);
};
// 接收实时消息
const eventSource = new EventSource('/sse');
eventSource.addEventListener('message', function (event) {
console.log('收到消息:', event);
var kv = JSON.parse(event.data);
addText(kv.name, kv.msg, kv.sendALL);
});
eventSource.addEventListener('addUser', function (event) {
console.log('收到addUser消息:', event);
var kv = JSON.parse(event.data);
addText(kv.name, kv.msg, true);
var options = '<option>ALL</option>';
for(let i = 0; i < kv.users.length; i++){
options += '<option>' + kv.users[i] + '</option>';
}
$("#select_name").html(options);
});
eventSource.addEventListener('error', function (event) {
console.log('关闭:', event);
// 这里不关闭时,浏览器可能会重新连接SSE
eventSource.close();
});
// 下线
const sseClose = function () {
$.get("/sseClose");
eventSource.close();
}
window.onbeforeunload = sseClose;
$("#close_button").click(sseClose);
// 发送实时消息
$("#send_button").click(function(){
var name = $("#select_name").val();
var msg = $("#input_msg").val();
$.post("/sseSend", {name: name, msg: msg}, function(ret){
if(ret.state == "ok") {
addText('我', msg);
}else{
addText('系统', ret.msg);
}
});
return false;
});
};
</script>
#end#@layout()
#define main()
<h1>EventSource 消息示例</h1>
<div class="table_box">
<p>欢迎来到 JFinal极速开发世界!</p>
<br><br>
本Demo采用 JFinal Template 作为视图文件。
<br/><br><br>
<label for="name">用户名:</label><input type="text" id="name" value="#(name)">
<button type="button" id="start">进入聊天室</button>
<br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/>
<br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/>
</div>
<script>
$("#start").click(function(){
window.location.href = "/chat?name=" + $("#name").val();
});
</script>
#end启动一下DemoConfig看下效果:


使用多个浏览器,或者使用无痕模式多开几个窗口:
上面例子代码中使用了 自定义 event 消息头:addUser ,JS 中可捕获做相应的业务处理。
以及浏览器关闭时,自动关闭连接处理。
PS 线上部署一定要配置 nginx ,这里建议 sse 消息 统一使用 这个前缀,nginx 配置时方便切割单独配置:
server {
listen 80;
listen 443 ssl;
server_name 域名;
include /xxxx/nginx/_https_demo.txt;
location /sse {
proxy_http_version 1.1;
proxy_set_header Connection "";
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# SSE 连接时的超时时间 1天
proxy_read_timeout 86400s;
# 取消缓冲
proxy_buffering off;
# 关闭代理缓存
proxy_cache off;
# 禁用分块传输编码
#chunked_transfer_encoding off
# 反向代理到 SSE 应用的地址和端口
proxy_pass http://127.0.0.1:20035;
}
location / {
proxy_set_header Host $host;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_redirect off;
proxy_pass http://127.0.0.1:20035;
}
### 升级页面
error_page 500 502 503 504 /50x.html;
location = /50x.html {
default_type application/json;
return 200 '{"code":502,"msg":"try again later"}';
}
access_log off;
}
更多业务场景可加入JFinal_SAAS微信群沟通(微信:dufuzhong)
看到下面这个 onDeploy 用法:
UndertowServer.create(DemoConfig.class).onDeploy((cl, di) ->
di.getFilters().get("jfinal").setAsyncSupported(true)).start();
我自己在做了这个 onDeploy 功能之后就没用过,甚至完全忘记了,本以为没有什么用,没想到用上了,极好极好