diff --git a/proxy/handlers.go b/proxy/handlers.go index 6f9cbac..b2a8624 100644 --- a/proxy/handlers.go +++ b/proxy/handlers.go @@ -1,7 +1,6 @@ package proxy import ( - "errors" "log" "regexp" "strings" @@ -21,16 +20,19 @@ func (s *ProxyServer) handleLoginRPC(cs *Session, params []string, id string) (b return false, &ErrorReply{Code: -1, Message: "Invalid params"} } - //If login contain information about workers name "walletId.workerName" login := params[0] - if strings.Contains(login, ".") { - var workerParams = strings.Split(login, ".") - login = workerParams[0] - id = workerParams[1] + // WORKER NAME 0x1234.WORKERNAME + if strings.ContainsAny(login, ".") { + var param = strings.Split(login, ".") + login = param[0] + id = param[1] } - login = strings.ToLower(login) + if !workerPattern.MatchString(id) { + id = "0" + } + if !util.IsValidHexAddress(login) { return false, &ErrorReply{Code: -1, Message: "Invalid login"} } @@ -38,13 +40,8 @@ func (s *ProxyServer) handleLoginRPC(cs *Session, params []string, id string) (b return false, &ErrorReply{Code: -1, Message: "You are blacklisted"} } - if !workerPattern.MatchString(id) { - id = "0" - } - - cs.worker = id cs.login = login - + cs.worker = id s.registerSession(cs) log.Printf("Stratum miner connected %v@%v", login, cs.ip) return true, nil @@ -55,7 +52,7 @@ func (s *ProxyServer) handleGetWorkRPC(cs *Session) ([]string, *ErrorReply) { if t == nil || len(t.Header) == 0 || s.isSick() { return nil, &ErrorReply{Code: 0, Message: "Work not ready"} } - return []string{t.Header, t.Seed, s.diff}, nil + return []string{t.Header, t.Seed, s.diff, util.ToHex(int64(t.Height))}, nil } // Stratum @@ -78,52 +75,48 @@ func (s *ProxyServer) handleSubmitRPC(cs *Session, login, id string, params []st return false, &ErrorReply{Code: -1, Message: "Invalid params"} } + stratumMode := cs.stratumMode() + if stratumMode == NiceHash { + for i := 0; i <= 2; i++ { + if params[i][0:2] != "0x" { + params[i] = "0x" + params[i] + } + } + } + if !noncePattern.MatchString(params[0]) || !hashPattern.MatchString(params[1]) || !hashPattern.MatchString(params[2]) { s.policy.ApplyMalformedPolicy(cs.ip) log.Printf("Malformed PoW result from %s@%s %v", login, cs.ip, params) return false, &ErrorReply{Code: -1, Message: "Malformed PoW result"} } + t := s.currentBlockTemplate() + exist, validShare := s.processShare(login, id, cs.ip, t, params, stratumMode != EthProxy) + ok := s.policy.ApplySharePolicy(cs.ip, !exist && validShare) - go func(s *ProxyServer, cs *Session, login, id string, params []string) { - t := s.currentBlockTemplate() - - //MFO: This function (s.processShare) will process a share as per hasher.Verify function of github.com/ethereum/ethash - // output of this function is either: - // true,true (Exists) which means share already exists and it is validShare - // true,false (Exists & invalid)which means share already exists and it is invalidShare or it is a block <-- should not ever happen - // false,false (stale/invalid)which means share is new, and it is not a block, might be a stale share or invalidShare - // false,true (valid)which means share is new, and it is a block or accepted share - // false,false,false (blacklisted wallet attached to share) see json file - // When this function finishes, the results is already recorded in the db for valid shares or blocks. - exist, validShare := s.processShare(login, id, cs.ip, t, params) - ok := s.policy.ApplySharePolicy(cs.ip, !exist && validShare) - - // if true,true or true,false - if exist { - log.Printf("Duplicate share from %s@%s %v", login, cs.ip, params) - cs.lastErr = errors.New("Duplicate share") - } - - // if false, false - if !validShare { - //MFO: Here we have an invalid share - log.Printf("Invalid share from %s@%s", login, cs.ip) - // Bad shares limit reached, return error and close - if !ok { - cs.lastErr = errors.New("Invalid share") - } - } - //MFO: Here we have a valid share and it is already recorded in DB by miner.go - // if false, true - if s.config.Proxy.Debug { - log.Printf("Valid share from %s@%s", login, cs.ip) + if exist { + log.Printf("Duplicate share from %s@%s %v", login, cs.ip, params) + // see https://github.com/sammy007/open-ethereum-pool/compare/master...nicehashdev:patch-1 + if !ok { + return false, &ErrorReply{Code: 23, Message: "Invalid share"} } + return false, nil + } + if !validShare { + log.Printf("Invalid share from %s@%s", login, cs.ip) + // Bad shares limit reached, return error and close if !ok { - cs.lastErr = errors.New("High rate of invalid shares") + return false, &ErrorReply{Code: 23, Message: "Invalid share"} } - }(s, cs, login, id, params) + return false, nil + } + if s.config.Proxy.Debug { + log.Printf("Valid share from %s@%s", login, cs.ip) + } + if !ok { + return true, &ErrorReply{Code: -1, Message: "High rate of invalid shares"} + } return true, nil } diff --git a/proxy/miner.go b/proxy/miner.go index 1d01778..b60301d 100644 --- a/proxy/miner.go +++ b/proxy/miner.go @@ -8,6 +8,7 @@ import ( "github.com/etclabscore/core-geth/common" "github.com/yuriy0803/etchash" + "github.com/yuriy0803/open-etc-pool-friends/util" ) var ( @@ -18,7 +19,7 @@ var ( hasher *etchash.Etchash = nil ) -func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, params []string) (bool, bool) { +func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, params []string, stratum bool) (bool, bool) { if hasher == nil { if s.config.Network == "classic" { @@ -42,27 +43,24 @@ func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, param nonce, _ := strconv.ParseUint(strings.Replace(nonceHex, "0x", "", -1), 16, 64) shareDiff := s.config.Proxy.Difficulty - h, ok := t.headers[hashNoNonce] - if !ok { - log.Printf("Stale share from %v@%v", login, ip) - s.backend.WriteWorkerShareStatus(login, id, false, true, false) - return false, false - } + var result common.Hash + if stratum { + hashNoNonceTmp := common.HexToHash(params[2]) - share := Block{ - number: h.height, - hashNoNonce: common.HexToHash(hashNoNonce), - difficulty: big.NewInt(shareDiff), - nonce: nonce, - mixDigest: common.HexToHash(mixDigest), - } + mixDigestTmp, hashTmp := hasher.Compute(t.Height, hashNoNonceTmp, nonce) + params[1] = hashNoNonceTmp.Hex() + params[2] = mixDigestTmp.Hex() + hashNoNonce = params[1] + result = hashTmp + } else { + hashNoNonceTmp := common.HexToHash(hashNoNonce) + mixDigestTmp, hashTmp := hasher.Compute(t.Height, hashNoNonceTmp, nonce) - block := Block{ - number: h.height, - hashNoNonce: common.HexToHash(hashNoNonce), - difficulty: h.diff, - nonce: nonce, - mixDigest: common.HexToHash(mixDigest), + // check mixDigest + if mixDigestTmp.Hex() != mixDigest { + return false, false + } + result = hashTmp } //this is to stop people in wallet blacklist, from getting shares into the db. @@ -74,20 +72,37 @@ func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, param //return codes need work here, a lot of it. } - isShare, actualDiff := hasher.Verify(share) + // Block "difficulty" is BigInt + // NiceHash "difficulty" is float64 ... + // diffFloat => target; then: diffInt = 2^256 / target + shareDiffCalc := util.TargetHexToDiff(result.Hex()).Int64() + shareDiffFloat := util.DiffIntToFloat(shareDiffCalc) + if shareDiffFloat < 0.0001 { + log.Printf("share difficulty too low, %f < %d, from %v@%v", shareDiffFloat, t.Difficulty, login, ip) + s.backend.WriteWorkerShareStatus(login, id, false, true, false) + return false, false + } if s.config.Proxy.Debug { - log.Printf("Difficulty pool Port/Shares found/Block difficulty = %d / %d / %d from %v@%v", shareDiff, actualDiff, t.Difficulty, login, ip) + log.Printf("Difficulty pool/block/share = %d / %d / %d(%f) from %v@%v", shareDiff, t.Difficulty, shareDiffCalc, shareDiffFloat, login, ip) } - if !isShare { - s.backend.WriteWorkerShareStatus(login, id, false, false, true) + h, ok := t.headers[hashNoNonce] + if !ok { + log.Printf("Stale share from %v@%v", login, ip) + s.backend.WriteWorkerShareStatus(login, id, false, true, false) return false, false } - isBlock, _ := hasher.Verify(block) - - if isBlock { + // check share difficulty + shareTarget := new(big.Int).Div(maxUint256, big.NewInt(shareDiff)) + if result.Big().Cmp(shareTarget) > 0 { + s.backend.WriteWorkerShareStatus(login, id, false, false, true) + return false, false + } + // check target difficulty + target := new(big.Int).Div(maxUint256, big.NewInt(h.diff.Int64())) + if result.Big().Cmp(target) <= 0 { ok, err := s.rpc().SubmitBlock(params) if err != nil { log.Printf("Block submission failure at height %v for %v: %v", h.height, t.Header, err) @@ -96,7 +111,7 @@ func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, param return false, false } else { s.fetchBlockTemplate() - exist, err := s.backend.WriteBlock(login, id, params, shareDiff, actualDiff, h.diff.Int64(), h.height, s.hashrateExpiration) + exist, err := s.backend.WriteBlock(login, id, params, shareDiff, shareDiffCalc, h.diff.Int64(), h.height, s.hashrateExpiration) if exist { return true, false } @@ -108,7 +123,7 @@ func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, param log.Printf("Block found by miner %v@%v at height %d", login, ip, h.height) } } else { - exist, err := s.backend.WriteShare(login, id, params, shareDiff, actualDiff, h.height, s.hashrateExpiration) + exist, err := s.backend.WriteShare(login, id, params, shareDiff, shareDiffCalc, h.height, s.hashrateExpiration) if exist { return true, false } diff --git a/proxy/proto.go b/proxy/proto.go index 16de943..f6d0751 100644 --- a/proxy/proto.go +++ b/proxy/proto.go @@ -8,6 +8,14 @@ type JSONRpcReq struct { Params json.RawMessage `json:"params"` } +type JSONStratumReq struct { + Id interface{} `json:"id"` + Method string `json:"method"` + Params interface{} `json:"params"` + Height string `json:"height"` + Algo string `json:"algo"` +} + type StratumReq struct { JSONRpcReq Worker string `json:"worker"` @@ -23,9 +31,9 @@ type JSONPushMessage struct { type JSONRpcResp struct { Id json.RawMessage `json:"id"` - Version string `json:"jsonrpc"` + Version string `json:"jsonrpc,omitempty"` Result interface{} `json:"result"` - Error interface{} `json:"error,omitempty"` + Error interface{} `json:"error"` } type SubmitReply struct { diff --git a/proxy/proxy.go b/proxy/proxy.go index 941f458..e1ef082 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -35,6 +35,13 @@ type ProxyServer struct { sessionsMu sync.RWMutex sessions map[*Session]struct{} timeout time.Duration + // Extranonce + Extranonces map[string]bool +} + +type staleJob struct { + SeedHash string + HeaderHash string } type Session struct { @@ -43,10 +50,25 @@ type Session struct { // Stratum sync.Mutex - conn net.Conn - login string - worker string - lastErr error + conn net.Conn + login string + worker string + stratum int + subscriptionID string + JobDeatils jobDetails + Extranonce string + ExtranonceSub bool + JobDetails jobDetails + staleJobs map[string]staleJob + staleJobIDs []string +} + +type jobDetails struct { + JobID string + SeedHash string + HeaderHash string + Height string + Epoch int64 } func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer { @@ -67,6 +89,7 @@ func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer { if cfg.Proxy.Stratum.Enabled { proxy.sessions = make(map[*Session]struct{}) + proxy.Extranonces = make(map[string]bool) go proxy.ListenTCP() } diff --git a/proxy/stratum.go b/proxy/stratum.go index 6a77670..33d5c4e 100644 --- a/proxy/stratum.go +++ b/proxy/stratum.go @@ -7,7 +7,10 @@ import ( "errors" "io" "log" + "math/rand" "net" + "strconv" + "strings" "time" "github.com/yuriy0803/open-etc-pool-friends/util" @@ -17,12 +20,16 @@ const ( MaxReqSize = 1024 ) +const ( + EthProxy int = iota + NiceHash +) + func (s *ProxyServer) ListenTCP() { s.timeout = util.MustParseDuration(s.config.Proxy.Stratum.Timeout) var err error var server net.Listener - setKeepAlive := func(net.Conn) {} if s.config.Proxy.Stratum.TLS { var cert tls.Certificate cert, err = tls.LoadX509KeyPair(s.config.Proxy.Stratum.CertFile, s.config.Proxy.Stratum.KeyFile) @@ -30,14 +37,10 @@ func (s *ProxyServer) ListenTCP() { log.Fatalln("Error loading certificate:", err) } tlsCfg := &tls.Config{Certificates: []tls.Certificate{cert}} - server, err = tls.Listen("tcp4", s.config.Proxy.Stratum.Listen, tlsCfg) + server, err = tls.Listen("tcp", s.config.Proxy.Stratum.Listen, tlsCfg) } else { - server, err = net.Listen("tcp4", s.config.Proxy.Stratum.Listen) - setKeepAlive = func(conn net.Conn) { - conn.(*net.TCPConn).SetKeepAlive(true) - } + server, err = net.Listen("tcp", s.config.Proxy.Stratum.Listen) } - if err != nil { log.Fatalf("Error: %v", err) } @@ -52,8 +55,6 @@ func (s *ProxyServer) ListenTCP() { if err != nil { continue } - setKeepAlive(conn) - ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) { @@ -61,12 +62,16 @@ func (s *ProxyServer) ListenTCP() { continue } n += 1 - cs := &Session{conn: conn, ip: ip} + // make unique extranonce + extranonce := s.uniqExtranonce() + cs := &Session{conn: conn, ip: ip, Extranonce: extranonce, ExtranonceSub: false, stratum: -1} + // allocate stales cache + cs.staleJobs = make(map[string]staleJob) accept <- n go func(cs *Session) { err = s.handleTCPClient(cs) - if err != nil || cs.lastErr != nil { + if err != nil { s.removeSession(cs) conn.Close() } @@ -79,6 +84,7 @@ func (s *ProxyServer) handleTCPClient(cs *Session) error { cs.enc = json.NewEncoder(cs.conn) connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize) s.setDeadline(cs.conn) + for { data, isPrefix, err := connbuff.ReadLine() if isPrefix { @@ -112,13 +118,29 @@ func (s *ProxyServer) handleTCPClient(cs *Session) error { return nil } +func (cs *Session) setStratumMode(str string) error { + switch str { + case "EthereumStratum/1.0.0": + cs.stratum = NiceHash + break + default: + cs.stratum = EthProxy + break + } + return nil +} + +func (cs *Session) stratumMode() int { + return cs.stratum +} + func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { - // Handle RPC methods + // Handle RPC/Stratum methods switch req.Method { case "eth_submitLogin": var params []string err := json.Unmarshal(req.Params, ¶ms) - if err != nil { + if err != nil || len(params) < 1 { log.Println("Malformed stratum request params from", cs.ip) return err } @@ -126,7 +148,178 @@ func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { if errReply != nil { return cs.sendTCPError(req.Id, errReply) } + cs.setStratumMode("EthProxy") + log.Println("EthProxy login", cs.ip) return cs.sendTCPResult(req.Id, reply) + + case "mining.subscribe": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil || len(params) < 2 { + log.Println("Malformed stratum request params from", cs.ip) + return err + } + + if params[1] != "EthereumStratum/1.0.0" { + log.Println("Unsupported stratum version from ", cs.ip) + return cs.sendStratumError(req.Id, "unsupported stratum version") + } + + cs.ExtranonceSub = true + cs.setStratumMode("EthereumStratum/1.0.0") + log.Println("Nicehash subscribe", cs.ip) + result := cs.getNotificationResponse(s) + return cs.sendStratumResult(req.Id, result) + + default: + switch cs.stratumMode() { + case 0: + break + case 1: + break + default: + errReply := s.handleUnknownRPC(cs, req.Method) + return cs.sendTCPError(req.Id, errReply) + } + } + + if cs.stratumMode() == NiceHash { + switch req.Method { + case "mining.authorize": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil || len(params) < 1 { + return errors.New("invalid params") + } + splitData := strings.Split(params[0], ".") + params[0] = splitData[0] + reply, errReply := s.handleLoginRPC(cs, params, req.Worker) + if errReply != nil { + return cs.sendStratumError(req.Id, []string{ + string(errReply.Code), + errReply.Message, + }) + } + + if err := cs.sendStratumResult(req.Id, reply); err != nil { + return err + } + + paramsDiff := []float64{ + util.DiffIntToFloat(s.config.Proxy.Difficulty), + } + respReq := JSONStratumReq{Method: "mining.set_difficulty", Params: paramsDiff} + if err := cs.sendTCPReq(respReq); err != nil { + return err + } + + return cs.sendJob(s, req.Id, true) + + case "mining.extranonce.subscribe": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil { + return errors.New("invalid params") + } + if len(params) == 0 { + if err := cs.sendStratumResult(req.Id, true); err != nil { + return err + } + cs.ExtranonceSub = true + req := JSONStratumReq{ + Id: nil, + Method: "mining.set_extranonce", + Params: []interface{}{ + cs.Extranonce, + }, + } + return cs.sendTCPReq(req) + } + return cs.sendStratumError(req.Id, []string{ + "20", + "Not supported.", + }) + case "mining.submit": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil || len(params) < 3 { + log.Println("mining.submit: json.Unmarshal fail") + return err + } + + // params[0] = Username + // params[1] = Job ID + // params[2] = Minernonce + // Reference: + // https://github.com/nicehash/nhethpool/blob/060817a9e646cd9f1092647b870ed625ee138ab4/nhethpool/EthereumInstance.cs#L369 + + // WORKER NAME MANDATORY 0x1234.WORKERNAME + splitData := strings.Split(params[0], ".") + id := "0" + if len(splitData) > 1 { + id = splitData[1] + } + + cs.worker = id + + // check Extranonce subscription. + extranonce := cs.Extranonce + if !cs.ExtranonceSub { + extranonce = "" + } + nonce := extranonce + params[2] + + if cs.JobDetails.JobID != params[1] { + stale, ok := cs.staleJobs[params[1]] + if ok { + log.Printf("Cached stale JobID %s", params[1]) + params = []string{ + nonce, + stale.SeedHash, + stale.HeaderHash, + } + } else { + log.Printf("Stale share (mining.submit JobID received %s != current %s)", params[1], cs.JobDetails.JobID) + if err := cs.sendStratumError(req.Id, []string{"21", "Stale share."}); err != nil { + return err + } + return cs.sendJob(s, req.Id, false) + } + } else { + params = []string{ + nonce, + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + } + } + + reply, errReply := s.handleTCPSubmitRPC(cs, id, params) + if errReply != nil { + log.Println("mining.submit: handleTCPSubmitRPC failed") + return cs.sendStratumError(req.Id, []string{ + strconv.Itoa(errReply.Code), + errReply.Message, + }) + } + + // TEST, ein notify zu viel + //if err := cs.sendTCPResult(resp); err != nil { + // return err + //} + + //return cs.sendJob(s, req.Id) + return cs.sendStratumResult(req.Id, reply) + + default: + errReply := s.handleUnknownRPC(cs, req.Method) + return cs.sendStratumError(req.Id, []string{ + strconv.Itoa(errReply.Code), + errReply.Message, + }) + } + } + + switch req.Method { case "eth_getWork": reply, errReply := s.handleGetWorkRPC(cs) if errReply != nil { @@ -136,7 +329,7 @@ func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { case "eth_submitWork": var params []string err := json.Unmarshal(req.Params, ¶ms) - if err != nil { + if err != nil || len(params) < 3 { log.Println("Malformed stratum request params from", cs.ip) return err } @@ -161,9 +354,71 @@ func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error { return cs.enc.Encode(&message) } -func (cs *Session) pushNewJob(result interface{}) error { +// cache stale jobs +func (cs *Session) cacheStales(max, n int) { + l := len(cs.staleJobIDs) + // remove outdated stales except last n caches if l > max + if l > max { + save := cs.staleJobIDs[l-n : l] + del := cs.staleJobIDs[0 : l-n] + for _, v := range del { + delete(cs.staleJobs, v) + } + cs.staleJobIDs = save + } + // save stales cache + cs.staleJobs[cs.JobDetails.JobID] = staleJob{ + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + } + cs.staleJobIDs = append(cs.staleJobIDs, cs.JobDetails.JobID) +} + +func (cs *Session) pushNewJob(s *ProxyServer, result interface{}) error { cs.Lock() defer cs.Unlock() + + if cs.stratumMode() == NiceHash { + cs.cacheStales(10, 3) + + t := result.(*[]string) + cs.JobDetails = jobDetails{ + JobID: randomHex(8), + SeedHash: (*t)[1], + HeaderHash: (*t)[0], + Height: (*t)[3], + } + + // strip 0x prefix + if cs.JobDetails.SeedHash[0:2] == "0x" { + cs.JobDetails.SeedHash = cs.JobDetails.SeedHash[2:] + cs.JobDetails.HeaderHash = cs.JobDetails.HeaderHash[2:] + } + + a := s.currentBlockTemplate() + + resp := JSONStratumReq{ + Method: "mining.notify", + Params: []interface{}{ + cs.JobDetails.JobID, + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + // If set to true, then miner needs to clear queue of jobs and immediatelly + // start working on new provided job, because all old jobs shares will + // result with stale share error. + // + // if true, NiceHash charges "Extra Rewards" for frequent job changes + // if false, the stale rate might be higher because miners take too long to switch jobs + // + // It's undetermined what's more cost-effective + false, + }, + + Height: util.ToHex1(int64(a.Height)), + Algo: "etchash", + } + return cs.enc.Encode(&resp) + } // FIXME: Temporarily add ID for Claymore compliance message := JSONPushMessage{Version: "2.0", Result: result, Id: 0} return cs.enc.Encode(&message) @@ -181,6 +436,30 @@ func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error { return errors.New(reply.Message) } +func (cs *Session) sendStratumResult(id json.RawMessage, result interface{}) error { + cs.Lock() + defer cs.Unlock() + + resp := JSONRpcResp{Id: id, Error: nil, Result: result} + return cs.enc.Encode(&resp) +} + +func (cs *Session) sendStratumError(id json.RawMessage, message interface{}) error { + cs.Lock() + defer cs.Unlock() + + resp := JSONRpcResp{Id: id, Error: message} + + return cs.enc.Encode(&resp) +} + +func (cs *Session) sendTCPReq(resp JSONStratumReq) error { + cs.Lock() + defer cs.Unlock() + + return cs.enc.Encode(&resp) +} + func (self *ProxyServer) setDeadline(conn net.Conn) { conn.SetDeadline(time.Now().Add(self.timeout)) } @@ -194,15 +473,60 @@ func (s *ProxyServer) registerSession(cs *Session) { func (s *ProxyServer) removeSession(cs *Session) { s.sessionsMu.Lock() defer s.sessionsMu.Unlock() + delete(s.Extranonces, cs.Extranonce) delete(s.sessions, cs) } +// nicehash +func (cs *Session) sendJob(s *ProxyServer, id json.RawMessage, newjob bool) error { + if newjob { + reply, errReply := s.handleGetWorkRPC(cs) + if errReply != nil { + return cs.sendStratumError(id, []string{ + string(errReply.Code), + errReply.Message, + }) + } + + cs.JobDetails = jobDetails{ + JobID: randomHex(8), + SeedHash: reply[1], + HeaderHash: reply[0], + Height: reply[3], + } + + // The NiceHash official .NET pool omits 0x... + // TO DO: clean up once everything works + if cs.JobDetails.SeedHash[0:2] == "0x" { + cs.JobDetails.SeedHash = cs.JobDetails.SeedHash[2:] + cs.JobDetails.HeaderHash = cs.JobDetails.HeaderHash[2:] + } + } + + t := s.currentBlockTemplate() + + resp := JSONStratumReq{ + Method: "mining.notify", + Params: []interface{}{ + cs.JobDetails.JobID, + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + true, + }, + + Height: util.ToHex1(int64(t.Height)), + Algo: "etchash", + } + + return cs.sendTCPReq(resp) +} + func (s *ProxyServer) broadcastNewJobs() { t := s.currentBlockTemplate() if t == nil || len(t.Header) == 0 || s.isSick() { return } - reply := []string{t.Header, t.Seed, s.diff} + reply := []string{t.Header, t.Seed, s.diff, util.ToHex(int64(t.Height))} s.sessionsMu.RLock() defer s.sessionsMu.RUnlock() @@ -210,7 +534,6 @@ func (s *ProxyServer) broadcastNewJobs() { count := len(s.sessions) log.Printf("Broadcasting new job to %v stratum miners", count) - s.backend.NumberStratumWorker(count) start := time.Now() bcast := make(chan int, 1024) n := 0 @@ -220,9 +543,9 @@ func (s *ProxyServer) broadcastNewJobs() { bcast <- n go func(cs *Session) { - err := cs.pushNewJob(&reply) + err := cs.pushNewJob(s, &reply) <-bcast - if err != nil || cs.lastErr != nil { + if err != nil { log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err) s.removeSession(cs) } else { @@ -232,3 +555,37 @@ func (s *ProxyServer) broadcastNewJobs() { } log.Printf("Jobs broadcast finished %s", time.Since(start)) } + +func (s *ProxyServer) uniqExtranonce() string { + s.sessionsMu.RLock() + defer s.sessionsMu.RUnlock() + + extranonce := randomHex(4) + for { + if _, ok := s.Extranonces[extranonce]; ok { + extranonce = randomHex(4) + } else { + break + } + } + s.Extranonces[extranonce] = true + return extranonce +} + +func randomHex(strlen int) string { + rand.Seed(time.Now().UTC().UnixNano()) + const chars = "0123456789abcdef" + result := make([]byte, strlen) + for i := 0; i < strlen; i++ { + result[i] = chars[rand.Intn(len(chars))] + } + return string(result) +} + +func (cs *Session) getNotificationResponse(s *ProxyServer) interface{} { + result := make([]interface{}, 2) + result[0] = []string{"mining.notify", randomHex(16), "EthereumStratum/1.0.0"} + result[1] = cs.Extranonce + + return result +} diff --git a/storage/redis.go b/storage/redis.go index cc147ac..2adb6ea 100644 --- a/storage/redis.go +++ b/storage/redis.go @@ -81,7 +81,7 @@ type RewardData struct { type BlockData struct { Login string `json:"login"` Worker string `json:"worker"` - ActualDiff int64 `json:"shareDiff"` + ShareDiffCalc int64 `json:"shareDiff"` Height int64 `json:"height"` Timestamp int64 `json:"timestamp"` Difficulty int64 `json:"difficulty"` @@ -134,7 +134,7 @@ func (b *BlockData) RoundKey() string { } func (b *BlockData) key() string { - return join(b.UncleHeight, b.Orphan, b.Nonce, b.serializeHash(), b.Timestamp, b.Difficulty, b.TotalShares, b.Reward, b.Login, b.ActualDiff, b.Worker) + return join(b.UncleHeight, b.Orphan, b.Nonce, b.serializeHash(), b.Timestamp, b.Difficulty, b.TotalShares, b.Reward, b.Login, b.ShareDiffCalc, b.Worker) } type Miner struct { @@ -517,7 +517,7 @@ func (r *RedisClient) checkPoWExist(height uint64, params []string) (bool, error return val == 0, err } -func (r *RedisClient) WriteShare(login, id string, params []string, diff int64, actualDiff int64, height uint64, window time.Duration) (bool, error) { +func (r *RedisClient) WriteShare(login, id string, params []string, diff int64, shareDiffCalc int64, height uint64, window time.Duration) (bool, error) { exist, err := r.checkPoWExist(height, params) if err != nil { return false, err @@ -533,14 +533,14 @@ func (r *RedisClient) WriteShare(login, id string, params []string, diff int64, ts := ms / 1000 _, err = tx.Exec(func() error { - r.writeShare(tx, ms, ts, login, id, diff, actualDiff, window) + r.writeShare(tx, ms, ts, login, id, diff, shareDiffCalc, window) tx.HIncrBy(r.formatKey("stats"), "roundShares", diff) return nil }) return false, err } -func (r *RedisClient) WriteBlock(login, id string, params []string, diff, actualDiff int64, roundDiff int64, height uint64, window time.Duration) (bool, error) { +func (r *RedisClient) WriteBlock(login, id string, params []string, diff, shareDiffCalc int64, roundDiff int64, height uint64, window time.Duration) (bool, error) { exist, err := r.checkPoWExist(height, params) if err != nil { return false, err @@ -556,7 +556,7 @@ func (r *RedisClient) WriteBlock(login, id string, params []string, diff, actual ts := ms / 1000 cmds, err := tx.Exec(func() error { - r.writeShare(tx, ms, ts, login, id, diff, actualDiff, window) + r.writeShare(tx, ms, ts, login, id, diff, shareDiffCalc, window) tx.HSet(r.formatKey("stats"), "lastBlockFound", strconv.FormatInt(ts, 10)) tx.HDel(r.formatKey("stats"), "roundShares") tx.HSet(r.formatKey("miners", login), "roundShares", strconv.FormatInt(0, 10)) @@ -599,13 +599,13 @@ func (r *RedisClient) WriteBlock(login, id string, params []string, diff, actual totalShares += n } hashHex := strings.Join(params, ":") - s := join(hashHex, ts, roundDiff, totalShares, login, actualDiff, id) + s := join(hashHex, ts, roundDiff, totalShares, login, shareDiffCalc, id) cmd := r.client.ZAdd(r.formatKey("blocks", "candidates"), redis.Z{Score: float64(height), Member: s}) return false, cmd.Err() } } -func (r *RedisClient) writeShare(tx *redis.Multi, ms, ts int64, login, id string, diff int64, actualDiff int64, expire time.Duration) { +func (r *RedisClient) writeShare(tx *redis.Multi, ms, ts int64, login, id string, diff int64, shareDiffCalc int64, expire time.Duration) { times := int(diff / 1000000000) for i := 0; i < times; i++ { tx.LPush(r.formatKey("lastshares"), login) @@ -617,7 +617,7 @@ func (r *RedisClient) writeShare(tx *redis.Multi, ms, ts int64, login, id string tx.ZAdd(r.formatKey("hashrate", login), redis.Z{Score: float64(ts), Member: join(diff, id, ms)}) tx.Expire(r.formatKey("hashrate", login), expire) // Will delete hashrates for miners that gone tx.HSet(r.formatKey("miners", login), "lastShare", strconv.FormatInt(ts, 10)) - tx.HSet(r.formatKey("miners", login), "lastShareDiff", strconv.FormatInt(actualDiff, 10)) + tx.HSet(r.formatKey("miners", login), "lastShareDiff", strconv.FormatInt(shareDiffCalc, 10)) } func (r *RedisClient) WriteBlocksFound(ms, ts int64, login, id, share string, diff int64) { @@ -1448,7 +1448,7 @@ func convertCandidateResults(raw *redis.ZSliceCmd) []*BlockData { block.Difficulty, _ = strconv.ParseInt(fields[4], 10, 64) block.TotalShares, _ = strconv.ParseInt(fields[5], 10, 64) block.Login = fields[6] - block.ActualDiff, _ = strconv.ParseInt(fields[7], 10, 64) + block.ShareDiffCalc, _ = strconv.ParseInt(fields[7], 10, 64) block.Worker = fields[8] block.candidateKey = v.Member.(string) result = append(result, &block) @@ -1496,7 +1496,7 @@ func convertBlockResults(rows ...*redis.ZSliceCmd) []*BlockData { block.RewardString = fields[7] block.ImmatureReward = fields[7] block.Login = fields[8] - block.ActualDiff, _ = strconv.ParseInt(fields[9], 10, 64) + block.ShareDiffCalc, _ = strconv.ParseInt(fields[9], 10, 64) block.Worker = fields[10] block.immatureKey = v.Member.(string) result = append(result, &block) diff --git a/util/util.go b/util/util.go index 8caa41d..9d8e406 100644 --- a/util/util.go +++ b/util/util.go @@ -80,3 +80,17 @@ func String2Big(num string) *big.Int { n.SetString(num, 0) return n } + +func DiffFloatToInt(diffFloat float64) (diffInt int64) { + diffInt = int64(diffFloat * float64(1<<48) / float64(0xffff)) // 48 = 256 - 26*8 + return +} + +func DiffIntToFloat(diffInt int64) (diffFloat float64) { + diffFloat = float64(diffInt*0xffff) / float64(1<<48) // 48 = 256 - 26*8 + return +} + +func ToHex1(n int64) string { + return strconv.FormatInt(n, 10) +}