You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
339 lines
8.6 KiB
339 lines
8.6 KiB
package payouts |
|
|
|
import ( |
|
"fmt" |
|
"log" |
|
"math/big" |
|
"os" |
|
"os/exec" |
|
"strconv" |
|
"sync" |
|
"time" |
|
|
|
"github.com/ethereum/go-ethereum/common/hexutil" |
|
|
|
"github.com/yuriy0803/open-etc-pool-friends/rpc" |
|
"github.com/yuriy0803/open-etc-pool-friends/storage" |
|
"github.com/yuriy0803/open-etc-pool-friends/util" |
|
) |
|
|
|
const txCheckInterval = 5 * time.Second |
|
|
|
type PayoutsConfig struct { |
|
Enabled bool `json:"enabled"` |
|
RequirePeers int64 `json:"requirePeers"` |
|
Interval string `json:"interval"` |
|
Daemon string `json:"daemon"` |
|
Timeout string `json:"timeout"` |
|
Address string `json:"address"` |
|
Gas string `json:"gas"` |
|
GasPrice string `json:"gasPrice"` |
|
AutoGas bool `json:"autoGas"` |
|
// In Shannon |
|
Threshold int64 `json:"threshold"` |
|
BgSave bool `json:"bgsave"` |
|
ConcurrentTx int `json:"concurrentTx"` |
|
} |
|
|
|
func (self PayoutsConfig) GasHex() string { |
|
x := util.String2Big(self.Gas) |
|
return hexutil.EncodeBig(x) |
|
} |
|
|
|
func (self PayoutsConfig) GasPriceHex() string { |
|
x := util.String2Big(self.GasPrice) |
|
return hexutil.EncodeBig(x) |
|
} |
|
|
|
type PayoutsProcessor struct { |
|
config *PayoutsConfig |
|
backend *storage.RedisClient |
|
rpc *rpc.RPCClient |
|
halt bool |
|
lastFail error |
|
} |
|
|
|
func NewPayoutsProcessor(cfg *PayoutsConfig, backend *storage.RedisClient) *PayoutsProcessor { |
|
u := &PayoutsProcessor{config: cfg, backend: backend} |
|
u.rpc = rpc.NewRPCClient("PayoutsProcessor", cfg.Daemon, cfg.Timeout) |
|
return u |
|
} |
|
|
|
func (u *PayoutsProcessor) Start() { |
|
log.Println("Starting payouts") |
|
|
|
if u.mustResolvePayout() { |
|
log.Println("Running with env RESOLVE_PAYOUT=1, now trying to resolve locked payouts") |
|
u.resolvePayouts() |
|
log.Println("Now you have to restart payouts module with RESOLVE_PAYOUT=0 for normal run") |
|
return |
|
} |
|
|
|
intv := util.MustParseDuration(u.config.Interval) |
|
timer := time.NewTimer(intv) |
|
log.Printf("Set payouts interval to %v", intv) |
|
|
|
payments := u.backend.GetPendingPayments() |
|
if len(payments) > 0 { |
|
log.Printf("Previous payout failed, you have to resolve it. List of failed payments:\n %v", |
|
formatPendingPayments(payments)) |
|
return |
|
} |
|
|
|
locked, err := u.backend.IsPayoutsLocked() |
|
if err != nil { |
|
log.Println("Unable to start payouts:", err) |
|
return |
|
} |
|
if locked { |
|
log.Println("Unable to start payouts because they are locked") |
|
return |
|
} |
|
|
|
// Immediately process payouts after start |
|
u.process() |
|
timer.Reset(intv) |
|
|
|
go func() { |
|
for { |
|
select { |
|
case <-timer.C: |
|
u.process() |
|
timer.Reset(intv) |
|
} |
|
} |
|
}() |
|
} |
|
|
|
func (u *PayoutsProcessor) process() { |
|
if u.halt { |
|
log.Println("Payments suspended due to last critical error:", u.lastFail) |
|
os.Exit(1) |
|
return |
|
} |
|
mustPay := 0 |
|
minersPaid := 0 |
|
totalAmount := big.NewInt(0) |
|
payees, err := u.backend.GetPayees() |
|
if err != nil { |
|
log.Println("Error while retrieving payees from backend:", err) |
|
return |
|
} |
|
|
|
waitingCount := 0 |
|
var wg sync.WaitGroup |
|
|
|
for _, login := range payees { |
|
amount, _ := u.backend.GetBalance(login) |
|
amountInShannon := big.NewInt(amount) |
|
ptresh, _ := u.backend.GetTreshold(login) |
|
if ptresh <= 10 { |
|
ptresh = u.config.Threshold |
|
} |
|
|
|
// Shannon^2 = Wei |
|
amountInWei := new(big.Int).Mul(amountInShannon, util.Shannon) |
|
|
|
if !u.reachedThreshold(amountInShannon, ptresh) { |
|
continue |
|
} |
|
mustPay++ |
|
|
|
// Require active peers before processing |
|
if !u.checkPeers() { |
|
break |
|
} |
|
// Require unlocked account |
|
if !u.isUnlockedAccount() { |
|
break |
|
} |
|
|
|
// Check if we have enough funds |
|
poolBalance, err := u.rpc.GetBalance(u.config.Address) |
|
if err != nil { |
|
u.halt = true |
|
u.lastFail = err |
|
break |
|
} |
|
if poolBalance.Cmp(amountInWei) < 0 { |
|
err := fmt.Errorf("Not enough balance for payment, need %s Wei, pool has %s Wei", |
|
amountInWei.String(), poolBalance.String()) |
|
u.halt = true |
|
u.lastFail = err |
|
break |
|
} |
|
|
|
// Lock payments for current payout |
|
err = u.backend.LockPayouts(login, amount) |
|
if err != nil { |
|
log.Printf("Failed to lock payment for %s: %v", login, err) |
|
u.halt = true |
|
u.lastFail = err |
|
break |
|
} |
|
log.Printf("Locked payment for %s, %v Shannon", login, amount) |
|
|
|
// Debit miner's balance and update stats |
|
err = u.backend.UpdateBalance(login, amount) |
|
if err != nil { |
|
log.Printf("Failed to update balance for %s, %v Shannon: %v", login, amount, err) |
|
u.halt = true |
|
u.lastFail = err |
|
break |
|
} |
|
|
|
value := hexutil.EncodeBig(amountInWei) |
|
txHash, err := u.rpc.SendTransaction(u.config.Address, login, u.config.GasHex(), u.config.GasPriceHex(), value, u.config.AutoGas) |
|
if err != nil { |
|
log.Printf("Failed to send payment to %s, %v Shannon: %v. Check outgoing tx for %s in block explorer and docs/PAYOUTS.md", |
|
login, amount, err, login) |
|
u.halt = true |
|
u.lastFail = err |
|
break |
|
} |
|
|
|
if postCommand, present := os.LookupEnv("POST_PAYOUT_HOOK"); present { |
|
go func(postCommand string, login string, value string) { |
|
out, err := exec.Command(postCommand, login, value).CombinedOutput() |
|
if err != nil { |
|
log.Printf("WARNING: Error running post payout hook: %s", err.Error()) |
|
} |
|
log.Printf("Running post payout hook with result: %s", out) |
|
}(postCommand, login, value) |
|
} |
|
|
|
// Log transaction hash |
|
err = u.backend.WritePayment(login, txHash, amount) |
|
if err != nil { |
|
log.Printf("Failed to log payment data for %s, %v Shannon, tx: %s: %v", login, amount, txHash, err) |
|
u.halt = true |
|
u.lastFail = err |
|
break |
|
} |
|
|
|
minersPaid++ |
|
totalAmount.Add(totalAmount, big.NewInt(amount)) |
|
log.Printf("Paid %v Shannon to %v, TxHash: %v", amount, login, txHash) |
|
|
|
wg.Add(1) |
|
waitingCount++ |
|
go func(txHash string, login string, wg *sync.WaitGroup) { |
|
// Wait for TX confirmation before further payouts |
|
for { |
|
log.Printf("Waiting for tx confirmation: %v", txHash) |
|
time.Sleep(txCheckInterval) |
|
receipt, err := u.rpc.GetTxReceipt(txHash) |
|
if err != nil { |
|
log.Printf("Failed to get tx receipt for %v: %v", txHash, err) |
|
continue |
|
} |
|
// Tx has been mined |
|
if receipt != nil && receipt.Confirmed() { |
|
if receipt.Successful() { |
|
log.Printf("Payout tx successful for %s: %s", login, txHash) |
|
} else { |
|
log.Printf("Payout tx failed for %s: %s. Address contract throws on incoming tx.", login, txHash) |
|
} |
|
break |
|
} |
|
} |
|
wg.Done() |
|
}(txHash, login, &wg) |
|
|
|
if waitingCount > u.config.ConcurrentTx { |
|
wg.Wait() |
|
waitingCount = 0 |
|
} |
|
} |
|
|
|
wg.Wait() |
|
waitingCount = 0 |
|
|
|
if mustPay > 0 { |
|
log.Printf("Paid total %v Shannon to %v of %v payees", totalAmount, minersPaid, mustPay) |
|
} else { |
|
log.Println("No payees that have reached payout threshold") |
|
} |
|
|
|
// Save redis state to disk |
|
if minersPaid > 0 && u.config.BgSave { |
|
u.bgSave() |
|
} |
|
} |
|
|
|
func (self PayoutsProcessor) isUnlockedAccount() bool { |
|
_, err := self.rpc.Sign(self.config.Address, "0x0") |
|
if err != nil { |
|
log.Println("Unable to process payouts:", err) |
|
return false |
|
} |
|
return true |
|
} |
|
|
|
func (self PayoutsProcessor) checkPeers() bool { |
|
n, err := self.rpc.GetPeerCount() |
|
if err != nil { |
|
log.Println("Unable to start payouts, failed to retrieve number of peers from node:", err) |
|
return false |
|
} |
|
if n < self.config.RequirePeers { |
|
log.Println("Unable to start payouts, number of peers on a node is less than required", self.config.RequirePeers) |
|
return false |
|
} |
|
return true |
|
} |
|
|
|
func (self PayoutsProcessor) reachedThreshold(amount *big.Int, threshold int64) bool { |
|
return big.NewInt(threshold).Cmp(amount) < 0 |
|
} |
|
|
|
func formatPendingPayments(list []*storage.PendingPayment) string { |
|
var s string |
|
for _, v := range list { |
|
s += fmt.Sprintf("\tAddress: %s, Amount: %v Shannon, %v\n", v.Address, v.Amount, time.Unix(v.Timestamp, 0)) |
|
} |
|
return s |
|
} |
|
|
|
func (self PayoutsProcessor) bgSave() { |
|
result, err := self.backend.BgSave() |
|
if err != nil { |
|
log.Println("Failed to perform BGSAVE on backend:", err) |
|
return |
|
} |
|
log.Println("Saving backend state to disk:", result) |
|
} |
|
|
|
func (self PayoutsProcessor) resolvePayouts() { |
|
payments := self.backend.GetPendingPayments() |
|
|
|
if len(payments) > 0 { |
|
log.Printf("Will credit back following balances:\n%s", formatPendingPayments(payments)) |
|
|
|
for _, v := range payments { |
|
err := self.backend.RollbackBalance(v.Address, v.Amount) |
|
if err != nil { |
|
log.Printf("Failed to credit %v Shannon back to %s, error is: %v", v.Amount, v.Address, err) |
|
return |
|
} |
|
log.Printf("Credited %v Shannon back to %s", v.Amount, v.Address) |
|
} |
|
err := self.backend.UnlockPayouts() |
|
if err != nil { |
|
log.Println("Failed to unlock payouts:", err) |
|
return |
|
} |
|
} else { |
|
log.Println("No pending payments to resolve") |
|
} |
|
|
|
if self.config.BgSave { |
|
self.bgSave() |
|
} |
|
log.Println("Payouts unlocked") |
|
} |
|
|
|
func (self PayoutsProcessor) mustResolvePayout() bool { |
|
v, _ := strconv.ParseBool(os.Getenv("RESOLVE_PAYOUT")) |
|
return v |
|
}
|
|
|