migration of open-etc-friends-pool for use with Etica/EGAZ
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

373 lines
8.5 KiB

package policy
import (
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/yuriy0803/open-etc-pool-friends/storage"
"github.com/yuriy0803/open-etc-pool-friends/util"
)
type Config struct {
Workers int `json:"workers"`
Banning Banning `json:"banning"`
Limits Limits `json:"limits"`
ResetInterval string `json:"resetInterval"`
RefreshInterval string `json:"refreshInterval"`
Walletblacklist string `json:"blacklist_file"`
}
type Limits struct {
Enabled bool `json:"enabled"`
Limit int32 `json:"limit"`
Grace string `json:"grace"`
LimitJump int32 `json:"limitJump"`
}
type Banning struct {
Enabled bool `json:"enabled"`
IPSet string `json:"ipset"`
Timeout int64 `json:"timeout"`
InvalidPercent float32 `json:"invalidPercent"`
CheckThreshold int32 `json:"checkThreshold"`
MalformedLimit int32 `json:"malformedLimit"`
Fail2BanCommand string `json:"fail2banCommand"`
}
type Stats struct {
sync.Mutex
// We are using atomic with LastBeat,
// so moving it before the rest in order to avoid alignment issue
LastBeat int64
BannedAt int64
ValidShares int32
InvalidShares int32
Malformed int32
ConnLimit int32
Banned int32
}
type PolicyServer struct {
sync.RWMutex
statsMu sync.Mutex
config *Config
stats map[string]*Stats
banChannel chan string
startedAt int64
grace int64
timeout int64
blacklist []string
whitelist []string
storage *storage.RedisClient
walletblacklist []string
}
// addToFail2Ban adds the given IP address to Fail2Ban's blacklist.
func addToFail2Ban(ip string) error {
cmd := exec.Command("fail2ban-client", "set", "blacklist", "add", ip)
output, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("Error adding to Fail2Ban: %v, Output: %s", err, output)
}
return nil
}
// doBan bans the specified IP address using the configured IPSet and timeout.
func (s *PolicyServer) doBan(ip string) {
set, timeout := s.config.Banning.IPSet, s.config.Banning.Timeout
cmd := fmt.Sprintf("sudo ipset add %s %s timeout %v -!", set, ip, timeout)
args := strings.Fields(cmd)
head := args[0]
args = args[1:]
log.Printf("Banned %v with timeout %v on ipset %s", ip, timeout, set)
_, err := exec.Command(head, args...).Output()
if err != nil {
log.Printf("CMD Error: %s", err)
// Add a call here to add the IP address to Fail2Ban
addToFail2Ban(ip)
}
}
func Start(cfg *Config, storage *storage.RedisClient) *PolicyServer {
s := &PolicyServer{config: cfg, startedAt: util.MakeTimestamp()}
grace := util.MustParseDuration(cfg.Limits.Grace)
s.grace = int64(grace / time.Millisecond)
s.banChannel = make(chan string, 64)
s.stats = make(map[string]*Stats)
s.storage = storage
s.refreshState()
timeout := util.MustParseDuration(s.config.ResetInterval)
s.timeout = int64(timeout / time.Millisecond)
resetIntv := util.MustParseDuration(s.config.ResetInterval)
resetTimer := time.NewTimer(resetIntv)
log.Printf("Set policy stats reset every %v", resetIntv)
refreshIntv := util.MustParseDuration(s.config.RefreshInterval)
refreshTimer := time.NewTimer(refreshIntv)
log.Printf("Set policy state refresh every %v", refreshIntv)
go func() {
for {
select {
case <-resetTimer.C:
s.resetStats()
resetTimer.Reset(resetIntv)
case <-refreshTimer.C:
s.refreshState()
refreshTimer.Reset(refreshIntv)
}
}
}()
for i := 0; i < s.config.Workers; i++ {
s.startPolicyWorker()
}
log.Printf("Running with %v policy workers", s.config.Workers)
return s
}
func (s *PolicyServer) startPolicyWorker() {
go func() {
for {
select {
case ip := <-s.banChannel:
s.doBan(ip)
}
}
}()
}
func (s *PolicyServer) resetStats() {
now := util.MakeTimestamp()
banningTimeout := s.config.Banning.Timeout * 1000
total := 0
s.statsMu.Lock()
defer s.statsMu.Unlock()
for key, m := range s.stats {
lastBeat := atomic.LoadInt64(&m.LastBeat)
bannedAt := atomic.LoadInt64(&m.BannedAt)
if now-bannedAt >= banningTimeout {
atomic.StoreInt64(&m.BannedAt, 0)
if atomic.CompareAndSwapInt32(&m.Banned, 1, 0) {
log.Printf("Ban dropped for %v", key)
delete(s.stats, key)
total++
}
}
if now-lastBeat >= s.timeout {
delete(s.stats, key)
total++
}
}
log.Printf("Flushed stats for %v IP addresses", total)
}
// loads up blacklist of wallets if file is present
func (s *PolicyServer) GetWalletBlacklist() ([]string, error) {
blacklistFileName := s.config.Walletblacklist
blacklistFileName, _ = filepath.Abs(blacklistFileName)
log.Printf("Loading wallet blacklist: %v", blacklistFileName)
blacklistFile, err := os.Open(blacklistFileName)
if err != nil {
log.Printf("File error: %v", err.Error())
return nil, err
}
defer blacklistFile.Close()
var data []string
jsonParser := json.NewDecoder(blacklistFile)
if err := jsonParser.Decode(&data); err != nil {
log.Printf("Blacklist parsing error: %v", err.Error())
return nil, err
}
return data, nil
}
func (s *PolicyServer) refreshState() {
s.Lock()
defer s.Unlock()
var err error
s.blacklist, err = s.storage.GetBlacklist()
if err != nil {
log.Printf("Failed to get blacklist from backend: %v", err)
}
s.whitelist, err = s.storage.GetWhitelist()
if err != nil {
log.Printf("Failed to get whitelist from backend: %v", err)
}
s.walletblacklist, err = s.GetWalletBlacklist()
if err != nil {
log.Printf("Failed to get wallet/login blacklist from json file backend: %v", err)
}
log.Println("Policy state refresh complete")
}
func (s *PolicyServer) NewStats() *Stats {
x := &Stats{
ConnLimit: s.config.Limits.Limit,
}
x.heartbeat()
return x
}
func (s *PolicyServer) Get(ip string) *Stats {
s.statsMu.Lock()
defer s.statsMu.Unlock()
if x, ok := s.stats[ip]; !ok {
x = s.NewStats()
s.stats[ip] = x
return x
} else {
x.heartbeat()
return x
}
}
func (s *PolicyServer) BanClient(ip string) {
x := s.Get(ip)
s.forceBan(x, ip)
}
func (s *PolicyServer) IsBanned(ip string) bool {
x := s.Get(ip)
return atomic.LoadInt32(&x.Banned) > 0
}
func (s *PolicyServer) ApplyLimitPolicy(ip string) bool {
if !s.config.Limits.Enabled {
return true
}
now := util.MakeTimestamp()
if now-s.startedAt > s.grace {
return s.Get(ip).decrLimit() > 0
}
return true
}
func (s *PolicyServer) ApplyLoginWalletPolicy(login string) bool {
if s.InWalletBlackList(login) {
return false
}
return true
}
func (s *PolicyServer) ApplyLoginPolicy(addy, ip string) bool {
if s.InBlackList(addy) {
x := s.Get(ip)
s.forceBan(x, ip)
return false
}
return true
}
func (s *PolicyServer) InWalletBlackList(addy string) bool {
s.RLock()
defer s.RUnlock()
return util.StringInSlice(addy, s.walletblacklist)
}
func (s *PolicyServer) ApplyMalformedPolicy(ip string) bool {
x := s.Get(ip)
n := x.incrMalformed()
if n >= s.config.Banning.MalformedLimit {
s.forceBan(x, ip)
return false
}
return true
}
func (s *PolicyServer) ApplySharePolicy(ip string, validShare bool) bool {
x := s.Get(ip)
x.Lock()
if validShare {
x.ValidShares++
if s.config.Limits.Enabled {
x.incrLimit(s.config.Limits.LimitJump)
}
} else {
x.InvalidShares++
}
totalShares := x.ValidShares + x.InvalidShares
if totalShares < s.config.Banning.CheckThreshold {
x.Unlock()
return true
}
validShares := float32(x.ValidShares)
invalidShares := float32(x.InvalidShares)
x.resetShares()
x.Unlock()
ratio := invalidShares / validShares
if ratio >= s.config.Banning.InvalidPercent/100.0 {
s.forceBan(x, ip)
return false
}
return true
}
func (x *Stats) resetShares() {
x.ValidShares = 0
x.InvalidShares = 0
}
func (s *PolicyServer) forceBan(x *Stats, ip string) {
if !s.config.Banning.Enabled || s.InWhiteList(ip) {
return
}
atomic.StoreInt64(&x.BannedAt, util.MakeTimestamp())
if atomic.CompareAndSwapInt32(&x.Banned, 0, 1) {
if len(s.config.Banning.IPSet) > 0 {
s.banChannel <- ip
} else {
log.Println("Banned peer", ip)
}
}
}
func (x *Stats) incrLimit(n int32) {
atomic.AddInt32(&x.ConnLimit, n)
}
func (x *Stats) incrMalformed() int32 {
return atomic.AddInt32(&x.Malformed, 1)
}
func (x *Stats) decrLimit() int32 {
return atomic.AddInt32(&x.ConnLimit, -1)
}
func (s *PolicyServer) InBlackList(addy string) bool {
s.RLock()
defer s.RUnlock()
return util.StringInSlice(addy, s.blacklist)
}
func (s *PolicyServer) InWhiteList(ip string) bool {
s.RLock()
defer s.RUnlock()
return util.StringInSlice(ip, s.whitelist)
}
func (x *Stats) heartbeat() {
now := util.MakeTimestamp()
atomic.StoreInt64(&x.LastBeat, now)
}