SSE概述
Server-Sent Events 服务器推送事件,简称 SSE,是一种基于HTTP协议的技术,允许服务端向客户端主动推送请求。
其核心特点是流式传输—— 服务端可将数据分块逐步发送,比如当前大语言模型的流式响应就是将先计算出来的数据返回给用户,然后边计算边返回。
与其他实时通信技术的对比
技术类型 | SSE | WebSocket | 轮询(Polling) |
---|---|---|---|
连接性质 | 单向(服务端→客户端) | 双向全双工 | 客户端主动请求 |
HTTP 依赖 | 基于 HTTP 长连接 | 独立协议(需握手升级)|TCP | 多次短连接 |
实现复杂度 | 简单(服务端逻辑轻量) | 复杂(需处理双向通信) | 极低 |
典型场景 | 新闻推送、日志流、AI 流式响应 | 聊天应用、实时游戏 | 简单状态查询 |
浏览器兼容性 | 主流浏览器支持(IE 除外) | 现代浏览器支持 | 全兼容 |
数据类型 | 文本或使用 Base64 编码和 gzip 压缩的二进制消息 | 类型广泛 | 类型广泛 |
服务端实现
SSE 协议非常简单,本质是浏览器发起 http 请求,服务器在收到请求后,返回状态与数据,并附带以下 headers:
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive
- SSE API规定推送事件流的 MIME 类型为
text/event-stream
。 - 必须指定浏览器不缓存服务端发送的数据,以确保浏览器可以实时显示服务端发送的数据。
- SSE 是一个一直保持开启的 TCP 连接,所以 Connection 为 keep-alive。
Spring Boot开发中不需要设置请求头
使用标注
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE)
时,Spring会自动设置正确的Content-Type。
消息格式
每条消息由一行或多行字段(event
、id
、retry
、data
)组成
data 字段 (必需):可以有多行data字段,最终会合并为一个值(用\n连接)
event 字段 (可选):指定事件类型
id 字段 (可选):设置消息ID,用于断线重连时恢复,客户端会在重连时通过Last-Event-ID头发送最后收到的ID
retry 字段 (可选):指定重连时间(毫秒)
浏览器 API
在浏览器端,可以使用 JavaScript 的 EventSource API 创建 EventSource
对象监听服务器发送的事件。一旦建立连接,服务器就可以使用 HTTP 响应的 ‘text/event-stream’ 内容类型发送事件消息,浏览器则可以通过监听 EventSource 对象的 onmessage
、onopen
和 onerror
事件来处理这些消息。
建立连接
// 第二个是可选值,包含 withCredentials 属性,表示是否发送凭证(cookie、HTTP认证信息等)到服务端,默认为 false。
const eventSource = new EventSource('http_api_url', { withCredentials: true })
监听事件
EventSource 对象本身继承自 EventTarget 接口,因此可以使用 addEventListener() 方法来监听事件。EventSource 对象触发的事件主要包括以下三种:
- open 事件:当成功连接到服务端时触发。
- message 事件:当接收到服务器发送的消息时触发。该事件对象的 data 属性包含了服务器发送的消息内容。
- error 事件:当发生错误时触发。该事件对象的 event 属性包含了错误信息。
/ 初始化 eventSource 等省略eventSource.onopen = function(event) {console.log('Connection opened')
}eventSource.onmessage = function(event) {console.log('Received message: ' + event.data);
}eventSource.onerror = function(event) {console.log('Error occurred: ' + event.event);
})
实践
服务端
使用 Node.js 实现 SSE 的简单示例:
const http = require('http')
const fs = require('fs')http.createServer((req, res) => {const url = req.urlif (url === '/' || url === 'index.html') {// 如果请求根路径,返回 index.html 文件fs.readFile('index.html', (err, data) => {if (err) {res.writeHead(500)res.end('Error loading')} else {res.writeHead(200, {'Content-Type': 'text/html'})res.end(data)}})} else if (url.includes('/sse')) {// 如果请求 /events 路径,建立 SSE 连接res.writeHead(200, {'Content-Type': 'text/event-stream','Cache-Control': 'no-cache','Connection': 'keep-alive','Access-Control-Allow-Origin': '*', // 允许跨域})// 每隔 1 秒发送一条消息let id = 0const intervalId = setInterval(() => {res.write(`event: customEvent\n`)res.write(`id: ${id}\n`)res.write(`retry: 30000\n`)const params = url.split('?')[1]const data = { id, time: new Date().toISOString(), params }res.write(`data: ${JSON.stringify(data)}\n\n`)id++if (id >= 10) {clearInterval(intervalId)res.end()}}, 1000)// 当客户端关闭连接时停止发送消息req.on('close', () => {clearInterval(intervalId)id = 0res.end()})} else {// 如果请求的路径无效,返回 404 状态码res.writeHead(404)res.end()}
}).listen(3000)console.log('Server listening on port 3000')
使用 Spring Boot实现 SSE 的简单示例:
@RestController
@RequestMapping("/sse")
public class SSEmitterController {@GetMapping("/stream")public SseEmitter stream() {// 发送Map集合Map<String, Object> dataMap = new HashMap<>();dataMap.put("userId", "user123");dataMap.put("content", "订单已发货");dataMap.put("timestamp", new Date());// 用于创建一个 SSE 连接对象SseEmitter emitter = new SseEmitter();// 在后台线程中模拟实时数据try {// emitter.send() 方法向客户端发送消息// 使用SseEmitter.event()创建一个事件对象,设置事件名称和数据emitter.send(SseEmitter.event().name("message").data(dataMap ));} catch (IOException e) {// 发生错误时,关闭连接并报错emitter.completeWithError(e);}return emitter;}
}
浏览器
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>SSE Demo</title>
</head>
<body><h1>SSE Demo</h1><button onclick="connectSSE()">建立 SSE 连接</button> <button onclick="closeSSE()">断开 SSE 连接</button><br /><br /><div id="message"></div><script>const messageElement = document.getElementById('message')let eventSource// 建立 SSE 连接const connectSSE = () => {eventSource = new EventSource('http://127.0.0.1:3000/sse?content=xxx')// 监听消息事件eventSource.addEventListener('customEvent', (event) => {const data = JSON.parse(event.data)messageElement.innerHTML += `${data.id} --- ${data.time} --- params参数:${JSON.stringify(data.params)}` + '<br />'})eventSource.onopen = () => {messageElement.innerHTML += `SSE 连接成功,状态${eventSource.readyState}<br />`}eventSource.onerror = () => {messageElement.innerHTML += `SSE 连接错误,状态${eventSource.readyState}<br />`}}// 断开 SSE 连接const closeSSE = () => {eventSource.close()messageElement.innerHTML += `SSE 连接关闭,状态${eventSource.readyState}<br />`}</script>
</body>
</html>
Fetch 实现
浏览器 EventSource API 限制,在使用 SSE 时不能自定义请求头、只能发出 GET 请求,且在大多数浏览器中,URL 限制 2000个字符
服务端
const http = require('http')
const fs = require('fs')http.createServer((req, res) => {const url = req.urlif (url === '/' || url === 'index-fetch.html') {// 如果请求根路径,返回 ndex-fetch.html 文件fs.readFile('index-fetch.html', (err, data) => {if (err) {res.writeHead(500)res.end('Error loading')} else {res.writeHead(200, {'Content-Type': 'text/html'})res.end(data)}})} else if (url.includes('/fetch-sse')) {// 如果请求 /events-fetch 路径,建立连接res.writeHead(200, {'Content-Type': 'text/event-stream','Cache-Control': 'no-cache','Connection': 'keep-alive','Access-Control-Allow-Origin': '*', // 允许跨域})let body = ''req.on('data', chunk => {body += chunk})// 每隔 1 秒发送一条消息let id = 0const intervalId = setInterval(() => {const data = { id, time: new Date().toISOString(), body: JSON.parse(body) }res.write(JSON.stringify(data))id++if (id >= 10) {clearInterval(intervalId)res.end()}}, 1000)// 当客户端关闭连接时停止发送消息req.on('close', () => {clearInterval(intervalId)id = 0res.end()})} else {// 如果请求的路径无效,返回 404 状态码res.writeHead(404)res.end()}
}).listen(3001)console.log('Server listening on port 3001')
浏览器
<!DOCTYPE html>
<html lang="en">
<head><meta charset="UTF-8"><meta http-equiv="X-UA-Compatible" content="IE=edge"><meta name="viewport" content="width=device-width, initial-scale=1.0"><title>fetchSSE Demo</title>
</head>
<body><h1>fetchSSE Demo</h1><button onclick="connectFetch()">建立 fetchSSE 连接</button><button onclick="closeSSE()">断开 fetchSSE 连接</button><br /><br /><div id="message"></div><script>const messageElement = document.getElementById('message')let controller// 建立 FETCH-SSE 连接const connectFetch = () => {controller = new AbortController()fetchEventSource('http://127.0.0.1:3001/fetch-sse', {method: 'POST',body: JSON.stringify({content: 'xxx'}),signal: controller.signal,onopen: () => {messageElement.innerHTML += `FETCH 连接成功<br />`},onclose: () => {messageElement.innerHTML += `FETCH 连接关闭<br />`},onmessage: (event) => {const data = JSON.parse(event)messageElement.innerHTML += `${data.id} --- ${data.time} --- body参数:${JSON.stringify(data.body)}` + '<br />'},onerror: (e) => {console.log(e)}})}// 断开 FETCH-SSE 连接const closeSSE = () => {if (controller) {controller.abort()controller = undefinedmessageElement.innerHTML += `FETCH 连接关闭<br />`}}const fetchEventSource = (url, options) => {fetch(url, options).then(response => {if (response.status === 200) {options.onopen && options.onopen()return response.body}}).then(rb => {const reader = rb.getReader()const push = () => {// done 为数据流是否接收完成,boolean// value 为返回数据,Uint8Arrayreturn reader.read().then(({done, value}) => {if (done) {options.onclose && options.onclose()return}options.onmessage && options.onmessage(new TextDecoder().decode(value))// 持续读取流信息return push()})}// 开始读取流信息return push()}).catch((e) => {options.error && options.error(e)})}
</script></html>
参考博客:https://juejin.cn/post/7221125237500330039