You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
363 lines
8.8 KiB
363 lines
8.8 KiB
package proxy |
|
|
|
import ( |
|
"encoding/json" |
|
"io" |
|
"log" |
|
"net" |
|
"net/http" |
|
"strconv" |
|
"strings" |
|
"sync" |
|
"sync/atomic" |
|
"time" |
|
|
|
"github.com/gorilla/mux" |
|
|
|
"github.com/yuriy0803/open-etc-pool-friends/policy" |
|
"github.com/yuriy0803/open-etc-pool-friends/rpc" |
|
"github.com/yuriy0803/open-etc-pool-friends/storage" |
|
"github.com/yuriy0803/open-etc-pool-friends/util" |
|
) |
|
|
|
type ProxyServer struct { |
|
config *Config |
|
blockTemplate atomic.Value |
|
upstream int32 |
|
upstreams []*rpc.RPCClient |
|
backend *storage.RedisClient |
|
diff string |
|
policy *policy.PolicyServer |
|
hashrateExpiration time.Duration |
|
failsCount int64 |
|
|
|
// Stratum |
|
sessionsMu sync.RWMutex |
|
sessions map[*Session]struct{} |
|
timeout time.Duration |
|
// Extranonce |
|
Extranonces map[string]bool |
|
} |
|
|
|
type staleJob struct { |
|
SeedHash string |
|
HeaderHash string |
|
} |
|
|
|
type Session struct { |
|
ip string |
|
enc *json.Encoder |
|
|
|
// Stratum |
|
sync.Mutex |
|
conn net.Conn |
|
login string |
|
worker string |
|
stratum int |
|
subscriptionID string |
|
JobDeatils jobDetails |
|
Extranonce string |
|
ExtranonceSub bool |
|
JobDetails jobDetails |
|
staleJobs map[string]staleJob |
|
staleJobIDs []string |
|
} |
|
|
|
type jobDetails struct { |
|
JobID string |
|
SeedHash string |
|
HeaderHash string |
|
Height string |
|
Epoch int64 |
|
} |
|
|
|
func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer { |
|
if len(cfg.Name) == 0 { |
|
log.Fatal("You must set instance name") |
|
} |
|
policy := policy.Start(&cfg.Proxy.Policy, backend) |
|
|
|
proxy := &ProxyServer{config: cfg, backend: backend, policy: policy} |
|
proxy.diff = util.GetTargetHex(cfg.Proxy.Difficulty) |
|
|
|
proxy.upstreams = make([]*rpc.RPCClient, len(cfg.Upstream)) |
|
for i, v := range cfg.Upstream { |
|
proxy.upstreams[i] = rpc.NewRPCClient(v.Name, v.Url, v.Timeout) |
|
log.Printf("Upstream: %s => %s", v.Name, v.Url) |
|
} |
|
log.Printf("Default upstream: %s => %s", proxy.rpc().Name, proxy.rpc().Url) |
|
|
|
if cfg.Proxy.Stratum.Enabled { |
|
proxy.sessions = make(map[*Session]struct{}) |
|
proxy.Extranonces = make(map[string]bool) |
|
go proxy.ListenTCP() |
|
} |
|
|
|
proxy.fetchBlockTemplate() |
|
|
|
proxy.hashrateExpiration = util.MustParseDuration(cfg.Proxy.HashrateExpiration) |
|
|
|
refreshIntv := util.MustParseDuration(cfg.Proxy.BlockRefreshInterval) |
|
refreshTimer := time.NewTimer(refreshIntv) |
|
log.Printf("Set block refresh every %v", refreshIntv) |
|
|
|
checkIntv := util.MustParseDuration(cfg.UpstreamCheckInterval) |
|
checkTimer := time.NewTimer(checkIntv) |
|
|
|
stateUpdateIntv := util.MustParseDuration(cfg.Proxy.StateUpdateInterval) |
|
stateUpdateTimer := time.NewTimer(stateUpdateIntv) |
|
|
|
go func() { |
|
for { |
|
select { |
|
case <-refreshTimer.C: |
|
proxy.fetchBlockTemplate() |
|
refreshTimer.Reset(refreshIntv) |
|
} |
|
} |
|
}() |
|
|
|
go func() { |
|
for { |
|
select { |
|
case <-checkTimer.C: |
|
proxy.checkUpstreams() |
|
checkTimer.Reset(checkIntv) |
|
} |
|
} |
|
}() |
|
|
|
go func() { |
|
for { |
|
select { |
|
case <-stateUpdateTimer.C: |
|
t := proxy.currentBlockTemplate() |
|
if t != nil { |
|
rpc := proxy.rpc() |
|
// get the latest block height |
|
height := int64(t.Height) - 1 |
|
block, _ := rpc.GetBlockByHeight(height) |
|
timestamp, _ := strconv.ParseInt(strings.Replace(block.Timestamp, "0x", "", -1), 16, 64) |
|
prev := height - 100 |
|
if prev < 0 { |
|
prev = 0 |
|
} |
|
n := height - prev |
|
if n > 0 { |
|
prevblock, err := rpc.GetBlockByHeight(prev) |
|
if err != nil || prevblock == nil { |
|
log.Fatalf("Error while retrieving block from node: %v", err) |
|
} else { |
|
prevtime, _ := strconv.ParseInt(strings.Replace(prevblock.Timestamp, "0x", "", -1), 16, 64) |
|
blocktime := float64(timestamp-prevtime) / float64(n) |
|
err = backend.WriteNodeState(cfg.Name, t.Height, t.Difficulty, blocktime) |
|
if err != nil { |
|
log.Printf("Failed to write node state to backend: %v", err) |
|
proxy.markSick() |
|
} else { |
|
proxy.markOk() |
|
} |
|
} |
|
} else { |
|
proxy.markSick() |
|
} |
|
} |
|
stateUpdateTimer.Reset(stateUpdateIntv) |
|
} |
|
} |
|
}() |
|
|
|
return proxy |
|
} |
|
|
|
func (s *ProxyServer) Start() { |
|
log.Printf("Starting proxy on %v", s.config.Proxy.Listen) |
|
r := mux.NewRouter() |
|
r.Handle("/{login:0x[0-9a-fA-F]{40}}/{id:[0-9a-zA-Z-_]{1,200}}", s) |
|
r.Handle("/{login:0x[0-9a-fA-F]{40}}", s) |
|
srv := &http.Server{ |
|
Addr: s.config.Proxy.Listen, |
|
Handler: r, |
|
MaxHeaderBytes: s.config.Proxy.LimitHeadersSize, |
|
} |
|
err := srv.ListenAndServe() |
|
if err != nil { |
|
log.Fatalf("Failed to start proxy: %v", err) |
|
} |
|
} |
|
|
|
func (s *ProxyServer) rpc() *rpc.RPCClient { |
|
i := atomic.LoadInt32(&s.upstream) |
|
return s.upstreams[i] |
|
} |
|
|
|
func (s *ProxyServer) checkUpstreams() { |
|
idx := atomic.LoadInt32(&s.upstream) |
|
current := s.upstreams[idx] |
|
if current.Check() { |
|
return |
|
} |
|
|
|
candidate := int32(0) |
|
backup := false |
|
|
|
for i, v := range s.upstreams { |
|
if v.Check() && !backup { |
|
candidate = int32(i) |
|
backup = true |
|
} |
|
} |
|
|
|
if s.upstream != candidate { |
|
log.Printf("Switching to %v upstream", s.upstreams[candidate].Name) |
|
atomic.StoreInt32(&s.upstream, candidate) |
|
} |
|
} |
|
|
|
func (s *ProxyServer) ServeHTTP(w http.ResponseWriter, r *http.Request) { |
|
if r.Method != "POST" { |
|
s.writeError(w, 405, "rpc: POST method required, received "+r.Method) |
|
return |
|
} |
|
ip := s.remoteAddr(r) |
|
if !s.policy.IsBanned(ip) { |
|
s.handleClient(w, r, ip) |
|
} |
|
} |
|
|
|
func (s *ProxyServer) remoteAddr(r *http.Request) string { |
|
if s.config.Proxy.BehindReverseProxy { |
|
ip := r.Header.Get("X-Forwarded-For") |
|
if len(ip) > 0 && net.ParseIP(ip) != nil { |
|
return ip |
|
} |
|
} |
|
ip, _, _ := net.SplitHostPort(r.RemoteAddr) |
|
return ip |
|
} |
|
|
|
func (s *ProxyServer) handleClient(w http.ResponseWriter, r *http.Request, ip string) { |
|
if r.ContentLength > s.config.Proxy.LimitBodySize { |
|
log.Printf("Socket flood from %s", ip) |
|
s.policy.ApplyMalformedPolicy(ip) |
|
http.Error(w, "Request too large", http.StatusExpectationFailed) |
|
return |
|
} |
|
r.Body = http.MaxBytesReader(w, r.Body, s.config.Proxy.LimitBodySize) |
|
defer r.Body.Close() |
|
|
|
cs := &Session{ip: ip, enc: json.NewEncoder(w)} |
|
dec := json.NewDecoder(r.Body) |
|
for { |
|
var req JSONRpcReq |
|
if err := dec.Decode(&req); err == io.EOF { |
|
break |
|
} else if err != nil { |
|
log.Printf("Malformed request from %v: %v", ip, err) |
|
s.policy.ApplyMalformedPolicy(ip) |
|
return |
|
} |
|
cs.handleMessage(s, r, &req) |
|
} |
|
} |
|
|
|
func (cs *Session) handleMessage(s *ProxyServer, r *http.Request, req *JSONRpcReq) { |
|
if req.Id == nil { |
|
log.Printf("Missing RPC id from %s", cs.ip) |
|
s.policy.ApplyMalformedPolicy(cs.ip) |
|
return |
|
} |
|
|
|
vars := mux.Vars(r) |
|
login := strings.ToLower(vars["login"]) |
|
|
|
if !util.IsValidHexAddress(login) { |
|
errReply := &ErrorReply{Code: -1, Message: "Invalid login"} |
|
cs.sendError(req.Id, errReply) |
|
return |
|
} |
|
if !s.policy.ApplyLoginPolicy(login, cs.ip) { |
|
errReply := &ErrorReply{Code: -1, Message: "You are blacklisted"} |
|
cs.sendError(req.Id, errReply) |
|
return |
|
} |
|
|
|
// Handle RPC methods |
|
switch req.Method { |
|
case "eth_getWork": |
|
reply, errReply := s.handleGetWorkRPC(cs) |
|
if errReply != nil { |
|
cs.sendError(req.Id, errReply) |
|
break |
|
} |
|
cs.sendResult(req.Id, &reply) |
|
case "eth_submitWork": |
|
if req.Params != nil { |
|
var params []string |
|
err := json.Unmarshal(req.Params, ¶ms) |
|
if err != nil { |
|
log.Printf("Unable to parse params from %v", cs.ip) |
|
s.policy.ApplyMalformedPolicy(cs.ip) |
|
break |
|
} |
|
reply, errReply := s.handleSubmitRPC(cs, login, vars["id"], params) |
|
if errReply != nil { |
|
cs.sendError(req.Id, errReply) |
|
break |
|
} |
|
cs.sendResult(req.Id, &reply) |
|
} else { |
|
s.policy.ApplyMalformedPolicy(cs.ip) |
|
errReply := &ErrorReply{Code: -1, Message: "Malformed request"} |
|
cs.sendError(req.Id, errReply) |
|
} |
|
case "eth_getBlockByNumber": |
|
reply := s.handleGetBlockByNumberRPC() |
|
cs.sendResult(req.Id, reply) |
|
case "eth_submitHashrate": |
|
cs.sendResult(req.Id, true) |
|
default: |
|
errReply := s.handleUnknownRPC(cs, req.Method) |
|
cs.sendError(req.Id, errReply) |
|
} |
|
} |
|
|
|
func (cs *Session) sendResult(id json.RawMessage, result interface{}) error { |
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: nil, Result: result} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (cs *Session) sendError(id json.RawMessage, reply *ErrorReply) error { |
|
message := JSONRpcResp{Id: id, Version: "2.0", Error: reply} |
|
return cs.enc.Encode(&message) |
|
} |
|
|
|
func (s *ProxyServer) writeError(w http.ResponseWriter, status int, msg string) { |
|
w.WriteHeader(status) |
|
w.Header().Set("Content-Type", "text/plain; charset=utf-8") |
|
} |
|
|
|
func (s *ProxyServer) currentBlockTemplate() *BlockTemplate { |
|
t := s.blockTemplate.Load() |
|
if t != nil { |
|
return t.(*BlockTemplate) |
|
} else { |
|
return nil |
|
} |
|
} |
|
|
|
func (s *ProxyServer) markSick() { |
|
atomic.AddInt64(&s.failsCount, 1) |
|
} |
|
|
|
func (s *ProxyServer) isSick() bool { |
|
x := atomic.LoadInt64(&s.failsCount) |
|
if s.config.Proxy.HealthCheck && x >= s.config.Proxy.MaxFails { |
|
return true |
|
} |
|
return false |
|
} |
|
|
|
func (s *ProxyServer) markOk() { |
|
atomic.StoreInt64(&s.failsCount, 0) |
|
}
|
|
|