Gin
- 注意事项
- 不能返回,必须阻塞handler
- 写完数据必须手动c.Writer.Flush,否则不会立刻进行响应
func SSEHandler(c *gin.Context) {
ssex.Manager.AddConnect(c)
}
package ssex
import (
"fastgin/boost/config"
"fastgin/common/constant" "fastgin/common/httpz" "fmt" "github.com/gin-contrib/sse" "github.com/gin-gonic/gin" "github.com/samber/lo" "net/http" "sync" "time")
var Manager *ConnectManager
func Init() {
Manager = &ConnectManager{
store: NewMutexMap[string, *Context](),
}
}
type MutexMap[K comparable, V any] struct {
m map[K]V
sm sync.Mutex
}
func NewMutexMap[K comparable, V any]() *MutexMap[K, V] {
return &MutexMap[K, V]{
m: make(map[K]V),
sm: sync.Mutex{},
}
}
func (t *MutexMap[K, V]) Set(k K, v V) {
t.sm.Lock()
defer t.sm.Unlock()
t.m[k] = v
}
func (t *MutexMap[K, V]) Delete(k K) {
t.sm.Lock()
defer t.sm.Unlock()
delete(t.m, k)
}
func (t *MutexMap[K, V]) GetWith(k K, f func(v V, ok bool)) {
t.sm.Lock()
defer t.sm.Unlock()
v, ok := t.m[k]
f(v, ok)
}
func (t *MutexMap[K, V]) Keys() []K {
t.sm.Lock()
defer t.sm.Unlock()
return lo.Keys(t.m)
}
func (t *MutexMap[K, V]) Len() int {
t.sm.Lock()
defer t.sm.Unlock()
return len(t.m)
}
type ConnectManager struct {
store *MutexMap[string, *Context]
}
type Context struct {
ginc *gin.Context
ch chan sse.Event
}
func (t *ConnectManager) AddConnect(c *gin.Context) {
var err error
_, ok := c.Writer.(http.Flusher)
if !ok {
httpz.ServerError(c, "not support sse")
return
}
c.Header("Content-Type", sse.ContentType)
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
key := fmt.Sprintf("%v", c.MustGet(constant.KeyUid))
ch := make(chan sse.Event, 5)
t.store.Set(key, &Context{ginc: c, ch: ch})
c.Render(-1, &Heartbeat{})
if c.IsAborted() {
err = fmt.Errorf("ssex send data: %s", c.Errors.String())
config.Log.Error(err)
httpz.ServerError(c, err.Error())
return
}
c.Writer.Flush()
ticker := time.NewTicker(time.Second * 30)
defer func() {
t.store.Delete(key)
}()
for {
select {
case <-c.Request.Context().Done():
return
case e := <-ch:
c.Render(-1, e)
if c.IsAborted() {
err = fmt.Errorf("ssex send data: %s", c.Errors.String())
config.Log.Error(err)
return
}
c.Writer.Flush()
case <-ticker.C:
c.Render(-1, &Heartbeat{})
if c.IsAborted() {
err = fmt.Errorf("ssex send data: %s", c.Errors.String())
config.Log.Error(err)
return
}
c.Writer.Flush()
}
}
}
func (t *ConnectManager) HasConnects() bool {
return t.store.Len() > 0
}
func (t *ConnectManager) Broadcast(s sse.Event) {
for _, k := range t.store.Keys() {
t.store.GetWith(k, func(v *Context, ok bool) {
if !ok {
return
}
v.ch <- s
})
}
}
type Heartbeat struct{}
func (t *Heartbeat) Render(w http.ResponseWriter) error {
_, err := w.Write([]byte(": ping\n\n"))
return err
}
var contentType = []string{sse.ContentType}
var noCache = []string{"no-cache"}
func (t *Heartbeat) WriteContentType(w http.ResponseWriter) {
header := w.Header()
header["Content-Type"] = contentType
if _, exist := header["Cache-Control"]; !exist {
header["Cache-Control"] = noCache
}
}