Browse Source

// FlushStaleStats deletes all stale statistics in Redis.

// window defines the duration after which the statistics become stale.
// largeWindow defines the duration after which the statistics are permanently deleted.
master
yuriy0803 3 years ago
parent
commit
06e24dd0ce
  1. 123
      storage/redis.go

123
storage/redis.go

@ -343,55 +343,54 @@ func convertPoolChartsResults(raw *redis.ZSliceCmd) []*PoolCharts {
return reverse return reverse
} }
func (r *RedisClient) GetNetCharts(netHashLen int64) (stats []*NetCharts, err error) { func (r *RedisClient) GetNetCharts(netHashLen int64) (stats []*NetCharts, err error) {
tx := r.client.Multi() tx := r.client.Multi()
defer tx.Close() defer tx.Close()
now := util.MakeTimestamp() / 1000 now := util.MakeTimestamp() / 1000
cmds, err := tx.Exec(func() error { cmds, err := tx.Exec(func() error {
if err := tx.ZRemRangeByScore(r.formatKey("charts", "difficulty"), "-inf", fmt.Sprint("(", now-172800)).Err(); err != nil { if err := tx.ZRemRangeByScore(r.formatKey("charts", "difficulty"), "-inf", fmt.Sprint("(", now-172800)).Err(); err != nil {
return err return err
} }
zRangeCmd := tx.ZRevRangeWithScores(r.formatKey("charts", "difficulty"), 0, netHashLen) zRangeCmd := tx.ZRevRangeWithScores(r.formatKey("charts", "difficulty"), 0, netHashLen)
if zRangeCmd.Err() != nil { if zRangeCmd.Err() != nil {
return zRangeCmd.Err() return zRangeCmd.Err()
} }
return nil return nil
}) })
if err != nil { if err != nil {
return nil, err return nil, err
} }
zSliceCmd, ok := cmds[1].(*redis.ZSliceCmd) zSliceCmd, ok := cmds[1].(*redis.ZSliceCmd)
if !ok { if !ok {
return nil, fmt.Errorf("invalid command result type: %T", cmds[1]) return nil, fmt.Errorf("invalid command result type: %T", cmds[1])
} }
stats, err = convertNetChartsResults(zSliceCmd) stats, err = convertNetChartsResults(zSliceCmd)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return stats, nil return stats, nil
} }
func convertNetChartsResults(raw *redis.ZSliceCmd) ([]*NetCharts, error) { func convertNetChartsResults(raw *redis.ZSliceCmd) ([]*NetCharts, error) {
var result []*NetCharts var result []*NetCharts
for _, v := range raw.Val() { for _, v := range raw.Val() {
// "Timestamp:TimeFormat:Hash" // "Timestamp:TimeFormat:Hash"
pc := NetCharts{} pc := NetCharts{}
pc.Timestamp = int64(v.Score) pc.Timestamp = int64(v.Score)
str := v.Member.(string) str := v.Member.(string)
pc.TimeFormat = str[strings.Index(str, ":")+1 : strings.LastIndex(str, ":")] pc.TimeFormat = str[strings.Index(str, ":")+1 : strings.LastIndex(str, ":")]
pc.NetHash, _ = strconv.ParseInt(str[strings.LastIndex(str, ":")+1:], 10, 64) pc.NetHash, _ = strconv.ParseInt(str[strings.LastIndex(str, ":")+1:], 10, 64)
result = append(result, &pc) result = append(result, &pc)
} }
var reverse []*NetCharts var reverse []*NetCharts
for i := len(result) - 1; i >= 0; i-- { for i := len(result) - 1; i >= 0; i-- {
reverse = append(reverse, result[i]) reverse = append(reverse, result[i])
} }
return reverse, nil return reverse, nil
} }
func convertMinerChartsResults(raw *redis.ZSliceCmd) []*MinerCharts { func convertMinerChartsResults(raw *redis.ZSliceCmd) []*MinerCharts {
@ -1105,30 +1104,41 @@ func convertStringMap(m map[string]string) map[string]interface{} {
return result return result
} }
// WARNING: Must run it periodically to flush out of window hashrate entries // FlushStaleStats deletes all stale statistics in Redis.
func (r *RedisClient) FlushStaleStats(window, largeWindow time.Duration) (int64, error) { // window defines the duration after which the statistics become stale.
// largeWindow defines the duration after which the statistics are permanently deleted.
func (r *RedisClient) FlushStaleStats(staleDuration, largeWindow time.Duration) (int64, error) {
// Convert the current timestamp to seconds
now := util.MakeTimestamp() / 1000 now := util.MakeTimestamp() / 1000
max := fmt.Sprint("(", now-int64(window/time.Second)) // Define the maximum expiry date for stale statistics
total, err := r.client.ZRemRangeByScore(r.formatKey("hashrate"), "-inf", max).Result() maxStale := fmt.Sprint("(", now-int64(staleDuration/time.Second))
// Delete all stale statistics from the "hashrate" hash
total, err := r.client.ZRemRangeByScore(r.formatKey("hashrate"), "-inf", maxStale).Result()
if err != nil { if err != nil {
return total, err return total, err
} }
var c int64 var cursor int64
// Use a map to ensure that each miner is only processed once
miners := make(map[string]struct{}) miners := make(map[string]struct{})
max = fmt.Sprint("(", now-int64(largeWindow/time.Second)) // Define the maximum expiry date for the statistics to be deleted permanently
maxFinal := fmt.Sprint("(", now-int64(largeWindow/time.Second))
// Iterate over all hash keys starting with "hashrate", and delete all stale statistics for each miner
for { for {
var keys []string var keys []string
var err error var err error
c, keys, err = r.client.Scan(c, r.formatKey("hashrate", "*"), 100).Result() cursor, keys, err = r.client.Scan(cursor, r.formatKey("hashrate", "*"), 100).Result()
if err != nil { if err != nil {
return total, err return total, err
} }
for _, row := range keys { for _, key := range keys {
login := strings.Split(row, ":")[2] // Extract the miner name from the key
login := strings.Split(key, ":")[2]
if _, ok := miners[login]; !ok { if _, ok := miners[login]; !ok {
n, err := r.client.ZRemRangeByScore(r.formatKey("hashrate", login), "-inf", max).Result() // Delete all stale statistics for this miner
n, err := r.client.ZRemRangeByScore(r.formatKey("hashrate", login), "-inf", maxFinal).Result()
if err != nil { if err != nil {
return total, err return total, err
} }
@ -1136,7 +1146,8 @@ func (r *RedisClient) FlushStaleStats(window, largeWindow time.Duration) (int64,
total += n total += n
} }
} }
if c == 0 { // If we've processed all keys, exit the loop
if cursor == 0 {
break break
} }
} }

Loading…
Cancel
Save