commit 3a91eb324efc00ba935fc2467121b49937d22b6a
parent 8e80656387d16054d80842de41fa544132b9a68a
Author: chris-belcher <chris-belcher@users.noreply.github.com>
Date: Wed, 21 Mar 2018 00:41:37 +0000
moved transaction monitoring code to a new file transactionmonitor.py
Diffstat:
M | server.py | | | 348 | +++++++------------------------------------------------------------------------ |
A | transactionmonitor.py | | | 326 | +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ |
2 files changed, 354 insertions(+), 320 deletions(-)
diff --git a/server.py b/server.py
@@ -3,12 +3,11 @@
#the electrum protocol uses hash(scriptpubkey) as a key for lookups
# as an alternative to address or scriptpubkey
-import socket, time, json, datetime, struct, binascii, math, pprint, ssl
+import socket, time, json, datetime, struct, binascii, ssl
from configparser import ConfigParser, NoSectionError
-from decimal import Decimal
from jsonrpc import JsonRpc, JsonRpcError
-import hashes, merkleproof, deterministicwallet
+import hashes, merkleproof, deterministicwallet, transactionmonitor
ADDRESSES_LABEL = "electrum-watchonly-addresses"
@@ -31,7 +30,6 @@ Pruning: {pruning}
## global variables are actually mutable lists
subscribed_to_headers = [False]
bestblockhash = [None]
-last_known_recent_txid = [None]
#log for checking up/seeing your wallet, debug for when something has gone wrong
def debugorlog(line, ttype):
@@ -55,14 +53,11 @@ def send_update(sock, update):
sock.sendall(json.dumps(update).encode('utf-8') + b'\n')
debug('<= ' + json.dumps(update))
-def on_heartbeat_listening(rpc, address_history, unconfirmed_txes,
- deterministic_wallets):
+def on_heartbeat_listening(txmonitor):
debug("on heartbeat listening")
- check_for_updated_txes(rpc, address_history, unconfirmed_txes,
- deterministic_wallets)
+ txmonitor.check_for_updated_txes()
-def on_heartbeat_connected(sock, rpc, address_history, unconfirmed_txes,
- deterministic_wallets):
+def on_heartbeat_connected(sock, rpc, txmonitor):
debug("on heartbeat connected")
is_tip_updated, header = check_for_new_blockchain_tip(rpc)
if is_tip_updated:
@@ -71,23 +66,18 @@ def on_heartbeat_connected(sock, rpc, address_history, unconfirmed_txes,
update = {"method": "blockchain.headers.subscribe",
"params": [header]}
send_update(sock, update)
- updated_scripthashes = check_for_updated_txes(rpc, address_history,
- unconfirmed_txes, deterministic_wallets)
+ updated_scripthashes = txmonitor.check_for_updated_txes()
for scrhash in updated_scripthashes:
- if not address_history[scrhash]["subscribed"]:
- continue
- history_hash = hashes.get_status_electrum( ((h["tx_hash"], h["height"])
- for h in address_history[scrhash]["history"]) )
+ history_hash = txmonitor.get_electrum_history_hash(scrhash)
update = {"method": "blockchain.scripthash.subscribe", "params":
[scrhash, history_hash]}
send_update(sock, update)
-def on_disconnect(address_history):
+def on_disconnect(txmonitor):
subscribed_to_headers[0] = False
- for srchash, his in address_history.items():
- his["subscribed"] = False
+ txmonitor.unsubscribe_all_addresses()
-def handle_query(sock, line, rpc, address_history, deterministic_wallets):
+def handle_query(sock, line, rpc, txmonitor):
debug("=> " + line)
try:
query = json.loads(line)
@@ -123,20 +113,16 @@ def handle_query(sock, line, rpc, address_history, deterministic_wallets):
send_response(sock, query, reply)
elif method == "blockchain.scripthash.subscribe":
scrhash = query["params"][0]
- if scrhash in address_history:
- address_history[scrhash]["subscribed"] = True
- history_hash = hashes.get_status_electrum((
- (h["tx_hash"], h["height"])
- for h in address_history[scrhash]["history"]))
+ if txmonitor.subscribe_address(scrhash):
+ history_hash = txmonitor.get_electrum_history_hash(scrhash)
else:
log("WARNING: address scripthash not known to us: " + scrhash)
history_hash = hashes.get_status_electrum([])
send_response(sock, query, history_hash)
elif method == "blockchain.scripthash.get_history":
scrhash = query["params"][0]
- if scrhash in address_history:
- history = address_history[scrhash]["history"]
- else:
+ history = txmonitor.get_electrum_history(scrhash)
+ if history == None:
history = []
log("WARNING: address scripthash history not known to us: "
+ scrhash)
@@ -180,7 +166,7 @@ def handle_query(sock, line, rpc, address_history, deterministic_wallets):
debug("tx broadcast result = " + str(result))
send_response(sock, query, result)
elif method == "mempool.get_fee_histogram":
- result = [] #not handling, sending empty
+ result = [] #TODO not handling, sending empty
send_response(sock, query, result)
elif method == "blockchain.estimatefee":
estimate = rpc.call("estimatesmartfee", [query["params"][0]])
@@ -196,8 +182,8 @@ def handle_query(sock, line, rpc, address_history, deterministic_wallets):
blockchaininfo = rpc.call("getblockchaininfo", [])
uptime = rpc.call("uptime", [])
send_response(sock, query, BANNER.format(
- detwallets=len(deterministic_wallets),
- addr=len(address_history),
+ detwallets=len(txmonitor.deterministic_wallets),
+ addr=len(txmonitor.address_history),
useragent=networkinfo["subversion"],
peers=networkinfo["connections"],
uptime=str(datetime.timedelta(seconds=uptime)),
@@ -246,8 +232,7 @@ def create_server_socket(hostport):
log("Listening on " + str(hostport))
return server_sock
-def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes,
- deterministic_wallets, poll_interval_listening,
+def run_electrum_server(hostport, rpc, txmonitor, poll_interval_listening,
poll_interval_connected, certfile, keyfile):
log("Starting electrum server")
server_sock = create_server_socket(hostport)
@@ -262,8 +247,7 @@ def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes,
certfile=certfile, keyfile=keyfile,
ssl_version=ssl.PROTOCOL_SSLv23)
except socket.timeout:
- on_heartbeat_listening(rpc, address_history,
- unconfirmed_txes, deterministic_wallets)
+ on_heartbeat_listening(txmonitor)
except ssl.SSLError:
sock.close()
sock = None
@@ -285,10 +269,9 @@ def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes,
recv_buffer = recv_buffer[lb + 1:]
lb = recv_buffer.find(b'\n')
handle_query(sock, line.decode("utf-8"), rpc,
- address_history, deterministic_wallets)
+ txmonitor)
except socket.timeout:
- on_heartbeat_connected(sock, rpc, address_history,
- unconfirmed_txes, deterministic_wallets)
+ on_heartbeat_connected(sock, rpc, txmonitor)
except (IOError, EOFError) as e:
if isinstance(e, EOFError):
log("Electrum wallet disconnected")
@@ -299,284 +282,9 @@ def run_electrum_server(hostport, rpc, address_history, unconfirmed_txes,
except IOError:
pass
sock = None
- on_disconnect(address_history)
+ on_disconnect(txmonitor)
time.sleep(0.2)
-def get_input_and_output_scriptpubkeys(rpc, txid):
- gettx = rpc.call("gettransaction", [txid])
- txd = rpc.call("decoderawtransaction", [gettx["hex"]])
- output_scriptpubkeys = [out["scriptPubKey"]["hex"] for out in txd["vout"]]
- input_scriptpubkeys = []
- for inn in txd["vin"]:
- try:
- wallet_tx = rpc.call("gettransaction", [inn["txid"]])
- except JsonRpcError:
- #wallet doesnt know about this tx, so the input isnt ours
- continue
- input_decoded = rpc.call("decoderawtransaction", [wallet_tx["hex"]])
- script = input_decoded["vout"][inn["vout"]]["scriptPubKey"]["hex"]
- input_scriptpubkeys.append(script)
- return output_scriptpubkeys, input_scriptpubkeys, txd
-
-def generate_new_history_element(rpc, tx, txd):
- if tx["confirmations"] == 0:
- unconfirmed_input = False
- total_input_value = 0
- for inn in txd["vin"]:
- utxo = rpc.call("gettxout", [inn["txid"], inn["vout"], True])
- if utxo is None:
- utxo = rpc.call("gettxout", [inn["txid"], inn["vout"], False])
- if utxo is None:
- debug("utxo not found(!)")
- #TODO detect this and figure out how to tell
- # electrum that we dont know the fee
- total_input_value += int(Decimal(utxo["value"]) * Decimal(1e8))
- unconfirmed_input = unconfirmed_input or utxo["confirmations"] == 0
- debug("total_input_value = " + str(total_input_value))
-
- fee = total_input_value - sum([int(Decimal(out["value"])*Decimal(1e8))
- for out in txd["vout"]])
- height = -1 if unconfirmed_input else 0
- new_history_element = ({"tx_hash": tx["txid"], "height": height,
- "fee": fee})
- else:
- blockheader = rpc.call("getblockheader", [tx['blockhash']])
- new_history_element = ({"tx_hash": tx["txid"],
- "height": blockheader["height"]})
- return new_history_element
-
-def sort_address_history_list(his):
- unconfirm_txes = list(filter(lambda h:h["height"] == 0, his["history"]))
- confirm_txes = filter(lambda h:h["height"] != 0, his["history"])
- #TODO txes must be "in blockchain order"
- # the order they appear in the block
- # it might be "blockindex" in listtransactions and gettransaction
- #so must sort with key height+':'+blockindex
- #perhaps check if any heights are the same then get the pos only for those
- #a better way to do this is to have a separate dict that isnt in history
- # which maps txid => blockindex
- # and then sort by key height+":"+idx[txid]
- his["history"] = sorted(confirm_txes, key=lambda h:h["height"])
- his["history"].extend(unconfirm_txes)
- return unconfirm_txes
-
-def check_for_updated_txes(rpc, address_history, unconfirmed_txes,
- deterministic_wallets):
- updated_srchashes1 = check_for_new_txes(rpc, address_history,
- unconfirmed_txes, deterministic_wallets)
- updated_srchashes2 = check_for_confirmations(rpc, address_history,
- unconfirmed_txes)
- updated_srchashes = updated_srchashes1 | updated_srchashes2
- for ush in updated_srchashes:
- his = address_history[ush]
- sort_address_history_list(his)
- if len(updated_srchashes) > 0:
- debug("new tx address_history =\n" + pprint.pformat(address_history))
- debug("unconfirmed txes = " + pprint.pformat(unconfirmed_txes))
- debug("updated_scripthashes = " + str(updated_srchashes))
- else:
- debug("no updated txes")
- return updated_srchashes
-
-def check_for_confirmations(rpc, address_history, unconfirmed_txes):
- confirmed_txes_srchashes = []
- debug("check4con unconfirmed_txes = " + pprint.pformat(unconfirmed_txes))
- for uc_txid, srchashes in unconfirmed_txes.items():
- tx = rpc.call("gettransaction", [uc_txid])
- debug("uc_txid=" + uc_txid + " => " + str(tx))
- if tx["confirmations"] == 0:
- continue #still unconfirmed
- log("A transaction confirmed: " + uc_txid)
- confirmed_txes_srchashes.append((uc_txid, srchashes))
- block = rpc.call("getblockheader", [tx["blockhash"]])
- for srchash in srchashes:
- #delete the old unconfirmed entry in address_history
- deleted_entries = [h for h in address_history[srchash][
- "history"] if h["tx_hash"] == uc_txid]
- for d_his in deleted_entries:
- address_history[srchash]["history"].remove(d_his)
- #create the new confirmed entry in address_history
- address_history[srchash]["history"].append({"height":
- block["height"], "tx_hash": uc_txid})
- updated_srchashes = set()
- for tx, srchashes in confirmed_txes_srchashes:
- del unconfirmed_txes[tx]
- updated_srchashes.update(set(srchashes))
- return updated_srchashes
-
-def check_for_new_txes(rpc, address_history, unconfirmed_txes,
- deterministic_wallets):
- MAX_TX_REQUEST_COUNT = 256
- tx_request_count = 2
- max_attempts = int(math.log(MAX_TX_REQUEST_COUNT, 2))
- for i in range(max_attempts):
- debug("listtransactions tx_request_count=" + str(tx_request_count))
- ret = rpc.call("listtransactions", ["*", tx_request_count, 0, True])
- ret = ret[::-1]
- if last_known_recent_txid[0] == None:
- recent_tx_index = len(ret) #=0 means no new txes
- break
- else:
- txid_list = [(tx["txid"], tx["address"]) for tx in ret]
- recent_tx_index = next((i for i, (txid, addr)
- in enumerate(txid_list) if
- txid == last_known_recent_txid[0][0] and
- addr == last_known_recent_txid[0][1]), -1)
- if recent_tx_index != -1:
- break
- tx_request_count *= 2
-
- #TODO low priority: handle a user getting more than 255 new
- # transactions in 15 seconds
- debug("recent tx index = " + str(recent_tx_index) + " ret = " + str(ret))
- # str([(t["txid"], t["address"]) for t in ret]))
- if len(ret) > 0:
- last_known_recent_txid[0] = (ret[0]["txid"], ret[0]["address"])
- debug("last_known_recent_txid = " + str(last_known_recent_txid[0]))
- assert(recent_tx_index != -1)
- if recent_tx_index == 0:
- return set()
- new_txes = ret[:recent_tx_index][::-1]
- debug("new txes = " + str(new_txes))
- #tests: finding one unconfirmed tx, finding one confirmed tx
- #sending a tx that has nothing to do with our wallets
- #getting a new tx on a completely empty wallet
- #finding a confirmed and unconfirmed tx, in that order, then both confirm
- #finding an unconfirmed and confirmed tx, in that order, then both confirm
- #send a tx to an address which hasnt been used before
- #import two addresses, transaction from one to the other, unc then confirm
- obtained_txids = set()
- updated_scripthashes = []
- for tx in new_txes:
- if "txid" not in tx or "category" not in tx:
- continue
- if tx["category"] not in ("receive", "send"):
- continue
- if tx["txid"] in obtained_txids:
- continue
- obtained_txids.add(tx["txid"])
- output_scriptpubkeys, input_scriptpubkeys, txd = \
- get_input_and_output_scriptpubkeys(rpc, tx["txid"])
- matching_scripthashes = []
- for spk in (output_scriptpubkeys + input_scriptpubkeys):
- scripthash = hashes.script_to_scripthash(spk)
- if scripthash in address_history:
- matching_scripthashes.append(scripthash)
- if len(matching_scripthashes) == 0:
- continue
-
- for wal in deterministic_wallets:
- overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit(
- output_scriptpubkeys)
- if overrun_depths != None:
- for change, import_count in overrun_depths.items():
- spks = wal.get_new_scriptpubkeys(change, import_count)
- new_addrs = [hashes.script_to_address(s, rpc) for s in spks]
- debug("Importing " + str(len(spks)) + " into change="
- + str(change))
- import_addresses(rpc, new_addrs)
-
- updated_scripthashes.extend(matching_scripthashes)
- new_history_element = generate_new_history_element(rpc, tx, txd)
- log("Found new tx: " + str(new_history_element))
- for srchash in matching_scripthashes:
- address_history[srchash]["history"].append(new_history_element)
- if new_history_element["height"] == 0:
- if tx["txid"] in unconfirmed_txes:
- unconfirmed_txes[tx["txid"]].append(srchash)
- else:
- unconfirmed_txes[tx["txid"]] = [srchash]
- #check whether the gap limits have been overrun and import more addrs
- return set(updated_scripthashes)
-
-def build_address_history(rpc, monitored_scriptpubkeys, deterministic_wallets):
- log("Building history with " + str(len(monitored_scriptpubkeys)) +
- " addresses")
- st = time.time()
- address_history = {}
- for spk in monitored_scriptpubkeys:
- address_history[hashes.script_to_scripthash(spk)] = {'history': [],
- 'subscribed': False}
- wallet_addr_scripthashes = set(address_history.keys())
- #populate history
- #which is a blockheight-ordered list of ("txhash", height)
- #unconfirmed transactions go at the end as ("txhash", 0, fee)
- # 0=unconfirmed -1=unconfirmed with unconfirmed parents
-
- BATCH_SIZE = 1000
- ret = list(range(BATCH_SIZE))
- t = 0
- count = 0
- obtained_txids = set()
- while len(ret) == BATCH_SIZE:
- ret = rpc.call("listtransactions", ["*", BATCH_SIZE, t, True])
- debug("listtransactions skip=" + str(t) + " len(ret)=" + str(len(ret)))
- t += len(ret)
- for tx in ret:
- if "txid" not in tx or "category" not in tx:
- continue
- if tx["category"] not in ("receive", "send"):
- continue
- if tx["txid"] in obtained_txids:
- continue
- debug("adding obtained tx=" + str(tx["txid"]))
- obtained_txids.add(tx["txid"])
-
- #obtain all the addresses this transaction is involved with
- output_scriptpubkeys, input_scriptpubkeys, txd = \
- get_input_and_output_scriptpubkeys(rpc, tx["txid"])
- output_scripthashes = [hashes.script_to_scripthash(sc)
- for sc in output_scriptpubkeys]
- sh_to_add = wallet_addr_scripthashes.intersection(set(
- output_scripthashes))
- input_scripthashes = [hashes.script_to_scripthash(sc)
- for sc in input_scriptpubkeys]
- sh_to_add |= wallet_addr_scripthashes.intersection(set(
- input_scripthashes))
- if len(sh_to_add) == 0:
- continue
-
- for wal in deterministic_wallets:
- overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit(
- output_scriptpubkeys)
- if overrun_depths != None:
- log("ERROR: Not enough addresses imported. Exiting.")
- log("Delete wallet.dat and increase the value of " +
- "`initial_import_count` in the file `config.cfg` " +
- "then reimport and rescan")
- #TODO make it so users dont have to delete wallet.dat
- # check whether all initial_import_count addresses are
- # imported rather than just the first one
- return None, None
- new_history_element = generate_new_history_element(rpc, tx, txd)
- for scripthash in sh_to_add:
- address_history[scripthash][
- "history"].append(new_history_element)
- count += 1
-
- unconfirmed_txes = {}
- for srchash, his in address_history.items():
- uctx = sort_address_history_list(his)
- for u in uctx:
- if u["tx_hash"] in unconfirmed_txes:
- unconfirmed_txes[u["tx_hash"]].append(srchash)
- else:
- unconfirmed_txes[u["tx_hash"]] = [srchash]
- debug("unconfirmed_txes = " + str(unconfirmed_txes))
- if len(ret) > 0:
- #txid doesnt uniquely identify transactions from listtransactions
- #but the tuple (txid, address) does
- last_known_recent_txid[0] = (ret[-1]["txid"], ret[-1]["address"])
- else:
- last_known_recent_txid[0] = None
- debug("last_known_recent_txid = " + str(last_known_recent_txid[0]))
-
- et = time.time()
- log("Found " + str(count) + " txes. History built in " + str(et - st)
- + "sec")
- debug("address_history =\n" + pprint.pformat(address_history))
- return address_history, unconfirmed_txes
-
def get_scriptpubkeys_to_monitor(rpc, config):
imported_addresses = set(rpc.call("getaddressesbyaccount",
[ADDRESSES_LABEL]))
@@ -693,9 +401,9 @@ def main():
"that the wallets are new\nand empty then there's no need to " +
"rescan, just restart this script")
else:
- address_history, unconfirmed_txes = build_address_history(
- rpc, relevant_spks_addrs, deterministic_wallets)
- if address_history == None:
+ txmonitor = transactionmonitor.TransactionMonitor(rpc,
+ deterministic_wallets)
+ if not txmonitor.build_address_history(relevant_spks_addrs):
return
hostport = (config.get("electrum-server", "host"),
int(config.get("electrum-server", "port")))
@@ -705,8 +413,8 @@ def main():
"poll_interval_connected"))
certfile = config.get("electrum-server", "certfile")
keyfile = config.get("electrum-server", "keyfile")
- run_electrum_server(hostport, rpc, address_history, unconfirmed_txes,
- deterministic_wallets, poll_interval_listening,
+ run_electrum_server(hostport, rpc, txmonitor, poll_interval_listening,
poll_interval_connected, certfile, keyfile)
-main()
+if __name__ == "__main__":
+ main()
diff --git a/transactionmonitor.py b/transactionmonitor.py
@@ -0,0 +1,326 @@
+
+import time, pprint, math
+from decimal import Decimal
+
+from jsonrpc import JsonRpcError
+from server import debug, log, import_addresses
+import hashes
+
+class TransactionMonitor(object):
+ def __init__(self, rpc, deterministic_wallets):
+ self.rpc = rpc
+ self.deterministic_wallets = deterministic_wallets
+ self.last_known_recent_txid = None
+ self.address_history = None
+ self.unconfirmed_txes = None
+
+ def get_electrum_history_hash(self, scrhash):
+ return hashes.get_status_electrum( ((h["tx_hash"], h["height"])
+ for h in self.address_history[scrhash]["history"]) )
+
+ def get_electrum_history(self, scrhash):
+ if scrhash in self.address_history:
+ return self.address_history[scrhash]["history"]
+ else:
+ return None
+
+ def subscribe_address(self, scrhash):
+ if scrhash in self.address_history:
+ self.address_history[scrhash]["subscribed"] = True
+ return True
+ else:
+ return False
+
+ def unsubscribe_all_addresses(self):
+ for srchash, his in self.address_history.items():
+ his["subscribed"] = False
+
+ def build_address_history(self, monitored_scriptpubkeys):
+ log("Building history with " + str(len(monitored_scriptpubkeys)) +
+ " addresses")
+ st = time.time()
+ address_history = {}
+ for spk in monitored_scriptpubkeys:
+ address_history[hashes.script_to_scripthash(spk)] = {'history': [],
+ 'subscribed': False}
+ wallet_addr_scripthashes = set(address_history.keys())
+ #populate history
+ #which is a blockheight-ordered list of ("txhash", height)
+ #unconfirmed transactions go at the end as ("txhash", 0, fee)
+ # 0=unconfirmed -1=unconfirmed with unconfirmed parents
+
+ BATCH_SIZE = 1000
+ ret = list(range(BATCH_SIZE))
+ t = 0
+ count = 0
+ obtained_txids = set()
+ while len(ret) == BATCH_SIZE:
+ ret = self.rpc.call("listtransactions", ["*", BATCH_SIZE, t, True])
+ debug("listtransactions skip=" + str(t) + " len(ret)="
+ + str(len(ret)))
+ t += len(ret)
+ for tx in ret:
+ if "txid" not in tx or "category" not in tx:
+ continue
+ if tx["category"] not in ("receive", "send"):
+ continue
+ if tx["txid"] in obtained_txids:
+ continue
+ debug("adding obtained tx=" + str(tx["txid"]))
+ obtained_txids.add(tx["txid"])
+
+ #obtain all the addresses this transaction is involved with
+ output_scriptpubkeys, input_scriptpubkeys, txd = \
+ self.get_input_and_output_scriptpubkeys(tx["txid"])
+ output_scripthashes = [hashes.script_to_scripthash(sc)
+ for sc in output_scriptpubkeys]
+ sh_to_add = wallet_addr_scripthashes.intersection(set(
+ output_scripthashes))
+ input_scripthashes = [hashes.script_to_scripthash(sc)
+ for sc in input_scriptpubkeys]
+ sh_to_add |= wallet_addr_scripthashes.intersection(set(
+ input_scripthashes))
+ if len(sh_to_add) == 0:
+ continue
+
+ for wal in self.deterministic_wallets:
+ overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit(
+ output_scriptpubkeys)
+ if overrun_depths != None:
+ log("ERROR: Not enough addresses imported.")
+ log("Delete wallet.dat and increase the value " +
+ "of `initial_import_count` in the file " +
+ "`config.cfg` then reimport and rescan")
+ #TODO make it so users dont have to delete wallet.dat
+ # check whether all initial_import_count addresses are
+ # imported rather than just the first one
+ return False
+ new_history_element = self.generate_new_history_element(tx, txd)
+ for scripthash in sh_to_add:
+ address_history[scripthash][
+ "history"].append(new_history_element)
+ count += 1
+
+ unconfirmed_txes = {}
+ for srchash, his in address_history.items():
+ uctx = self.sort_address_history_list(his)
+ for u in uctx:
+ if u["tx_hash"] in unconfirmed_txes:
+ unconfirmed_txes[u["tx_hash"]].append(srchash)
+ else:
+ unconfirmed_txes[u["tx_hash"]] = [srchash]
+ debug("unconfirmed_txes = " + str(unconfirmed_txes))
+ if len(ret) > 0:
+ #txid doesnt uniquely identify transactions from listtransactions
+ #but the tuple (txid, address) does
+ self.last_known_recent_txid = (ret[-1]["txid"], ret[-1]["address"])
+ else:
+ self.last_known_recent_txid = None
+ debug("last_known_recent_txid = " + str(self.last_known_recent_txid))
+
+ et = time.time()
+ log("Found " + str(count) + " txes. History built in " +
+ str(et - st) + "sec")
+ debug("address_history =\n" + pprint.pformat(address_history))
+ self.address_history = address_history
+ self.unconfirmed_txes = unconfirmed_txes
+ return True
+
+ def get_input_and_output_scriptpubkeys(self, txid):
+ gettx = self.rpc.call("gettransaction", [txid])
+ txd = self.rpc.call("decoderawtransaction", [gettx["hex"]])
+ output_scriptpubkeys = [out["scriptPubKey"]["hex"]
+ for out in txd["vout"]]
+ input_scriptpubkeys = []
+ for inn in txd["vin"]:
+ try:
+ wallet_tx = self.rpc.call("gettransaction", [inn["txid"]])
+ except JsonRpcError:
+ #wallet doesnt know about this tx, so the input isnt ours
+ continue
+ input_decoded = self.rpc.call("decoderawtransaction", [wallet_tx[
+ "hex"]])
+ script = input_decoded["vout"][inn["vout"]]["scriptPubKey"]["hex"]
+ input_scriptpubkeys.append(script)
+ return output_scriptpubkeys, input_scriptpubkeys, txd
+
+ def generate_new_history_element(self, tx, txd):
+ if tx["confirmations"] == 0:
+ unconfirmed_input = False
+ total_input_value = 0
+ for inn in txd["vin"]:
+ utxo = self.rpc.call("gettxout", [inn["txid"], inn["vout"],
+ True])
+ if utxo is None:
+ utxo = self.rpc.call("gettxout", [inn["txid"], inn["vout"],
+ False])
+ if utxo is None:
+ debug("utxo not found(!)")
+ #TODO detect this and figure out how to tell
+ # electrum that we dont know the fee
+ total_input_value += int(Decimal(utxo["value"]) * Decimal(1e8))
+ unconfirmed_input = (unconfirmed_input or
+ utxo["confirmations"] == 0)
+ debug("total_input_value = " + str(total_input_value))
+
+ fee = total_input_value - sum([int(Decimal(out["value"])
+ * Decimal(1e8)) for out in txd["vout"]])
+ height = -1 if unconfirmed_input else 0
+ new_history_element = ({"tx_hash": tx["txid"], "height": height,
+ "fee": fee})
+ else:
+ blockheader = self.rpc.call("getblockheader", [tx['blockhash']])
+ new_history_element = ({"tx_hash": tx["txid"],
+ "height": blockheader["height"]})
+ return new_history_element
+
+ def sort_address_history_list(self, his):
+ unconfirm_txes = list(filter(lambda h:h["height"] == 0, his["history"]))
+ confirm_txes = filter(lambda h:h["height"] != 0, his["history"])
+ #TODO txes must be "in blockchain order"
+ # the order they appear in the block
+ # it might be "blockindex" in listtransactions and gettransaction
+ #so must sort with key height+':'+blockindex
+ #maybe check if any heights are the same then get the pos only for those
+ #better way to do this is to have a separate dict that isnt in history
+ # which maps txid => blockindex
+ # and then sort by key height+":"+idx[txid]
+ his["history"] = sorted(confirm_txes, key=lambda h:h["height"])
+ his["history"].extend(unconfirm_txes)
+ return unconfirm_txes
+
+ def check_for_updated_txes(self):
+ updated_srchashes1 = self.check_for_new_txes()
+ updated_srchashes2 = self.check_for_confirmations()
+ updated_srchashes = updated_srchashes1 | updated_srchashes2
+ for ush in updated_srchashes:
+ his = self.address_history[ush]
+ self.sort_address_history_list(his)
+ if len(updated_srchashes) > 0:
+ debug("new tx address_history =\n"
+ + pprint.pformat(self.address_history))
+ debug("unconfirmed txes = " + pprint.pformat(self.unconfirmed_txes))
+ debug("updated_scripthashes = " + str(updated_srchashes))
+ else:
+ debug("no updated txes")
+ updated_srchashes = filter(lambda sh:self.address_history[sh][
+ "subscribed"], updated_srchashes)
+ #TODO srchashes is misspelled, should be scrhashes
+ return updated_srchashes
+
+ def check_for_confirmations(self):
+ confirmed_txes_srchashes = []
+ debug("check4con unconfirmed_txes = "
+ + pprint.pformat(self.unconfirmed_txes))
+ for uc_txid, srchashes in self.unconfirmed_txes.items():
+ tx = self.rpc.call("gettransaction", [uc_txid])
+ debug("uc_txid=" + uc_txid + " => " + str(tx))
+ if tx["confirmations"] == 0:
+ continue #still unconfirmed
+ log("A transaction confirmed: " + uc_txid)
+ confirmed_txes_srchashes.append((uc_txid, srchashes))
+ block = self.rpc.call("getblockheader", [tx["blockhash"]])
+ for srchash in srchashes:
+ #delete the old unconfirmed entry in address_history
+ deleted_entries = [h for h in self.address_history[srchash][
+ "history"] if h["tx_hash"] == uc_txid]
+ for d_his in deleted_entries:
+ self.address_history[srchash]["history"].remove(d_his)
+ #create the new confirmed entry in address_history
+ self.address_history[srchash]["history"].append({"height":
+ block["height"], "tx_hash": uc_txid})
+ updated_srchashes = set()
+ for tx, srchashes in confirmed_txes_srchashes:
+ del self.unconfirmed_txes[tx]
+ updated_srchashes.update(set(srchashes))
+ return updated_srchashes
+
+ def check_for_new_txes(self):
+ MAX_TX_REQUEST_COUNT = 256
+ tx_request_count = 2
+ max_attempts = int(math.log(MAX_TX_REQUEST_COUNT, 2))
+ for i in range(max_attempts):
+ debug("listtransactions tx_request_count=" + str(tx_request_count))
+ ret = self.rpc.call("listtransactions", ["*", tx_request_count, 0,
+ True])
+ ret = ret[::-1]
+ if self.last_known_recent_txid == None:
+ recent_tx_index = len(ret) #=0 means no new txes
+ break
+ else:
+ txid_list = [(tx["txid"], tx["address"]) for tx in ret]
+ recent_tx_index = next((i for i, (txid, addr)
+ in enumerate(txid_list) if
+ txid == self.last_known_recent_txid[0] and
+ addr == self.last_known_recent_txid[1]), -1)
+ if recent_tx_index != -1:
+ break
+ tx_request_count *= 2
+
+ #TODO low priority: handle a user getting more than 255 new
+ # transactions in 15 seconds
+ debug("recent tx index = " + str(recent_tx_index) + " ret = " +
+ str(ret))
+ # str([(t["txid"], t["address"]) for t in ret]))
+ if len(ret) > 0:
+ self.last_known_recent_txid = (ret[0]["txid"], ret[0]["address"])
+ debug("last_known_recent_txid = " + str(
+ self.last_known_recent_txid))
+ assert(recent_tx_index != -1)
+ if recent_tx_index == 0:
+ return set()
+ new_txes = ret[:recent_tx_index][::-1]
+ debug("new txes = " + str(new_txes))
+ #tests: finding one unconfirmed tx, finding one confirmed tx
+ #sending a tx that has nothing to do with our wallets
+ #getting a new tx on a completely empty wallet
+ #finding confirmed and unconfirmed tx, in that order, then both confirm
+ #finding unconfirmed and confirmed tx, in that order, then both confirm
+ #send a tx to an address which hasnt been used before
+ #import two addresses, transaction from one to the other
+ obtained_txids = set()
+ updated_scripthashes = []
+ for tx in new_txes:
+ if "txid" not in tx or "category" not in tx:
+ continue
+ if tx["category"] not in ("receive", "send"):
+ continue
+ if tx["txid"] in obtained_txids:
+ continue
+ obtained_txids.add(tx["txid"])
+ output_scriptpubkeys, input_scriptpubkeys, txd = \
+ self.get_input_and_output_scriptpubkeys(tx["txid"])
+ matching_scripthashes = []
+ for spk in (output_scriptpubkeys + input_scriptpubkeys):
+ scripthash = hashes.script_to_scripthash(spk)
+ if scripthash in self.address_history:
+ matching_scripthashes.append(scripthash)
+ if len(matching_scripthashes) == 0:
+ continue
+
+ for wal in self.deterministic_wallets:
+ overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit(
+ output_scriptpubkeys)
+ if overrun_depths != None:
+ for change, import_count in overrun_depths.items():
+ spks = wal.get_new_scriptpubkeys(change, import_count)
+ new_addrs = [hashes.script_to_address(s, rpc)
+ for s in spks]
+ debug("Importing " + str(len(spks)) + " into change="
+ + str(change))
+ import_addresses(rpc, new_addrs)
+
+ updated_scripthashes.extend(matching_scripthashes)
+ new_history_element = self.generate_new_history_element(tx, txd)
+ log("Found new tx: " + str(new_history_element))
+ for srchash in matching_scripthashes:
+ self.address_history[srchash]["history"].append(
+ new_history_element)
+ if new_history_element["height"] == 0:
+ if tx["txid"] in self.unconfirmed_txes:
+ self.unconfirmed_txes[tx["txid"]].append(srchash)
+ else:
+ self.unconfirmed_txes[tx["txid"]] = [srchash]
+ #check whether gap limits have been overrun and import more addrs
+ return set(updated_scripthashes)
+