msg_bus.go 1012 B

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748
  1. package service
  2. import (
  3. "sportfitness/base/assembly/base/model"
  4. "sync"
  5. "github.com/sirupsen/logrus"
  6. )
  7. // 监听列表, key: userId
  8. var msgBusDataMap = map[int]chan model.Msg{}
  9. var msgBusDataMapMu = sync.Mutex{}
  10. type MsgBus struct {
  11. }
  12. func (m MsgBus) AddListener(userId int) (channelOut <-chan model.Msg) {
  13. msgBusDataMapMu.Lock()
  14. defer msgBusDataMapMu.Unlock()
  15. channel := make(chan model.Msg, 10)
  16. msgBusDataMap[userId] = channel
  17. channelOut = channel
  18. return
  19. }
  20. func (m MsgBus) RemoveListener(userId int) {
  21. msgBusDataMapMu.Lock()
  22. defer msgBusDataMapMu.Unlock()
  23. if c, ok := msgBusDataMap[userId]; ok {
  24. close(c)
  25. }
  26. delete(msgBusDataMap, userId)
  27. }
  28. func (MsgBus) SendMsg(msg model.Msg) {
  29. msgBusDataMapMu.Lock()
  30. defer msgBusDataMapMu.Unlock()
  31. if channel, ok := msgBusDataMap[msg.UserIdRcv]; ok {
  32. select {
  33. case channel <- msg:
  34. break
  35. default:
  36. logrus.Errorf("向用户ID[%d]发送信息失败,chan满", msg.UserIdRcv)
  37. }
  38. } //else {
  39. // 设备不在线时,可在此处理
  40. //}
  41. }