From debfde18e1d1e915b59ef480c9a0989fc931afb4 Mon Sep 17 00:00:00 2001 From: yuriy0803 <68668177+yuriy0803@users.noreply.github.com> Date: Sun, 14 Mar 2021 11:10:37 +0100 Subject: [PATCH] test --- util/proxy/blocks.go | 117 +++++++++++++++ util/proxy/config.go | 73 ++++++++++ util/proxy/handlers.go | 115 +++++++++++++++ util/proxy/miner.go | 95 ++++++++++++ util/proxy/proto.go | 44 ++++++ util/proxy/proxy.go | 320 +++++++++++++++++++++++++++++++++++++++++ util/proxy/stratum.go | 221 ++++++++++++++++++++++++++++ 7 files changed, 985 insertions(+) create mode 100644 util/proxy/blocks.go create mode 100644 util/proxy/config.go create mode 100644 util/proxy/handlers.go create mode 100644 util/proxy/miner.go create mode 100644 util/proxy/proto.go create mode 100644 util/proxy/proxy.go create mode 100644 util/proxy/stratum.go diff --git a/util/proxy/blocks.go b/util/proxy/blocks.go new file mode 100644 index 0000000..941e38d --- /dev/null +++ b/util/proxy/blocks.go @@ -0,0 +1,117 @@ +package proxy + +import ( + "log" + "math/big" + "strconv" + "strings" + "sync" + + "github.com/ethereum/go-ethereum/common" + + "github.com/yuriy0803/open-etc-pool-friends/rpc" + "github.com/yuriy0803/open-etc-pool-friends/util" +) + +const maxBacklog = 3 + +type heightDiffPair struct { + diff *big.Int + height uint64 +} + +type BlockTemplate struct { + sync.RWMutex + Header string + Seed string + Target string + Difficulty *big.Int + Height uint64 + GetPendingBlockCache *rpc.GetBlockReplyPart + nonces map[string]bool + headers map[string]heightDiffPair +} + +type Block struct { + difficulty *big.Int + hashNoNonce common.Hash + nonce uint64 + mixDigest common.Hash + number uint64 +} + +func (b Block) Difficulty() *big.Int { return b.difficulty } +func (b Block) HashNoNonce() common.Hash { return b.hashNoNonce } +func (b Block) Nonce() uint64 { return b.nonce } +func (b Block) MixDigest() common.Hash { return b.mixDigest } +func (b Block) NumberU64() uint64 { return b.number } + +func (s *ProxyServer) fetchBlockTemplate() { + rpc := s.rpc() + t := s.currentBlockTemplate() + pendingReply, height, diff, err := s.fetchPendingBlock() + if err != nil { + log.Printf("Error while refreshing pending block on %s: %s", rpc.Name, err) + return + } + reply, err := rpc.GetWork() + if err != nil { + log.Printf("Error while refreshing block template on %s: %s", rpc.Name, err) + return + } + // No need to update, we have fresh job + if t != nil && t.Header == reply[0] { + return + } + + pendingReply.Difficulty = util.ToHex(s.config.Proxy.Difficulty) + + newTemplate := BlockTemplate{ + Header: reply[0], + Seed: reply[1], + Target: reply[2], + Height: height, + Difficulty: big.NewInt(diff), + GetPendingBlockCache: pendingReply, + headers: make(map[string]heightDiffPair), + } + // Copy job backlog and add current one + newTemplate.headers[reply[0]] = heightDiffPair{ + diff: util.TargetHexToDiff(reply[2]), + height: height, + } + if t != nil { + for k, v := range t.headers { + if v.height > height-maxBacklog { + newTemplate.headers[k] = v + } + } + } + s.blockTemplate.Store(&newTemplate) + log.Printf("New block to mine on %s at height %d / %s", rpc.Name, height, reply[0][0:10]) + + // Stratum + if s.config.Proxy.Stratum.Enabled { + go s.broadcastNewJobs() + } +} + +func (s *ProxyServer) fetchPendingBlock() (*rpc.GetBlockReplyPart, uint64, int64, error) { + rpc := s.rpc() + reply, err := rpc.GetPendingBlock() + if err != nil { + log.Printf("Error while refreshing pending block on %s: %s", rpc.Name, err) + return nil, 0, 0, err + } + blockNumber, err := strconv.ParseUint(strings.Replace(reply.Number, "0x", "", -1), 16, 64) + if err != nil { + log.Println("Can't parse pending block number") + return nil, 0, 0, err + } + blockDiff, err := strconv.ParseInt(strings.Replace(reply.Difficulty, "0x", "", -1), 16, 64) + if err != nil { + log.Println("Can't parse pending block difficulty") + return nil, 0, 0, err + } + return reply, blockNumber, blockDiff, nil +} diff --git a/util/proxy/config.go b/util/proxy/config.go new file mode 100644 index 0000000..c4701ce --- /dev/null +++ b/util/proxy/config.go @@ -0,0 +1,73 @@ +package proxy + +import ( + "github.com/yuriy0803/open-etc-pool-friends/api" + "github.com/yuriy0803/open-etc-pool-friends/payouts" + "github.com/yuriy0803/open-etc-pool-friends/policy" + "github.com/yuriy0803/open-etc-pool-friends/storage" +) + +type Config struct { + Name string `json:"name"` + Proxy Proxy `json:"proxy"` + Api api.ApiConfig `json:"api"` + Upstream []Upstream `json:"upstream"` + UpstreamCheckInterval string `json:"upstreamCheckInterval"` + + Threads int `json:"threads"` + + Network string `json:"network"` + Coin string `json:"coin"` + Pplns int64 `json:"pplns"` + Redis storage.Config `json:"redis"` + + BlockUnlocker payouts.UnlockerConfig `json:"unlocker"` + Payouts payouts.PayoutsConfig `json:"payouts"` + + NewrelicName string `json:"newrelicName"` + NewrelicKey string `json:"newrelicKey"` + NewrelicVerbose bool `json:"newrelicVerbose"` + NewrelicEnabled bool `json:"newrelicEnabled"` +} + +type Proxy struct { + Enabled bool `json:"enabled"` + Listen string `json:"listen"` + LimitHeadersSize int `json:"limitHeadersSize"` + LimitBodySize int64 `json:"limitBodySize"` + BehindReverseProxy bool `json:"behindReverseProxy"` + BlockRefreshInterval string `json:"blockRefreshInterval"` + Difficulty int64 `json:"difficulty"` + StateUpdateInterval string `json:"stateUpdateInterval"` + HashrateExpiration string `json:"hashrateExpiration"` + + Policy policy.Config `json:"policy"` + + MaxFails int64 `json:"maxFails"` + HealthCheck bool `json:"healthCheck"` + Debug bool `json:"debug"` + + Stratum Stratum `json:"stratum"` + + StratumNiceHash StratumNiceHash `json:"stratum_nice_hash"` +} + +type Stratum struct { + Enabled bool `json:"enabled"` + Listen string `json:"listen"` + Timeout string `json:"timeout"` + MaxConn int `json:"maxConn"` +} + +type StratumNiceHash struct { + Enabled bool `json:"enabled"` + Listen string `json:"listen"` + Timeout string `json:"timeout"` + MaxConn int `json:"maxConn"` +} + +type Upstream struct { + Name string `json:"name"` + Url string `json:"url"` + Timeout string `json:"timeout"` +} diff --git a/util/proxy/handlers.go b/util/proxy/handlers.go new file mode 100644 index 0000000..921dc93 --- /dev/null +++ b/util/proxy/handlers.go @@ -0,0 +1,115 @@ +package proxy + +import ( + "errors" + "log" + "regexp" + "strings" + + "github.com/yuriy0803/open-etc-pool-friends/rpc" + "github.com/yuriy0803/open-etc-pool-friends/util" +) + +// Allow only lowercase hexadecimal with 0x prefix +var noncePattern = regexp.MustCompile("^0x[0-9a-f]{16}$") +var hashPattern = regexp.MustCompile("^0x[0-9a-f]{64}$") +var workerPattern = regexp.MustCompile("^[0-9a-zA-Z-_]{1,8}$") + +// Stratum +func (s *ProxyServer) handleLoginRPC(cs *Session, params []string, id string) (bool, *ErrorReply) { + if len(params) == 0 { + return false, &ErrorReply{Code: -1, Message: "Invalid params"} + } + + login := strings.ToLower(params[0]) + if !util.IsValidHexAddress(login) { + return false, &ErrorReply{Code: -1, Message: "Invalid login"} + } + if !s.policy.ApplyLoginPolicy(login, cs.ip) { + return false, &ErrorReply{Code: -1, Message: "You are blacklisted"} + } + cs.login = login + s.registerSession(cs) + log.Printf("Stratum miner connected %v@%v", login, cs.ip) + return true, nil +} + +func (s *ProxyServer) handleGetWorkRPC(cs *Session) ([]string, *ErrorReply) { + t := s.currentBlockTemplate() + if t == nil || len(t.Header) == 0 || s.isSick() { + return nil, &ErrorReply{Code: 0, Message: "Work not ready"} + } + return []string{t.Header, t.Seed, s.diff}, nil +} + +// Stratum +func (s *ProxyServer) handleTCPSubmitRPC(cs *Session, id string, params []string) (bool, *ErrorReply) { + s.sessionsMu.RLock() + _, ok := s.sessions[cs] + s.sessionsMu.RUnlock() + + if !ok { + return false, &ErrorReply{Code: 25, Message: "Not subscribed"} + } + return s.handleSubmitRPC(cs, cs.login, id, params) +} + +func (s *ProxyServer) handleSubmitRPC(cs *Session, login, id string, params []string) (bool, *ErrorReply) { + if !workerPattern.MatchString(id) { + id = "0" + } + if len(params) != 3 { + s.policy.ApplyMalformedPolicy(cs.ip) + log.Printf("Malformed params from %s@%s %v", login, cs.ip, params) + return false, &ErrorReply{Code: -1, Message: "Invalid params"} + } + + if !noncePattern.MatchString(params[0]) || !hashPattern.MatchString(params[1]) || !hashPattern.MatchString(params[2]) { + s.policy.ApplyMalformedPolicy(cs.ip) + log.Printf("Malformed PoW result from %s@%s %v", login, cs.ip, params) + return false, &ErrorReply{Code: -1, Message: "Malformed PoW result"} + } + go func(s *ProxyServer, cs *Session, login, id string, params []string) { + t := s.currentBlockTemplate() + exist, validShare := s.processShare(login, id, cs.ip, t, params) + ok := s.policy.ApplySharePolicy(cs.ip, !exist && validShare) + + if exist { + log.Printf("Duplicate share from %s@%s %v", login, cs.ip, params) + cs.lastErr = errors.New("Duplicate share") + } + + if !validShare { + log.Printf("Invalid share from %s@%s", login, cs.ip) + // Bad shares limit reached, return error and close + if !ok { + cs.lastErr = errors.New("Invalid share") + } + } + + if s.config.Proxy.Debug { + log.Printf("Valid share from %s@%s", login, cs.ip) + } + + if !ok { + cs.lastErr = errors.New("High rate of invalid shares") + } + }(s, cs, login, id, params) + + return true, nil +} + +func (s *ProxyServer) handleGetBlockByNumberRPC() *rpc.GetBlockReplyPart { + t := s.currentBlockTemplate() + var reply *rpc.GetBlockReplyPart + if t != nil { + reply = t.GetPendingBlockCache + } + return reply +} + +func (s *ProxyServer) handleUnknownRPC(cs *Session, m string) *ErrorReply { + log.Printf("Unknown request method %s from %s", m, cs.ip) + s.policy.ApplyMalformedPolicy(cs.ip) + return &ErrorReply{Code: -3, Message: "Method not found"} +} diff --git a/util/proxy/miner.go b/util/proxy/miner.go new file mode 100644 index 0000000..3c0b28e --- /dev/null +++ b/util/proxy/miner.go @@ -0,0 +1,95 @@ +package proxy + +import ( + "log" + "math/big" + "strconv" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/yuriy0803/go-etchash" +) + +var ecip1099FBlockClassic uint64 = 11700000 // classic mainnet +var ecip1099FBlockMordor uint64 = 2520000 // mordor + +var hasher *etchash.Etchash = nil + +func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, params []string) (bool, bool) { + if hasher == nil { + if s.config.Network == "classic" { + hasher = etchash.New(&ecip1099FBlockClassic) + } else if s.config.Network == "mordor" { + hasher = etchash.New(&ecip1099FBlockMordor) + } else { + // unknown network + log.Printf("Unknown network configuration %s", s.config.Network) + return false, false + } + } + nonceHex := params[0] + hashNoNonce := params[1] + mixDigest := params[2] + nonce, _ := strconv.ParseUint(strings.Replace(nonceHex, "0x", "", -1), 16, 64) + shareDiff := s.config.Proxy.Difficulty + + h, ok := t.headers[hashNoNonce] + if !ok { + log.Printf("Stale share from %v@%v", login, ip) + s.backend.WriteWorkerShareStatus(login, id, false, true, false) + return false, false + } + + share := Block{ + number: h.height, + hashNoNonce: common.HexToHash(hashNoNonce), + difficulty: big.NewInt(shareDiff), + nonce: nonce, + mixDigest: common.HexToHash(mixDigest), + } + + block := Block{ + number: h.height, + hashNoNonce: common.HexToHash(hashNoNonce), + difficulty: h.diff, + nonce: nonce, + mixDigest: common.HexToHash(mixDigest), + } + + if !hasher.Verify(share) { + s.backend.WriteWorkerShareStatus(login, id, false, false, true) + return false, false + } + + if hasher.Verify(block) { + ok, err := s.rpc().SubmitBlock(params) + if err != nil { + log.Printf("Block submission failure at height %v for %v: %v", h.height, t.Header, err) + } else if !ok { + log.Printf("Block rejected at height %v for %v", h.height, t.Header) + return false, false + } else { + s.fetchBlockTemplate() + exist, err := s.backend.WriteBlock(login, id, params, shareDiff, h.diff.Int64(), h.height, s.hashrateExpiration) + if exist { + return true, false + } + if err != nil { + log.Println("Failed to insert block candidate into backend:", err) + } else { + log.Printf("Inserted block %v to backend", h.height) + } + log.Printf("Block found by miner %v@%v at height %d", login, ip, h.height) + } + } else { + exist, err := s.backend.WriteShare(login, id, params, shareDiff, h.height, s.hashrateExpiration) + if exist { + return true, false + } + if err != nil { + log.Println("Failed to insert share data into backend:", err) + } + } + s.backend.WriteWorkerShareStatus(login, id, true, false, false) + return false, true +} diff --git a/util/proxy/proto.go b/util/proxy/proto.go new file mode 100644 index 0000000..ae3b4ec --- /dev/null +++ b/util/proxy/proto.go @@ -0,0 +1,44 @@ +package proxy + +import "encoding/json" + +type JSONRpcReq struct { + Id json.RawMessage `json:"id"` + Method string `json:"method"` + Params json.RawMessage `json:"params"` +} + +type JSONRpcReqNH struct { + Id interface{} `json:"id"` + Method string `json:"method"` + Params interface{} `json:"params"` +} + +type StratumReq struct { + JSONRpcReq + Worker string `json:"worker"` +} + +// Stratum +type JSONPushMessage struct { + // FIXME: Temporarily add ID for Claymore compliance + Id int64 `json:"id"` + Version string `json:"jsonrpc"` + Result interface{} `json:"result"` +} + +type JSONRpcResp struct { + Id json.RawMessage `json:"id"` + Version string `json:"jsonrpc"` + Result interface{} `json:"result"` + Error interface{} `json:"error,omitempty"` +} + +type SubmitReply struct { + Status string `json:"status"` +} + +type ErrorReply struct { + Code int `json:"code"` + Message string `json:"message"` +} diff --git a/util/proxy/proxy.go b/util/proxy/proxy.go new file mode 100644 index 0000000..276c8ae --- /dev/null +++ b/util/proxy/proxy.go @@ -0,0 +1,320 @@ +package proxy + +import ( + "encoding/json" + "io" + "log" + "net" + "net/http" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/mux" + + "github.com/yuriy0803/open-etc-pool-friends/policy" + "github.com/yuriy0803/open-etc-pool-friends/rpc" + "github.com/yuriy0803/open-etc-pool-friends/storage" + "github.com/yuriy0803/open-etc-pool-friends/util" +) + +type ProxyServer struct { + config *Config + blockTemplate atomic.Value + upstream int32 + upstreams []*rpc.RPCClient + backend *storage.RedisClient + diff string + policy *policy.PolicyServer + hashrateExpiration time.Duration + failsCount int64 + + // Stratum + sessionsMu sync.RWMutex + sessions map[*Session]struct{} + timeout time.Duration + Extranonce string +} + +type jobDetails struct { + JobID string + SeedHash string + HeaderHash string +} + +type Session struct { + ip string + enc *json.Encoder + + // Stratum + sync.Mutex + conn *net.TCPConn + login string + lastErr error + subscriptionID string + JobDeatils jobDetails +} + +func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer { + if len(cfg.Name) == 0 { + log.Fatal("You must set instance name") + } + policy := policy.Start(&cfg.Proxy.Policy, backend) + + proxy := &ProxyServer{config: cfg, backend: backend, policy: policy} + proxy.diff = util.GetTargetHex(cfg.Proxy.Difficulty) + + proxy.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream)) + for i, v := range cfg.Upstream { + proxy.upstreams[i] = rpc.NewRPCClient(v.Name, v.Url, v.Timeout) + log.Printf("Upstream: %s => %s", v.Name, v.Url) + } + log.Printf("Default upstream: %s => %s", proxy.rpc().Name, proxy.rpc().Url) + + if cfg.Proxy.Stratum.Enabled { + proxy.sessions = make(map[*Session]struct{}) + go proxy.ListenTCP() + } + + proxy.fetchBlockTemplate() + + proxy.hashrateExpiration = util.MustParseDuration(cfg.Proxy.HashrateExpiration) + + refreshIntv := util.MustParseDuration(cfg.Proxy.BlockRefreshInterval) + refreshTimer := time.NewTimer(refreshIntv) + log.Printf("Set block refresh every %v", refreshIntv) + + checkIntv := util.MustParseDuration(cfg.UpstreamCheckInterval) + checkTimer := time.NewTimer(checkIntv) + + stateUpdateIntv := util.MustParseDuration(cfg.Proxy.StateUpdateInterval) + stateUpdateTimer := time.NewTimer(stateUpdateIntv) + + go func() { + for { + select { + case <-refreshTimer.C: + proxy.fetchBlockTemplate() + refreshTimer.Reset(refreshIntv) + } + } + }() + + go func() { + for { + select { + case <-checkTimer.C: + proxy.checkUpstreams() + checkTimer.Reset(checkIntv) + } + } + }() + + go func() { + for { + select { + case <-stateUpdateTimer.C: + t := proxy.currentBlockTemplate() + if t != nil { + err := backend.WriteNodeState(cfg.Name, t.Height, t.Difficulty) + if err != nil { + log.Printf("Failed to write node state to backend: %v", err) + proxy.markSick() + } else { + proxy.markOk() + } + } + stateUpdateTimer.Reset(stateUpdateIntv) + } + } + }() + + return proxy +} + +func (s *ProxyServer) Start() { + log.Printf("Starting proxy on %v", s.config.Proxy.Listen) + r := mux.NewRouter() + r.Handle("/{login:0x[0-9a-fA-F]{40}}/{id:[0-9a-zA-Z-_]{1,8}}", s) + r.Handle("/{login:0x[0-9a-fA-F]{40}}", s) + srv := &http.Server{ + Addr: s.config.Proxy.Listen, + Handler: r, + MaxHeaderBytes: s.config.Proxy.LimitHeadersSize, + } + err := srv.ListenAndServe() + if err != nil { + log.Fatalf("Failed to start proxy: %v", err) + } +} + +func (s *ProxyServer) rpc() *rpc.RPCClient { + i := atomic.LoadInt32(&s.upstream) + return s.upstreams[i] +} + +func (s *ProxyServer) checkUpstreams() { + candidate := int32(0) + backup := false + + for i, v := range s.upstreams { + if v.Check() && !backup { + candidate = int32(i) + backup = true + } + } + + if s.upstream != candidate { + log.Printf("Switching to %v upstream", s.upstreams[candidate].Name) + atomic.StoreInt32(&s.upstream, candidate) + } +} + +func (s *ProxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + s.writeError(w, 405, "rpc: POST method required, received "+r.Method) + return + } + ip := s.remoteAddr(r) + if !s.policy.IsBanned(ip) { + s.handleClient(w, r, ip) + } +} + +func (s *ProxyServer) remoteAddr(r *http.Request) string { + if s.config.Proxy.BehindReverseProxy { + ip := r.Header.Get("X-Forwarded-For") + if len(ip) > 0 && net.ParseIP(ip) != nil { + return ip + } + } + ip, _, _ := net.SplitHostPort(r.RemoteAddr) + return ip +} + +func (s *ProxyServer) handleClient(w http.ResponseWriter, r *http.Request, ip string) { + if r.ContentLength > s.config.Proxy.LimitBodySize { + log.Printf("Socket flood from %s", ip) + s.policy.ApplyMalformedPolicy(ip) + http.Error(w, "Request too large", http.StatusExpectationFailed) + return + } + r.Body = http.MaxBytesReader(w, r.Body, s.config.Proxy.LimitBodySize) + defer r.Body.Close() + + cs := &Session{ip: ip, enc: json.NewEncoder(w)} + dec := json.NewDecoder(r.Body) + for { + var req JSONRpcReq + if err := dec.Decode(&req); err == io.EOF { + break + } else if err != nil { + log.Printf("Malformed request from %v: %v", ip, err) + s.policy.ApplyMalformedPolicy(ip) + return + } + cs.handleMessage(s, r, &req) + } +} + +func (cs *Session) handleMessage(s *ProxyServer, r *http.Request, req *JSONRpcReq) { + if req.Id == nil { + log.Printf("Missing RPC id from %s", cs.ip) + s.policy.ApplyMalformedPolicy(cs.ip) + return + } + + vars := mux.Vars(r) + login := strings.ToLower(vars["login"]) + + if !util.IsValidHexAddress(login) { + errReply := &ErrorReply{Code: -1, Message: "Invalid login"} + cs.sendError(req.Id, errReply) + return + } + if !s.policy.ApplyLoginPolicy(login, cs.ip) { + errReply := &ErrorReply{Code: -1, Message: "You are blacklisted"} + cs.sendError(req.Id, errReply) + return + } + + // Handle RPC methods + switch req.Method { + case "eth_getWork": + reply, errReply := s.handleGetWorkRPC(cs) + if errReply != nil { + cs.sendError(req.Id, errReply) + break + } + cs.sendResult(req.Id, &reply) + case "eth_submitWork": + if req.Params != nil { + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil { + log.Printf("Unable to parse params from %v", cs.ip) + s.policy.ApplyMalformedPolicy(cs.ip) + break + } + reply, errReply := s.handleSubmitRPC(cs, login, vars["id"], params) + if errReply != nil { + cs.sendError(req.Id, errReply) + break + } + cs.sendResult(req.Id, &reply) + } else { + s.policy.ApplyMalformedPolicy(cs.ip) + errReply := &ErrorReply{Code: -1, Message: "Malformed request"} + cs.sendError(req.Id, errReply) + } + case "eth_getBlockByNumber": + reply := s.handleGetBlockByNumberRPC() + cs.sendResult(req.Id, reply) + case "eth_submitHashrate": + cs.sendResult(req.Id, true) + default: + errReply := s.handleUnknownRPC(cs, req.Method) + cs.sendError(req.Id, errReply) + } +} + +func (cs *Session) sendResult(id json.RawMessage, result interface{}) error { + message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} + return cs.enc.Encode(&message) +} + +func (cs *Session) sendError(id json.RawMessage, reply *ErrorReply) error { + message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} + return cs.enc.Encode(&message) +} + +func (s *ProxyServer) writeError(w http.ResponseWriter, status int, msg string) { + w.WriteHeader(status) + w.Header().Set("Content-Type", "text/plain; charset=utf-8") +} + +func (s *ProxyServer) currentBlockTemplate() *BlockTemplate { + t := s.blockTemplate.Load() + if t != nil { + return t.(*BlockTemplate) + } else { + return nil + } +} + +func (s *ProxyServer) markSick() { + atomic.AddInt64(&s.failsCount, 1) +} + +func (s *ProxyServer) isSick() bool { + x := atomic.LoadInt64(&s.failsCount) + if s.config.Proxy.HealthCheck && x >= s.config.Proxy.MaxFails { + return true + } + return false +} + +func (s *ProxyServer) markOk() { + atomic.StoreInt64(&s.failsCount, 0) +} diff --git a/util/proxy/stratum.go b/util/proxy/stratum.go new file mode 100644 index 0000000..50422ba --- /dev/null +++ b/util/proxy/stratum.go @@ -0,0 +1,221 @@ +package proxy + +import ( + "bufio" + "encoding/json" + "errors" + "io" + "log" + "net" + "time" + + "github.com/yuriy0803/open-etc-pool-friends/util" +) + +const ( + MaxReqSize = 1024 +) + +func (s *ProxyServer) ListenTCP() { + timeout := util.MustParseDuration(s.config.Proxy.Stratum.Timeout) + s.timeout = timeout + + addr, err := net.ResolveTCPAddr("tcp4", s.config.Proxy.Stratum.Listen) + if err != nil { + log.Fatalf("Error: %v", err) + } + server, err := net.ListenTCP("tcp4", addr) + if err != nil { + log.Fatalf("Error: %v", err) + } + defer server.Close() + + log.Printf("Stratum listening on %s", s.config.Proxy.Stratum.Listen) + var accept = make(chan int, s.config.Proxy.Stratum.MaxConn) + n := 0 + + for { + conn, err := server.AcceptTCP() + if err != nil { + continue + } + conn.SetKeepAlive(true) + + ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) + + if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) { + conn.Close() + continue + } + n += 1 + cs := &Session{conn: conn, ip: ip} + + accept <- n + go func(cs *Session) { + err = s.handleTCPClient(cs) + if err != nil || cs.lastErr != nil { + s.removeSession(cs) + conn.Close() + } + <-accept + }(cs) + } +} + +func (s *ProxyServer) handleTCPClient(cs *Session) error { + cs.enc = json.NewEncoder(cs.conn) + connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize) + s.setDeadline(cs.conn) + for { + data, isPrefix, err := connbuff.ReadLine() + if isPrefix { + log.Printf("Socket flood detected from %s", cs.ip) + s.policy.BanClient(cs.ip) + return err + } else if err == io.EOF { + log.Printf("Client %s disconnected", cs.ip) + s.removeSession(cs) + break + } else if err != nil { + log.Printf("Error reading from socket: %v", err) + return err + } + + if len(data) > 1 { + var req StratumReq + err = json.Unmarshal(data, &req) + if err != nil { + s.policy.ApplyMalformedPolicy(cs.ip) + log.Printf("Malformed stratum request from %s: %v", cs.ip, err) + return err + } + s.setDeadline(cs.conn) + err = cs.handleTCPMessage(s, &req) + if err != nil { + return err + } + } + } + return nil +} + +func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { + // Handle RPC methods + switch req.Method { + case "eth_submitLogin": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil { + log.Println("Malformed stratum request params from", cs.ip) + return err + } + reply, errReply := s.handleLoginRPC(cs, params, req.Worker) + if errReply != nil { + return cs.sendTCPError(req.Id, errReply) + } + return cs.sendTCPResult(req.Id, reply) + case "eth_getWork": + reply, errReply := s.handleGetWorkRPC(cs) + if errReply != nil { + return cs.sendTCPError(req.Id, errReply) + } + return cs.sendTCPResult(req.Id, &reply) + case "eth_submitWork": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil { + log.Println("Malformed stratum request params from", cs.ip) + return err + } + reply, errReply := s.handleTCPSubmitRPC(cs, req.Worker, params) + if errReply != nil { + return cs.sendTCPError(req.Id, errReply) + } + return cs.sendTCPResult(req.Id, &reply) + case "eth_submitHashrate": + return cs.sendTCPResult(req.Id, true) + default: + errReply := s.handleUnknownRPC(cs, req.Method) + return cs.sendTCPError(req.Id, errReply) + } +} + +func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error { + cs.Lock() + defer cs.Unlock() + + message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} + return cs.enc.Encode(&message) +} + +func (cs *Session) pushNewJob(result interface{}) error { + cs.Lock() + defer cs.Unlock() + // FIXME: Temporarily add ID for Claymore compliance + message := JSONPushMessage{Version: "2.0", Result: result, Id: 0} + return cs.enc.Encode(&message) +} + +func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error { + cs.Lock() + defer cs.Unlock() + + message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} + err := cs.enc.Encode(&message) + if err != nil { + return err + } + return errors.New(reply.Message) +} + +func (self *ProxyServer) setDeadline(conn *net.TCPConn) { + conn.SetDeadline(time.Now().Add(self.timeout)) +} + +func (s *ProxyServer) registerSession(cs *Session) { + s.sessionsMu.Lock() + defer s.sessionsMu.Unlock() + s.sessions[cs] = struct{}{} +} + +func (s *ProxyServer) removeSession(cs *Session) { + s.sessionsMu.Lock() + defer s.sessionsMu.Unlock() + delete(s.sessions, cs) +} + +func (s *ProxyServer) broadcastNewJobs() { + t := s.currentBlockTemplate() + if t == nil || len(t.Header) == 0 || s.isSick() { + return + } + reply := []string{t.Header, t.Seed, s.diff} + + s.sessionsMu.RLock() + defer s.sessionsMu.RUnlock() + + count := len(s.sessions) + log.Printf("Broadcasting new job to %v stratum miners", count) + + s.backend.NumberStratumWorker(count) + start := time.Now() + bcast := make(chan int, 1024) + n := 0 + + for m, _ := range s.sessions { + n++ + bcast <- n + + go func(cs *Session) { + err := cs.pushNewJob(&reply) + <-bcast + if err != nil { + log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err) + s.removeSession(cs) + } else { + s.setDeadline(cs.conn) + } + }(m) + } + log.Printf("Jobs broadcast finished %s", time.Since(start)) +}