From 428ae9eceb0d38c337d82fca0999a6cf46661154 Mon Sep 17 00:00:00 2001 From: yuriy0803 <68668177+yuriy0803@users.noreply.github.com> Date: Tue, 14 Sep 2021 22:42:18 +0200 Subject: [PATCH] =?UTF-8?q?L=C3=B6schen?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- util/proxy/blocks.go | 117 --------------- util/proxy/config.go | 73 ---------- util/proxy/handlers.go | 115 --------------- util/proxy/miner.go | 95 ------------ util/proxy/proto.go | 44 ------ util/proxy/proxy.go | 320 ----------------------------------------- util/proxy/stratum.go | 221 ---------------------------- 7 files changed, 985 deletions(-) delete mode 100644 util/proxy/blocks.go delete mode 100644 util/proxy/config.go delete mode 100644 util/proxy/handlers.go delete mode 100644 util/proxy/miner.go delete mode 100644 util/proxy/proto.go delete mode 100644 util/proxy/proxy.go delete mode 100644 util/proxy/stratum.go diff --git a/util/proxy/blocks.go b/util/proxy/blocks.go deleted file mode 100644 index 941e38d..0000000 --- a/util/proxy/blocks.go +++ /dev/null @@ -1,117 +0,0 @@ -package proxy - -import ( - "log" - "math/big" - "strconv" - "strings" - "sync" - - "github.com/ethereum/go-ethereum/common" - - "github.com/yuriy0803/open-etc-pool-friends/rpc" - "github.com/yuriy0803/open-etc-pool-friends/util" -) - -const maxBacklog = 3 - -type heightDiffPair struct { - diff *big.Int - height uint64 -} - -type BlockTemplate struct { - sync.RWMutex - Header string - Seed string - Target string - Difficulty *big.Int - Height uint64 - GetPendingBlockCache *rpc.GetBlockReplyPart - nonces map[string]bool - headers map[string]heightDiffPair -} - -type Block struct { - difficulty *big.Int - hashNoNonce common.Hash - nonce uint64 - mixDigest common.Hash - number uint64 -} - -func (b Block) Difficulty() *big.Int { return b.difficulty } -func (b Block) HashNoNonce() common.Hash { return b.hashNoNonce } -func (b Block) Nonce() uint64 { return b.nonce } -func (b Block) MixDigest() common.Hash { return b.mixDigest } -func (b Block) NumberU64() uint64 { return b.number } - -func (s *ProxyServer) fetchBlockTemplate() { - rpc := s.rpc() - t := s.currentBlockTemplate() - pendingReply, height, diff, err := s.fetchPendingBlock() - if err != nil { - log.Printf("Error while refreshing pending block on %s: %s", rpc.Name, err) - return - } - reply, err := rpc.GetWork() - if err != nil { - log.Printf("Error while refreshing block template on %s: %s", rpc.Name, err) - return - } - // No need to update, we have fresh job - if t != nil && t.Header == reply[0] { - return - } - - pendingReply.Difficulty = util.ToHex(s.config.Proxy.Difficulty) - - newTemplate := BlockTemplate{ - Header: reply[0], - Seed: reply[1], - Target: reply[2], - Height: height, - Difficulty: big.NewInt(diff), - GetPendingBlockCache: pendingReply, - headers: make(map[string]heightDiffPair), - } - // Copy job backlog and add current one - newTemplate.headers[reply[0]] = heightDiffPair{ - diff: util.TargetHexToDiff(reply[2]), - height: height, - } - if t != nil { - for k, v := range t.headers { - if v.height > height-maxBacklog { - newTemplate.headers[k] = v - } - } - } - s.blockTemplate.Store(&newTemplate) - log.Printf("New block to mine on %s at height %d / %s", rpc.Name, height, reply[0][0:10]) - - // Stratum - if s.config.Proxy.Stratum.Enabled { - go s.broadcastNewJobs() - } -} - -func (s *ProxyServer) fetchPendingBlock() (*rpc.GetBlockReplyPart, uint64, int64, error) { - rpc := s.rpc() - reply, err := rpc.GetPendingBlock() - if err != nil { - log.Printf("Error while refreshing pending block on %s: %s", rpc.Name, err) - return nil, 0, 0, err - } - blockNumber, err := strconv.ParseUint(strings.Replace(reply.Number, "0x", "", -1), 16, 64) - if err != nil { - log.Println("Can't parse pending block number") - return nil, 0, 0, err - } - blockDiff, err := strconv.ParseInt(strings.Replace(reply.Difficulty, "0x", "", -1), 16, 64) - if err != nil { - log.Println("Can't parse pending block difficulty") - return nil, 0, 0, err - } - return reply, blockNumber, blockDiff, nil -} diff --git a/util/proxy/config.go b/util/proxy/config.go deleted file mode 100644 index c4701ce..0000000 --- a/util/proxy/config.go +++ /dev/null @@ -1,73 +0,0 @@ -package proxy - -import ( - "github.com/yuriy0803/open-etc-pool-friends/api" - "github.com/yuriy0803/open-etc-pool-friends/payouts" - "github.com/yuriy0803/open-etc-pool-friends/policy" - "github.com/yuriy0803/open-etc-pool-friends/storage" -) - -type Config struct { - Name string `json:"name"` - Proxy Proxy `json:"proxy"` - Api api.ApiConfig `json:"api"` - Upstream []Upstream `json:"upstream"` - UpstreamCheckInterval string `json:"upstreamCheckInterval"` - - Threads int `json:"threads"` - - Network string `json:"network"` - Coin string `json:"coin"` - Pplns int64 `json:"pplns"` - Redis storage.Config `json:"redis"` - - BlockUnlocker payouts.UnlockerConfig `json:"unlocker"` - Payouts payouts.PayoutsConfig `json:"payouts"` - - NewrelicName string `json:"newrelicName"` - NewrelicKey string `json:"newrelicKey"` - NewrelicVerbose bool `json:"newrelicVerbose"` - NewrelicEnabled bool `json:"newrelicEnabled"` -} - -type Proxy struct { - Enabled bool `json:"enabled"` - Listen string `json:"listen"` - LimitHeadersSize int `json:"limitHeadersSize"` - LimitBodySize int64 `json:"limitBodySize"` - BehindReverseProxy bool `json:"behindReverseProxy"` - BlockRefreshInterval string `json:"blockRefreshInterval"` - Difficulty int64 `json:"difficulty"` - StateUpdateInterval string `json:"stateUpdateInterval"` - HashrateExpiration string `json:"hashrateExpiration"` - - Policy policy.Config `json:"policy"` - - MaxFails int64 `json:"maxFails"` - HealthCheck bool `json:"healthCheck"` - Debug bool `json:"debug"` - - Stratum Stratum `json:"stratum"` - - StratumNiceHash StratumNiceHash `json:"stratum_nice_hash"` -} - -type Stratum struct { - Enabled bool `json:"enabled"` - Listen string `json:"listen"` - Timeout string `json:"timeout"` - MaxConn int `json:"maxConn"` -} - -type StratumNiceHash struct { - Enabled bool `json:"enabled"` - Listen string `json:"listen"` - Timeout string `json:"timeout"` - MaxConn int `json:"maxConn"` -} - -type Upstream struct { - Name string `json:"name"` - Url string `json:"url"` - Timeout string `json:"timeout"` -} diff --git a/util/proxy/handlers.go b/util/proxy/handlers.go deleted file mode 100644 index 921dc93..0000000 --- a/util/proxy/handlers.go +++ /dev/null @@ -1,115 +0,0 @@ -package proxy - -import ( - "errors" - "log" - "regexp" - "strings" - - "github.com/yuriy0803/open-etc-pool-friends/rpc" - "github.com/yuriy0803/open-etc-pool-friends/util" -) - -// Allow only lowercase hexadecimal with 0x prefix -var noncePattern = regexp.MustCompile("^0x[0-9a-f]{16}$") -var hashPattern = regexp.MustCompile("^0x[0-9a-f]{64}$") -var workerPattern = regexp.MustCompile("^[0-9a-zA-Z-_]{1,8}$") - -// Stratum -func (s *ProxyServer) handleLoginRPC(cs *Session, params []string, id string) (bool, *ErrorReply) { - if len(params) == 0 { - return false, &ErrorReply{Code: -1, Message: "Invalid params"} - } - - login := strings.ToLower(params[0]) - if !util.IsValidHexAddress(login) { - return false, &ErrorReply{Code: -1, Message: "Invalid login"} - } - if !s.policy.ApplyLoginPolicy(login, cs.ip) { - return false, &ErrorReply{Code: -1, Message: "You are blacklisted"} - } - cs.login = login - s.registerSession(cs) - log.Printf("Stratum miner connected %v@%v", login, cs.ip) - return true, nil -} - -func (s *ProxyServer) handleGetWorkRPC(cs *Session) ([]string, *ErrorReply) { - t := s.currentBlockTemplate() - if t == nil || len(t.Header) == 0 || s.isSick() { - return nil, &ErrorReply{Code: 0, Message: "Work not ready"} - } - return []string{t.Header, t.Seed, s.diff}, nil -} - -// Stratum -func (s *ProxyServer) handleTCPSubmitRPC(cs *Session, id string, params []string) (bool, *ErrorReply) { - s.sessionsMu.RLock() - _, ok := s.sessions[cs] - s.sessionsMu.RUnlock() - - if !ok { - return false, &ErrorReply{Code: 25, Message: "Not subscribed"} - } - return s.handleSubmitRPC(cs, cs.login, id, params) -} - -func (s *ProxyServer) handleSubmitRPC(cs *Session, login, id string, params []string) (bool, *ErrorReply) { - if !workerPattern.MatchString(id) { - id = "0" - } - if len(params) != 3 { - s.policy.ApplyMalformedPolicy(cs.ip) - log.Printf("Malformed params from %s@%s %v", login, cs.ip, params) - return false, &ErrorReply{Code: -1, Message: "Invalid params"} - } - - if !noncePattern.MatchString(params[0]) || !hashPattern.MatchString(params[1]) || !hashPattern.MatchString(params[2]) { - s.policy.ApplyMalformedPolicy(cs.ip) - log.Printf("Malformed PoW result from %s@%s %v", login, cs.ip, params) - return false, &ErrorReply{Code: -1, Message: "Malformed PoW result"} - } - go func(s *ProxyServer, cs *Session, login, id string, params []string) { - t := s.currentBlockTemplate() - exist, validShare := s.processShare(login, id, cs.ip, t, params) - ok := s.policy.ApplySharePolicy(cs.ip, !exist && validShare) - - if exist { - log.Printf("Duplicate share from %s@%s %v", login, cs.ip, params) - cs.lastErr = errors.New("Duplicate share") - } - - if !validShare { - log.Printf("Invalid share from %s@%s", login, cs.ip) - // Bad shares limit reached, return error and close - if !ok { - cs.lastErr = errors.New("Invalid share") - } - } - - if s.config.Proxy.Debug { - log.Printf("Valid share from %s@%s", login, cs.ip) - } - - if !ok { - cs.lastErr = errors.New("High rate of invalid shares") - } - }(s, cs, login, id, params) - - return true, nil -} - -func (s *ProxyServer) handleGetBlockByNumberRPC() *rpc.GetBlockReplyPart { - t := s.currentBlockTemplate() - var reply *rpc.GetBlockReplyPart - if t != nil { - reply = t.GetPendingBlockCache - } - return reply -} - -func (s *ProxyServer) handleUnknownRPC(cs *Session, m string) *ErrorReply { - log.Printf("Unknown request method %s from %s", m, cs.ip) - s.policy.ApplyMalformedPolicy(cs.ip) - return &ErrorReply{Code: -3, Message: "Method not found"} -} diff --git a/util/proxy/miner.go b/util/proxy/miner.go deleted file mode 100644 index 3c0b28e..0000000 --- a/util/proxy/miner.go +++ /dev/null @@ -1,95 +0,0 @@ -package proxy - -import ( - "log" - "math/big" - "strconv" - "strings" - - "github.com/ethereum/go-ethereum/common" - "github.com/yuriy0803/go-etchash" -) - -var ecip1099FBlockClassic uint64 = 11700000 // classic mainnet -var ecip1099FBlockMordor uint64 = 2520000 // mordor - -var hasher *etchash.Etchash = nil - -func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, params []string) (bool, bool) { - if hasher == nil { - if s.config.Network == "classic" { - hasher = etchash.New(&ecip1099FBlockClassic) - } else if s.config.Network == "mordor" { - hasher = etchash.New(&ecip1099FBlockMordor) - } else { - // unknown network - log.Printf("Unknown network configuration %s", s.config.Network) - return false, false - } - } - nonceHex := params[0] - hashNoNonce := params[1] - mixDigest := params[2] - nonce, _ := strconv.ParseUint(strings.Replace(nonceHex, "0x", "", -1), 16, 64) - shareDiff := s.config.Proxy.Difficulty - - h, ok := t.headers[hashNoNonce] - if !ok { - log.Printf("Stale share from %v@%v", login, ip) - s.backend.WriteWorkerShareStatus(login, id, false, true, false) - return false, false - } - - share := Block{ - number: h.height, - hashNoNonce: common.HexToHash(hashNoNonce), - difficulty: big.NewInt(shareDiff), - nonce: nonce, - mixDigest: common.HexToHash(mixDigest), - } - - block := Block{ - number: h.height, - hashNoNonce: common.HexToHash(hashNoNonce), - difficulty: h.diff, - nonce: nonce, - mixDigest: common.HexToHash(mixDigest), - } - - if !hasher.Verify(share) { - s.backend.WriteWorkerShareStatus(login, id, false, false, true) - return false, false - } - - if hasher.Verify(block) { - ok, err := s.rpc().SubmitBlock(params) - if err != nil { - log.Printf("Block submission failure at height %v for %v: %v", h.height, t.Header, err) - } else if !ok { - log.Printf("Block rejected at height %v for %v", h.height, t.Header) - return false, false - } else { - s.fetchBlockTemplate() - exist, err := s.backend.WriteBlock(login, id, params, shareDiff, h.diff.Int64(), h.height, s.hashrateExpiration) - if exist { - return true, false - } - if err != nil { - log.Println("Failed to insert block candidate into backend:", err) - } else { - log.Printf("Inserted block %v to backend", h.height) - } - log.Printf("Block found by miner %v@%v at height %d", login, ip, h.height) - } - } else { - exist, err := s.backend.WriteShare(login, id, params, shareDiff, h.height, s.hashrateExpiration) - if exist { - return true, false - } - if err != nil { - log.Println("Failed to insert share data into backend:", err) - } - } - s.backend.WriteWorkerShareStatus(login, id, true, false, false) - return false, true -} diff --git a/util/proxy/proto.go b/util/proxy/proto.go deleted file mode 100644 index ae3b4ec..0000000 --- a/util/proxy/proto.go +++ /dev/null @@ -1,44 +0,0 @@ -package proxy - -import "encoding/json" - -type JSONRpcReq struct { - Id json.RawMessage `json:"id"` - Method string `json:"method"` - Params json.RawMessage `json:"params"` -} - -type JSONRpcReqNH struct { - Id interface{} `json:"id"` - Method string `json:"method"` - Params interface{} `json:"params"` -} - -type StratumReq struct { - JSONRpcReq - Worker string `json:"worker"` -} - -// Stratum -type JSONPushMessage struct { - // FIXME: Temporarily add ID for Claymore compliance - Id int64 `json:"id"` - Version string `json:"jsonrpc"` - Result interface{} `json:"result"` -} - -type JSONRpcResp struct { - Id json.RawMessage `json:"id"` - Version string `json:"jsonrpc"` - Result interface{} `json:"result"` - Error interface{} `json:"error,omitempty"` -} - -type SubmitReply struct { - Status string `json:"status"` -} - -type ErrorReply struct { - Code int `json:"code"` - Message string `json:"message"` -} diff --git a/util/proxy/proxy.go b/util/proxy/proxy.go deleted file mode 100644 index 276c8ae..0000000 --- a/util/proxy/proxy.go +++ /dev/null @@ -1,320 +0,0 @@ -package proxy - -import ( - "encoding/json" - "io" - "log" - "net" - "net/http" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/gorilla/mux" - - "github.com/yuriy0803/open-etc-pool-friends/policy" - "github.com/yuriy0803/open-etc-pool-friends/rpc" - "github.com/yuriy0803/open-etc-pool-friends/storage" - "github.com/yuriy0803/open-etc-pool-friends/util" -) - -type ProxyServer struct { - config *Config - blockTemplate atomic.Value - upstream int32 - upstreams []*rpc.RPCClient - backend *storage.RedisClient - diff string - policy *policy.PolicyServer - hashrateExpiration time.Duration - failsCount int64 - - // Stratum - sessionsMu sync.RWMutex - sessions map[*Session]struct{} - timeout time.Duration - Extranonce string -} - -type jobDetails struct { - JobID string - SeedHash string - HeaderHash string -} - -type Session struct { - ip string - enc *json.Encoder - - // Stratum - sync.Mutex - conn *net.TCPConn - login string - lastErr error - subscriptionID string - JobDeatils jobDetails -} - -func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer { - if len(cfg.Name) == 0 { - log.Fatal("You must set instance name") - } - policy := policy.Start(&cfg.Proxy.Policy, backend) - - proxy := &ProxyServer{config: cfg, backend: backend, policy: policy} - proxy.diff = util.GetTargetHex(cfg.Proxy.Difficulty) - - proxy.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream)) - for i, v := range cfg.Upstream { - proxy.upstreams[i] = rpc.NewRPCClient(v.Name, v.Url, v.Timeout) - log.Printf("Upstream: %s => %s", v.Name, v.Url) - } - log.Printf("Default upstream: %s => %s", proxy.rpc().Name, proxy.rpc().Url) - - if cfg.Proxy.Stratum.Enabled { - proxy.sessions = make(map[*Session]struct{}) - go proxy.ListenTCP() - } - - proxy.fetchBlockTemplate() - - proxy.hashrateExpiration = util.MustParseDuration(cfg.Proxy.HashrateExpiration) - - refreshIntv := util.MustParseDuration(cfg.Proxy.BlockRefreshInterval) - refreshTimer := time.NewTimer(refreshIntv) - log.Printf("Set block refresh every %v", refreshIntv) - - checkIntv := util.MustParseDuration(cfg.UpstreamCheckInterval) - checkTimer := time.NewTimer(checkIntv) - - stateUpdateIntv := util.MustParseDuration(cfg.Proxy.StateUpdateInterval) - stateUpdateTimer := time.NewTimer(stateUpdateIntv) - - go func() { - for { - select { - case <-refreshTimer.C: - proxy.fetchBlockTemplate() - refreshTimer.Reset(refreshIntv) - } - } - }() - - go func() { - for { - select { - case <-checkTimer.C: - proxy.checkUpstreams() - checkTimer.Reset(checkIntv) - } - } - }() - - go func() { - for { - select { - case <-stateUpdateTimer.C: - t := proxy.currentBlockTemplate() - if t != nil { - err := backend.WriteNodeState(cfg.Name, t.Height, t.Difficulty) - if err != nil { - log.Printf("Failed to write node state to backend: %v", err) - proxy.markSick() - } else { - proxy.markOk() - } - } - stateUpdateTimer.Reset(stateUpdateIntv) - } - } - }() - - return proxy -} - -func (s *ProxyServer) Start() { - log.Printf("Starting proxy on %v", s.config.Proxy.Listen) - r := mux.NewRouter() - r.Handle("/{login:0x[0-9a-fA-F]{40}}/{id:[0-9a-zA-Z-_]{1,8}}", s) - r.Handle("/{login:0x[0-9a-fA-F]{40}}", s) - srv := &http.Server{ - Addr: s.config.Proxy.Listen, - Handler: r, - MaxHeaderBytes: s.config.Proxy.LimitHeadersSize, - } - err := srv.ListenAndServe() - if err != nil { - log.Fatalf("Failed to start proxy: %v", err) - } -} - -func (s *ProxyServer) rpc() *rpc.RPCClient { - i := atomic.LoadInt32(&s.upstream) - return s.upstreams[i] -} - -func (s *ProxyServer) checkUpstreams() { - candidate := int32(0) - backup := false - - for i, v := range s.upstreams { - if v.Check() && !backup { - candidate = int32(i) - backup = true - } - } - - if s.upstream != candidate { - log.Printf("Switching to %v upstream", s.upstreams[candidate].Name) - atomic.StoreInt32(&s.upstream, candidate) - } -} - -func (s *ProxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - s.writeError(w, 405, "rpc: POST method required, received "+r.Method) - return - } - ip := s.remoteAddr(r) - if !s.policy.IsBanned(ip) { - s.handleClient(w, r, ip) - } -} - -func (s *ProxyServer) remoteAddr(r *http.Request) string { - if s.config.Proxy.BehindReverseProxy { - ip := r.Header.Get("X-Forwarded-For") - if len(ip) > 0 && net.ParseIP(ip) != nil { - return ip - } - } - ip, _, _ := net.SplitHostPort(r.RemoteAddr) - return ip -} - -func (s *ProxyServer) handleClient(w http.ResponseWriter, r *http.Request, ip string) { - if r.ContentLength > s.config.Proxy.LimitBodySize { - log.Printf("Socket flood from %s", ip) - s.policy.ApplyMalformedPolicy(ip) - http.Error(w, "Request too large", http.StatusExpectationFailed) - return - } - r.Body = http.MaxBytesReader(w, r.Body, s.config.Proxy.LimitBodySize) - defer r.Body.Close() - - cs := &Session{ip: ip, enc: json.NewEncoder(w)} - dec := json.NewDecoder(r.Body) - for { - var req JSONRpcReq - if err := dec.Decode(&req); err == io.EOF { - break - } else if err != nil { - log.Printf("Malformed request from %v: %v", ip, err) - s.policy.ApplyMalformedPolicy(ip) - return - } - cs.handleMessage(s, r, &req) - } -} - -func (cs *Session) handleMessage(s *ProxyServer, r *http.Request, req *JSONRpcReq) { - if req.Id == nil { - log.Printf("Missing RPC id from %s", cs.ip) - s.policy.ApplyMalformedPolicy(cs.ip) - return - } - - vars := mux.Vars(r) - login := strings.ToLower(vars["login"]) - - if !util.IsValidHexAddress(login) { - errReply := &ErrorReply{Code: -1, Message: "Invalid login"} - cs.sendError(req.Id, errReply) - return - } - if !s.policy.ApplyLoginPolicy(login, cs.ip) { - errReply := &ErrorReply{Code: -1, Message: "You are blacklisted"} - cs.sendError(req.Id, errReply) - return - } - - // Handle RPC methods - switch req.Method { - case "eth_getWork": - reply, errReply := s.handleGetWorkRPC(cs) - if errReply != nil { - cs.sendError(req.Id, errReply) - break - } - cs.sendResult(req.Id, &reply) - case "eth_submitWork": - if req.Params != nil { - var params []string - err := json.Unmarshal(req.Params, ¶ms) - if err != nil { - log.Printf("Unable to parse params from %v", cs.ip) - s.policy.ApplyMalformedPolicy(cs.ip) - break - } - reply, errReply := s.handleSubmitRPC(cs, login, vars["id"], params) - if errReply != nil { - cs.sendError(req.Id, errReply) - break - } - cs.sendResult(req.Id, &reply) - } else { - s.policy.ApplyMalformedPolicy(cs.ip) - errReply := &ErrorReply{Code: -1, Message: "Malformed request"} - cs.sendError(req.Id, errReply) - } - case "eth_getBlockByNumber": - reply := s.handleGetBlockByNumberRPC() - cs.sendResult(req.Id, reply) - case "eth_submitHashrate": - cs.sendResult(req.Id, true) - default: - errReply := s.handleUnknownRPC(cs, req.Method) - cs.sendError(req.Id, errReply) - } -} - -func (cs *Session) sendResult(id json.RawMessage, result interface{}) error { - message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} - return cs.enc.Encode(&message) -} - -func (cs *Session) sendError(id json.RawMessage, reply *ErrorReply) error { - message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} - return cs.enc.Encode(&message) -} - -func (s *ProxyServer) writeError(w http.ResponseWriter, status int, msg string) { - w.WriteHeader(status) - w.Header().Set("Content-Type", "text/plain; charset=utf-8") -} - -func (s *ProxyServer) currentBlockTemplate() *BlockTemplate { - t := s.blockTemplate.Load() - if t != nil { - return t.(*BlockTemplate) - } else { - return nil - } -} - -func (s *ProxyServer) markSick() { - atomic.AddInt64(&s.failsCount, 1) -} - -func (s *ProxyServer) isSick() bool { - x := atomic.LoadInt64(&s.failsCount) - if s.config.Proxy.HealthCheck && x >= s.config.Proxy.MaxFails { - return true - } - return false -} - -func (s *ProxyServer) markOk() { - atomic.StoreInt64(&s.failsCount, 0) -} diff --git a/util/proxy/stratum.go b/util/proxy/stratum.go deleted file mode 100644 index 50422ba..0000000 --- a/util/proxy/stratum.go +++ /dev/null @@ -1,221 +0,0 @@ -package proxy - -import ( - "bufio" - "encoding/json" - "errors" - "io" - "log" - "net" - "time" - - "github.com/yuriy0803/open-etc-pool-friends/util" -) - -const ( - MaxReqSize = 1024 -) - -func (s *ProxyServer) ListenTCP() { - timeout := util.MustParseDuration(s.config.Proxy.Stratum.Timeout) - s.timeout = timeout - - addr, err := net.ResolveTCPAddr("tcp4", s.config.Proxy.Stratum.Listen) - if err != nil { - log.Fatalf("Error: %v", err) - } - server, err := net.ListenTCP("tcp4", addr) - if err != nil { - log.Fatalf("Error: %v", err) - } - defer server.Close() - - log.Printf("Stratum listening on %s", s.config.Proxy.Stratum.Listen) - var accept = make(chan int, s.config.Proxy.Stratum.MaxConn) - n := 0 - - for { - conn, err := server.AcceptTCP() - if err != nil { - continue - } - conn.SetKeepAlive(true) - - ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) - - if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) { - conn.Close() - continue - } - n += 1 - cs := &Session{conn: conn, ip: ip} - - accept <- n - go func(cs *Session) { - err = s.handleTCPClient(cs) - if err != nil || cs.lastErr != nil { - s.removeSession(cs) - conn.Close() - } - <-accept - }(cs) - } -} - -func (s *ProxyServer) handleTCPClient(cs *Session) error { - cs.enc = json.NewEncoder(cs.conn) - connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize) - s.setDeadline(cs.conn) - for { - data, isPrefix, err := connbuff.ReadLine() - if isPrefix { - log.Printf("Socket flood detected from %s", cs.ip) - s.policy.BanClient(cs.ip) - return err - } else if err == io.EOF { - log.Printf("Client %s disconnected", cs.ip) - s.removeSession(cs) - break - } else if err != nil { - log.Printf("Error reading from socket: %v", err) - return err - } - - if len(data) > 1 { - var req StratumReq - err = json.Unmarshal(data, &req) - if err != nil { - s.policy.ApplyMalformedPolicy(cs.ip) - log.Printf("Malformed stratum request from %s: %v", cs.ip, err) - return err - } - s.setDeadline(cs.conn) - err = cs.handleTCPMessage(s, &req) - if err != nil { - return err - } - } - } - return nil -} - -func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { - // Handle RPC methods - switch req.Method { - case "eth_submitLogin": - var params []string - err := json.Unmarshal(req.Params, ¶ms) - if err != nil { - log.Println("Malformed stratum request params from", cs.ip) - return err - } - reply, errReply := s.handleLoginRPC(cs, params, req.Worker) - if errReply != nil { - return cs.sendTCPError(req.Id, errReply) - } - return cs.sendTCPResult(req.Id, reply) - case "eth_getWork": - reply, errReply := s.handleGetWorkRPC(cs) - if errReply != nil { - return cs.sendTCPError(req.Id, errReply) - } - return cs.sendTCPResult(req.Id, &reply) - case "eth_submitWork": - var params []string - err := json.Unmarshal(req.Params, ¶ms) - if err != nil { - log.Println("Malformed stratum request params from", cs.ip) - return err - } - reply, errReply := s.handleTCPSubmitRPC(cs, req.Worker, params) - if errReply != nil { - return cs.sendTCPError(req.Id, errReply) - } - return cs.sendTCPResult(req.Id, &reply) - case "eth_submitHashrate": - return cs.sendTCPResult(req.Id, true) - default: - errReply := s.handleUnknownRPC(cs, req.Method) - return cs.sendTCPError(req.Id, errReply) - } -} - -func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error { - cs.Lock() - defer cs.Unlock() - - message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} - return cs.enc.Encode(&message) -} - -func (cs *Session) pushNewJob(result interface{}) error { - cs.Lock() - defer cs.Unlock() - // FIXME: Temporarily add ID for Claymore compliance - message := JSONPushMessage{Version: "2.0", Result: result, Id: 0} - return cs.enc.Encode(&message) -} - -func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error { - cs.Lock() - defer cs.Unlock() - - message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} - err := cs.enc.Encode(&message) - if err != nil { - return err - } - return errors.New(reply.Message) -} - -func (self *ProxyServer) setDeadline(conn *net.TCPConn) { - conn.SetDeadline(time.Now().Add(self.timeout)) -} - -func (s *ProxyServer) registerSession(cs *Session) { - s.sessionsMu.Lock() - defer s.sessionsMu.Unlock() - s.sessions[cs] = struct{}{} -} - -func (s *ProxyServer) removeSession(cs *Session) { - s.sessionsMu.Lock() - defer s.sessionsMu.Unlock() - delete(s.sessions, cs) -} - -func (s *ProxyServer) broadcastNewJobs() { - t := s.currentBlockTemplate() - if t == nil || len(t.Header) == 0 || s.isSick() { - return - } - reply := []string{t.Header, t.Seed, s.diff} - - s.sessionsMu.RLock() - defer s.sessionsMu.RUnlock() - - count := len(s.sessions) - log.Printf("Broadcasting new job to %v stratum miners", count) - - s.backend.NumberStratumWorker(count) - start := time.Now() - bcast := make(chan int, 1024) - n := 0 - - for m, _ := range s.sessions { - n++ - bcast <- n - - go func(cs *Session) { - err := cs.pushNewJob(&reply) - <-bcast - if err != nil { - log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err) - s.removeSession(cs) - } else { - s.setDeadline(cs.conn) - } - }(m) - } - log.Printf("Jobs broadcast finished %s", time.Since(start)) -}