You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
234 lines
5.5 KiB
234 lines
5.5 KiB
package proxy |
|
|
|
import ( |
|
"bufio" |
|
"crypto/tls" |
|
"encoding/json" |
|
"errors" |
|
"io" |
|
"log" |
|
"net" |
|
"time" |
|
|
|
"github.com/yuriy0803/open-etc-pool-friends/util" |
|
) |
|
|
|
const ( |
|
MaxReqSize = 1024 |
|
) |
|
|
|
func (s *ProxyServer) ListenTCP() { |
|
s.timeout = util.MustParseDuration(s.config.Proxy.Stratum.Timeout) |
|
|
|
var err error |
|
var server net.Listener |
|
setKeepAlive := func(net.Conn) {} |
|
if s.config.Proxy.Stratum.TLS { |
|
var cert tls.Certificate |
|
cert, err = tls.LoadX509KeyPair(s.config.Proxy.Stratum.CertFile, s.config.Proxy.Stratum.KeyFile) |
|
if err != nil { |
|
log.Fatalln("Error loading certificate:", err) |
|
} |
|
tlsCfg := &tls.Config{Certificates: []tls.Certificate{cert}} |
|
server, err = tls.Listen("tcp4", s.config.Proxy.Stratum.Listen, tlsCfg) |
|
} else { |
|
server, err = net.Listen("tcp4", s.config.Proxy.Stratum.Listen) |
|
setKeepAlive = func(conn net.Conn) { |
|
conn.(*net.TCPConn).SetKeepAlive(true) |
|
} |
|
} |
|
|
|
if err != nil { |
|
log.Fatalf("Error: %v", err) |
|
} |
|
defer server.Close() |
|
|
|
log.Printf("Stratum listening on %s", s.config.Proxy.Stratum.Listen) |
|
var accept = make(chan int, s.config.Proxy.Stratum.MaxConn) |
|
n := 0 |
|
|
|
for { |
|
conn, err := server.Accept() |
|
if err != nil { |
|
continue |
|
} |
|
setKeepAlive(conn) |
|
|
|
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) |
|
|
|
if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) { |
|
conn.Close() |
|
continue |
|
} |
|
n += 1 |
|
cs := &Session{conn: conn, ip: ip} |
|
|
|
accept <- n |
|
go func(cs *Session) { |
|
err = s.handleTCPClient(cs) |
|
if err != nil || cs.lastErr != nil { |
|
s.removeSession(cs) |
|
conn.Close() |
|
} |
|
<-accept |
|
}(cs) |
|
} |
|
} |
|
|
|
func (s *ProxyServer) handleTCPClient(cs *Session) error { |
|
cs.enc = json.NewEncoder(cs.conn) |
|
connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize) |
|
s.setDeadline(cs.conn) |
|
for { |
|
data, isPrefix, err := connbuff.ReadLine() |
|
if isPrefix { |
|
log.Printf("Socket flood detected from %s", cs.ip) |
|
s.policy.BanClient(cs.ip) |
|
return err |
|
} else if err == io.EOF { |
|
log.Printf("Client %s disconnected", cs.ip) |
|
s.removeSession(cs) |
|
break |
|
} else if err != nil { |
|
log.Printf("Error reading from socket: %v", err) |
|
return err |
|
} |
|
|
|
if len(data) > 1 { |
|
var req StratumReq |
|
err = json.Unmarshal(data, &req) |
|
if err != nil { |
|
s.policy.ApplyMalformedPolicy(cs.ip) |
|
log.Printf("Malformed stratum request from %s: %v", cs.ip, err) |
|
return err |
|
} |
|
s.setDeadline(cs.conn) |
|
err = cs.handleTCPMessage(s, &req) |
|
if err != nil { |
|
return err |
|
} |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { |
|
// Handle RPC methods |
|
switch req.Method { |
|
case "eth_submitLogin": |
|
var params []string |
|
err := json.Unmarshal(req.Params, ¶ms) |
|
if err != nil { |
|
log.Println("Malformed stratum request params from", cs.ip) |
|
return err |
|
} |
|
reply, errReply := s.handleLoginRPC(cs, params, req.Worker) |
|
if errReply != nil { |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
return cs.sendTCPResult(req.Id, reply) |
|
case "eth_getWork": |
|
reply, errReply := s.handleGetWorkRPC(cs) |
|
if errReply != nil { |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
return cs.sendTCPResult(req.Id, &reply) |
|
case "eth_submitWork": |
|
var params []string |
|
err := json.Unmarshal(req.Params, ¶ms) |
|
if err != nil { |
|
log.Println("Malformed stratum request params from", cs.ip) |
|
return err |
|
} |
|
reply, errReply := s.handleTCPSubmitRPC(cs, req.Worker, params) |
|
if errReply != nil { |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
return cs.sendTCPResult(req.Id, &reply) |
|
case "eth_submitHashrate": |
|
return cs.sendTCPResult(req.Id, true) |
|
default: |
|
errReply := s.handleUnknownRPC(cs, req.Method) |
|
return cs.sendTCPError(req.Id, errReply) |
|
} |
|
} |
|
|
|
func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
|
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (cs *Session) pushNewJob(result interface{}) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
// FIXME: Temporarily add ID for Claymore compliance |
|
message := JSONPushMessage{Version: "2.0", Result: result, Id: 0} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error { |
|
cs.Lock() |
|
defer cs.Unlock() |
|
|
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} |
|
err := cs.enc.Encode(&message) |
|
if err != nil { |
|
return err |
|
} |
|
return errors.New(reply.Message) |
|
} |
|
|
|
func (self *ProxyServer) setDeadline(conn net.Conn) { |
|
conn.SetDeadline(time.Now().Add(self.timeout)) |
|
} |
|
|
|
func (s *ProxyServer) registerSession(cs *Session) { |
|
s.sessionsMu.Lock() |
|
defer s.sessionsMu.Unlock() |
|
s.sessions[cs] = struct{}{} |
|
} |
|
|
|
func (s *ProxyServer) removeSession(cs *Session) { |
|
s.sessionsMu.Lock() |
|
defer s.sessionsMu.Unlock() |
|
delete(s.sessions, cs) |
|
} |
|
|
|
func (s *ProxyServer) broadcastNewJobs() { |
|
t := s.currentBlockTemplate() |
|
if t == nil || len(t.Header) == 0 || s.isSick() { |
|
return |
|
} |
|
reply := []string{t.Header, t.Seed, s.diff} |
|
|
|
s.sessionsMu.RLock() |
|
defer s.sessionsMu.RUnlock() |
|
|
|
count := len(s.sessions) |
|
log.Printf("Broadcasting new job to %v stratum miners", count) |
|
|
|
s.backend.NumberStratumWorker(count) |
|
start := time.Now() |
|
bcast := make(chan int, 1024) |
|
n := 0 |
|
|
|
for m, _ := range s.sessions { |
|
n++ |
|
bcast <- n |
|
|
|
go func(cs *Session) { |
|
err := cs.pushNewJob(&reply) |
|
<-bcast |
|
if err != nil { |
|
log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err) |
|
s.removeSession(cs) |
|
} else { |
|
s.setDeadline(cs.conn) |
|
} |
|
}(m) |
|
} |
|
log.Printf("Jobs broadcast finished %s", time.Since(start)) |
|
}
|
|
|