zrufo 4 år sedan
förälder
incheckning
824fb39a3e

+ 1 - 1
assembly/base/repository/redis/init.go

@@ -10,7 +10,7 @@ import (
 	"github.com/go-redis/redis/v8"
 )
 
-const prefix = "sportfitness:base"
+const prefix = "bsw:sport_fitness:base"
 
 var ctx = context.Background()
 

+ 5 - 5
assembly/heartRate/init.go

@@ -1,22 +1,22 @@
 package base
 
 import (
-	"sportfitness/base/assembly/base/service"
 	"sportfitness/base/assembly/heartRate/repository"
 	"sportfitness/base/assembly/heartRate/repository/postgre"
+	"sportfitness/base/assembly/heartRate/repository/redis"
 
 	"github.com/sirupsen/logrus"
 )
 
 func Init() {
-	logrus.Info("基础库初始化")
+	logrus.Info("心率库初始化")
 	repository.Init()
 	postgre.Init()
-	//redis.Init()
+	redis.Init()
 
 	//TODO 添加开关
-	service.RegisterServices()
+	//service.RegisterServices()
 
-	logrus.Info("基础库初始化成功")
+	logrus.Info("心率库初始化成功")
 	return
 }

+ 9 - 0
assembly/heartRate/model/band_data.go

@@ -0,0 +1,9 @@
+package model
+
+type BandData struct {
+	BandId    int
+	HubId     int
+	HeartRate int
+	Battery   int
+	TimeMiles int64
+}

+ 19 - 0
assembly/heartRate/model/hr_course_detail.go

@@ -0,0 +1,19 @@
+package model
+
+type HrResult struct {
+	Cle              int  `json:"cle"`
+	PureCalorieNoVo2 int  `json:"pure_calorie_no_vo_2"`
+	Ck               int  `json:"ck"`
+	SportPer         int  `json:"sport_per"`
+	Hr               int  `json:"hr"`
+	AvgHr            int  `json:"avg_hr"`
+	MaxHr            int  `json:"max_hr"`
+	IsWin            bool `json:"is_win"`
+	IsBest           bool `json:"is_best"`
+	IsBreaking       bool `json:"is_breaking"`
+	BreakTime        int  `json:"break_time"`
+}
+type HrCourseDetail struct {
+	HcId int
+	HrResult
+}

+ 84 - 0
assembly/heartRate/repository/postgre/hr.go

@@ -7,6 +7,13 @@
  */
 package postgre
 
+import (
+	"github.com/sirupsen/logrus"
+	"sportfitness/base/assembly/heartRate/model"
+	"sync"
+	"time"
+)
+
 type PGHr struct {
 }
 
@@ -121,3 +128,80 @@ func (PGHr) HrClassGiveUpConfirm(optUserId int64, objectShopId int64, jsonStr st
 
 	return
 }
+
+const hrSaveCount = 1000
+const hrSaveTime = time.Second * 10
+
+// key: sn
+var hrSaveMap = map[string][]model.BandData{}
+var hrSaveMapMu = sync.Mutex{}
+
+func (PGHr) getSNByBandId(id int) (sn string) {
+	//TODO 实现
+	return
+}
+
+func (p PGHr) SaveHrBatch(data []model.BandData) {
+	hrSaveMapMu.Lock()
+	defer hrSaveMapMu.Unlock()
+
+	for _, one := range data {
+		sn := p.getSNByBandId(one.BandId)
+		if list, ok := hrSaveMap[sn]; ok {
+			hrSaveMap[sn] = append(list, one)
+		} else {
+			hrSaveMap[sn] = []model.BandData{one}
+		}
+	}
+	p.trySaveHrSaveMap()
+}
+func (PGHr) saveOneBandToDB(sn string, data []model.BandData) {
+	//TODO 保存
+}
+func (p PGHr) workSaveHrSaveMapLoop() {
+	defer func() {
+		if p := recover(); p != nil {
+			logrus.Errorf("workSaveHrSaveMapLoop: %s", p)
+		}
+	}()
+
+	hrSaveMapMu.Lock()
+	defer hrSaveMapMu.Unlock()
+
+	p.trySaveHrSaveMap()
+}
+
+func (p PGHr) workSaveHrSaveMap() {
+	logrus.Info("心率保存循环开启")
+
+	for {
+		time.Sleep(time.Second * 30)
+		p.workSaveHrSaveMapLoop()
+	}
+}
+
+func (p PGHr) trySaveHrSaveMap() {
+	var deleteKeys []string
+	for sn, v := range hrSaveMap {
+		needSave := false
+
+		if len(v) >= hrSaveCount {
+			needSave = true
+		}
+		if len(v) > 0 {
+			firstTime := time.Unix(0, v[0].TimeMiles*1000000)
+			if time.Now().Sub(firstTime) > hrSaveTime {
+				needSave = true
+			}
+		}
+		if needSave {
+			p.saveOneBandToDB(sn, v)
+			deleteKeys = append(deleteKeys, sn)
+		}
+	}
+
+	for _, sn := range deleteKeys {
+		delete(hrSaveMap, sn)
+	}
+
+}

+ 2 - 0
assembly/heartRate/repository/postgre/init.go

@@ -25,6 +25,8 @@ func Init() {
 	// 需要从数据库取出sys_token用作系统验证,存全局变量,每次调用总后台接口需要传递sys_token
 	//getClient().Raw("select sys_token from s_sys_config").Scan(&SysToken)  Hr子系统不需要与系统总后台对接
 	//global.SysToken = SysToken
+
+	go PGHr{}.workSaveHrSaveMap()
 }
 func getClient() *lib.DB {
 	return repository.Repository.GetGormClient()

+ 47 - 0
assembly/heartRate/repository/redis/data_from_hub.go

@@ -0,0 +1,47 @@
+package redis
+
+import (
+	"fmt"
+	"github.com/go-redis/redis/v8"
+	"github.com/vmihailenco/msgpack"
+	"sportfitness/base/assembly/heartRate/model"
+	"time"
+)
+
+const hubBandDataChannel = "bsw:sport_fitness:hub_tcp_server:heart_rate"
+
+func GetHubBandData() (data []model.BandData, err error) {
+	var resultTemp []string
+
+	bt := time.Now()
+
+	for i := 0; i < 500; i++ {
+		if time.Now().Sub(bt) > time.Second*5 {
+			return
+		}
+
+		resultTemp, err = getClient().BLPop(ctx, time.Second, hubBandDataChannel).Result()
+		if err != nil {
+			if err == redis.Nil {
+				err = nil
+				continue
+			}
+			return
+		}
+
+		if len(resultTemp) != 2 {
+			err = fmt.Errorf("GetHubBandData: 接收长度错误 %d", len(resultTemp))
+			return
+		}
+
+		one := model.BandData{}
+		err = msgpack.Unmarshal([]byte(resultTemp[1]), &one)
+		if err != nil {
+			return
+		}
+		data = append(data, one)
+
+	}
+
+	return
+}

+ 36 - 0
assembly/heartRate/repository/redis/hr_store_r4.go

@@ -0,0 +1,36 @@
+package redis
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/go-redis/redis/v8"
+	"github.com/sirupsen/logrus"
+	"sportfitness/base/assembly/heartRate/model"
+)
+
+const hrStoreR4Key = prefix + ":hr_store_r4"
+
+func HrStoreR4(list []model.HrCourseDetail) {
+	cmds, err := getClient().TxPipelined(ctx, func(pipeliner redis.Pipeliner) error {
+		for _, one := range list {
+			data, err2 := json.Marshal(one.HrResult)
+			if err2 != nil {
+				panic(err2)
+			}
+
+			pipeliner.Set(ctx, fmt.Sprintf("%s:%d", hrStoreR4Key, one.HcId), data, 0)
+		}
+		return nil
+	})
+
+	if err != nil {
+		logrus.Errorf("HrStoreR4 保存失败:%s", err)
+		return
+	}
+
+	for _, r := range cmds {
+		if r.Err() != nil {
+			logrus.Errorf("HrStoreR4 保存失败:%s", r.Err())
+		}
+	}
+}

+ 45 - 0
assembly/heartRate/repository/redis/init.go

@@ -0,0 +1,45 @@
+package redis
+
+import (
+	"context"
+	"sportfitness/base/assembly/base/repository"
+	"sportfitness/base/errors"
+
+	"git.beswell.com/gframe/application"
+
+	"github.com/go-redis/redis/v8"
+)
+
+const prefix = "bsw:sport_fitness:heart_rate"
+
+var ctx = context.Background()
+
+func Init() {
+	err := repository.Repository.InitRedisByConfigCenter(0)
+	if err != nil {
+		panic(err)
+	}
+}
+
+func getClient() redis.UniversalClient {
+	return repository.Repository.GetRedisClient()
+}
+
+func handleTokenErr(err error) {
+	if err != nil {
+		if err == redis.Nil {
+			err = application.ErrorBusinessF(errors.ErrCodeToken, "token无效")
+		}
+
+		panic(err)
+	}
+}
+
+func handleErr(err error) {
+	if err != nil {
+		if err == redis.Nil {
+			err = application.ErrorBusinessF(errors.ErrCodeNoRecord, "记录不存在")
+		}
+		panic(err)
+	}
+}

+ 44 - 0
assembly/heartRate/service/calculation_unit.go

@@ -0,0 +1,44 @@
+package service
+
+import (
+	"github.com/sirupsen/logrus"
+	"sportfitness/base/assembly/heartRate/model"
+	"sportfitness/base/assembly/heartRate/repository/postgre"
+	"sportfitness/base/assembly/heartRate/repository/redis"
+)
+
+type CalculationUnit struct {
+}
+
+func (CalculationUnit) calculate(src model.BandData) (result model.HrCourseDetail) {
+	// TODO 实现计算
+	return
+}
+
+func (c CalculationUnit) loop() {
+	data, err := redis.GetHubBandData()
+	if err != nil {
+		logrus.Errorf("接收心率数据失败:%s", err)
+		return
+	}
+	if len(data) == 0 {
+		return
+	}
+
+	postgre.PGHr{}.SaveHrBatch(data)
+
+	var results []model.HrCourseDetail
+	for _, one := range data {
+		results = append(results, c.calculate(one))
+	}
+
+	redis.HrStoreR4(results)
+}
+
+func (c CalculationUnit) Run() {
+	go func() {
+		for {
+			c.loop()
+		}
+	}()
+}

+ 1 - 0
go.mod

@@ -13,6 +13,7 @@ require (
 	github.com/mojocn/base64Captcha v1.3.4
 	github.com/qiniu/api.v7/v7 v7.6.0
 	github.com/sirupsen/logrus v1.8.1
+	github.com/vmihailenco/msgpack v4.0.4+incompatible
 	golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 // indirect
 	golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 // indirect
 	golang.org/x/text v0.3.6 // indirect

+ 6 - 4
main.go

@@ -2,16 +2,16 @@ package main
 
 import (
 	"fmt"
+	"git.beswell.com/gframe/application"
+	"github.com/ZR233/glog/v2"
+	"github.com/sirupsen/logrus"
 	"log"
 	"net"
 	"sportfitness/base/api/grpc"
 	pb "sportfitness/base/api/grpc/base"
 	"sportfitness/base/assembly/base"
 	hr "sportfitness/base/assembly/heartRate"
-
-	"git.beswell.com/gframe/application"
-	"github.com/ZR233/glog/v2"
-	"github.com/sirupsen/logrus"
+	hrs "sportfitness/base/assembly/heartRate/service"
 )
 
 func main() {
@@ -47,6 +47,8 @@ func main() {
 	base.Init()
 	hr.Init()
 
+	hrs.CalculationUnit{}.Run()
+
 	grpcServer := app.GRPCServer()
 
 	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", 19090))