package proxy import ( "bufio" "crypto/tls" "encoding/json" "errors" "io" "log" "net" "time" "github.com/yuriy0803/open-etc-pool-friends/util" ) const ( MaxReqSize = 1024 ) func (s *ProxyServer) ListenTCP() { s.timeout = util.MustParseDuration(s.config.Proxy.Stratum.Timeout) var err error var server net.Listener setKeepAlive := func(net.Conn) {} if s.config.Proxy.Stratum.TLS { var cert tls.Certificate cert, err = tls.LoadX509KeyPair(s.config.Proxy.Stratum.CertFile, s.config.Proxy.Stratum.KeyFile) if err != nil { log.Fatalln("Error loading certificate:", err) } tlsCfg := &tls.Config{Certificates: []tls.Certificate{cert}} server, err = tls.Listen("tcp4", s.config.Proxy.Stratum.Listen, tlsCfg) } else { server, err = net.Listen("tcp4", s.config.Proxy.Stratum.Listen) setKeepAlive = func(conn net.Conn) { conn.(*net.TCPConn).SetKeepAlive(true) } } if err != nil { log.Fatalf("Error: %v", err) } defer server.Close() log.Printf("Stratum listening on %s", s.config.Proxy.Stratum.Listen) var accept = make(chan int, s.config.Proxy.Stratum.MaxConn) n := 0 for { conn, err := server.Accept() if err != nil { continue } setKeepAlive(conn) ip, _, _ := net.SplitHostPort(conn.RemoteAddr().String()) if s.policy.IsBanned(ip) || !s.policy.ApplyLimitPolicy(ip) { conn.Close() continue } n += 1 cs := &Session{conn: conn, ip: ip} accept <- n go func(cs *Session) { err = s.handleTCPClient(cs) if err != nil || cs.lastErr != nil { s.removeSession(cs) conn.Close() } <-accept }(cs) } } func (s *ProxyServer) handleTCPClient(cs *Session) error { cs.enc = json.NewEncoder(cs.conn) connbuff := bufio.NewReaderSize(cs.conn, MaxReqSize) s.setDeadline(cs.conn) for { data, isPrefix, err := connbuff.ReadLine() if isPrefix { log.Printf("Socket flood detected from %s", cs.ip) s.policy.BanClient(cs.ip) return err } else if err == io.EOF { log.Printf("Client %s disconnected", cs.ip) s.removeSession(cs) break } else if err != nil { log.Printf("Error reading from socket: %v", err) return err } if len(data) > 1 { var req StratumReq err = json.Unmarshal(data, &req) if err != nil { s.policy.ApplyMalformedPolicy(cs.ip) log.Printf("Malformed stratum request from %s: %v", cs.ip, err) return err } s.setDeadline(cs.conn) err = cs.handleTCPMessage(s, &req) if err != nil { return err } } } return nil } func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { // Handle RPC methods switch req.Method { case "eth_submitLogin": var params []string err := json.Unmarshal(req.Params, ¶ms) if err != nil { log.Println("Malformed stratum request params from", cs.ip) return err } reply, errReply := s.handleLoginRPC(cs, params, req.Worker) if errReply != nil { return cs.sendTCPError(req.Id, errReply) } return cs.sendTCPResult(req.Id, reply) case "eth_getWork": reply, errReply := s.handleGetWorkRPC(cs) if errReply != nil { return cs.sendTCPError(req.Id, errReply) } return cs.sendTCPResult(req.Id, &reply) case "eth_submitWork": var params []string err := json.Unmarshal(req.Params, ¶ms) if err != nil { log.Println("Malformed stratum request params from", cs.ip) return err } reply, errReply := s.handleTCPSubmitRPC(cs, req.Worker, params) if errReply != nil { return cs.sendTCPError(req.Id, errReply) } return cs.sendTCPResult(req.Id, &reply) case "eth_submitHashrate": return cs.sendTCPResult(req.Id, true) default: errReply := s.handleUnknownRPC(cs, req.Method) return cs.sendTCPError(req.Id, errReply) } } func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error { cs.Lock() defer cs.Unlock() message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} return cs.enc.Encode(&message) } func (cs *Session) pushNewJob(result interface{}) error { cs.Lock() defer cs.Unlock() // FIXME: Temporarily add ID for Claymore compliance message := JSONPushMessage{Version: "2.0", Result: result, Id: 0} return cs.enc.Encode(&message) } func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error { cs.Lock() defer cs.Unlock() message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} err := cs.enc.Encode(&message) if err != nil { return err } return errors.New(reply.Message) } func (self *ProxyServer) setDeadline(conn net.Conn) { conn.SetDeadline(time.Now().Add(self.timeout)) } func (s *ProxyServer) registerSession(cs *Session) { s.sessionsMu.Lock() defer s.sessionsMu.Unlock() s.sessions[cs] = struct{}{} } func (s *ProxyServer) removeSession(cs *Session) { s.sessionsMu.Lock() defer s.sessionsMu.Unlock() delete(s.sessions, cs) } func (s *ProxyServer) broadcastNewJobs() { t := s.currentBlockTemplate() if t == nil || len(t.Header) == 0 || s.isSick() { return } reply := []string{t.Header, t.Seed, s.diff} s.sessionsMu.RLock() defer s.sessionsMu.RUnlock() count := len(s.sessions) log.Printf("Broadcasting new job to %v stratum miners", count) s.backend.NumberStratumWorker(count) start := time.Now() bcast := make(chan int, 1024) n := 0 for m, _ := range s.sessions { n++ bcast <- n go func(cs *Session) { err := cs.pushNewJob(&reply) <-bcast if err != nil { log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err) s.removeSession(cs) } else { s.setDeadline(cs.conn) } }(m) } log.Printf("Jobs broadcast finished %s", time.Since(start)) }