You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
220 lines
5.0 KiB
220 lines
5.0 KiB
package proxy |
|
|
|
import ( |
|
"bufio" |
|
"encoding/json" |
|
"errors" |
|
"io" |
|
"log" |
|
"net" |
|
"time" |
|
|
|
"github.com/yuriy0803/open-etc-pool-friends/util" |
|
) |
|
|
|
const ( |
|
MaxReqSize = 1024 |
|
) |
|
|
|
func (s *ProxyServer) ListenTCP() { |
|
timeout := util.MustParseDuration(s.config.Proxy.Stratum.Timeout) |
|
s.timeout = timeout |
|
|
|
addr, err := net.ResolveTCPAddr("tcp4", s.config.Proxy.Stratum.Listen) |
|
if err != nil { |
|
log.Fatalf("Error: %v", err) |
|
} |
|
server, err := net.ListenTCP("tcp4", addr) |
|
if err != nil { |
|
log.Fatalf("Error: %v", err) |
|
} |
|
defer server.Close() |
|
|
|
log.Printf("Stratum listening on %s", s.config.Proxy.Stratum.Listen) |
|
var accept = make(chan int, s.config.Proxy.Stratum.MaxConn) |
|
n := 0 |
|
|
|
for { |
|
conn, err := server.AcceptTCP() |
|
if err != nil { |
|
continue |
|
} |
|
conn.SetKeepAlive(true) |
|
|
|
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) |
|
|
|
if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) { |
|
conn.Close() |
|
continue |
|
} |
|
n += 1 |
|
cs := &Session{conn: conn, ip: ip} |
|
|
|
accept <- n |
|
go func(cs *Session) { |
|
err = s.handleTCPClient(cs) |
|
if err != nil || cs.lastErr != nil { |
|
s.removeSession(cs) |
|
conn.Close() |
|
} |
|
<-accept |
|
}(cs) |
|
} |
|
} |
|
|
|
func (s *ProxyServer) handleTCPClient(cs *Session) error { |
|
cs.enc = json.NewEncoder(cs.conn) |
|
connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize) |
|
s.setDeadline(cs.conn) |
|
for { |
|
data, isPrefix, err := connbuff.ReadLine() |
|
if isPrefix { |
|
log.Printf("Socket flood detected from %s", cs.ip) |
|
s.policy.BanClient(cs.ip) |
|
return err |
|
} else if err == io.EOF { |
|
log.Printf("Client %s disconnected", cs.ip) |
|
s.removeSession(cs) |
|
break |
|
} else if err != nil { |
|
log.Printf("Error reading from socket: %v", err) |
|
return err |
|
} |
|
|
|
if len(data) > 1 { |
|
var req StratumReq |
|
err = json.Unmarshal(data, &req) |
|
if err != nil { |
|
s.policy.ApplyMalformedPolicy(cs.ip) |
|
log.Printf("Malformed stratum request from %s: %v", cs.ip, err) |
|
return err |
|
} |
|
s.setDeadline(cs.conn) |
|
err = cs.handleTCPMessage(s, &req) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { |
|
// Handle RPC methods |
|
switch req.Method { |
|
case "eth_submitLogin": |
|
var params []string |
|
err := json.Unmarshal(req.Params, ¶ms) |
|
if err != nil { |
|
log.Println("Malformed stratum request params from", cs.ip) |
|
return err |
|
} |
|
reply, errReply := s.handleLoginRPC(cs, params, req.Worker) |
|
if errReply != nil { |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
return cs.sendTCPResult(req.Id, reply) |
|
case "eth_getWork": |
|
reply, errReply := s.handleGetWorkRPC(cs) |
|
if errReply != nil { |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
return cs.sendTCPResult(req.Id, &reply) |
|
case "eth_submitWork": |
|
var params []string |
|
err := json.Unmarshal(req.Params, ¶ms) |
|
if err != nil { |
|
log.Println("Malformed stratum request params from", cs.ip) |
|
return err |
|
} |
|
reply, errReply := s.handleTCPSubmitRPC(cs, req.Worker, params) |
|
if errReply != nil { |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
return cs.sendTCPResult(req.Id, &reply) |
|
case "eth_submitHashrate": |
|
return cs.sendTCPResult(req.Id, true) |
|
default: |
|
errReply := s.handleUnknownRPC(cs, req.Method) |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
} |
|
|
|
func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
|
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (cs *Session) pushNewJob(result interface{}) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
// FIXME: Temporarily add ID for Claymore compliance |
|
message := JSONPushMessage{Version: "2.0", Result: result, Id: 0} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
|
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} |
|
err := cs.enc.Encode(&message) |
|
if err != nil { |
|
return err |
|
} |
|
return errors.New(reply.Message) |
|
} |
|
|
|
func (self *ProxyServer) setDeadline(conn *net.TCPConn) { |
|
conn.SetDeadline(time.Now().Add(self.timeout)) |
|
} |
|
|
|
func (s *ProxyServer) registerSession(cs *Session) { |
|
s.sessionsMu.Lock() |
|
defer s.sessionsMu.Unlock() |
|
s.sessions[cs] = struct{}{} |
|
} |
|
|
|
func (s *ProxyServer) removeSession(cs *Session) { |
|
s.sessionsMu.Lock() |
|
defer s.sessionsMu.Unlock() |
|
delete(s.sessions, cs) |
|
} |
|
|
|
func (s *ProxyServer) broadcastNewJobs() { |
|
t := s.currentBlockTemplate() |
|
if t == nil || len(t.Header) == 0 || s.isSick() { |
|
return |
|
} |
|
reply := []string{t.Header, t.Seed, s.diff} |
|
|
|
s.sessionsMu.RLock() |
|
defer s.sessionsMu.RUnlock() |
|
|
|
count := len(s.sessions) |
|
log.Printf("Broadcasting new job to %v stratum miners", count) |
|
|
|
start := time.Now() |
|
bcast := make(chan int, 1024) |
|
n := 0 |
|
|
|
for m, _ := range s.sessions { |
|
n++ |
|
bcast <- n |
|
|
|
go func(cs *Session) { |
|
err := cs.pushNewJob(&reply) |
|
<-bcast |
|
if err != nil { |
|
log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err) |
|
s.removeSession(cs) |
|
} else { |
|
s.setDeadline(cs.conn) |
|
} |
|
}(m) |
|
} |
|
log.Printf("Jobs broadcast finished %s", time.Since(start)) |
|
}
|
|
|