msg_bus.go 955 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. package service
  2. import (
  3. "github.com/sirupsen/logrus"
  4. "sportfitness/base/assembly/base/model"
  5. "sync"
  6. )
  7. // 监听列表, key: userId
  8. var msgBusDataMap = map[int]chan model.Msg{}
  9. var msgBusDataMapMu = sync.Mutex{}
  10. type MsgBus struct {
  11. }
  12. func (m MsgBus) AddListener(userId int) (channelOut <-chan model.Msg) {
  13. msgBusDataMapMu.Lock()
  14. defer msgBusDataMapMu.Unlock()
  15. channel := make(chan model.Msg, 10)
  16. msgBusDataMap[userId] = channel
  17. channelOut = channel
  18. return
  19. }
  20. func (m MsgBus) RemoveListener(userId int) {
  21. msgBusDataMapMu.Lock()
  22. defer msgBusDataMapMu.Unlock()
  23. if c, ok := msgBusDataMap[userId]; ok {
  24. close(c)
  25. }
  26. delete(msgBusDataMap, userId)
  27. }
  28. func (MsgBus) SendMsg(msg model.Msg) {
  29. msgBusDataMapMu.Lock()
  30. defer msgBusDataMapMu.Unlock()
  31. if channel, ok := msgBusDataMap[msg.UserIdRcv]; ok {
  32. select {
  33. case channel <- msg:
  34. break
  35. default:
  36. logrus.Errorf("向用户ID[%d]发送信息失败,chan满", msg.UserIdRcv)
  37. }
  38. }
  39. }