From 06e24dd0ce04bfd79d92271c5d3c7d2319a3f505 Mon Sep 17 00:00:00 2001 From: yuriy0803 <68668177+yuriy0803@users.noreply.github.com> Date: Tue, 9 May 2023 19:43:08 +0200 Subject: [PATCH] // FlushStaleStats deletes all stale statistics in Redis. // window defines the duration after which the statistics become stale. // largeWindow defines the duration after which the statistics are permanently deleted. --- storage/redis.go | 123 ++++++++++++++++++++++++++--------------------- 1 file changed, 67 insertions(+), 56 deletions(-) diff --git a/storage/redis.go b/storage/redis.go index f00b494..7dd6861 100644 --- a/storage/redis.go +++ b/storage/redis.go @@ -343,55 +343,54 @@ func convertPoolChartsResults(raw *redis.ZSliceCmd) []*PoolCharts { return reverse } - func (r *RedisClient) GetNetCharts(netHashLen int64) (stats []*NetCharts, err error) { - tx := r.client.Multi() - defer tx.Close() - - now := util.MakeTimestamp() / 1000 - - cmds, err := tx.Exec(func() error { - if err := tx.ZRemRangeByScore(r.formatKey("charts", "difficulty"), "-inf", fmt.Sprint("(", now-172800)).Err(); err != nil { - return err - } - zRangeCmd := tx.ZRevRangeWithScores(r.formatKey("charts", "difficulty"), 0, netHashLen) - if zRangeCmd.Err() != nil { - return zRangeCmd.Err() - } - return nil - }) - if err != nil { - return nil, err - } - - zSliceCmd, ok := cmds[1].(*redis.ZSliceCmd) - if !ok { - return nil, fmt.Errorf("invalid command result type: %T", cmds[1]) - } - stats, err = convertNetChartsResults(zSliceCmd) - if err != nil { - return nil, err - } - return stats, nil + tx := r.client.Multi() + defer tx.Close() + + now := util.MakeTimestamp() / 1000 + + cmds, err := tx.Exec(func() error { + if err := tx.ZRemRangeByScore(r.formatKey("charts", "difficulty"), "-inf", fmt.Sprint("(", now-172800)).Err(); err != nil { + return err + } + zRangeCmd := tx.ZRevRangeWithScores(r.formatKey("charts", "difficulty"), 0, netHashLen) + if zRangeCmd.Err() != nil { + return zRangeCmd.Err() + } + return nil + }) + if err != nil { + return nil, err + } + + zSliceCmd, ok := cmds[1].(*redis.ZSliceCmd) + if !ok { + return nil, fmt.Errorf("invalid command result type: %T", cmds[1]) + } + stats, err = convertNetChartsResults(zSliceCmd) + if err != nil { + return nil, err + } + return stats, nil } func convertNetChartsResults(raw *redis.ZSliceCmd) ([]*NetCharts, error) { - var result []*NetCharts - for _, v := range raw.Val() { - // "Timestamp:TimeFormat:Hash" - pc := NetCharts{} - pc.Timestamp = int64(v.Score) - str := v.Member.(string) - pc.TimeFormat = str[strings.Index(str, ":")+1 : strings.LastIndex(str, ":")] - pc.NetHash, _ = strconv.ParseInt(str[strings.LastIndex(str, ":")+1:], 10, 64) - result = append(result, &pc) - } - - var reverse []*NetCharts - for i := len(result) - 1; i >= 0; i-- { - reverse = append(reverse, result[i]) - } - return reverse, nil + var result []*NetCharts + for _, v := range raw.Val() { + // "Timestamp:TimeFormat:Hash" + pc := NetCharts{} + pc.Timestamp = int64(v.Score) + str := v.Member.(string) + pc.TimeFormat = str[strings.Index(str, ":")+1 : strings.LastIndex(str, ":")] + pc.NetHash, _ = strconv.ParseInt(str[strings.LastIndex(str, ":")+1:], 10, 64) + result = append(result, &pc) + } + + var reverse []*NetCharts + for i := len(result) - 1; i >= 0; i-- { + reverse = append(reverse, result[i]) + } + return reverse, nil } func convertMinerChartsResults(raw *redis.ZSliceCmd) []*MinerCharts { @@ -1105,30 +1104,41 @@ func convertStringMap(m map[string]string) map[string]interface{} { return result } -// WARNING: Must run it periodically to flush out of window hashrate entries -func (r *RedisClient) FlushStaleStats(window, largeWindow time.Duration) (int64, error) { +// FlushStaleStats deletes all stale statistics in Redis. +// window defines the duration after which the statistics become stale. +// largeWindow defines the duration after which the statistics are permanently deleted. +func (r *RedisClient) FlushStaleStats(staleDuration, largeWindow time.Duration) (int64, error) { + // Convert the current timestamp to seconds now := util.MakeTimestamp() / 1000 - max := fmt.Sprint("(", now-int64(window/time.Second)) - total, err := r.client.ZRemRangeByScore(r.formatKey("hashrate"), "-inf", max).Result() + // Define the maximum expiry date for stale statistics + maxStale := fmt.Sprint("(", now-int64(staleDuration/time.Second)) + + // Delete all stale statistics from the "hashrate" hash + total, err := r.client.ZRemRangeByScore(r.formatKey("hashrate"), "-inf", maxStale).Result() if err != nil { return total, err } - var c int64 + var cursor int64 + // Use a map to ensure that each miner is only processed once miners := make(map[string]struct{}) - max = fmt.Sprint("(", now-int64(largeWindow/time.Second)) + // Define the maximum expiry date for the statistics to be deleted permanently + maxFinal := fmt.Sprint("(", now-int64(largeWindow/time.Second)) + // Iterate over all hash keys starting with "hashrate", and delete all stale statistics for each miner for { var keys []string var err error - c, keys, err = r.client.Scan(c, r.formatKey("hashrate", "*"), 100).Result() + cursor, keys, err = r.client.Scan(cursor, r.formatKey("hashrate", "*"), 100).Result() if err != nil { return total, err } - for _, row := range keys { - login := strings.Split(row, ":")[2] + for _, key := range keys { + // Extract the miner name from the key + login := strings.Split(key, ":")[2] if _, ok := miners[login]; !ok { - n, err := r.client.ZRemRangeByScore(r.formatKey("hashrate", login), "-inf", max).Result() + // Delete all stale statistics for this miner + n, err := r.client.ZRemRangeByScore(r.formatKey("hashrate", login), "-inf", maxFinal).Result() if err != nil { return total, err } @@ -1136,7 +1146,8 @@ func (r *RedisClient) FlushStaleStats(window, largeWindow time.Duration) (int64, total += n } } - if c == 0 { + // If we've processed all keys, exit the loop + if cursor == 0 { break } }