migration of open-etc-friends-pool for use with Etica/EGAZ
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

234 lines
5.5 KiB

package proxy
import (
"bufio"
"crypto/tls"
"encoding/json"
"errors"
"io"
"log"
"net"
"time"
"github.com/yuriy0803/open-etc-pool-friends/util"
)
const (
MaxReqSize = 1024
)
func (s *ProxyServer) ListenTCP() {
s.timeout = util.MustParseDuration(s.config.Proxy.Stratum.Timeout)
var err error
var server net.Listener
setKeepAlive := func(net.Conn) {}
if s.config.Proxy.Stratum.TLS {
var cert tls.Certificate
cert, err = tls.LoadX509KeyPair(s.config.Proxy.Stratum.CertFile, s.config.Proxy.Stratum.KeyFile)
if err != nil {
log.Fatalln("Error loading certificate:", err)
}
tlsCfg := &tls.Config{Certificates: []tls.Certificate{cert}}
server, err = tls.Listen("tcp4", s.config.Proxy.Stratum.Listen, tlsCfg)
} else {
server, err = net.Listen("tcp4", s.config.Proxy.Stratum.Listen)
setKeepAlive = func(conn net.Conn) {
conn.(*net.TCPConn).SetKeepAlive(true)
}
}
if err != nil {
log.Fatalf("Error: %v", err)
}
defer server.Close()
log.Printf("Stratum listening on %s", s.config.Proxy.Stratum.Listen)
var accept = make(chan int, s.config.Proxy.Stratum.MaxConn)
n := 0
for {
conn, err := server.Accept()
if err != nil {
continue
}
setKeepAlive(conn)
ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String())
if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) {
conn.Close()
continue
}
n += 1
cs := &Session{conn: conn, ip: ip}
accept <- n
go func(cs *Session) {
err = s.handleTCPClient(cs)
if err != nil || cs.lastErr != nil {
s.removeSession(cs)
conn.Close()
}
<-accept
}(cs)
}
}
func (s *ProxyServer) handleTCPClient(cs *Session) error {
cs.enc = json.NewEncoder(cs.conn)
connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize)
s.setDeadline(cs.conn)
for {
data, isPrefix, err := connbuff.ReadLine()
if isPrefix {
log.Printf("Socket flood detected from %s", cs.ip)
s.policy.BanClient(cs.ip)
return err
} else if err == io.EOF {
log.Printf("Client %s disconnected", cs.ip)
s.removeSession(cs)
break
} else if err != nil {
log.Printf("Error reading from socket: %v", err)
return err
}
if len(data) > 1 {
var req StratumReq
err = json.Unmarshal(data, &req)
if err != nil {
s.policy.ApplyMalformedPolicy(cs.ip)
log.Printf("Malformed stratum request from %s: %v", cs.ip, err)
return err
}
s.setDeadline(cs.conn)
err = cs.handleTCPMessage(s, &req)
if err != nil {
return err
}
}
}
return nil
}
func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error {
// Handle RPC methods
switch req.Method {
case "eth_submitLogin":
var params []string
err := json.Unmarshal(req.Params, &params)
if err != nil {
log.Println("Malformed stratum request params from", cs.ip)
return err
}
reply, errReply := s.handleLoginRPC(cs, params, req.Worker)
if errReply != nil {
return cs.sendTCPError(req.Id, errReply)
}
return cs.sendTCPResult(req.Id, reply)
case "eth_getWork":
reply, errReply := s.handleGetWorkRPC(cs)
if errReply != nil {
return cs.sendTCPError(req.Id, errReply)
}
return cs.sendTCPResult(req.Id, &reply)
case "eth_submitWork":
var params []string
err := json.Unmarshal(req.Params, &params)
if err != nil {
log.Println("Malformed stratum request params from", cs.ip)
return err
}
reply, errReply := s.handleTCPSubmitRPC(cs, req.Worker, params)
if errReply != nil {
return cs.sendTCPError(req.Id, errReply)
}
return cs.sendTCPResult(req.Id, &reply)
case "eth_submitHashrate":
return cs.sendTCPResult(req.Id, true)
default:
errReply := s.handleUnknownRPC(cs, req.Method)
return cs.sendTCPError(req.Id, errReply)
}
}
func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error {
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result}
return cs.enc.Encode(&message)
}
func (cs *Session) pushNewJob(result interface{}) error {
cs.Lock()
defer cs.Unlock()
// FIXME: Temporarily add ID for Claymore compliance
message := JSONPushMessage{Version: "2.0", Result: result, Id: 0}
return cs.enc.Encode(&message)
}
func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error {
cs.Lock()
defer cs.Unlock()
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply}
err := cs.enc.Encode(&message)
if err != nil {
return err
}
return errors.New(reply.Message)
}
func (self *ProxyServer) setDeadline(conn net.Conn) {
conn.SetDeadline(time.Now().Add(self.timeout))
}
func (s *ProxyServer) registerSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
s.sessions[cs] = struct{}{}
}
func (s *ProxyServer) removeSession(cs *Session) {
s.sessionsMu.Lock()
defer s.sessionsMu.Unlock()
delete(s.sessions, cs)
}
func (s *ProxyServer) broadcastNewJobs() {
t := s.currentBlockTemplate()
if t == nil || len(t.Header) == 0 || s.isSick() {
return
}
reply := []string{t.Header, t.Seed, s.diff}
s.sessionsMu.RLock()
defer s.sessionsMu.RUnlock()
count := len(s.sessions)
log.Printf("Broadcasting new job to %v stratum miners", count)
s.backend.NumberStratumWorker(count)
start := time.Now()
bcast := make(chan int, 1024)
n := 0
for m, _ := range s.sessions {
n++
bcast <- n
go func(cs *Session) {
err := cs.pushNewJob(&reply)
<-bcast
if err != nil || cs.lastErr != nil {
log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err)
s.removeSession(cs)
} else {
s.setDeadline(cs.conn)
}
}(m)
}
log.Printf("Jobs broadcast finished %s", time.Since(start))
}