JFinal使用技巧-基于SSE 的客户端消息通知

官网对Websocket 多有介绍,这里就不在阐述他们有什么区别了。站内信一般都推荐使用SSE 技术的。

昨天又有社友问到我这边了,说 JF 里面的SseEmitter 没弄明白,之前分享的例子也没弄懂。
咋和登录用户关联起来,又咋给指定用户发消息。

(附知乎图)

练习 demo的话,聊天室绝对是个好例子,互通私发,群发都用到了。
还是使用JFinal 官网的 demo 做例子,项目源码文件在app栏下载

1、对UndertowServer 开启处理异步请求功能: setAsyncSupported(true)

/**
 * 启动入口,运行此 main 方法可以启动项目,此 main 方法可以放置在任意的 Class 类定义中,不一定要放于此
 */
public static void main(String[] args) {
    UndertowServer.create(DemoConfig.class).onDeploy((cl, di) ->
          di.getFilters().get("jfinal").setAsyncSupported(true)).start();
}

PS:项目要是 Tomcat 运行的话需要配置web.xml 里<async-supported>true</async-supported>开启处理异步请求

<filter>
<filter-name>jfinal</filter-name>
<filter-class>com.jfinal.core.JFinalFilter</filter-class>
<async-supported>true</async-supported>
</filter>


2、增加一个用户与AsyncContext的对应关系管理的 MAP 工具 SseEmitterKit :

package com.demo.common.kit;

import com.jfinal.core.Const;
import com.jfinal.core.Controller;
import com.jfinal.kit.JsonKit;
import com.jfinal.kit.LogKit;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * SSE消息发送工具类
 * @author 杜福忠
 */
@SuppressWarnings({"unused", "UnusedReturnValue"})
public class SseEmitterKit {
    private static final Map<String, AsyncContext> sseMap = new ConcurrentHashMap<>();
    public static AsyncContext get(String user) {
        return sseMap.get(user);
    }
    public static Set<String> getUsers() {
        return sseMap.keySet();
    }

    /**
     * 开启sse连接
     * @param user  用户名
     * @param c  Controller
     * @return  AsyncContext
     */
    public static AsyncContext startAsync(String user, Controller c) {
        c.renderNull();
        HttpServletResponse response = c.getResponse();
        response.setContentType("text/event-stream");
        response.setCharacterEncoding(Const.DEFAULT_ENCODING);
        response.setHeader("Content-Type", "text/event-stream; charset:utf-8");
        response.setHeader("Cache-Control", "no-cache");
        response.setHeader("Connection", "keep-alive");
        AsyncContext ac = c.getRequest().startAsync();
        // 默认1小时超时
        ac.setTimeout(60 * 60 * 1000);
        sseMap.put(user, ac);
        ac.addListener(new AsyncListener() {
            @Override
            public void onComplete(AsyncEvent event) {
                SseEmitterKit.remove(user);
            }
            @Override
            public void onTimeout(AsyncEvent event) {}
            @Override
            public void onError(AsyncEvent event) {
                SseEmitterKit.remove(user);
            }
            @Override
            public void onStartAsync(AsyncEvent event) {}
        });
        return ac;
    }

    public static void remove(String user) {
        if (user != null) {
            AsyncContext ac = sseMap.remove(user);
            if (ac != null) {
                ac.complete();
            }
        }
    }

    public static boolean sendJsonMessage(String user, Object data) {
        String dataStr = String.format("data: %s\n\n", toJson(data));
        return sendMessage(user, dataStr);
    }

    public static boolean sendJsonMessage(String user, String event, Object data) {
        String dataStr = String.format("event: %s\ndata: %s\n\n", event, toJson(data));
        return sendMessage(user, dataStr);
    }
    public static boolean sendJsonMessage(String user, Integer id, Object data) {
        String dataStr = String.format("id: %d\ndata: %s\n\n", id, toJson(data));
        return sendMessage(user, dataStr);
    }

    /**
     * 发送消息
     * @param user  接收者
     * @param id  消息 ID
     * @param event  事件
     * @param data  json消息内容
     * @return 发送成功返回true,失败返回false
     */
    public static boolean sendJsonMessage(String user, Integer id, String event, Object data) {
        String dataStr = String.format("id: %d\nevent: %s\ndata: %s\n\n", id, event, toJson(data));
        return sendMessage(user, dataStr);
    }

    private static String toJson(Object data) {
        if (data == null){
            return "";
        }
        return data instanceof String ? (String) data : JsonKit.toJson(data);
    }

    /**
     * 发送消息
     * @param user 用户
     * @param dataStr 消息内容(需做格式化)
     * @return 发送成功返回true,失败返回false
     */
    public static boolean sendMessage(String user, String dataStr) {
        AsyncContext ac = get(user);
        if (ac != null) {
            try {
                PrintWriter writer = ac.getResponse().getWriter();
                writer.write(dataStr);
                writer.flush();
                return true;
            } catch (IOException e) {
                LogKit.error(e.getMessage(), e);
            }
        }
        return false;
    }
}

可以看见工具很简单:
开启sse连接 》SseEmitterKit.startAsync
发送 json 消息 》SseEmitterKit.sendJsonMessage
关闭sse连接 》SseEmitterKit.remove
三板斧方便使用~
PS:JF里面自带的SseEmitter适用一轮任务多次向客户端输出的场景,比如现流行的AI流输出。
像登录用户站内协作信息通知、多人协作功能等,开启异步请求会更好,不阻塞处理。



接下来上例子聊天室代码:

1、为了演示方便直接在IndexController中加了三板斧的使用

@Path(value = "/", viewPath = "/index")
public class IndexController extends Controller {

    public void index() {
       render("index.html");
    }

    /**
     * /chat?name=a
     */
    public void chat() {
       // 这里用 name 做键名演示,实际业务可能是用户ID,或登录id之类的
       String name = get("name", "游客" + System.currentTimeMillis());
       setSessionAttr("sseName", name);
       setAttr("name", name);
       render("chat_sse.html");
    }

    /**
     * 创建 SseEmitter
     */
    public void sse() {
       String name = getSessionAttr("sseName");
       // 开启sse连接
       SseEmitterKit.startAsync(name, this);
       // 通知其他客户端有新用户加入
       Set<String> users = SseEmitterKit.getUsers();
       String data = Kv.by("msg", "哈喽~大家好!元旦快乐~").set("name", name).set("users", users).toJson();
       for (String user : users) {
          SseEmitterKit.sendJsonMessage(user, "addUser", data);
       }
    }

    public void sseClose() {
       // 主动关闭连接
       String name = getSessionAttr("sseName");
       SseEmitterKit.remove(name);
       renderJson(Ret.ok("OK"));
    }

    /**
     * 前端主动发送消息
     */
    public void sseSend() {
       String name = get("name", "ALL");
       String msg = get("msg");
       String sse_name = getSessionAttr("sseName");
       // 发送客户端消息,这个是实时通知,同时业务上应该做好数据库持久化处理
       // 增加 event 参数,是为了做一个 消息业务前缀,区分不同的业务通知。msg 可以是任何类型
       // SseEmitterKit 任何位置都可以调用,比如系统产生的消息通知
       String data = Kv.by("msg", msg).set("name", sse_name).set("sendALL", "ALL".equals(name)).toJson();
       if ("ALL".equals(name)){
          for (String user : SseEmitterKit.getUsers()) {
             // 排除自己
             if (! sse_name.equals(user)){
                SseEmitterKit.sendJsonMessage(user, data);
             }
          }
          renderJson(Ret.ok("OK"));
          return;
       }
       boolean sent = SseEmitterKit.sendJsonMessage(name, data);
       renderJson(sent ? Ret.ok("OK") : Ret.fail("不在线"));
    }

}


上前端页面:

#@layout()
#define main()
<h1>JFinal 开发者聊天室</h1>
<div class="table_box">
    <p>欢迎来到 JFinal极速开发世界!</p>

    <br>

    <h1>我是:#(name)</h1>
    <hr>
    <div id="div_msg" style="white-space: pre-wrap;"></div>
    <hr>
    <br><br><br>
    <div>
        <h2>发送实时消息:</h2>
        <label for="select_name">接收用户:</label>
        <select id="select_name"><option>ALL</option></select>
        <label for="input_msg">消息:</label><input id="input_msg" type="text" placeholder="请输入消息" />
        <button type="button" id="send_button">发送消息</button>
        <button type="button" id="close_button">下线</button>
    </div>
    <br><br><br>
    <br><br><br>
</div>
<script>
    window.onload = function () {
        const obj = $("#div_msg");
        var textAll = "";
        const addText = function (head, text, sendALL) {
            textAll += (sendALL?'世界':'私聊') + '[' + head + ']:' + text + '\n';;
            obj.text(textAll);
        };
        // 接收实时消息
        const eventSource = new EventSource('/sse');
        eventSource.addEventListener('message', function (event) {
            console.log('收到消息:', event);
            var kv = JSON.parse(event.data);
            addText(kv.name, kv.msg, kv.sendALL);
        });
        eventSource.addEventListener('addUser', function (event) {
            console.log('收到addUser消息:', event);
            var kv = JSON.parse(event.data);
            addText(kv.name, kv.msg, true);
            var options = '<option>ALL</option>';
            for(let i = 0; i < kv.users.length; i++){
                options += '<option>' + kv.users[i] + '</option>';
            }
            $("#select_name").html(options);
        });
        eventSource.addEventListener('error', function (event) {
            console.log('关闭:', event);
            // 这里不关闭时,浏览器可能会重新连接SSE
            eventSource.close();
        });

        // 下线
        const sseClose = function () {
            $.get("/sseClose");
            eventSource.close();
        }
        window.onbeforeunload = sseClose;
        $("#close_button").click(sseClose);

        // 发送实时消息
        $("#send_button").click(function(){
            var name = $("#select_name").val();
            var msg = $("#input_msg").val();
            $.post("/sseSend", {name: name, msg: msg}, function(ret){
                if(ret.state == "ok") {
                    addText('我', msg);
                }else{
                    addText('系统', ret.msg);
                }
            });
            return false;
        });
    };
</script>
#end
#@layout()
#define main()
<h1>EventSource 消息示例</h1>
<div class="table_box">
    <p>欢迎来到 JFinal极速开发世界!</p>
    
    <br><br>
    
    本Demo采用 JFinal Template 作为视图文件。
    <br/><br><br>
    <label for="name">用户名:</label><input type="text" id="name" value="#(name)">
    <button type="button" id="start">进入聊天室</button>

    <br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/>
    <br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/><br/>
</div>
<script>
    $("#start").click(function(){
       window.location.href = "/chat?name=" + $("#name").val();
    });
</script>
#end


启动一下DemoConfig看下效果:

image.png

image.png
使用多个浏览器,或者使用无痕模式多开几个窗口:
image.png
上面例子代码中使用了 自定义 event 消息头:addUser ,JS 中可捕获做相应的业务处理。
以及浏览器关闭时,自动关闭连接处理。

PS:nginx

server {
    listen 80;
    listen 443 ssl;
    
    server_name 域名;
        
    include /xxxx/nginx/_https_demo.txt;
    
    location /sse {
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        # SSE 连接时的超时时间 1天
        proxy_read_timeout 86400s;
        # 取消缓冲
        proxy_buffering off;
        # 关闭代理缓存
        proxy_cache off;
        # 禁用分块传输编码
        #chunked_transfer_encoding off
        # 反向代理到 SSE 应用的地址和端口
        proxy_pass http://127.0.0.1:20035;
    }
    
    location / {
        proxy_set_header Host $host;
        proxy_set_header X-Forwarded-For $remote_addr;
        proxy_redirect  off;
        proxy_pass http://127.0.0.1:20035;
    }
    
    ### 升级页面
    error_page   500 502 503 504  /50x.html;
    location = /50x.html {
        default_type application/json;
        return 200 '{"code":502,"msg":"try again later"}';
    }
    access_log off;
}


更多业务场景可加入JFinal_SAAS微信群沟通(微信:dufuzhong)
image.png

评论区

JFinal

2025-01-01 12:20

SseEmitter 我也很少用,先点赞收藏

看到下面这个 onDeploy 用法:
UndertowServer.create(DemoConfig.class).onDeploy((cl, di) ->
di.getFilters().get("jfinal").setAsyncSupported(true)).start();

我自己在做了这个 onDeploy 功能之后就没用过,甚至完全忘记了,本以为没有什么用,没想到用上了,极好极好

杜福忠

2025-01-01 12:34

@JFinal 😄功能丰富,代码简洁,一个钩子办大事~ APP 还没放开 https://jfinal.com/app/10058

JFinal

2025-01-01 12:37

@杜福忠 万万没想到 onDeploy 还能用上,这个我当时做了点前瞻性设计,也不知道是不是真有用

北流家园网

2025-01-03 22:27

我有个系统也想用上站内信,用的是SaSession,我要研究下,能不能改到合适的。

热门分享

扫码入社