mempoolhistogram.py (4089B)
1 2 import time 3 from collections import defaultdict 4 from datetime import datetime 5 from enum import Enum 6 7 from electrumpersonalserver.server.jsonrpc import JsonRpcError 8 9 def calc_histogram(mempool): 10 #algorithm copied from the relevant place in ElectrumX 11 #https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py 12 fee_hist = defaultdict(int) 13 for fee_rate, size in mempool.values(): 14 fee_hist[fee_rate] += size 15 l = list(reversed(sorted(fee_hist.items()))) 16 out = [] 17 size = 0 18 r = 0 19 binsize = 100000 20 for fee, s in l: 21 size += s 22 if size + r > binsize: 23 out.append((fee, size)) 24 r += size - binsize 25 size = 0 26 binsize *= 1.1 27 return out 28 29 class PollIntervalChange(Enum): 30 UNCHANGED = "unchanged" 31 FAST_POLLING = "fastpolling" 32 NORMAL_POLLING = "normalpolling" 33 34 class MempoolSync(object): 35 def __init__(self, rpc, disabled, polling_interval): 36 self.rpc = rpc 37 self.disabled = disabled 38 self.polling_interval = polling_interval 39 self.mempool = dict() 40 self.cached_fee_histogram = [[0, 0]] 41 self.added_txids = None 42 self.last_poll = None 43 self.state = "gettxids" 44 45 def set_polling_interval(self, polling_interval): 46 self.polling_interval = polling_interval 47 48 def get_fee_histogram(self): 49 return self.cached_fee_histogram 50 51 def initial_sync(self, logger): 52 if self.disabled: 53 return 54 logger.info("Synchronizing mempool . . .") 55 st = time.time() 56 for _ in range(2): 57 self.poll_update(-1) 58 self.state = "gettxids" 59 for _ in range(2): 60 self.poll_update(-1) 61 #run once for the getrawmempool 62 #again for the getmempoolentry 63 #and all that again because the first time will take so long 64 # that new txes could arrive in that time 65 et = time.time() 66 logger.info("Found " + str(len(self.mempool)) + " mempool entries. " 67 + "Synchronized mempool in " + str(et - st) + "sec") 68 69 #-1 for no timeout 70 def poll_update(self, timeout): 71 poll_interval_change = PollIntervalChange.UNCHANGED 72 if self.disabled: 73 return poll_interval_change 74 if self.state == "waiting": 75 if ((datetime.now() - self.last_poll).total_seconds() 76 > self.polling_interval): 77 poll_interval_change = PollIntervalChange.FAST_POLLING 78 self.state = "gettxids" 79 elif self.state == "gettxids": 80 mempool_txids = self.rpc.call("getrawmempool", []) 81 self.last_poll = datetime.now() 82 mempool_txids = set(mempool_txids) 83 84 removed_txids = set(self.mempool.keys()).difference(mempool_txids) 85 self.added_txids = iter(mempool_txids.difference( 86 set(self.mempool.keys()))) 87 88 for txid in removed_txids: 89 del self.mempool[txid] 90 91 self.state = "getfeerates" 92 elif self.state == "getfeerates": 93 if timeout == -1: 94 timeout = 2**32 95 start_time = datetime.now() 96 while self.state != "waiting" and ((datetime.now() - start_time 97 ).total_seconds() < timeout): 98 try: 99 txid = next(self.added_txids) 100 except StopIteration: 101 self.cached_fee_histogram = calc_histogram(self.mempool) 102 self.state = "waiting" 103 poll_interval_change = \ 104 PollIntervalChange.NORMAL_POLLING 105 self.last_poll = datetime.now() 106 continue 107 try: 108 mempool_tx = self.rpc.call("getmempoolentry", [txid]) 109 except JsonRpcError: 110 continue 111 fee_rate = 1e8*mempool_tx["fee"] // mempool_tx["vsize"] 112 self.mempool[txid] = (fee_rate, mempool_tx["vsize"]) 113 114 return poll_interval_change