服务端事件推送组件 SseEmitter
作用
在大模型对话模式下,客户端提交一个问题后,大模型会进行推理,将结果返回给客户端。此时针对返回机制两种模式:
- 推理完全结束后,将结果统一返回给客户端。通常大模型的推理时间会比较久(尤其是思考模式下),所以如果等待完全推理完成后再返回给客户端数据,那么客户端的体验就会比较差。
- 服务端推理出一点数据,往回返一点,就有了 ChatGPT 这样的打字机模式。客户体验提升的同时,如果我们发现基于已返回的数据我们已经拿到想要的结果了,则可以中断推理过程。
后面这种模式有两种常见的实现方式:WebSocket 和 SSE。
WebSocket 与 SseEmitter 的的对比

技术选型:
- WebSocket 更适合需要双向高频通信的场景
- SSE 更适合服务器向客户端推送更新、且客户端无需频繁发送数据的场景。 若客户端需要向服务器发送数据,SSE 仍需配合普通 HTTP 请求(如 fetch 或 AJAX)
整体来讲,SSE 更轻量,优先选择。
服务端代码
pom.xml
xml
<!-- web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>SseHelper
java
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class SseHelper {
private static final Map<String, SseEmitter> SSE_EMITTER_CACHE = new ConcurrentHashMap<>();
public static SseEmitter connect(String clientId) {
try {
SseEmitter sseEmitter = new SseEmitter(10 * 60 * 1000L);
sseEmitter.send(SseEmitter.event().reconnectTime(5000L).data("connect success"));
// 超时触发
sseEmitter.onTimeout(() -> SSE_EMITTER_CACHE.remove(clientId));
// 调用 emitter.complete() 触发
sseEmitter.onCompletion(() -> SSE_EMITTER_CACHE.remove(clientId));
// 调用 emitter.completeWithError() 触发
sseEmitter.onError((e) -> SSE_EMITTER_CACHE.remove(clientId));
SSE_EMITTER_CACHE.put(clientId, sseEmitter);
return sseEmitter;
} catch (IOException e) {
// 静默
throw new RuntimeException(e);
}
}
public static void send(String clientId, String message) {
if (!SSE_EMITTER_CACHE.containsKey(clientId)) {
connect(clientId);
}
try {
SSE_EMITTER_CACHE.get(clientId).send(message);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
public static void disconnect(String clientId) {
if (!SSE_EMITTER_CACHE.containsKey(clientId)) {
return;
}
SSE_EMITTER_CACHE.remove(clientId);
SSE_EMITTER_CACHE.get(clientId).complete();
}
}SseController
java
import cn.aileading.ai_study.helper.SseHelper;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
import java.time.LocalDateTime;
import java.util.concurrent.CompletableFuture;
@RestController
@RequestMapping("/sse")
public class SseController {
@RequestMapping("/send")
public SseEmitter send(@RequestParam("clientId") String clientId) {
SseEmitter emitter = SseHelper.connect(clientId);
CompletableFuture.runAsync(() -> {
try {
// 发送初始消息
SseHelper.send(clientId, "连接已建立: " + LocalDateTime.now());
for (int i = 0; i < 5; i++) {
SseHelper.send(clientId, LocalDateTime.now().toString());
Thread.sleep(5000);
}
emitter.complete();
} catch (Exception e) {
emitter.completeWithError(e);
throw new RuntimeException(e);
}
});
return emitter;
}
}IndexController:跳转首页
java
@Controller
public class IndexController {
@GetMapping({"/index"})
public String index() {
return "index.html";
}
}前端代码
static/index.html
html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>SSE 示例</title>
<style>
body {
font-family: Arial, sans-serif;
max-width: 800px;
margin: 0 auto;
padding: 20px;
}
#display {
border: 1px solid #ccc;
padding: 15px;
min-height: 200px;
background-color: #f9f9f9;
margin-bottom: 20px;
white-space: pre-wrap;
}
button {
padding: 10px 20px;
font-size: 16px;
background-color: #007bff;
color: white;
border: none;
cursor: pointer;
border-radius: 4px;
}
button:hover {
background-color: #0056b3;
}
button:disabled {
background-color: #cccccc;
cursor: not-allowed;
}
</style>
</head>
<body>
<h1>SSE 数据展示</h1>
<div id="display">点击下面的按钮开始接收数据...</div>
<button id="startBtn">开始接收数据</button>
<script>
const display = document.getElementById('display');
const startBtn = document.getElementById('startBtn');
let eventSource = null;
startBtn.addEventListener('click', function() {
// 禁用按钮防止重复点击
startBtn.disabled = true;
startBtn.textContent = '连接中...';
// 生成一个随机的 clientId
const clientId = 'client_' + Math.random().toString(36).substr(2, 9);
// 创建 SSE 连接
const url = '/sse/send?clientId=' + encodeURIComponent(clientId);
eventSource = new EventSource(url);
display.innerHTML = '正在连接...\n';
eventSource.onopen = function(event) {
display.innerHTML += '连接已建立\n';
startBtn.textContent = '正在接收数据...';
};
eventSource.onmessage = function(event) {
display.innerHTML += '收到数据: ' + event.data + '\n';
// 滚动到底部
display.scrollTop = display.scrollHeight;
};
eventSource.onerror = function(event) {
display.innerHTML += '连接错误或已断开\n';
startBtn.disabled = false;
startBtn.textContent = '开始接收数据';
if (eventSource) {
eventSource.close();
}
};
});
</script>
</body>
</html>测试
启动服务端,访问 http://localhost:8080/index