transactionmonitor.py (21724B)
1 2 import time 3 import pprint 4 import math 5 import sys 6 import logging 7 from decimal import Decimal 8 from collections import defaultdict 9 10 from electrumpersonalserver.server.jsonrpc import JsonRpcError 11 from electrumpersonalserver.server.hashes import ( 12 get_status_electrum, 13 script_to_scripthash, 14 script_to_address 15 ) 16 from electrumpersonalserver.server.deterministicwallet import import_addresses 17 18 #internally this code uses scriptPubKeys, it only converts to bitcoin addresses 19 # when importing to bitcoind or checking whether enough addresses have been 20 # imported 21 #the electrum protocol uses sha256(scriptpubkey) as a key for lookups 22 # this code calls them scripthashes 23 24 #when a transaction happens paying to an address from a deterministic wallet 25 # lookup the position of that address, if its less than gap_limit then 26 # import more addresses 27 28 CONFIRMATIONS_SAFE_FROM_REORG = 100 29 30 class TransactionMonitor(object): 31 """ 32 Class which monitors the bitcoind wallet for new transactions 33 and builds a history datastructure for sending to electrum 34 """ 35 def __init__(self, rpc, deterministic_wallets, logger=None): 36 self.rpc = rpc 37 self.deterministic_wallets = deterministic_wallets 38 self.last_known_wallet_txid = None 39 self.address_history = None 40 self.unconfirmed_txes = None 41 self.reorganizable_txes = None 42 self.logger = (logger if logger else 43 logging.getLogger('ELECTRUMPERSONALSERVER')) 44 45 def get_electrum_history_hash(self, scrhash): 46 return get_status_electrum( [(h["tx_hash"], h["height"]) 47 for h in self.address_history[scrhash]["history"]] ) 48 49 def get_electrum_history(self, scrhash): 50 if scrhash in self.address_history: 51 return self.address_history[scrhash]["history"] 52 else: 53 return None 54 55 def get_address_balance(self, scrhash): 56 history = self.get_electrum_history(scrhash) 57 if history == None: 58 return None 59 utxos = {} 60 for tx_info in history: 61 tx = self.rpc.call("gettransaction", [tx_info["tx_hash"]]) 62 txd = self.rpc.call("decoderawtransaction", [tx["hex"]]) 63 for index, output in enumerate(txd["vout"]): 64 if script_to_scripthash(output["scriptPubKey"]["hex"] 65 ) != scrhash: 66 continue 67 utxos[txd["txid"] + ":" + str(index)] = (output["value"], 68 tx["confirmations"]) 69 for inputt in txd["vin"]: 70 outpoint = inputt["txid"] + ":" + str(inputt["vout"]) 71 if outpoint in utxos: 72 del utxos[outpoint] 73 confirmed_balance = 0 74 unconfirmed_balance = 0 75 for utxo in utxos.values(): 76 value = int(Decimal(str(utxo[0])) * Decimal(1e8)) 77 if utxo[1] > 0: 78 confirmed_balance += value 79 else: 80 unconfirmed_balance += value 81 return {"confirmed": confirmed_balance, "unconfirmed": 82 unconfirmed_balance} 83 84 def subscribe_address(self, scrhash): 85 if scrhash in self.address_history: 86 self.address_history[scrhash]["subscribed"] = True 87 return True 88 else: 89 return False 90 91 def unsubscribe_all_addresses(self): 92 for scrhash, his in self.address_history.items(): 93 his["subscribed"] = False 94 95 def build_address_history(self, monitored_scriptpubkeys): 96 logger = self.logger 97 logger.info("Building history with " + 98 str(len(monitored_scriptpubkeys)) + " addresses . . .") 99 st = time.time() 100 address_history = {} 101 for spk in monitored_scriptpubkeys: 102 address_history[script_to_scripthash(spk)] = {'history': [], 103 'subscribed': False} 104 wallet_addr_scripthashes = set(address_history.keys()) 105 self.reorganizable_txes = [] 106 #populate history 107 #which is a blockheight-ordered list of ("txhash", height) 108 #unconfirmed transactions go at the end as ("txhash", 0, fee) 109 # 0=unconfirmed -1=unconfirmed with unconfirmed parents 110 111 BATCH_SIZE = 1000 112 ret = list(range(BATCH_SIZE)) 113 t = 0 114 count = 0 115 obtained_txids = set() 116 last_tx = None 117 while len(ret) == BATCH_SIZE: 118 ret = self.rpc.call("listtransactions", ["*", BATCH_SIZE, t, True]) 119 logger.debug("listtransactions skip=" + str(t) + " len(ret)=" 120 + str(len(ret))) 121 if t == 0 and len(ret) > 0: 122 last_tx = ret[-1] 123 t += len(ret) 124 for tx in ret: 125 if "txid" not in tx or "category" not in tx: 126 continue 127 if tx["category"] not in ("receive", "send", "generate", 128 "immature"): 129 continue 130 if tx["confirmations"] < 0: 131 continue #conflicted 132 if tx["txid"] in obtained_txids: 133 continue 134 logger.debug("adding obtained tx=" + str(tx["txid"])) 135 obtained_txids.add(tx["txid"]) 136 137 #obtain all the addresses this transaction is involved with 138 output_scriptpubkeys, input_scriptpubkeys, txd = \ 139 self.get_input_and_output_scriptpubkeys(tx["txid"]) 140 output_scripthashes = [script_to_scripthash(sc) 141 for sc in output_scriptpubkeys] 142 sh_to_add = wallet_addr_scripthashes.intersection(set( 143 output_scripthashes)) 144 input_scripthashes = [script_to_scripthash(sc) 145 for sc in input_scriptpubkeys] 146 sh_to_add |= wallet_addr_scripthashes.intersection(set( 147 input_scripthashes)) 148 if len(sh_to_add) == 0: 149 continue 150 new_history_element = self.generate_new_history_element(tx, txd) 151 if new_history_element == None: 152 continue 153 154 for wal in self.deterministic_wallets: 155 overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit( 156 output_scriptpubkeys) 157 if overrun_depths != None: 158 logger.error("Not enough addresses imported.") 159 logger.error("Delete wallet.dat and increase the value" 160 + " of `initial_import_count` in the file" 161 + " `config.ini` then reimport and rescan") 162 #TODO make it so users dont have to delete wallet.dat 163 # check whether all initial_import_count addresses are 164 # imported rather than just the first one 165 return False 166 for scripthash in sh_to_add: 167 address_history[scripthash][ 168 "history"].append(new_history_element) 169 if tx["confirmations"] > 0 and (tx["confirmations"] < 170 CONFIRMATIONS_SAFE_FROM_REORG): 171 self.reorganizable_txes.append((tx["txid"], tx["blockhash"], 172 new_history_element["height"], sh_to_add)) 173 count += 1 174 175 unconfirmed_txes = defaultdict(list) 176 for scrhash, his in address_history.items(): 177 uctx = self.sort_address_history_list(his) 178 for u in uctx: 179 unconfirmed_txes[u["tx_hash"]].append(scrhash) 180 logger.debug("unconfirmed_txes = " + str(unconfirmed_txes)) 181 logger.debug("reorganizable_txes = " + str(self.reorganizable_txes)) 182 if len(ret) > 0: 183 #txid doesnt uniquely identify transactions from listtransactions 184 #but the tuple (txid, address) does 185 self.last_known_wallet_txid = (last_tx["txid"], 186 last_tx.get("address", None)) 187 else: 188 self.last_known_wallet_txid = None 189 logger.debug("last_known_wallet_txid = " + str( 190 self.last_known_wallet_txid)) 191 192 et = time.time() 193 logger.info("Found " + str(count) + " txes. History built in " 194 + str(et - st) + "sec") 195 self.address_history = address_history 196 self.unconfirmed_txes = unconfirmed_txes 197 return True 198 199 def get_input_and_output_scriptpubkeys(self, txid): 200 gettx = self.rpc.call("gettransaction", [txid]) 201 txd = self.rpc.call("decoderawtransaction", [gettx["hex"]]) 202 output_scriptpubkeys = [out["scriptPubKey"]["hex"] 203 for out in txd["vout"]] 204 input_scriptpubkeys = [] 205 for inn in txd["vin"]: 206 if "coinbase" in inn: 207 break 208 try: 209 wallet_tx = self.rpc.call("gettransaction", [inn["txid"]]) 210 except JsonRpcError: 211 #wallet doesnt know about this tx, so the input isnt ours 212 continue 213 input_decoded = self.rpc.call("decoderawtransaction", [wallet_tx[ 214 "hex"]]) 215 script = input_decoded["vout"][inn["vout"]]["scriptPubKey"]["hex"] 216 input_scriptpubkeys.append(script) 217 return output_scriptpubkeys, input_scriptpubkeys, txd 218 219 def generate_new_history_element(self, tx, txd): 220 logger = self.logger 221 if tx["confirmations"] == 0: 222 try: 223 mempool_tx = self.rpc.call("getmempoolentry", [tx["txid"]]) 224 fee = int(Decimal(str(mempool_tx["fees"]["base"])) 225 * Decimal(1e8)) 226 unconfirmed_input = mempool_tx["ancestorcount"] > 1 227 except JsonRpcError as e: 228 #not in mempool, return None 229 logger.debug("txid in wallet but not in mempool = " 230 + tx["txid"]) 231 return None 232 height = -1 if unconfirmed_input else 0 233 new_history_element = ({"tx_hash": tx["txid"], "height": height, 234 "fee": fee}) 235 else: 236 blockheader = self.rpc.call("getblockheader", [tx['blockhash']]) 237 new_history_element = ({"tx_hash": tx["txid"], 238 "height": blockheader["height"]}) 239 return new_history_element 240 241 def sort_address_history_list(self, his): 242 unconfirm_txes = list(filter(lambda h:h["height"] <= 0, his["history"])) 243 confirm_txes = filter(lambda h:h["height"] > 0, his["history"]) 244 #TODO txes must be "in blockchain order" 245 # the order they appear in the block 246 # it might be "blockindex" in listtransactions and gettransaction 247 #so must sort with key height+':'+blockindex 248 #maybe check if any heights are the same then get the pos only for those 249 #better way to do this is to have a separate dict that isnt in history 250 # which maps txid => blockindex 251 # and then sort by key height+":"+idx[txid] 252 his["history"] = sorted(confirm_txes, key=lambda h:h["height"]) 253 his["history"].extend(unconfirm_txes) 254 return unconfirm_txes 255 256 def check_for_updated_txes(self): 257 logger = self.logger 258 updated_scrhashes1 = self.check_for_new_txes() 259 updated_scrhashes2 = self.check_for_confirmations() 260 updated_scrhashes3 = self.check_for_reorganizations() 261 updated_scrhashes = (updated_scrhashes1 | updated_scrhashes2 262 | updated_scrhashes3) 263 for ush in updated_scrhashes: 264 his = self.address_history[ush] 265 self.sort_address_history_list(his) 266 if len(updated_scrhashes) > 0: 267 logger.debug("unconfirmed txes = " 268 + pprint.pformat(self.unconfirmed_txes)) 269 logger.debug("reorganizable_txes = " 270 + pprint.pformat(self.reorganizable_txes)) 271 logger.debug("updated_scripthashes = " + str(updated_scrhashes)) 272 updated_scrhashes = filter(lambda sh:self.address_history[sh][ 273 "subscribed"], updated_scrhashes) 274 return updated_scrhashes 275 276 def check_for_reorganizations(self): 277 logger = self.logger 278 elements_removed = [] 279 elements_added = [] 280 updated_scrhashes = set() 281 for reorgable_tx in self.reorganizable_txes: 282 txid, blockhash, height, scrhashes = reorgable_tx 283 tx = self.rpc.call("gettransaction", [txid]) 284 if tx["confirmations"] >= CONFIRMATIONS_SAFE_FROM_REORG: 285 elements_removed.append(reorgable_tx) 286 logger.debug("Transaction considered safe from reorg: " + txid) 287 continue 288 if tx["confirmations"] < 1: 289 updated_scrhashes.update(scrhashes) 290 if tx["confirmations"] == 0: 291 #transaction became unconfirmed in a reorg 292 logger.info("A transaction was reorg'd out: " + txid) 293 elements_removed.append(reorgable_tx) 294 self.unconfirmed_txes[txid].extend(scrhashes) 295 296 #don't add orphans back into history 297 if tx["category"] != "orphan": 298 #add to history as unconfirmed 299 txd = self.rpc.call("decoderawtransaction", [tx["hex"]]) 300 new_history_element = self.generate_new_history_element( 301 tx, txd) 302 if new_history_element == None: 303 continue 304 for scrhash in scrhashes: 305 self.address_history[scrhash]["history"].append( 306 new_history_element) 307 308 elif tx["confirmations"] < 0: 309 #tx became conflicted in reorg i.e. a double spend 310 logger.info("A transaction was double spent! " + txid) 311 elements_removed.append(reorgable_tx) 312 elif tx["blockhash"] != blockhash: 313 block = self.rpc.call("getblockheader", [tx["blockhash"]]) 314 if block["height"] == height: #reorg but height is the same 315 logger.debug("A transaction was reorg'd but still " + 316 "confirmed at same height: " + txid) 317 continue 318 #reorged but still confirmed at a different height 319 updated_scrhashes.update(scrhashes) 320 logger.debug("A transaction was reorg'd but still confirmed" 321 + " to a new block and different height: " + txid) 322 #update history with the new height 323 for scrhash in scrhashes: 324 for h in self.address_history[scrhash]["history"]: 325 if h["tx_hash"] == txid: 326 h["height"] = block["height"] 327 #modify the reorgable tx with new hash and height 328 elements_removed.append(reorgable_tx) 329 elements_added.append((txid, tx["blockhash"], block["height"], 330 scrhashes)) 331 continue 332 else: 333 continue #no change to reorgable tx 334 #remove tx from history 335 for scrhash in scrhashes: 336 deleted_entries = [h for h in self.address_history[scrhash][ 337 "history"] if h["tx_hash"] == txid and 338 h["height"] == height] 339 for d_his in deleted_entries: 340 self.address_history[scrhash]["history"].remove(d_his) 341 342 for reorged_tx in elements_removed: 343 self.reorganizable_txes.remove(reorged_tx) 344 self.reorganizable_txes.extend(elements_added) 345 return updated_scrhashes 346 347 def check_for_confirmations(self): 348 logger = self.logger 349 tx_scrhashes_removed_from_mempool = [] 350 for uc_txid, scrhashes in self.unconfirmed_txes.items(): 351 tx = self.rpc.call("gettransaction", [uc_txid]) 352 if tx["confirmations"] == 0: 353 continue #still unconfirmed 354 tx_scrhashes_removed_from_mempool.append((uc_txid, scrhashes)) 355 if tx["confirmations"] > 0: 356 logger.info("A transaction confirmed: " + uc_txid) 357 block = self.rpc.call("getblockheader", [tx["blockhash"]]) 358 elif tx["confirmations"] < 0: 359 logger.warning("A transaction became conflicted: " + uc_txid) 360 for scrhash in scrhashes: 361 #delete the old unconfirmed entry in address_history 362 deleted_entries = [h for h in self.address_history[scrhash][ 363 "history"] if h["tx_hash"] == uc_txid] 364 for d_his in deleted_entries: 365 self.address_history[scrhash]["history"].remove(d_his) 366 if tx["confirmations"] > 0: 367 #create the new confirmed entry in address_history 368 self.address_history[scrhash]["history"].append({"height": 369 block["height"], "tx_hash": uc_txid}) 370 if tx["confirmations"] > 0: 371 self.reorganizable_txes.append((tx["txid"], tx["blockhash"], 372 block["height"], scrhashes)) 373 updated_scrhashes = set() 374 for tx, scrhashes in tx_scrhashes_removed_from_mempool: 375 del self.unconfirmed_txes[tx] 376 updated_scrhashes.update(set(scrhashes)) 377 return updated_scrhashes 378 379 def check_for_new_txes(self): 380 logger = self.logger 381 MAX_TX_REQUEST_COUNT = 256 382 tx_request_count = 2 383 max_attempts = int(math.log(MAX_TX_REQUEST_COUNT, 2)) 384 for i in range(max_attempts): 385 ##how listtransactions works 386 ##skip and count parameters take most-recent txes first 387 ## so skip=0 count=1 will return the most recent tx 388 ##and skip=0 count=3 will return the 3 most recent txes 389 ##but the actual list returned has the REVERSED order 390 ##skip=0 count=3 will return a list with the most recent tx LAST 391 ret = self.rpc.call("listtransactions", ["*", tx_request_count, 0, 392 True]) 393 ret = ret[::-1] 394 if self.last_known_wallet_txid == None: 395 recent_tx_index = len(ret) #=0 means no new txes 396 break 397 else: 398 txid_list = [(tx["txid"], tx.get("address", None)) 399 for tx in ret] 400 recent_tx_index = next((i for i, (txid, addr) 401 in enumerate(txid_list) if 402 txid == self.last_known_wallet_txid[0] and 403 addr == self.last_known_wallet_txid[1]), -1) 404 if recent_tx_index != -1: 405 break 406 tx_request_count *= 2 407 408 #TODO low priority: handle a user getting more than 255 new 409 # transactions in 15 seconds 410 if len(ret) > 0: 411 self.last_known_wallet_txid = (ret[0]["txid"], 412 ret[0].get("address", None)) 413 assert(recent_tx_index != -1) 414 if recent_tx_index == 0: 415 return set() 416 new_txes = ret[:recent_tx_index][::-1] 417 logger.debug("new txes = " + str(new_txes)) 418 obtained_txids = set() 419 updated_scripthashes = [] 420 for tx in new_txes: 421 if "txid" not in tx or "category" not in tx: 422 continue 423 if tx["category"] not in ("receive", "send", "generate", 424 "immature"): 425 continue 426 if tx["confirmations"] < 0: 427 continue #conflicted 428 if tx["txid"] in obtained_txids: 429 continue 430 obtained_txids.add(tx["txid"]) 431 output_scriptpubkeys, input_scriptpubkeys, txd = \ 432 self.get_input_and_output_scriptpubkeys(tx["txid"]) 433 matching_scripthashes = [] 434 for spk in (output_scriptpubkeys + input_scriptpubkeys): 435 scripthash = script_to_scripthash(spk) 436 if scripthash in self.address_history: 437 matching_scripthashes.append(scripthash) 438 if len(matching_scripthashes) == 0: 439 continue 440 new_history_element = self.generate_new_history_element(tx, txd) 441 if new_history_element == None: 442 continue 443 444 for wal in self.deterministic_wallets: 445 overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit( 446 output_scriptpubkeys) 447 if overrun_depths != None: 448 for change, import_count in overrun_depths.items(): 449 new_addrs, spks = wal.get_new_addresses(change, 450 import_count) 451 for spk in spks: 452 self.address_history[script_to_scripthash( 453 spk)] = {'history': [], 'subscribed': False} 454 logger.debug("importing " + str(len(spks)) + 455 " into change=" + str(change)) 456 import_addresses(self.rpc, new_addrs, [], -1, 0, logger) 457 458 updated_scripthashes.extend(matching_scripthashes) 459 logger.info("Found new tx: " + str(new_history_element)) 460 for scrhash in matching_scripthashes: 461 self.address_history[scrhash]["history"].append( 462 new_history_element) 463 if new_history_element["height"] <= 0: 464 self.unconfirmed_txes[tx["txid"]].append(scrhash) 465 if tx["confirmations"] > 0: 466 self.reorganizable_txes.append((tx["txid"], tx["blockhash"], 467 new_history_element["height"], matching_scripthashes)) 468 return set(updated_scripthashes) 469