Browse Source

test

master
yuriy0803 5 years ago
parent
commit
debfde18e1
  1. 117
      util/proxy/blocks.go
  2. 73
      util/proxy/config.go
  3. 115
      util/proxy/handlers.go
  4. 95
      util/proxy/miner.go
  5. 44
      util/proxy/proto.go
  6. 320
      util/proxy/proxy.go
  7. 221
      util/proxy/stratum.go

117
util/proxy/blocks.go

@ -0,0 +1,117 @@
package proxy
import (
"log"
"math/big"
"strconv"
"strings"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/yuriy0803/open-etc-pool-friends/rpc"
"github.com/yuriy0803/open-etc-pool-friends/util"
)
const maxBacklog = 3
type heightDiffPair struct {
diff *big.Int
height uint64
}
type BlockTemplate struct {
sync.RWMutex
Header string
Seed string
Target string
Difficulty *big.Int
Height uint64
GetPendingBlockCache *rpc.GetBlockReplyPart
nonces map[string]bool
headers map[string]heightDiffPair
}
type Block struct {
difficulty *big.Int
hashNoNonce common.Hash
nonce uint64
mixDigest common.Hash
number uint64
}
func (b Block) Difficulty() *big.Int { return b.difficulty }
func (b Block) HashNoNonce() common.Hash { return b.hashNoNonce }
func (b Block) Nonce() uint64 { return b.nonce }
func (b Block) MixDigest() common.Hash { return b.mixDigest }
func (b Block) NumberU64() uint64 { return b.number }
func (s *ProxyServer) fetchBlockTemplate() {
rpc := s.rpc()
t := s.currentBlockTemplate()
pendingReply, height, diff, err := s.fetchPendingBlock()
if err != nil {
log.Printf("Error while refreshing pending block on %s: %s", rpc.Name, err)
return
}
reply, err := rpc.GetWork()
if err != nil {
log.Printf("Error while refreshing block template on %s: %s", rpc.Name, err)
return
}
// No need to update, we have fresh job
if t != nil && t.Header == reply[0] {
return
}
pendingReply.Difficulty = util.ToHex(s.config.Proxy.Difficulty)
newTemplate := BlockTemplate{
Header: reply[0],
Seed: reply[1],
Target: reply[2],
Height: height,
Difficulty: big.NewInt(diff),
GetPendingBlockCache: pendingReply,
headers: make(map[string]heightDiffPair),
}
// Copy job backlog and add current one
newTemplate.headers[reply[0]] = heightDiffPair{
diff: util.TargetHexToDiff(reply[2]),
height: height,
}
if t != nil {
for k, v := range t.headers {
if v.height > height-maxBacklog {
newTemplate.headers[k] = v
}
}
}
s.blockTemplate.Store(&newTemplate)
log.Printf("New block to mine on %s at height %d / %s", rpc.Name, height, reply[0][0:10])
// Stratum
if s.config.Proxy.Stratum.Enabled {
go s.broadcastNewJobs()
}
}
func (s *ProxyServer) fetchPendingBlock() (*rpc.GetBlockReplyPart, uint64, int64, error) {
rpc := s.rpc()
reply, err := rpc.GetPendingBlock()
if err != nil {
log.Printf("Error while refreshing pending block on %s: %s", rpc.Name, err)
return nil, 0, 0, err
}
blockNumber, err := strconv.ParseUint(strings.Replace(reply.Number, "0x", "", -1), 16, 64)
if err != nil {
log.Println("Can't parse pending block number")
return nil, 0, 0, err
}
blockDiff, err := strconv.ParseInt(strings.Replace(reply.Difficulty, "0x", "", -1), 16, 64)
if err != nil {
log.Println("Can't parse pending block difficulty")
return nil, 0, 0, err
}
return reply, blockNumber, blockDiff, nil
}

73
util/proxy/config.go

@ -0,0 +1,73 @@
package proxy
import (
"github.com/yuriy0803/open-etc-pool-friends/api"
"github.com/yuriy0803/open-etc-pool-friends/payouts"
"github.com/yuriy0803/open-etc-pool-friends/policy"
"github.com/yuriy0803/open-etc-pool-friends/storage"
)
type Config struct {
Name string `json:"name"`
Proxy Proxy `json:"proxy"`
Api api.ApiConfig `json:"api"`
Upstream []Upstream `json:"upstream"`
UpstreamCheckInterval string `json:"upstreamCheckInterval"`
Threads int `json:"threads"`
Network string `json:"network"`
Coin string `json:"coin"`
Pplns int64 `json:"pplns"`
Redis storage.Config `json:"redis"`
BlockUnlocker payouts.UnlockerConfig `json:"unlocker"`
Payouts payouts.PayoutsConfig `json:"payouts"`
NewrelicName string `json:"newrelicName"`
NewrelicKey string `json:"newrelicKey"`
NewrelicVerbose bool `json:"newrelicVerbose"`
NewrelicEnabled bool `json:"newrelicEnabled"`
}
type Proxy struct {
Enabled bool `json:"enabled"`
Listen string `json:"listen"`
LimitHeadersSize int `json:"limitHeadersSize"`
LimitBodySize int64 `json:"limitBodySize"`
BehindReverseProxy bool `json:"behindReverseProxy"`
BlockRefreshInterval string `json:"blockRefreshInterval"`
Difficulty int64 `json:"difficulty"`
StateUpdateInterval string `json:"stateUpdateInterval"`
HashrateExpiration string `json:"hashrateExpiration"`
Policy policy.Config `json:"policy"`
MaxFails int64 `json:"maxFails"`
HealthCheck bool `json:"healthCheck"`
Debug bool `json:"debug"`
Stratum Stratum `json:"stratum"`
StratumNiceHash StratumNiceHash `json:"stratum_nice_hash"`
}
type Stratum struct {
Enabled bool `json:"enabled"`
Listen string `json:"listen"`
Timeout string `json:"timeout"`
MaxConn int `json:"maxConn"`
}
type StratumNiceHash struct {
Enabled bool `json:"enabled"`
Listen string `json:"listen"`
Timeout string `json:"timeout"`
MaxConn int `json:"maxConn"`
}
type Upstream struct {
Name string `json:"name"`
Url string `json:"url"`
Timeout string `json:"timeout"`
}

115
util/proxy/handlers.go

@ -0,0 +1,115 @@
package proxy
import (
"errors"
"log"
"regexp"
"strings"
"github.com/yuriy0803/open-etc-pool-friends/rpc"
"github.com/yuriy0803/open-etc-pool-friends/util"
)
// Allow only lowercase hexadecimal with 0x prefix
var noncePattern = regexp.MustCompile("^0x[0-9a-f]{16}$")
var hashPattern = regexp.MustCompile("^0x[0-9a-f]{64}$")
var workerPattern = regexp.MustCompile("^[0-9a-zA-Z-_]{1,8}$")
// Stratum
func (s *ProxyServer) handleLoginRPC(cs *Session, params []string, id string) (bool, *ErrorReply) {
if len(params) == 0 {
return false, &ErrorReply{Code: -1, Message: "Invalid params"}
}
login := strings.ToLower(params[0])
if !util.IsValidHexAddress(login) {
return false, &ErrorReply{Code: -1, Message: "Invalid login"}
}
if !s.policy.ApplyLoginPolicy(login, cs.ip) {
return false, &ErrorReply{Code: -1, Message: "You are blacklisted"}
}
cs.login = login
s.registerSession(cs)
log.Printf("Stratum miner connected %v@%v", login, cs.ip)
return true, nil
}
func (s *ProxyServer) handleGetWorkRPC(cs *Session) ([]string, *ErrorReply) {
t := s.currentBlockTemplate()
if t == nil || len(t.Header) == 0 || s.isSick() {
return nil, &ErrorReply{Code: 0, Message: "Work not ready"}
}
return []string{t.Header, t.Seed, s.diff}, nil
}
// Stratum
func (s *ProxyServer) handleTCPSubmitRPC(cs *Session, id string, params []string) (bool, *ErrorReply) {
s.sessionsMu.RLock()
_, ok := s.sessions[cs]
s.sessionsMu.RUnlock()
if !ok {
return false, &ErrorReply{Code: 25, Message: "Not subscribed"}
}
return s.handleSubmitRPC(cs, cs.login, id, params)
}
func (s *ProxyServer) handleSubmitRPC(cs *Session, login, id string, params []string) (bool, *ErrorReply) {
if !workerPattern.MatchString(id) {
id = "0"
}
if len(params) != 3 {
s.policy.ApplyMalformedPolicy(cs.ip)
log.Printf("Malformed params from %s@%s %v", login, cs.ip, params)
return false, &ErrorReply{Code: -1, Message: "Invalid params"}
}
if !noncePattern.MatchString(params[0]) || !hashPattern.MatchString(params[1]) || !hashPattern.MatchString(params[2]) {
s.policy.ApplyMalformedPolicy(cs.ip)
log.Printf("Malformed PoW result from %s@%s %v", login, cs.ip, params)
return false, &ErrorReply{Code: -1, Message: "Malformed PoW result"}
}
go func(s *ProxyServer, cs *Session, login, id string, params []string) {
t := s.currentBlockTemplate()
exist, validShare := s.processShare(login, id, cs.ip, t, params)
ok := s.policy.ApplySharePolicy(cs.ip, !exist && validShare)
if exist {
log.Printf("Duplicate share from %s@%s %v", login, cs.ip, params)
cs.lastErr = errors.New("Duplicate share")
}
if !validShare {
log.Printf("Invalid share from %s@%s", login, cs.ip)
// Bad shares limit reached, return error and close
if !ok {
cs.lastErr = errors.New("Invalid share")
}
}
if s.config.Proxy.Debug {
log.Printf("Valid share from %s@%s", login, cs.ip)
}
if !ok {
cs.lastErr = errors.New("High rate of invalid shares")
}
}(s, cs, login, id, params)
return true, nil
}
func (s *ProxyServer) handleGetBlockByNumberRPC() *rpc.GetBlockReplyPart {
t := s.currentBlockTemplate()
var reply *rpc.GetBlockReplyPart
if t != nil {
reply = t.GetPendingBlockCache
}
return reply
}
func (s *ProxyServer) handleUnknownRPC(cs *Session, m string) *ErrorReply {
log.Printf("Unknown request method %s from %s", m, cs.ip)
s.policy.ApplyMalformedPolicy(cs.ip)
return &ErrorReply{Code: -3, Message: "Method not found"}
}

95
util/proxy/miner.go

@ -0,0 +1,95 @@
package proxy
import (
"log"
"math/big"
"strconv"
"strings"
"github.com/ethereum/go-ethereum/common"
"github.com/yuriy0803/go-etchash"
)
var ecip1099FBlockClassic uint64 = 11700000 // classic mainnet
var ecip1099FBlockMordor uint64 = 2520000 // mordor
var hasher *etchash.Etchash = nil
func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, params []string) (bool, bool) {
if hasher == nil {
if s.config.Network == "classic" {
hasher = etchash.New(&ecip1099FBlockClassic)
} else if s.config.Network == "mordor" {
hasher = etchash.New(&ecip1099FBlockMordor)
} else {
// unknown network
log.Printf("Unknown network configuration %s", s.config.Network)
return false, false
}
}
nonceHex := params[0]
hashNoNonce := params[1]
mixDigest := params[2]
nonce, _ := strconv.ParseUint(strings.Replace(nonceHex, "0x", "", -1), 16, 64)
shareDiff := s.config.Proxy.Difficulty
h, ok := t.headers[hashNoNonce]
if !ok {
log.Printf("Stale share from %v@%v", login, ip)
s.backend.WriteWorkerShareStatus(login, id, false, true, false)
return false, false
}
share := Block{
number: h.height,
hashNoNonce: common.HexToHash(hashNoNonce),
difficulty: big.NewInt(shareDiff),
nonce: nonce,
mixDigest: common.HexToHash(mixDigest),
}
block := Block{
number: h.height,
hashNoNonce: common.HexToHash(hashNoNonce),
difficulty: h.diff,
nonce: nonce,
mixDigest: common.HexToHash(mixDigest),
}
if !hasher.Verify(share) {
s.backend.WriteWorkerShareStatus(login, id, false, false, true)
return false, false
}
if hasher.Verify(block) {
ok, err := s.rpc().SubmitBlock(params)
if err != nil {
log.Printf("Block submission failure at height %v for %v: %v", h.height, t.Header, err)
} else if !ok {
log.Printf("Block rejected at height %v for %v", h.height, t.Header)
return false, false
} else {
s.fetchBlockTemplate()
exist, err := s.backend.WriteBlock(login, id, params, shareDiff, h.diff.Int64(), h.height, s.hashrateExpiration)
if exist {
return true, false
}
if err != nil {
log.Println("Failed to insert block candidate into backend:", err)
} else {
log.Printf("Inserted block %v to backend", h.height)
}
log.Printf("Block found by miner %v@%v at height %d", login, ip, h.height)
}
} else {
exist, err := s.backend.WriteShare(login, id, params, shareDiff, h.height, s.hashrateExpiration)
if exist {
return true, false
}
if err != nil {
log.Println("Failed to insert share data into backend:", err)
}
}
s.backend.WriteWorkerShareStatus(login, id, true, false, false)
return false, true
}

44
util/proxy/proto.go

@ -0,0 +1,44 @@
package proxy
import "encoding/json"
type JSONRpcReq struct {
Id json.RawMessage `json:"id"`
Method string `json:"method"`
Params json.RawMessage `json:"params"`
}
type JSONRpcReqNH struct {
Id interface{} `json:"id"`
Method string `json:"method"`
Params interface{} `json:"params"`
}
type StratumReq struct {
JSONRpcReq
Worker string `json:"worker"`
}
// Stratum
type JSONPushMessage struct {
// FIXME: Temporarily add ID for Claymore compliance
Id int64 `json:"id"`
Version string `json:"jsonrpc"`
Result interface{} `json:"result"`
}
type JSONRpcResp struct {
Id json.RawMessage `json:"id"`
Version string `json:"jsonrpc"`
Result interface{} `json:"result"`
Error interface{} `json:"error,omitempty"`
}
type SubmitReply struct {
Status string `json:"status"`
}
type ErrorReply struct {
Code int `json:"code"`
Message string `json:"message"`
}

320
util/proxy/proxy.go

@ -0,0 +1,320 @@
package proxy
import (
"encoding/json"
"io"
"log"
"net"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/mux"
"github.com/yuriy0803/open-etc-pool-friends/policy"
"github.com/yuriy0803/open-etc-pool-friends/rpc"
"github.com/yuriy0803/open-etc-pool-friends/storage"
"github.com/yuriy0803/open-etc-pool-friends/util"
)
type ProxyServer struct {
config *Config
blockTemplate atomic.Value
upstream int32
upstreams []*rpc.RPCClient
backend *storage.RedisClient
diff string
policy *policy.PolicyServer
hashrateExpiration time.Duration
failsCount int64
// Stratum
sessionsMu sync.RWMutex
sessions map[*Session]struct{}
timeout time.Duration
Extranonce string
}
type jobDetails struct {
JobID string
SeedHash string
HeaderHash string
}
type Session struct {
ip string
enc *json.Encoder
// Stratum
sync.Mutex
conn *net.TCPConn
login string
lastErr error
subscriptionID string
JobDeatils jobDetails
}
func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer {
if len(cfg.Name) == 0 {
log.Fatal("You must set instance name")
}
policy := policy.Start(&cfg.Proxy.Policy, backend)
proxy := &ProxyServer{config: cfg, backend: backend, policy: policy}
proxy.diff = util.GetTargetHex(cfg.Proxy.Difficulty)
proxy.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream))
for i, v := range cfg.Upstream {
proxy.upstreams[i] = rpc.NewRPCClient(v.Name, v.Url, v.Timeout)
log.Printf("Upstream: %s => %s", v.Name, v.Url)
}
log.Printf("Default upstream: %s => %s", proxy.rpc().Name, proxy.rpc().Url)
if cfg.Proxy.Stratum.Enabled {
proxy.sessions = make(map[*Session]struct{})
go proxy.ListenTCP()
}
proxy.fetchBlockTemplate()
proxy.hashrateExpiration = util.MustParseDuration(cfg.Proxy.HashrateExpiration)
refreshIntv := util.MustParseDuration(cfg.Proxy.BlockRefreshInterval)
refreshTimer := time.NewTimer(refreshIntv)
log.Printf("Set block refresh every %v", refreshIntv)
checkIntv := util.MustParseDuration(cfg.UpstreamCheckInterval)
checkTimer := time.NewTimer(checkIntv)
stateUpdateIntv := util.MustParseDuration(cfg.Proxy.StateUpdateInterval)
stateUpdateTimer := time.NewTimer(stateUpdateIntv)
go func() {
for {
select {
case <-refreshTimer.C:
proxy.fetchBlockTemplate()
refreshTimer.Reset(refreshIntv)
}
}
}()
go func() {
for {
select {
case <-checkTimer.C:
proxy.checkUpstreams()
checkTimer.Reset(checkIntv)
}
}
}()
go func() {
for {
select {
case <-stateUpdateTimer.C:
t := proxy.currentBlockTemplate()
if t != nil {
err := backend.WriteNodeState(cfg.Name, t.Height, t.Difficulty)
if err != nil {
log.Printf("Failed to write node state to backend: %v", err)
proxy.markSick()
} else {
proxy.markOk()
}
}
stateUpdateTimer.Reset(stateUpdateIntv)
}
}
}()
return proxy
}
func (s *ProxyServer) Start() {
log.Printf("Starting proxy on %v", s.config.Proxy.Listen)
r := mux.NewRouter()
r.Handle("/{login:0x[0-9a-fA-F]{40}}/{id:[0-9a-zA-Z-_]{1,8}}", s)
r.Handle("/{login:0x[0-9a-fA-F]{40}}", s)
srv := &http.Server{
Addr: s.config.Proxy.Listen,
Handler: r,
MaxHeaderBytes: s.config.Proxy.LimitHeadersSize,
}
err := srv.ListenAndServe()
if err != nil {
log.Fatalf("Failed to start proxy: %v", err)
}
}
func (s *ProxyServer) rpc() *rpc.RPCClient {
i := atomic.LoadInt32(&s.upstream)
return s.upstreams[i]
}
func (s *ProxyServer) checkUpstreams() {
candidate := int32(0)
backup := false
for i, v := range s.upstreams {
if v.Check() && !backup {
candidate = int32(i)
backup = true
}
}
if s.upstream != candidate {
log.Printf("Switching to %v upstream", s.upstreams[candidate].Name)
atomic.StoreInt32(&s.upstream, candidate)
}
}
func (s *ProxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
s.writeError(w, 405, "rpc: POST method required, received "+r.Method)
return
}
ip := s.remoteAddr(r)
if !s.policy.IsBanned(ip) {
s.handleClient(w, r, ip)
}
}
func (s *ProxyServer) remoteAddr(r *http.Request) string {
if s.config.Proxy.BehindReverseProxy {
ip := r.Header.Get("X-Forwarded-For")
if len(ip) > 0 && net.ParseIP(ip) != nil {
return ip
}
}
ip, _, _ := net.SplitHostPort(r.RemoteAddr)
return ip
}
func (s *ProxyServer) handleClient(w http.ResponseWriter, r *http.Request, ip string) {
if r.ContentLength > s.config.Proxy.LimitBodySize {
log.Printf("Socket flood from %s", ip)
s.policy.ApplyMalformedPolicy(ip)
http.Error(w, "Request too large", http.StatusExpectationFailed)
return
}
r.Body = http.MaxBytesReader(w, r.Body, s.config.Proxy.LimitBodySize)
defer r.Body.Close()
cs := &Session{ip: ip, enc: json.NewEncoder(w)}
dec := json.NewDecoder(r.Body)
for {
var req JSONRpcReq
if err := dec.Decode(&req); err == io.EOF {
break
} else if err != nil {
log.Printf("Malformed request from %v: %v", ip, err)
s.policy.ApplyMalformedPolicy(ip)
return
}
cs.handleMessage(s, r, &req)
}
}
func (cs *Session) handleMessage(s *ProxyServer, r *http.Request, req *JSONRpcReq) {
if req.Id == nil {
log.Printf("Missing RPC id from %s", cs.ip)
s.policy.ApplyMalformedPolicy(cs.ip)
return
}
vars := mux.Vars(r)
login := strings.ToLower(vars["login"])
if !util.IsValidHexAddress(login) {
errReply := &ErrorReply{Code: -1, Message: "Invalid login"}
cs.sendError(req.Id, errReply)
return
}
if !s.policy.ApplyLoginPolicy(login, cs.ip) {
errReply := &ErrorReply{Code: -1, Message: "You are blacklisted"}
cs.sendError(req.Id, errReply)
return
}
// Handle RPC methods
switch req.Method {
case "eth_getWork":
reply, errReply := s.handleGetWorkRPC(cs)
if errReply != nil {
cs.sendError(req.Id, errReply)
break
}
cs.sendResult(req.Id, &reply)
case "eth_submitWork":
if req.Params != nil {
var params []string
err := json.Unmarshal(req.Params, &params)
if err != nil {
log.Printf("Unable to parse params from %v", cs.ip)
s.policy.ApplyMalformedPolicy(cs.ip)
break
}
reply, errReply := s.handleSubmitRPC(cs, login, vars["id"], params)
if errReply != nil {
cs.sendError(req.Id, errReply)
break
}
cs.sendResult(req.Id, &reply)
} else {
s.policy.ApplyMalformedPolicy(cs.ip)
errReply := &ErrorReply{Code: -1, Message: "Malformed request"}
cs.sendError(req.Id, errReply)
}
case "eth_getBlockByNumber":
reply := s.handleGetBlockByNumberRPC()
cs.sendResult(req.Id, reply)
case "eth_submitHashrate":
cs.sendResult(req.Id, true)
default:
errReply := s.handleUnknownRPC(cs, req.Method)
cs.sendError(req.Id, errReply)
}
}
func (cs *Session) sendResult(id json.RawMessage, result interface{}) error {
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result}
return cs.enc.Encode(&message)
}
func (cs *Session) sendError(id json.RawMessage, reply *ErrorReply) error {
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply}
return cs.enc.Encode(&message)
}
func (s *ProxyServer) writeError(w http.ResponseWriter, status int, msg string) {
w.WriteHeader(status)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
}
func (s *ProxyServer) currentBlockTemplate() *BlockTemplate {
t := s.blockTemplate.Load()
if t != nil {
return t.(*BlockTemplate)
} else {
return nil
}
}
func (s *ProxyServer) markSick() {
atomic.AddInt64(&s.failsCount, 1)
}
func (s *ProxyServer) isSick() bool {
x := atomic.LoadInt64(&s.failsCount)
if s.config.Proxy.HealthCheck && x >= s.config.Proxy.MaxFails {
return true
}
return false
}
func (s *ProxyServer) markOk() {
atomic.StoreInt64(&s.failsCount, 0)
}

221
util/proxy/stratum.go

@ -0,0 +1,221 @@
package proxy
import (
"bufio"
"encoding/json"
"errors"
"io"
"log"
"net"
"time"
"github.com/yuriy0803/open-etc-pool-friends/util"
)
const (
MaxReqSize = 1024
)
func (s *ProxyServer) ListenTCP() {
timeout := util.MustParseDuration(s.config.Proxy.Stratum.Timeout)
s.timeout = timeout
addr, err := net.ResolveTCPAddr("tcp4", s.config.Proxy.Stratum.Listen)
if err != nil {
log.Fatalf("Error: %v", err)
}
server, err := net.ListenTCP("tcp4", addr)
if err != nil {
log.Fatalf("Error: %v", err)
}
defer server.Close()
log.Printf("Stratum listening on %s", s.config.Proxy.Stratum.Listen)
var accept = make(chan int, s.config.Proxy.Stratum.MaxConn)
n := 0
for {
conn, err := server.AcceptTCP()
if err != nil {
continue
}
conn.SetKeepAlive(true)
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) {
conn.Close()
continue
}
n += 1
cs := &Session{conn: conn, ip: ip}
accept <- n
go func(cs *Session) {
err = s.handleTCPClient(cs)
if err != nil || cs.lastErr != nil {
s.removeSession(cs)
conn.Close()
}
<-accept
}(cs)
}
}
func (s *ProxyServer) handleTCPClient(cs *Session) error {
cs.enc = json.NewEncoder(cs.conn)
connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize)
s.setDeadline(cs.conn)
for {
data, isPrefix, err := connbuff.ReadLine()
if isPrefix {
log.Printf("Socket flood detected from %s", cs.ip)
s.policy.BanClient(cs.ip)
return err
} else if err == io.EOF {
log.Printf("Client %s disconnected", cs.ip)
s.removeSession(cs)
break
} else if err != nil {
log.Printf("Error reading from socket: %v", err)
return err
}
if len(data) > 1 {
var req StratumReq
err = json.Unmarshal(data, &req)
if err != nil {
s.policy.ApplyMalformedPolicy(cs.ip)
log.Printf("Malformed stratum request from %s: %v", cs.ip, err)
return err
}
s.setDeadline(cs.conn)
err = cs.handleTCPMessage(s, &req)
if err != nil {
return err
}
}
}
return nil
}
func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error {
// Handle RPC methods
switch req.Method {
case "eth_submitLogin":
var params []string
err := json.Unmarshal(req.Params, &params)
if err != nil {
log.Println("Malformed stratum request params from", cs.ip)
return err
}
reply, errReply := s.handleLoginRPC(cs, params, req.Worker)
if errReply != nil {
return cs.sendTCPError(req.Id, errReply)
}
return cs.sendTCPResult(req.Id, reply)
case "eth_getWork":
reply, errReply := s.handleGetWorkRPC(cs)
if errReply != nil {
return cs.sendTCPError(req.Id, errReply)
}
return cs.sendTCPResult(req.Id, &reply)
case "eth_submitWork":
var params []string
err := json.Unmarshal(req.Params, &params)
if err != nil {
log.Println("Malformed stratum request params from", cs.ip)
return err
}
reply, errReply := s.handleTCPSubmitRPC(cs, req.Worker, params)
if errReply != nil {
return cs.sendTCPError(req.Id, errReply)
}
return cs.sendTCPResult(req.Id, &reply)
case "eth_submitHashrate":
return cs.sendTCPResult(req.Id, true)
default:
errReply := s.handleUnknownRPC(cs, req.Method)
return cs.sendTCPError(req.Id, errReply)
}
}
func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error {
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result}
return cs.enc.Encode(&message)
}
func (cs *Session) pushNewJob(result interface{}) error {
cs.Lock()
defer cs.Unlock()
// FIXME: Temporarily add ID for Claymore compliance
message := JSONPushMessage{Version: "2.0", Result: result, Id: 0}
return cs.enc.Encode(&message)
}
func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error {
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply}
err := cs.enc.Encode(&message)
if err != nil {
return err
}
return errors.New(reply.Message)
}
func (self *ProxyServer) setDeadline(conn *net.TCPConn) {
conn.SetDeadline(time.Now().Add(self.timeout))
}
func (s *ProxyServer) registerSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
s.sessions[cs] = struct{}{}
}
func (s *ProxyServer) removeSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
delete(s.sessions, cs)
}
func (s *ProxyServer) broadcastNewJobs() {
t := s.currentBlockTemplate()
if t == nil || len(t.Header) == 0 || s.isSick() {
return
}
reply := []string{t.Header, t.Seed, s.diff}
s.sessionsMu.RLock()
defer s.sessionsMu.RUnlock()
count := len(s.sessions)
log.Printf("Broadcasting new job to %v stratum miners", count)
s.backend.NumberStratumWorker(count)
start := time.Now()
bcast := make(chan int, 1024)
n := 0
for m, _ := range s.sessions {
n++
bcast <- n
go func(cs *Session) {
err := cs.pushNewJob(&reply)
<-bcast
if err != nil {
log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err)
s.removeSession(cs)
} else {
s.setDeadline(cs.conn)
}
}(m)
}
log.Printf("Jobs broadcast finished %s", time.Since(start))
}
Loading…
Cancel
Save