electrum-personal-server

Maximally lightweight electrum server for a single user
git clone https://git.parazyd.org/electrum-personal-server
Log | Files | Refs | README

transactionmonitor.py (21724B)


      1 
      2 import time
      3 import pprint
      4 import math
      5 import sys
      6 import logging
      7 from decimal import Decimal
      8 from collections import defaultdict
      9 
     10 from electrumpersonalserver.server.jsonrpc import JsonRpcError
     11 from electrumpersonalserver.server.hashes import (
     12     get_status_electrum,
     13     script_to_scripthash,
     14     script_to_address
     15 )
     16 from electrumpersonalserver.server.deterministicwallet import import_addresses
     17 
     18 #internally this code uses scriptPubKeys, it only converts to bitcoin addresses
     19 # when importing to bitcoind or checking whether enough addresses have been
     20 # imported
     21 #the electrum protocol uses sha256(scriptpubkey) as a key for lookups
     22 # this code calls them scripthashes
     23 
     24 #when a transaction happens paying to an address from a deterministic wallet
     25 # lookup the position of that address, if its less than gap_limit then
     26 # import more addresses
     27 
     28 CONFIRMATIONS_SAFE_FROM_REORG = 100
     29 
     30 class TransactionMonitor(object):
     31     """
     32     Class which monitors the bitcoind wallet for new transactions
     33     and builds a history datastructure for sending to electrum
     34     """
     35     def __init__(self, rpc, deterministic_wallets, logger=None):
     36         self.rpc = rpc
     37         self.deterministic_wallets = deterministic_wallets
     38         self.last_known_wallet_txid = None
     39         self.address_history = None
     40         self.unconfirmed_txes = None
     41         self.reorganizable_txes = None
     42         self.logger = (logger if logger else
     43             logging.getLogger('ELECTRUMPERSONALSERVER'))
     44 
     45     def get_electrum_history_hash(self, scrhash):
     46         return get_status_electrum( [(h["tx_hash"], h["height"])
     47             for h in self.address_history[scrhash]["history"]] )
     48 
     49     def get_electrum_history(self, scrhash):
     50         if scrhash in self.address_history:
     51             return self.address_history[scrhash]["history"]
     52         else:
     53             return None
     54 
     55     def get_address_balance(self, scrhash):
     56         history = self.get_electrum_history(scrhash)
     57         if history == None:
     58             return None
     59         utxos = {}
     60         for tx_info in history:
     61             tx = self.rpc.call("gettransaction", [tx_info["tx_hash"]])
     62             txd = self.rpc.call("decoderawtransaction", [tx["hex"]])
     63             for index, output in enumerate(txd["vout"]):
     64                 if script_to_scripthash(output["scriptPubKey"]["hex"]
     65                     ) != scrhash:
     66                     continue
     67                 utxos[txd["txid"] + ":" + str(index)] = (output["value"],
     68                     tx["confirmations"])
     69             for inputt in txd["vin"]:
     70                 outpoint = inputt["txid"] + ":" + str(inputt["vout"])
     71                 if outpoint in utxos:
     72                     del utxos[outpoint]
     73         confirmed_balance = 0
     74         unconfirmed_balance = 0
     75         for utxo in utxos.values():
     76             value = int(Decimal(str(utxo[0])) * Decimal(1e8))
     77             if utxo[1] > 0:
     78                 confirmed_balance += value
     79             else:
     80                 unconfirmed_balance += value
     81         return {"confirmed": confirmed_balance, "unconfirmed":
     82             unconfirmed_balance}
     83 
     84     def subscribe_address(self, scrhash):
     85         if scrhash in self.address_history:
     86             self.address_history[scrhash]["subscribed"] = True
     87             return True
     88         else:
     89             return False
     90 
     91     def unsubscribe_all_addresses(self):
     92         for scrhash, his in self.address_history.items():
     93             his["subscribed"] = False
     94 
     95     def build_address_history(self, monitored_scriptpubkeys):
     96         logger = self.logger
     97         logger.info("Building history with " +
     98             str(len(monitored_scriptpubkeys)) + " addresses . . .")
     99         st = time.time()
    100         address_history = {}
    101         for spk in monitored_scriptpubkeys:
    102             address_history[script_to_scripthash(spk)] = {'history': [],
    103                 'subscribed': False}
    104         wallet_addr_scripthashes = set(address_history.keys())
    105         self.reorganizable_txes = []
    106         #populate history
    107         #which is a blockheight-ordered list of ("txhash", height)
    108         #unconfirmed transactions go at the end as ("txhash", 0, fee)
    109         # 0=unconfirmed -1=unconfirmed with unconfirmed parents
    110 
    111         BATCH_SIZE = 1000
    112         ret = list(range(BATCH_SIZE))
    113         t = 0
    114         count = 0
    115         obtained_txids = set()
    116         last_tx = None
    117         while len(ret) == BATCH_SIZE:
    118             ret = self.rpc.call("listtransactions", ["*", BATCH_SIZE, t, True])
    119             logger.debug("listtransactions skip=" + str(t) + " len(ret)="
    120                 + str(len(ret)))
    121             if t == 0 and len(ret) > 0:
    122                 last_tx = ret[-1]
    123             t += len(ret)
    124             for tx in ret:
    125                 if "txid" not in tx or "category" not in tx:
    126                     continue
    127                 if tx["category"] not in ("receive", "send", "generate",
    128                         "immature"):
    129                     continue
    130                 if tx["confirmations"] < 0:
    131                     continue #conflicted
    132                 if tx["txid"] in obtained_txids:
    133                     continue
    134                 logger.debug("adding obtained tx=" + str(tx["txid"]))
    135                 obtained_txids.add(tx["txid"])
    136 
    137                 #obtain all the addresses this transaction is involved with
    138                 output_scriptpubkeys, input_scriptpubkeys, txd = \
    139                     self.get_input_and_output_scriptpubkeys(tx["txid"])
    140                 output_scripthashes = [script_to_scripthash(sc)
    141                     for sc in output_scriptpubkeys]
    142                 sh_to_add = wallet_addr_scripthashes.intersection(set(
    143                     output_scripthashes))
    144                 input_scripthashes = [script_to_scripthash(sc)
    145                     for sc in input_scriptpubkeys]
    146                 sh_to_add |= wallet_addr_scripthashes.intersection(set(
    147                     input_scripthashes))
    148                 if len(sh_to_add) == 0:
    149                     continue
    150                 new_history_element = self.generate_new_history_element(tx, txd)
    151                 if new_history_element == None:
    152                     continue
    153 
    154                 for wal in self.deterministic_wallets:
    155                     overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit(
    156                         output_scriptpubkeys)
    157                     if overrun_depths != None:
    158                         logger.error("Not enough addresses imported.")
    159                         logger.error("Delete wallet.dat and increase the value"
    160                             + " of `initial_import_count` in the file"
    161                             + " `config.ini` then reimport and rescan")
    162                         #TODO make it so users dont have to delete wallet.dat
    163                         # check whether all initial_import_count addresses are
    164                         # imported rather than just the first one
    165                         return False
    166                 for scripthash in sh_to_add:
    167                     address_history[scripthash][
    168                         "history"].append(new_history_element)
    169                 if tx["confirmations"] > 0 and (tx["confirmations"] <
    170                         CONFIRMATIONS_SAFE_FROM_REORG):
    171                     self.reorganizable_txes.append((tx["txid"], tx["blockhash"],
    172                         new_history_element["height"], sh_to_add))
    173                 count += 1
    174 
    175         unconfirmed_txes = defaultdict(list)
    176         for scrhash, his in address_history.items():
    177             uctx = self.sort_address_history_list(his)
    178             for u in uctx:
    179                 unconfirmed_txes[u["tx_hash"]].append(scrhash)
    180         logger.debug("unconfirmed_txes = " + str(unconfirmed_txes))
    181         logger.debug("reorganizable_txes = " + str(self.reorganizable_txes))
    182         if len(ret) > 0:
    183             #txid doesnt uniquely identify transactions from listtransactions
    184             #but the tuple (txid, address) does
    185             self.last_known_wallet_txid = (last_tx["txid"],
    186                 last_tx.get("address", None))
    187         else:
    188             self.last_known_wallet_txid = None
    189         logger.debug("last_known_wallet_txid = " + str(
    190             self.last_known_wallet_txid))
    191 
    192         et = time.time()
    193         logger.info("Found " + str(count) + " txes. History built in "
    194             + str(et - st) + "sec")
    195         self.address_history = address_history
    196         self.unconfirmed_txes = unconfirmed_txes
    197         return True
    198 
    199     def get_input_and_output_scriptpubkeys(self, txid):
    200         gettx = self.rpc.call("gettransaction", [txid])
    201         txd = self.rpc.call("decoderawtransaction", [gettx["hex"]])
    202         output_scriptpubkeys = [out["scriptPubKey"]["hex"]
    203             for out in txd["vout"]]
    204         input_scriptpubkeys = []
    205         for inn in txd["vin"]:
    206             if "coinbase" in inn:
    207                 break
    208             try:
    209                 wallet_tx = self.rpc.call("gettransaction", [inn["txid"]])
    210             except JsonRpcError:
    211                 #wallet doesnt know about this tx, so the input isnt ours
    212                 continue
    213             input_decoded = self.rpc.call("decoderawtransaction", [wallet_tx[
    214                 "hex"]])
    215             script = input_decoded["vout"][inn["vout"]]["scriptPubKey"]["hex"]
    216             input_scriptpubkeys.append(script)
    217         return output_scriptpubkeys, input_scriptpubkeys, txd
    218 
    219     def generate_new_history_element(self, tx, txd):
    220         logger = self.logger
    221         if tx["confirmations"] == 0:
    222             try:
    223                 mempool_tx = self.rpc.call("getmempoolentry", [tx["txid"]])
    224                 fee = int(Decimal(str(mempool_tx["fees"]["base"]))
    225                     * Decimal(1e8))
    226                 unconfirmed_input = mempool_tx["ancestorcount"] > 1
    227             except JsonRpcError as e:
    228                 #not in mempool, return None
    229                 logger.debug("txid in wallet but not in mempool = "
    230                     + tx["txid"])
    231                 return None
    232             height = -1 if unconfirmed_input else 0
    233             new_history_element = ({"tx_hash": tx["txid"], "height": height,
    234                 "fee": fee})
    235         else:
    236             blockheader = self.rpc.call("getblockheader", [tx['blockhash']])
    237             new_history_element = ({"tx_hash": tx["txid"],
    238                 "height": blockheader["height"]})
    239         return new_history_element
    240 
    241     def sort_address_history_list(self, his):
    242         unconfirm_txes = list(filter(lambda h:h["height"] <= 0, his["history"]))
    243         confirm_txes = filter(lambda h:h["height"] > 0, his["history"])
    244         #TODO txes must be "in blockchain order"
    245         # the order they appear in the block
    246         # it might be "blockindex" in listtransactions and gettransaction
    247         #so must sort with key height+':'+blockindex
    248         #maybe check if any heights are the same then get the pos only for those
    249         #better way to do this is to have a separate dict that isnt in history
    250         # which maps txid => blockindex
    251         # and then sort by key height+":"+idx[txid]
    252         his["history"] = sorted(confirm_txes, key=lambda h:h["height"])
    253         his["history"].extend(unconfirm_txes)
    254         return unconfirm_txes
    255 
    256     def check_for_updated_txes(self):
    257         logger = self.logger
    258         updated_scrhashes1 = self.check_for_new_txes()
    259         updated_scrhashes2 = self.check_for_confirmations()
    260         updated_scrhashes3 = self.check_for_reorganizations()
    261         updated_scrhashes = (updated_scrhashes1 | updated_scrhashes2
    262             | updated_scrhashes3)
    263         for ush in updated_scrhashes:
    264             his = self.address_history[ush]
    265             self.sort_address_history_list(his)
    266         if len(updated_scrhashes) > 0:
    267             logger.debug("unconfirmed txes = "
    268                 + pprint.pformat(self.unconfirmed_txes))
    269             logger.debug("reorganizable_txes = "
    270                 + pprint.pformat(self.reorganizable_txes))
    271             logger.debug("updated_scripthashes = " + str(updated_scrhashes))
    272         updated_scrhashes = filter(lambda sh:self.address_history[sh][
    273             "subscribed"], updated_scrhashes)
    274         return updated_scrhashes
    275 
    276     def check_for_reorganizations(self):
    277         logger = self.logger
    278         elements_removed = []
    279         elements_added = []
    280         updated_scrhashes = set()
    281         for reorgable_tx in self.reorganizable_txes:
    282             txid, blockhash, height, scrhashes = reorgable_tx
    283             tx = self.rpc.call("gettransaction", [txid])
    284             if tx["confirmations"] >= CONFIRMATIONS_SAFE_FROM_REORG:
    285                 elements_removed.append(reorgable_tx)
    286                 logger.debug("Transaction considered safe from reorg: " + txid)
    287                 continue
    288             if tx["confirmations"] < 1:
    289                 updated_scrhashes.update(scrhashes)
    290                 if tx["confirmations"] == 0:
    291                     #transaction became unconfirmed in a reorg
    292                     logger.info("A transaction was reorg'd out: " + txid)
    293                     elements_removed.append(reorgable_tx)
    294                     self.unconfirmed_txes[txid].extend(scrhashes)
    295 
    296                     #don't add orphans back into history
    297                     if tx["category"] != "orphan":
    298                         #add to history as unconfirmed
    299                         txd = self.rpc.call("decoderawtransaction", [tx["hex"]])
    300                         new_history_element = self.generate_new_history_element(
    301                             tx, txd)
    302                         if new_history_element == None:
    303                             continue
    304                         for scrhash in scrhashes:
    305                             self.address_history[scrhash]["history"].append(
    306                                 new_history_element)
    307 
    308                 elif tx["confirmations"] < 0:
    309                     #tx became conflicted in reorg i.e. a double spend
    310                     logger.info("A transaction was double spent! " + txid)
    311                     elements_removed.append(reorgable_tx)
    312             elif tx["blockhash"] != blockhash:
    313                 block = self.rpc.call("getblockheader", [tx["blockhash"]])
    314                 if block["height"] == height: #reorg but height is the same
    315                     logger.debug("A transaction was reorg'd but still " +
    316                         "confirmed at same height: " + txid)
    317                     continue
    318                 #reorged but still confirmed at a different height
    319                 updated_scrhashes.update(scrhashes)
    320                 logger.debug("A transaction was reorg'd but still confirmed"
    321                     + " to a new block and different height: " + txid)
    322                 #update history with the new height
    323                 for scrhash in scrhashes:
    324                     for h in self.address_history[scrhash]["history"]:
    325                         if h["tx_hash"] == txid:
    326                             h["height"] = block["height"]
    327                 #modify the reorgable tx with new hash and height
    328                 elements_removed.append(reorgable_tx)
    329                 elements_added.append((txid, tx["blockhash"], block["height"],
    330                     scrhashes))
    331                 continue
    332             else:
    333                 continue #no change to reorgable tx
    334             #remove tx from history
    335             for scrhash in scrhashes:
    336                 deleted_entries = [h for h in self.address_history[scrhash][
    337                     "history"] if h["tx_hash"] == txid and
    338                     h["height"] == height]
    339                 for d_his in deleted_entries:
    340                     self.address_history[scrhash]["history"].remove(d_his)
    341 
    342         for reorged_tx in elements_removed:
    343             self.reorganizable_txes.remove(reorged_tx)
    344         self.reorganizable_txes.extend(elements_added)
    345         return updated_scrhashes
    346 
    347     def check_for_confirmations(self):
    348         logger = self.logger
    349         tx_scrhashes_removed_from_mempool = []
    350         for uc_txid, scrhashes in self.unconfirmed_txes.items():
    351             tx = self.rpc.call("gettransaction", [uc_txid])
    352             if tx["confirmations"] == 0:
    353                 continue #still unconfirmed
    354             tx_scrhashes_removed_from_mempool.append((uc_txid, scrhashes))
    355             if tx["confirmations"] > 0:
    356                 logger.info("A transaction confirmed: " + uc_txid)
    357                 block = self.rpc.call("getblockheader", [tx["blockhash"]])
    358             elif tx["confirmations"] < 0:
    359                 logger.warning("A transaction became conflicted: " + uc_txid)
    360             for scrhash in scrhashes:
    361                 #delete the old unconfirmed entry in address_history
    362                 deleted_entries = [h for h in self.address_history[scrhash][
    363                     "history"] if h["tx_hash"] == uc_txid]
    364                 for d_his in deleted_entries:
    365                     self.address_history[scrhash]["history"].remove(d_his)
    366                 if tx["confirmations"] > 0:
    367                     #create the new confirmed entry in address_history
    368                     self.address_history[scrhash]["history"].append({"height":
    369                         block["height"], "tx_hash": uc_txid})
    370             if tx["confirmations"] > 0:
    371                 self.reorganizable_txes.append((tx["txid"], tx["blockhash"],
    372                     block["height"], scrhashes))
    373         updated_scrhashes = set()
    374         for tx, scrhashes in tx_scrhashes_removed_from_mempool:
    375             del self.unconfirmed_txes[tx]
    376             updated_scrhashes.update(set(scrhashes))
    377         return updated_scrhashes
    378 
    379     def check_for_new_txes(self):
    380         logger = self.logger
    381         MAX_TX_REQUEST_COUNT = 256 
    382         tx_request_count = 2
    383         max_attempts = int(math.log(MAX_TX_REQUEST_COUNT, 2))
    384         for i in range(max_attempts):
    385             ##how listtransactions works
    386             ##skip and count parameters take most-recent txes first
    387             ## so skip=0 count=1 will return the most recent tx
    388             ##and skip=0 count=3 will return the 3 most recent txes
    389             ##but the actual list returned has the REVERSED order
    390             ##skip=0 count=3 will return a list with the most recent tx LAST
    391             ret = self.rpc.call("listtransactions", ["*", tx_request_count, 0,
    392                 True])
    393             ret = ret[::-1]
    394             if self.last_known_wallet_txid == None:
    395                 recent_tx_index = len(ret) #=0 means no new txes
    396                 break
    397             else:
    398                 txid_list = [(tx["txid"], tx.get("address", None))
    399                     for tx in ret]
    400                 recent_tx_index = next((i for i, (txid, addr)
    401                     in enumerate(txid_list) if
    402                     txid == self.last_known_wallet_txid[0] and
    403                     addr == self.last_known_wallet_txid[1]), -1)
    404                 if recent_tx_index != -1:
    405                     break
    406                 tx_request_count *= 2
    407 
    408         #TODO low priority: handle a user getting more than 255 new
    409         # transactions in 15 seconds
    410         if len(ret) > 0:
    411             self.last_known_wallet_txid = (ret[0]["txid"],
    412                 ret[0].get("address", None))
    413         assert(recent_tx_index != -1)
    414         if recent_tx_index == 0:
    415             return set()
    416         new_txes = ret[:recent_tx_index][::-1]
    417         logger.debug("new txes = " + str(new_txes))
    418         obtained_txids = set()
    419         updated_scripthashes = []
    420         for tx in new_txes:
    421             if "txid" not in tx or "category" not in tx:
    422                 continue
    423             if tx["category"] not in ("receive", "send", "generate",
    424                     "immature"):
    425                 continue
    426             if tx["confirmations"] < 0:
    427                 continue #conflicted
    428             if tx["txid"] in obtained_txids:
    429                 continue
    430             obtained_txids.add(tx["txid"])
    431             output_scriptpubkeys, input_scriptpubkeys, txd = \
    432                 self.get_input_and_output_scriptpubkeys(tx["txid"])
    433             matching_scripthashes = []
    434             for spk in (output_scriptpubkeys + input_scriptpubkeys):
    435                 scripthash = script_to_scripthash(spk)
    436                 if scripthash in self.address_history:
    437                     matching_scripthashes.append(scripthash)
    438             if len(matching_scripthashes) == 0:
    439                 continue
    440             new_history_element = self.generate_new_history_element(tx, txd)
    441             if new_history_element == None:
    442                 continue
    443 
    444             for wal in self.deterministic_wallets:
    445                 overrun_depths = wal.have_scriptpubkeys_overrun_gaplimit(
    446                     output_scriptpubkeys)
    447                 if overrun_depths != None:
    448                     for change, import_count in overrun_depths.items():
    449                         new_addrs, spks = wal.get_new_addresses(change,
    450                             import_count)
    451                         for spk in spks:
    452                             self.address_history[script_to_scripthash(
    453                                 spk)] =  {'history': [], 'subscribed': False}
    454                         logger.debug("importing " + str(len(spks)) +
    455                             " into change=" + str(change))
    456                         import_addresses(self.rpc, new_addrs, [], -1, 0, logger)
    457 
    458             updated_scripthashes.extend(matching_scripthashes)
    459             logger.info("Found new tx: " + str(new_history_element))
    460             for scrhash in matching_scripthashes:
    461                 self.address_history[scrhash]["history"].append(
    462                     new_history_element)
    463                 if new_history_element["height"] <= 0:
    464                     self.unconfirmed_txes[tx["txid"]].append(scrhash)
    465             if tx["confirmations"] > 0:
    466                 self.reorganizable_txes.append((tx["txid"], tx["blockhash"],
    467                     new_history_element["height"], matching_scripthashes))
    468         return set(updated_scripthashes)
    469