commit 136b95767a1519c7025f384ea0200452613773eb
parent fcbd3ce5fc3573e14dcebac3ab7ac44737a95d7a
Author: chris-belcher <chris-belcher@users.noreply.github.com>
Date: Thu, 4 Mar 2021 15:45:03 +0000
Implement responsive mempool sync
Previously when generating fee histogram required by Electrum, the server would
use the RPC call `getrawmempool true` which would be very slow during times of
large mempools, and cause the server to be unresponsive.
This commit instead uses `getrawmempool false` and `getmempoolentry` to obtain
all the mempool fees. Because the mempool synchronization is split up over
many different RPC calls, the server can always remain responsive even while
obtaining the mempool. The typical lag will be at most 1 or 2 seconds.
See issue #96
Diffstat:
5 files changed, 180 insertions(+), 52 deletions(-)
diff --git a/config.ini_sample b/config.ini_sample
@@ -70,6 +70,10 @@ keyfile = certs/cert.key
# This is useful on low powered devices at times when the node mempool is large
disable_mempool_fee_histogram = false
+# How often in seconds to update the mempool
+# this mempool is used to calculate the mempool fee histogram
+mempool_update_interval = 60
+
# Parameter for broadcasting unconfirmed transactions
# Options are:
# * tor-or-own-node (use tor if tor is running locally, otherwise own-node)
diff --git a/electrumpersonalserver/server/__init__.py b/electrumpersonalserver/server/__init__.py
@@ -46,3 +46,6 @@ from electrumpersonalserver.server.electrumprotocol import (
get_block_headers_hex,
DONATION_ADDR,
)
+from electrumpersonalserver.server.mempoolhistogram import (
+ MempoolSync
+)
diff --git a/electrumpersonalserver/server/common.py b/electrumpersonalserver/server/common.py
@@ -1,6 +1,6 @@
import socket
import time
-import datetime
+from datetime import datetime
import ssl
import os
import os.path
@@ -26,12 +26,23 @@ from electrumpersonalserver.server.electrumprotocol import (
get_block_headers_hex,
DONATION_ADDR,
)
+from electrumpersonalserver.server.mempoolhistogram import (
+ MempoolSync,
+ PollIntervalChange
+)
##python has demented rules for variable scope, so these
## global variables are actually mutable lists
bestblockhash = [None]
-def on_heartbeat_listening(txmonitor):
+last_heartbeat_listening = [datetime.now()]
+last_heartbeat_connected = [datetime.now()]
+
+def on_heartbeat_listening(poll_interval_listening, txmonitor):
+ if ((datetime.now() - last_heartbeat_listening[0]).total_seconds()
+ < poll_interval_listening):
+ return True
+ last_heartbeat_listening[0] = datetime.now()
logger = logging.getLogger('ELECTRUMPERSONALSERVER')
try:
txmonitor.check_for_updated_txes()
@@ -40,7 +51,11 @@ def on_heartbeat_listening(txmonitor):
is_node_reachable = False
return is_node_reachable
-def on_heartbeat_connected(rpc, txmonitor, protocol):
+def on_heartbeat_connected(poll_interval_connected, rpc, txmonitor, protocol):
+ if ((datetime.now() - last_heartbeat_connected[0]).total_seconds()
+ < poll_interval_connected):
+ return
+ last_heartbeat_connected[0] = datetime.now()
logger = logging.getLogger('ELECTRUMPERSONALSERVER')
is_tip_updated, header = check_for_new_blockchain_tip(rpc,
protocol.are_headers_raw)
@@ -90,17 +105,26 @@ def run_electrum_server(rpc, txmonitor, config):
logger.debug('using cert: {}, key: {}'.format(certfile, keyfile))
disable_mempool_fee_histogram = config.getboolean("electrum-server",
"disable_mempool_fee_histogram", fallback=False)
+ mempool_update_interval = int(config.get("bitcoin-rpc",
+ "mempool_update_interval", fallback=60))
broadcast_method = config.get("electrum-server", "broadcast_method",
fallback="own-node")
tor_host = config.get("electrum-server", "tor_host", fallback="localhost")
tor_port = int(config.get("electrum-server", "tor_port", fallback="9050"))
tor_hostport = (tor_host, tor_port)
+ mempool_sync = MempoolSync(rpc,
+ disable_mempool_fee_histogram, mempool_update_interval)
+ mempool_sync.initial_sync(logger)
+
protocol = ElectrumProtocol(rpc, txmonitor, logger, broadcast_method,
- tor_hostport, disable_mempool_fee_histogram)
+ tor_hostport, mempool_sync)
+ normal_listening_timeout = min(poll_interval_listening,
+ mempool_update_interval)
+ fast_listening_timeout = 0.5
server_sock = create_server_socket(hostport)
- server_sock.settimeout(poll_interval_listening)
+ server_sock.settimeout(normal_listening_timeout)
accepting_clients = True
while True:
# main server loop, runs forever
@@ -121,7 +145,14 @@ def run_electrum_server(rpc, txmonitor, config):
certfile=certfile, keyfile=keyfile,
ssl_version=ssl.PROTOCOL_SSLv23)
except socket.timeout:
- is_node_reachable = on_heartbeat_listening(txmonitor)
+ poll_interval_change = mempool_sync.poll_update(1)
+ if poll_interval_change == PollIntervalChange.FAST_POLLING:
+ server_sock.settimeout(fast_listening_timeout)
+ elif poll_interval_change == PollIntervalChange.NORMAL_POLLING:
+ server_sock.settimeout(normal_listening_timeout)
+
+ is_node_reachable = on_heartbeat_listening(
+ poll_interval_listening, txmonitor)
accepting_clients = is_node_reachable
except (ConnectionRefusedError, ssl.SSLError, IOError):
sock.close()
@@ -135,7 +166,10 @@ def run_electrum_server(rpc, txmonitor, config):
protocol.set_send_reply_fun(send_reply_fun)
try:
- sock.settimeout(poll_interval_connected)
+ normal_connected_timeout = min(poll_interval_connected,
+ mempool_update_interval)
+ fast_connected_timeout = 0.5
+ sock.settimeout(normal_connected_timeout)
recv_buffer = bytearray()
while True:
# loop for replying to client queries
@@ -159,7 +193,15 @@ def run_electrum_server(rpc, txmonitor, config):
logger.debug("=> " + line)
protocol.handle_query(query)
except socket.timeout:
- on_heartbeat_connected(rpc, txmonitor, protocol)
+ poll_interval_change = mempool_sync.poll_update(1)
+ if poll_interval_change == PollIntervalChange.FAST_POLLING:
+ sock.settimeout(fast_connected_timeout)
+ elif (poll_interval_change
+ == PollIntervalChange.NORMAL_POLLING):
+ sock.settimeout(normal_connected_timeout)
+
+ on_heartbeat_connected(poll_interval_connected, rpc,
+ txmonitor, protocol)
except JsonRpcError as e:
logger.debug("Error with node connection, e = " + repr(e)
+ "\ntraceback = " + str(traceback.format_exc()))
@@ -454,14 +496,14 @@ def main():
def search_for_block_height_of_date(datestr, rpc):
logger = logging.getLogger('ELECTRUMPERSONALSERVER')
- target_time = datetime.datetime.strptime(datestr, "%d/%m/%Y")
+ target_time = datetime.strptime(datestr, "%d/%m/%Y")
bestblockhash = rpc.call("getbestblockhash", [])
best_head = rpc.call("getblockheader", [bestblockhash])
- if target_time > datetime.datetime.fromtimestamp(best_head["time"]):
+ if target_time > datetime.fromtimestamp(best_head["time"]):
logger.error("date in the future")
return -1
genesis_block = rpc.call("getblockheader", [rpc.call("getblockhash", [0])])
- if target_time < datetime.datetime.fromtimestamp(genesis_block["time"]):
+ if target_time < datetime.fromtimestamp(genesis_block["time"]):
logger.warning("date is before the creation of bitcoin")
return 0
first_height = 0
@@ -469,7 +511,7 @@ def search_for_block_height_of_date(datestr, rpc):
while True:
m = (first_height + last_height) // 2
m_header = rpc.call("getblockheader", [rpc.call("getblockhash", [m])])
- m_header_time = datetime.datetime.fromtimestamp(m_header["time"])
+ m_header_time = datetime.fromtimestamp(m_header["time"])
m_time_diff = (m_header_time - target_time).total_seconds()
if abs(m_time_diff) < 60*60*2: #2 hours
return m_header["height"]
diff --git a/electrumpersonalserver/server/electrumprotocol.py b/electrumpersonalserver/server/electrumprotocol.py
@@ -134,19 +134,18 @@ class ElectrumProtocol(object):
"""
def __init__(self, rpc, txmonitor, logger, broadcast_method,
- tor_hostport, disable_mempool_fee_histogram):
+ tor_hostport, mempool_sync):
self.rpc = rpc
self.txmonitor = txmonitor
self.logger = logger
self.broadcast_method = broadcast_method
self.tor_hostport = tor_hostport
- self.disable_mempool_fee_histogram = disable_mempool_fee_histogram
+ self.mempool_sync = mempool_sync
self.protocol_version = 0
self.subscribed_to_headers = False
self.are_headers_raw = False
self.txid_blockhash_map = {}
- self.printed_slow_mempool_warning = False
def set_send_reply_fun(self, send_reply_fun):
self.send_reply_fun = send_reply_fun
@@ -379,43 +378,9 @@ class ElectrumProtocol(object):
else:
self._send_error(query["id"], error)
elif method == "mempool.get_fee_histogram":
- if self.disable_mempool_fee_histogram:
- result = [[0, 0]]
- self.logger.debug("fee histogram disabled, sending back empty "
- + "mempool")
- else:
- st = time.time()
- mempool = self.rpc.call("getrawmempool", [True])
- et = time.time()
- MEMPOOL_WARNING_DURATION = 10 #seconds
- if et - st > MEMPOOL_WARNING_DURATION:
- if not self.printed_slow_mempool_warning:
- self.logger.warning("Mempool very large resulting in"
- + " slow response by server ("
- + str(round(et-st, 1)) + "sec). Consider setting "
- + "`disable_mempool_fee_histogram = true`")
- self.printed_slow_mempool_warning = True
- #algorithm copied from the relevant place in ElectrumX
- #https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py
- fee_hist = defaultdict(int)
- for txid, details in mempool.items():
- size = (details["size"] if "size" in details else
- details["vsize"])
- fee_rate = 1e8*details["fee"] // size
- fee_hist[fee_rate] += size
- l = list(reversed(sorted(fee_hist.items())))
- out = []
- size = 0
- r = 0
- binsize = 100000
- for fee, s in l:
- size += s
- if size + r > binsize:
- out.append((fee, size))
- r += size - binsize
- size = 0
- binsize *= 1.1
- result = out
+ result = self.mempool_sync.get_fee_histogram()
+ self.logger.debug("mempool entry count = "
+ + str(len(self.mempool_sync.mempool)))
self._send_response(query, result)
elif method == "blockchain.estimatefee":
estimate = self.rpc.call("estimatesmartfee", [query["params"][0]])
diff --git a/electrumpersonalserver/server/mempoolhistogram.py b/electrumpersonalserver/server/mempoolhistogram.py
@@ -0,0 +1,114 @@
+
+import time
+from collections import defaultdict
+from datetime import datetime
+from enum import Enum
+
+from electrumpersonalserver.server.jsonrpc import JsonRpcError
+
+def calc_histogram(mempool):
+ #algorithm copied from the relevant place in ElectrumX
+ #https://github.com/kyuupichan/electrumx/blob/e92c9bd4861c1e35989ad2773d33e01219d33280/server/mempool.py
+ fee_hist = defaultdict(int)
+ for fee_rate, size in mempool.values():
+ fee_hist[fee_rate] += size
+ l = list(reversed(sorted(fee_hist.items())))
+ out = []
+ size = 0
+ r = 0
+ binsize = 100000
+ for fee, s in l:
+ size += s
+ if size + r > binsize:
+ out.append((fee, size))
+ r += size - binsize
+ size = 0
+ binsize *= 1.1
+ return out
+
+class PollIntervalChange(Enum):
+ UNCHANGED = "unchanged"
+ FAST_POLLING = "fastpolling"
+ NORMAL_POLLING = "normalpolling"
+
+class MempoolSync(object):
+ def __init__(self, rpc, disabled, polling_interval):
+ self.rpc = rpc
+ self.disabled = disabled
+ self.polling_interval = polling_interval
+ self.mempool = dict()
+ self.cached_fee_histogram = [[0, 0]]
+ self.added_txids = None
+ self.last_poll = None
+ self.state = "gettxids"
+
+ def set_polling_interval(self, polling_interval):
+ self.polling_interval = polling_interval
+
+ def get_fee_histogram(self):
+ return self.cached_fee_histogram
+
+ def initial_sync(self, logger):
+ if self.disabled:
+ return
+ logger.info("Synchronizing mempool . . .")
+ st = time.time()
+ for _ in range(2):
+ self.poll_update(-1)
+ self.state = "gettxids"
+ for _ in range(2):
+ self.poll_update(-1)
+ #run once for the getrawmempool
+ #again for the getmempoolentry
+ #and all that again because the first time will take so long
+ # that new txes could arrive in that time
+ et = time.time()
+ logger.info("Found " + str(len(self.mempool)) + " mempool entries. "
+ + "Synchronized mempool in " + str(et - st) + "sec")
+
+ #-1 for no timeout
+ def poll_update(self, timeout):
+ poll_interval_change = PollIntervalChange.UNCHANGED
+ if self.disabled:
+ return poll_interval_change
+ if self.state == "waiting":
+ if ((datetime.now() - self.last_poll).total_seconds()
+ > self.polling_interval):
+ poll_interval_change = PollIntervalChange.FAST_POLLING
+ self.state = "gettxids"
+ elif self.state == "gettxids":
+ mempool_txids = self.rpc.call("getrawmempool", [])
+ self.last_poll = datetime.now()
+ mempool_txids = set(mempool_txids)
+
+ removed_txids = set(self.mempool.keys()).difference(mempool_txids)
+ self.added_txids = iter(mempool_txids.difference(
+ set(self.mempool.keys())))
+
+ for txid in removed_txids:
+ del self.mempool[txid]
+
+ self.state = "getfeerates"
+ elif self.state == "getfeerates":
+ if timeout == -1:
+ timeout = 2**32
+ start_time = datetime.now()
+ while self.state != "waiting" and ((datetime.now() - start_time
+ ).total_seconds() < timeout):
+ try:
+ txid = next(self.added_txids)
+ except StopIteration:
+ self.cached_fee_histogram = calc_histogram(self.mempool)
+ self.state = "waiting"
+ poll_interval_change = \
+ PollIntervalChange.NORMAL_POLLING
+ self.last_poll = datetime.now()
+ continue
+ try:
+ mempool_tx = self.rpc.call("getmempoolentry", [txid])
+ except JsonRpcError:
+ continue
+ fee_rate = 1e8*mempool_tx["fee"] // mempool_tx["vsize"]
+ self.mempool[txid] = (fee_rate, mempool_tx["vsize"])
+
+ return poll_interval_change