migration of open-etc-friends-pool for use with Etica/EGAZ
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

339 lines
8.6 KiB

package payouts
import (
"fmt"
"log"
"math/big"
"os"
"os/exec"
"strconv"
"sync"
"time"
"github.com/yuriy0803/core-geth1/common/hexutil"
"github.com/yuriy0803/open-etc-pool-friends/rpc"
"github.com/yuriy0803/open-etc-pool-friends/storage"
"github.com/yuriy0803/open-etc-pool-friends/util"
)
const txCheckInterval = 5 * time.Second
type PayoutsConfig struct {
Enabled bool `json:"enabled"`
RequirePeers int64 `json:"requirePeers"`
Interval string `json:"interval"`
Daemon string `json:"daemon"`
Timeout string `json:"timeout"`
Address string `json:"address"`
Gas string `json:"gas"`
GasPrice string `json:"gasPrice"`
AutoGas bool `json:"autoGas"`
// In Shannon
Threshold int64 `json:"threshold"`
BgSave bool `json:"bgsave"`
ConcurrentTx int `json:"concurrentTx"`
}
func (self PayoutsConfig) GasHex() string {
x := util.String2Big(self.Gas)
return hexutil.EncodeBig(x)
}
func (self PayoutsConfig) GasPriceHex() string {
x := util.String2Big(self.GasPrice)
return hexutil.EncodeBig(x)
}
type PayoutsProcessor struct {
config *PayoutsConfig
backend *storage.RedisClient
rpc *rpc.RPCClient
halt bool
lastFail error
}
func NewPayoutsProcessor(cfg *PayoutsConfig, backend *storage.RedisClient) *PayoutsProcessor {
u := &PayoutsProcessor{config: cfg, backend: backend}
u.rpc = rpc.NewRPCClient("PayoutsProcessor", cfg.Daemon, cfg.Timeout)
return u
}
func (u *PayoutsProcessor) Start() {
log.Println("Starting payouts")
if u.mustResolvePayout() {
log.Println("Running with env RESOLVE_PAYOUT=1, now trying to resolve locked payouts")
u.resolvePayouts()
log.Println("Now you have to restart payouts module with RESOLVE_PAYOUT=0 for normal run")
return
}
intv := util.MustParseDuration(u.config.Interval)
timer := time.NewTimer(intv)
log.Printf("Set payouts interval to %v", intv)
payments := u.backend.GetPendingPayments()
if len(payments) > 0 {
log.Printf("Previous payout failed, you have to resolve it. List of failed payments:\n %v",
formatPendingPayments(payments))
return
}
locked, err := u.backend.IsPayoutsLocked()
if err != nil {
log.Println("Unable to start payouts:", err)
return
}
if locked {
log.Println("Unable to start payouts because they are locked")
return
}
// Immediately process payouts after start
u.process()
timer.Reset(intv)
go func() {
for {
select {
case <-timer.C:
u.process()
timer.Reset(intv)
}
}
}()
}
func (u *PayoutsProcessor) process() {
if u.halt {
log.Println("Payments suspended due to last critical error:", u.lastFail)
os.Exit(1)
return
}
mustPay := 0
minersPaid := 0
totalAmount := big.NewInt(0)
payees, err := u.backend.GetPayees()
if err != nil {
log.Println("Error while retrieving payees from backend:", err)
return
}
waitingCount := 0
var wg sync.WaitGroup
for _, login := range payees {
amount, _ := u.backend.GetBalance(login)
amountInShannon := big.NewInt(amount)
ptresh, _ := u.backend.GetThreshold(login)
if ptresh <= 10 {
ptresh = u.config.Threshold
}
// Shannon^2 = Wei
amountInWei := new(big.Int).Mul(amountInShannon, util.Shannon)
if !u.reachedThreshold(amountInShannon, ptresh) {
continue
}
mustPay++
// Require active peers before processing
if !u.checkPeers() {
break
}
// Require unlocked account
if !u.isUnlockedAccount() {
break
}
// Check if we have enough funds
poolBalance, err := u.rpc.GetBalance(u.config.Address)
if err != nil {
u.halt = true
u.lastFail = err
break
}
if poolBalance.Cmp(amountInWei) < 0 {
err := fmt.Errorf("Not enough balance for payment, need %s Wei, pool has %s Wei",
amountInWei.String(), poolBalance.String())
u.halt = true
u.lastFail = err
break
}
// Lock payments for current payout
err = u.backend.LockPayouts(login, amount)
if err != nil {
log.Printf("Failed to lock payment for %s: %v", login, err)
u.halt = true
u.lastFail = err
break
}
log.Printf("Locked payment for %s, %v Shannon", login, amount)
// Debit miner's balance and update stats
err = u.backend.UpdateBalance(login, amount)
if err != nil {
log.Printf("Failed to update balance for %s, %v Shannon: %v", login, amount, err)
u.halt = true
u.lastFail = err
break
}
value := hexutil.EncodeBig(amountInWei)
txHash, err := u.rpc.SendTransaction(u.config.Address, login, u.config.GasHex(), u.config.GasPriceHex(), value, u.config.AutoGas)
if err != nil {
log.Printf("Failed to send payment to %s, %v Shannon: %v. Check outgoing tx for %s in block explorer and docs/PAYOUTS.md",
login, amount, err, login)
u.halt = true
u.lastFail = err
break
}
if postCommand, present := os.LookupEnv("POST_PAYOUT_HOOK"); present {
go func(postCommand string, login string, value string) {
out, err := exec.Command(postCommand, login, value).CombinedOutput()
if err != nil {
log.Printf("WARNING: Error running post payout hook: %s", err.Error())
}
log.Printf("Running post payout hook with result: %s", out)
}(postCommand, login, value)
}
// Log transaction hash
err = u.backend.WritePayment(login, txHash, amount)
if err != nil {
log.Printf("Failed to log payment data for %s, %v Shannon, tx: %s: %v", login, amount, txHash, err)
u.halt = true
u.lastFail = err
break
}
minersPaid++
totalAmount.Add(totalAmount, big.NewInt(amount))
log.Printf("Paid %v Shannon to %v, TxHash: %v", amount, login, txHash)
wg.Add(1)
waitingCount++
go func(txHash string, login string, wg *sync.WaitGroup) {
// Wait for TX confirmation before further payouts
for {
log.Printf("Waiting for tx confirmation: %v", txHash)
time.Sleep(txCheckInterval)
receipt, err := u.rpc.GetTxReceipt(txHash)
if err != nil {
log.Printf("Failed to get tx receipt for %v: %v", txHash, err)
continue
}
// Tx has been mined
if receipt != nil && receipt.Confirmed() {
if receipt.Successful() {
log.Printf("Payout tx successful for %s: %s", login, txHash)
} else {
log.Printf("Payout tx failed for %s: %s. Address contract throws on incoming tx.", login, txHash)
}
break
}
}
wg.Done()
}(txHash, login, &wg)
if waitingCount > u.config.ConcurrentTx {
wg.Wait()
waitingCount = 0
}
}
wg.Wait()
waitingCount = 0
if mustPay > 0 {
log.Printf("Paid total %v Shannon to %v of %v payees", totalAmount, minersPaid, mustPay)
} else {
log.Println("No payees that have reached payout threshold")
}
// Save redis state to disk
if minersPaid > 0 && u.config.BgSave {
u.bgSave()
}
}
func (self PayoutsProcessor) isUnlockedAccount() bool {
_, err := self.rpc.Sign(self.config.Address, "0x0")
if err != nil {
log.Println("Unable to process payouts:", err)
return false
}
return true
}
func (self PayoutsProcessor) checkPeers() bool {
n, err := self.rpc.GetPeerCount()
if err != nil {
log.Println("Unable to start payouts, failed to retrieve number of peers from node:", err)
return false
}
if n < self.config.RequirePeers {
log.Println("Unable to start payouts, number of peers on a node is less than required", self.config.RequirePeers)
return false
}
return true
}
func (self PayoutsProcessor) reachedThreshold(amount *big.Int, threshold int64) bool {
return big.NewInt(threshold).Cmp(amount) < 0
}
func formatPendingPayments(list []*storage.PendingPayment) string {
var s string
for _, v := range list {
s += fmt.Sprintf("\tAddress: %s, Amount: %v Shannon, %v\n", v.Address, v.Amount, time.Unix(v.Timestamp, 0))
}
return s
}
func (self PayoutsProcessor) bgSave() {
result, err := self.backend.BgSave()
if err != nil {
log.Println("Failed to perform BGSAVE on backend:", err)
return
}
log.Println("Saving backend state to disk:", result)
}
func (self PayoutsProcessor) resolvePayouts() {
payments := self.backend.GetPendingPayments()
if len(payments) > 0 {
log.Printf("Will credit back following balances:\n%s", formatPendingPayments(payments))
for _, v := range payments {
err := self.backend.RollbackBalance(v.Address, v.Amount)
if err != nil {
log.Printf("Failed to credit %v Shannon back to %s, error is: %v", v.Amount, v.Address, err)
return
}
log.Printf("Credited %v Shannon back to %s", v.Amount, v.Address)
}
err := self.backend.UnlockPayouts()
if err != nil {
log.Println("Failed to unlock payouts:", err)
return
}
} else {
log.Println("No pending payments to resolve")
}
if self.config.BgSave {
self.bgSave()
}
log.Println("Payouts unlocked")
}
func (self PayoutsProcessor) mustResolvePayout() bool {
v, _ := strconv.ParseBool(os.Getenv("RESOLVE_PAYOUT"))
return v
}