此方法自己尝试过可以借鉴参考,升级改造请自便
package sse
import (
"fmt"
"github.com/gin-gonic/gin"
"net/http"
"time"
)
// - SSE 相关封装 -----------------------
/*
输出说明:
1. Data等于空 不输出
2. Data+Event与上一个输出一致 不重复输出
3. 业务逻辑报错会向前端发出发送一个close事件 需要前端收到后主动close()请求
前端示例:
const es = new EventSource(url+'auth-token='+token),
listener = function (event) {
console.log(event);
var div = document.createElement("div");var type = event.type; div.appendChild(document.createTextNode(type + ": " + (type === "message" ? event.data : es.url)));document.body.appendChild(div);
};
es.addEventListener("open", listener);
es.addEventListener("message", listener);
es.addEventListener("error", listener);
es.addEventListener("close", function(ret){
es.close();
listener(ret)
});
*/
// MessageEvent 定义 SSE 事件
type MessageEvent struct {
Id string
Event string // Data+Event与前一个输出一致 不输出
Data string // Data等于空 不输出
}
// 实现 SSE 事件的 String() 方法
func (e MessageEvent) String() string {
s := ""
if e.Id != "" {
s += "id:" + e.Id + "\n"
}
if e.Event != "" {
s += "event:" + e.Event + "\n"
}
s += fmt.Sprintf("data:%s\n\n", e.Data)
return s
}
// SseOut see协议输出
//
// fun(i 循环计数器)(事件结构体,下一个循环延迟时间) 该函数执行业务逻辑并将结果写入通道
func SseOut(c *gin.Context, T func(i int64) (MessageEvent, time.Duration, error)) {
defer func(){}()
messageChan := make(chan MessageEvent, 5)
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
w := c.Writer
w.WriteHeader(http.StatusOK)
flusher, _ := w.(http.Flusher)
temp := "" //结果比对临时变量
var Err error
// 监听客户端信号
closeNotify := c.Request.Context().Done()
go func() {
defer func() {
if err := recover(); err != nil { // 捕获到 panic
return
}
}()
<-closeNotify
close(messageChan)
return
}()
// 执行业务逻辑
go func() {
defer func() {
if Err != nil {
close(messageChan)
return
}
if err := recover(); err != nil { // 捕获到 panic
return
}
}()
// 通道活性探测利用业务空闲时间独立执行
i := int64(0)
for {
e, t, err := T(i)
if err != nil {
Err = err
break
}
//--两次结果一致的数据阻止输出
if temp != e.Data+e.Event {
temp = e.Data + e.Event
} else {
e.Data = ""
}
if e.Data != "" {
i++
messageChan <- e
} else {
messageChan <- MessageEvent{
Event: "__ping__",
}
}
if t != 0 {
time.Sleep(t)
}
}
}()
// SSE 监听并输出
for {
if message, ok := <-messageChan; ok && Err == nil {
if message.Event == "__ping__" {
continue
}
fmt.Fprintf(w, "%s", &message)
flusher.Flush()
} else {
if Err != nil {
// 向客户端发送 SSE 事件
fmt.Fprintf(w, "%s", &MessageEvent{
Id: "-500",
Event: "close",
Data: Err.Error(),
})
// 刷新 response buffer
flusher.Flush()
}
break
}
}
}
引用
其它业务逻辑自行根据业务需求写入
....
sse.SseOut(c, func(index int64) (Row sse.MessageEvent, Sleep time.Duration, Err error) {
// 下一轮循环延迟时间
Sleep = time.Second * 3
// 数据
Row.Data = ""
Row.Id = fmt.Sprintf("%d", index)
Row.Event = action //--登录状态事件
if index > 0 {
return
}
return
})
....
评论 (0)