Skip to content

服务端事件推送组件 SseEmitter

作用

在大模型对话模式下,客户端提交一个问题后,大模型会进行推理,将结果返回给客户端。此时针对返回机制两种模式:

  1. 推理完全结束后,将结果统一返回给客户端。通常大模型的推理时间会比较久(尤其是思考模式下),所以如果等待完全推理完成后再返回给客户端数据,那么客户端的体验就会比较差。
  2. 服务端推理出一点数据,往回返一点,就有了 ChatGPT 这样的打字机模式。客户体验提升的同时,如果我们发现基于已返回的数据我们已经拿到想要的结果了,则可以中断推理过程。

后面这种模式有两种常见的实现方式:WebSocketSSE

WebSocket 与 SseEmitter 的的对比

img.png

技术选型:

  1. WebSocket 更适合需要双向高频通信的场景
  2. SSE 更适合服务器向客户端推送更新、且客户端无需频繁发送数据的场景。 若客户端需要向服务器发送数据,SSE 仍需配合普通 HTTP 请求(如 fetch 或 AJAX)

整体来讲,SSE 更轻量,优先选择。

服务端代码

pom.xml

xml
<!-- web -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

SseHelper

java
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class SseHelper {
    private static final Map<String, SseEmitter> SSE_EMITTER_CACHE = new ConcurrentHashMap<>();

    public static SseEmitter connect(String clientId) {
        try {
            SseEmitter sseEmitter = new SseEmitter(10 * 60 * 1000L);
            sseEmitter.send(SseEmitter.event().reconnectTime(5000L).data("connect success"));
            // 超时触发
            sseEmitter.onTimeout(() -> SSE_EMITTER_CACHE.remove(clientId));
            // 调用 emitter.complete() 触发
            sseEmitter.onCompletion(() -> SSE_EMITTER_CACHE.remove(clientId));
            // 调用 emitter.completeWithError() 触发
            sseEmitter.onError((e) -> SSE_EMITTER_CACHE.remove(clientId));
            SSE_EMITTER_CACHE.put(clientId, sseEmitter);
            return sseEmitter;
        } catch (IOException e) {
            // 静默
            throw new RuntimeException(e);
        }
    }

    public static void send(String clientId, String message) {
        if (!SSE_EMITTER_CACHE.containsKey(clientId)) {
            connect(clientId);
        }
        try {
            SSE_EMITTER_CACHE.get(clientId).send(message);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static void disconnect(String clientId) {
        if (!SSE_EMITTER_CACHE.containsKey(clientId)) {
            return;
        }
        SSE_EMITTER_CACHE.remove(clientId);
        SSE_EMITTER_CACHE.get(clientId).complete();
    }
}

SseController

java
import cn.aileading.ai_study.helper.SseHelper;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;

@RestController
@RequestMapping("/sse")
public class SseController {
    @RequestMapping("/send")
    public SseEmitter send(@RequestParam("clientId") String clientId) {
        SseEmitter emitter = SseHelper.connect(clientId);
        CompletableFuture.runAsync(() -> {
            try {
                // 发送初始消息
                SseHelper.send(clientId, "连接已建立: " + LocalDateTime.now());
                for (int i = 0; i < 5; i++) {
                    SseHelper.send(clientId, LocalDateTime.now().toString());
                    Thread.sleep(5000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
                throw new RuntimeException(e);
            }
        });
        return emitter;
    }
}

IndexController:跳转首页

java
@Controller
public class IndexController {
    @GetMapping({"/index"})
    public String index() {
        return "index.html";
    }
}

前端代码

static/index.html

html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE 示例</title>
    <style>
        body {
            font-family: Arial, sans-serif;
            max-width: 800px;
            margin: 0 auto;
            padding: 20px;
        }
        #display {
            border: 1px solid #ccc;
            padding: 15px;
            min-height: 200px;
            background-color: #f9f9f9;
            margin-bottom: 20px;
            white-space: pre-wrap;
        }
        button {
            padding: 10px 20px;
            font-size: 16px;
            background-color: #007bff;
            color: white;
            border: none;
            cursor: pointer;
            border-radius: 4px;
        }
        button:hover {
            background-color: #0056b3;
        }
        button:disabled {
            background-color: #cccccc;
            cursor: not-allowed;
        }
    </style>
</head>
<body>
    <h1>SSE 数据展示</h1>
    <div id="display">点击下面的按钮开始接收数据...</div>
    <button id="startBtn">开始接收数据</button>

    <script>
        const display = document.getElementById('display');
        const startBtn = document.getElementById('startBtn');
        let eventSource = null;

        startBtn.addEventListener('click', function() {
            // 禁用按钮防止重复点击
            startBtn.disabled = true;
            startBtn.textContent = '连接中...';
            
            // 生成一个随机的 clientId
            const clientId = 'client_' + Math.random().toString(36).substr(2, 9);
            
            // 创建 SSE 连接
            const url = '/sse/send?clientId=' + encodeURIComponent(clientId);
            eventSource = new EventSource(url);
            
            display.innerHTML = '正在连接...\n';
            
            eventSource.onopen = function(event) {
                display.innerHTML += '连接已建立\n';
                startBtn.textContent = '正在接收数据...';
            };
            
            eventSource.onmessage = function(event) {
                display.innerHTML += '收到数据: ' + event.data + '\n';
                // 滚动到底部
                display.scrollTop = display.scrollHeight;
            };
            
            eventSource.onerror = function(event) {
                display.innerHTML += '连接错误或已断开\n';
                startBtn.disabled = false;
                startBtn.textContent = '开始接收数据';
                if (eventSource) {
                    eventSource.close();
                }
            };
        });
    </script>
</body>
</html>

测试

启动服务端,访问 http://localhost:8080/indeximg.png