zrufo 4 éve
szülő
commit
f7aee657f0

+ 32 - 6
api/grpc/base.go

@@ -1412,12 +1412,38 @@ func (a Api) GenPhoneVerifyCodeByWeiXin(ctx context.Context, r *pb.GenPhoneVerif
 	return rst, nil
 }
 
-// Grpc-Web总线机制
-//func (a Api) Test(ctx context.Context, in *pb.WeiXinSignInRequest) (c pb.Api_TestClient, err error) {
-//	for {
-//		c.SendMsg()
-//	}
-//}
+// UserListenMsg Grpc-Web总线机制
+func (Api) UserListenMsg(_ *pb.DefaultRequest, stream pb.Api_UserListenMsgServer) (err error) {
+	// TODO 验证并获取 userId
+	userId := 0
+
+	channel := service.MsgBus{}.AddListener(userId)
+	defer func() {
+		service.MsgBus{}.RemoveListener(userId)
+	}()
+
+	for {
+		select {
+		case msg := <-channel:
+			{
+				pbMsg := &pb.Msg{
+					UserIdSend: int64(msg.UserIdSend),
+					Title:      msg.Title,
+					Content:    msg.Content,
+				}
+
+				err = stream.Send(pbMsg)
+				if err != nil {
+					return
+				}
+			}
+		case <-stream.Context().Done():
+			{
+				return
+			}
+		}
+	}
+}
 
 //// WeiXinSignIn 微信登陆
 //func (a Api) WeiXinSignIn(ctx context.Context, r *pb.WeiXinSignInRequest) (*pb.SignInReply, error) {

A különbségek nem kerülnek megjelenítésre, a fájl túl nagy
+ 841 - 770
api/grpc/base/base.pb.go


+ 69 - 7
api/grpc/base/base_grpc.pb.go

@@ -11,7 +11,6 @@ import (
 
 // This is a compile-time assertion to ensure that this generated file
 // is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
 const _ = grpc.SupportPackageIsVersion7
 
 // ApiClient is the client API for Api service.
@@ -155,6 +154,8 @@ type ApiClient interface {
 	GenPhoneVerifyCodeByWeiXin(ctx context.Context, in *GenPhoneVerifyCodeRequestWX, opts ...grpc.CallOption) (*StandardReply, error)
 	CheckVerifyCode(ctx context.Context, in *CheckVerifyCodeRequest, opts ...grpc.CallOption) (*DefaultReply, error)
 	WeiXinSignIn(ctx context.Context, in *WeiXinSignInRequest, opts ...grpc.CallOption) (*SignInReply, error)
+	// 用户监听消息
+	UserListenMsg(ctx context.Context, in *DefaultRequest, opts ...grpc.CallOption) (Api_UserListenMsgClient, error)
 }
 
 type apiClient struct {
@@ -1308,6 +1309,38 @@ func (c *apiClient) WeiXinSignIn(ctx context.Context, in *WeiXinSignInRequest, o
 	return out, nil
 }
 
+func (c *apiClient) UserListenMsg(ctx context.Context, in *DefaultRequest, opts ...grpc.CallOption) (Api_UserListenMsgClient, error) {
+	stream, err := c.cc.NewStream(ctx, &_Api_serviceDesc.Streams[0], "/base.Api/UserListenMsg", opts...)
+	if err != nil {
+		return nil, err
+	}
+	x := &apiUserListenMsgClient{stream}
+	if err := x.ClientStream.SendMsg(in); err != nil {
+		return nil, err
+	}
+	if err := x.ClientStream.CloseSend(); err != nil {
+		return nil, err
+	}
+	return x, nil
+}
+
+type Api_UserListenMsgClient interface {
+	Recv() (*Msg, error)
+	grpc.ClientStream
+}
+
+type apiUserListenMsgClient struct {
+	grpc.ClientStream
+}
+
+func (x *apiUserListenMsgClient) Recv() (*Msg, error) {
+	m := new(Msg)
+	if err := x.ClientStream.RecvMsg(m); err != nil {
+		return nil, err
+	}
+	return m, nil
+}
+
 // ApiServer is the server API for Api service.
 // All implementations must embed UnimplementedApiServer
 // for forward compatibility
@@ -1449,6 +1482,8 @@ type ApiServer interface {
 	GenPhoneVerifyCodeByWeiXin(context.Context, *GenPhoneVerifyCodeRequestWX) (*StandardReply, error)
 	CheckVerifyCode(context.Context, *CheckVerifyCodeRequest) (*DefaultReply, error)
 	WeiXinSignIn(context.Context, *WeiXinSignInRequest) (*SignInReply, error)
+	// 用户监听消息
+	UserListenMsg(*DefaultRequest, Api_UserListenMsgServer) error
 	mustEmbedUnimplementedApiServer()
 }
 
@@ -1837,6 +1872,9 @@ func (UnimplementedApiServer) CheckVerifyCode(context.Context, *CheckVerifyCodeR
 func (UnimplementedApiServer) WeiXinSignIn(context.Context, *WeiXinSignInRequest) (*SignInReply, error) {
 	return nil, status.Errorf(codes.Unimplemented, "method WeiXinSignIn not implemented")
 }
+func (UnimplementedApiServer) UserListenMsg(*DefaultRequest, Api_UserListenMsgServer) error {
+	return status.Errorf(codes.Unimplemented, "method UserListenMsg not implemented")
+}
 func (UnimplementedApiServer) mustEmbedUnimplementedApiServer() {}
 
 // UnsafeApiServer may be embedded to opt out of forward compatibility for this service.
@@ -1847,7 +1885,7 @@ type UnsafeApiServer interface {
 }
 
 func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
-	s.RegisterService(&Api_ServiceDesc, srv)
+	s.RegisterService(&_Api_serviceDesc, srv)
 }
 
 func _Api_SignUp_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
@@ -4136,10 +4174,28 @@ func _Api_WeiXinSignIn_Handler(srv interface{}, ctx context.Context, dec func(in
 	return interceptor(ctx, in, info, handler)
 }
 
-// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
-// It's only intended for direct use with grpc.RegisterService,
-// and not to be introspected or modified (even as a copy)
-var Api_ServiceDesc = grpc.ServiceDesc{
+func _Api_UserListenMsg_Handler(srv interface{}, stream grpc.ServerStream) error {
+	m := new(DefaultRequest)
+	if err := stream.RecvMsg(m); err != nil {
+		return err
+	}
+	return srv.(ApiServer).UserListenMsg(m, &apiUserListenMsgServer{stream})
+}
+
+type Api_UserListenMsgServer interface {
+	Send(*Msg) error
+	grpc.ServerStream
+}
+
+type apiUserListenMsgServer struct {
+	grpc.ServerStream
+}
+
+func (x *apiUserListenMsgServer) Send(m *Msg) error {
+	return x.ServerStream.SendMsg(m)
+}
+
+var _Api_serviceDesc = grpc.ServiceDesc{
 	ServiceName: "base.Api",
 	HandlerType: (*ApiServer)(nil),
 	Methods: []grpc.MethodDesc{
@@ -4652,6 +4708,12 @@ var Api_ServiceDesc = grpc.ServiceDesc{
 			Handler:    _Api_WeiXinSignIn_Handler,
 		},
 	},
-	Streams:  []grpc.StreamDesc{},
+	Streams: []grpc.StreamDesc{
+		{
+			StreamName:    "UserListenMsg",
+			Handler:       _Api_UserListenMsg_Handler,
+			ServerStreams: true,
+		},
+	},
 	Metadata: "base.proto",
 }

+ 8 - 0
assembly/base/model/msg.go

@@ -0,0 +1,8 @@
+package model
+
+type Msg struct {
+	UserIdSend int
+	UserIdRcv  int
+	Title      string
+	Content    string
+}

+ 43 - 0
assembly/base/service/msg_bus.go

@@ -0,0 +1,43 @@
+package service
+
+import (
+	"github.com/sirupsen/logrus"
+	"sportfitness/base/assembly/base/model"
+	"sync"
+)
+
+var msgBusDataMap = map[int]chan model.Msg{}
+var msgBusDataMapMu = sync.Mutex{}
+
+type MsgBus struct {
+}
+
+func (m MsgBus) AddListener(userId int) (channelOut <-chan model.Msg) {
+	msgBusDataMapMu.Lock()
+	defer msgBusDataMapMu.Unlock()
+	channel := make(chan model.Msg, 10)
+	msgBusDataMap[userId] = channel
+	channelOut = channel
+	return
+}
+func (m MsgBus) RemoveListener(userId int) {
+	msgBusDataMapMu.Lock()
+	defer msgBusDataMapMu.Unlock()
+	if c, ok := msgBusDataMap[userId]; ok {
+		close(c)
+	}
+	delete(msgBusDataMap, userId)
+}
+
+func (MsgBus) SendMsg(msg model.Msg) {
+	msgBusDataMapMu.Lock()
+	defer msgBusDataMapMu.Unlock()
+	if channel, ok := msgBusDataMap[msg.UserIdRcv]; ok {
+		select {
+		case channel <- msg:
+			break
+		default:
+			logrus.Errorf("向用户ID[%d]发送信息失败,chan满", msg.UserIdRcv)
+		}
+	}
+}

+ 9 - 1
proto/server/base.proto

@@ -167,8 +167,16 @@ service Api {
   rpc CheckVerifyCode(CheckVerifyCodeRequest)returns(DefaultReply){}
   rpc WeiXinSignIn(WeiXinSignInRequest)returns(SignInReply){}
 
-//  rpc test(WeiXinSignInRequest) returns(stream WeiXinSignInRequest){}
+  // 用户监听消息
+  rpc UserListenMsg(DefaultRequest) returns(stream Msg){}
 }
+
+message Msg{
+  int64  userIdSend = 1;
+  string title = 2;
+  string content = 3;
+}
+
 message DefaultRequest{
 }
 message DefaultReply{}

+ 2 - 2
repository/grpc/bsw/im/im/im.pb.go

@@ -1,7 +1,7 @@
 // Code generated by protoc-gen-go. DO NOT EDIT.
 // versions:
-// 	protoc-gen-go v1.25.0
-// 	protoc        v3.14.0
+// 	protoc-gen-go v1.23.0
+// 	protoc        v3.11.3
 // source: im.proto
 
 package im

+ 3 - 7
repository/grpc/bsw/im/im/im_grpc.pb.go

@@ -11,7 +11,6 @@ import (
 
 // This is a compile-time assertion to ensure that this generated file
 // is compatible with the grpc package it is being compiled against.
-// Requires gRPC-Go v1.32.0 or later.
 const _ = grpc.SupportPackageIsVersion7
 
 // ApiClient is the client API for Api service.
@@ -460,7 +459,7 @@ func (c *apiClient) IMSelfInfo(ctx context.Context, in *DefaultRequest, opts ...
 }
 
 func (c *apiClient) IMMessageRcv(ctx context.Context, in *DefaultRequest, opts ...grpc.CallOption) (Api_IMMessageRcvClient, error) {
-	stream, err := c.cc.NewStream(ctx, &Api_ServiceDesc.Streams[0], "/im.Api/IMMessageRcv", opts...)
+	stream, err := c.cc.NewStream(ctx, &_Api_serviceDesc.Streams[0], "/im.Api/IMMessageRcv", opts...)
 	if err != nil {
 		return nil, err
 	}
@@ -706,7 +705,7 @@ type UnsafeApiServer interface {
 }
 
 func RegisterApiServer(s grpc.ServiceRegistrar, srv ApiServer) {
-	s.RegisterService(&Api_ServiceDesc, srv)
+	s.RegisterService(&_Api_serviceDesc, srv)
 }
 
 func _Api_GenVerifyImage_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
@@ -1450,10 +1449,7 @@ func (x *apiIMMessageRcvServer) Send(m *Message) error {
 	return x.ServerStream.SendMsg(m)
 }
 
-// Api_ServiceDesc is the grpc.ServiceDesc for Api service.
-// It's only intended for direct use with grpc.RegisterService,
-// and not to be introspected or modified (even as a copy)
-var Api_ServiceDesc = grpc.ServiceDesc{
+var _Api_serviceDesc = grpc.ServiceDesc{
 	ServiceName: "im.Api",
 	HandlerType: (*ApiServer)(nil),
 	Methods: []grpc.MethodDesc{

Nem az összes módosított fájl került megjelenítésre, mert túl sok fájl változott