go实现的SSE长连接封装

Tony哥
2023-08-22 / 0 评论 / 402 阅读 / 正在检测是否收录...

此方法自己尝试过可以借鉴参考,升级改造请自便

package sse

import (
    "fmt"
    "github.com/gin-gonic/gin"
    "net/http"
    "time"
)

// - SSE 相关封装 -----------------------
/*
  输出说明:
     1. Data等于空 不输出
     2. Data+Event与上一个输出一致 不重复输出
     3. 业务逻辑报错会向前端发出发送一个close事件 需要前端收到后主动close()请求

  前端示例:
    const es = new EventSource(url+'auth-token='+token),
      listener = function (event) {
          console.log(event);
          var div = document.createElement("div");var type = event.type; div.appendChild(document.createTextNode(type + ": " + (type === "message" ? event.data : es.url)));document.body.appendChild(div);
      };
    es.addEventListener("open", listener);
    es.addEventListener("message", listener);
    es.addEventListener("error", listener);
    es.addEventListener("close", function(ret){
        es.close();
        listener(ret)
    });

*/

// MessageEvent 定义 SSE 事件
type MessageEvent struct {
    Id    string
    Event string // Data+Event与前一个输出一致 不输出
    Data  string // Data等于空 不输出
}

// 实现 SSE 事件的 String() 方法
func (e MessageEvent) String() string {
    s := ""
    if e.Id != "" {
        s += "id:" + e.Id + "\n"
    }
    if e.Event != "" {
        s += "event:" + e.Event + "\n"
    }
    s += fmt.Sprintf("data:%s\n\n", e.Data)
    return s
}

// SseOut see协议输出
//
//    fun(i 循环计数器)(事件结构体,下一个循环延迟时间) 该函数执行业务逻辑并将结果写入通道
func SseOut(c *gin.Context, T func(i int64) (MessageEvent, time.Duration, error)) {
    defer func(){}()
    messageChan := make(chan MessageEvent, 5)
    c.Header("Content-Type", "text/event-stream")
    c.Header("Cache-Control", "no-cache")
    c.Header("Connection", "keep-alive")
    w := c.Writer
    w.WriteHeader(http.StatusOK)
    flusher, _ := w.(http.Flusher)
    temp := "" //结果比对临时变量
    var Err error
    // 监听客户端信号
    closeNotify := c.Request.Context().Done()
    go func() {
        defer func() {
            if err := recover(); err != nil { // 捕获到 panic
                return
            }
        }()
        <-closeNotify
        close(messageChan)
        return
    }()

    // 执行业务逻辑
    go func() {
        defer func() {
            if Err != nil {
                close(messageChan)
                return
            }
            if err := recover(); err != nil { // 捕获到 panic
                return
            }
        }()
        // 通道活性探测利用业务空闲时间独立执行
        i := int64(0)
        for {
            e, t, err := T(i)
            if err != nil {
                Err = err
                break
            }
            //--两次结果一致的数据阻止输出
            if temp != e.Data+e.Event {
                temp = e.Data + e.Event
            } else {
                e.Data = ""
            }
            if e.Data != "" {
                i++
                messageChan <- e
            } else {
                messageChan <- MessageEvent{
                    Event: "__ping__",
                }
            }
            if t != 0 {
                time.Sleep(t)
            }
        }
    }()

    // SSE 监听并输出
    for {
        if message, ok := <-messageChan; ok && Err == nil {
            if message.Event == "__ping__" {
                continue
            }
            fmt.Fprintf(w, "%s", &message)
            flusher.Flush()
        } else {
            if Err != nil {
                // 向客户端发送 SSE 事件
                fmt.Fprintf(w, "%s", &MessageEvent{
                    Id:    "-500",
                    Event: "close",
                    Data:  Err.Error(),
                })
                // 刷新 response buffer
                flusher.Flush()
            }
            break
        }
    }
}

引用

其它业务逻辑自行根据业务需求写入
....
sse.SseOut(c, func(index int64) (Row sse.MessageEvent, Sleep time.Duration, Err error) {
  // 下一轮循环延迟时间
  Sleep = time.Second * 3
  // 数据
  Row.Data = ""
  Row.Id = fmt.Sprintf("%d", index)
  Row.Event = action //--登录状态事件
  if index > 0 {
      return
  }
  return
})
....
0

评论 (0)

取消