electrum-personal-server

Maximally lightweight electrum server for a single user
git clone https://git.parazyd.org/electrum-personal-server
Log | Files | Refs | README

merkleproof.py (5390B)


      1 
      2 import electrumpersonalserver.bitcoin as btc
      3 import binascii
      4 from math import ceil, log
      5 
      6 from electrumpersonalserver.server.hashes import Hash, hash_encode, hash_decode
      7 
      8 #lots of ideas and code taken from bitcoin core and breadwallet
      9 #https://github.com/bitcoin/bitcoin/blob/master/src/merkleblock.h
     10 #https://github.com/breadwallet/breadwallet-core/blob/master/BRMerkleBlock.c
     11 
     12 def calc_tree_width(height, txcount):
     13     """Efficently calculates the number of nodes at given merkle tree height"""
     14     return (txcount + (1 << height) - 1) >> height
     15 
     16 def decend_merkle_tree(hashes, flags, height, txcount, pos):
     17     """Function recursively follows the flags bitstring down into the
     18        tree, building up a tree in memory"""
     19     flag = next(flags)
     20     if height > 0:
     21         #non-txid node
     22         if flag:
     23             left = decend_merkle_tree(hashes, flags, height-1, txcount, pos*2)
     24             #bitcoin's merkle tree format has a rule that if theres an
     25             # odd number of nodes in then the tree, the last hash is duplicated
     26             #in the electrum format we must hash together the duplicate
     27             # tree branch
     28             if pos*2+1 < calc_tree_width(height-1, txcount):
     29                 right = decend_merkle_tree(hashes, flags, height-1,
     30                     txcount, pos*2+1)
     31             else:
     32                 if isinstance(left, tuple):
     33                     right = expand_tree_hashing(left)
     34                 else:
     35                     right = left
     36             return (left, right)
     37         else:
     38             hs = next(hashes)
     39             return hs
     40     else:
     41         #txid node
     42         hs = next(hashes)
     43         if flag:
     44             #for the actual transaction, also store its position with a flag 
     45             return "tx:" + str(pos) + ":" + hs
     46         else:
     47             return hs
     48 
     49 def deserialize_core_format_merkle_proof(hash_list, flag_value, txcount):
     50     """Converts core's format for a merkle proof into a tree in memory"""
     51     tree_depth = int(ceil(log(txcount, 2)))
     52     hashes = iter(hash_list)
     53     #one-liner which converts the flags value to a list of True/False bits
     54     flags = (flag_value[i//8]&1 << i%8 != 0 for i in range(len(flag_value)*8))
     55     try:
     56         root_node = decend_merkle_tree(hashes, flags, tree_depth, txcount, 0)
     57         return root_node
     58     except StopIteration:
     59         raise ValueError
     60 
     61 def expand_tree_electrum_format_merkle_proof(node, result):
     62     """Recurse down into the tree, adding hashes to the result list
     63        in depth order"""
     64     left, right = node
     65     if isinstance(left, tuple):
     66         expand_tree_electrum_format_merkle_proof(left, result)
     67     if isinstance(right, tuple):
     68         expand_tree_electrum_format_merkle_proof(right, result)
     69     if not isinstance(left, tuple):
     70         result.append(left)
     71     if not isinstance(right, tuple):
     72         result.append(right)
     73 
     74 def get_node_hash(node):
     75     if node.startswith("tx"):
     76         return node.split(":")[2]
     77     else:
     78         return node
     79 
     80 def expand_tree_hashing(node):
     81     """Recurse down into the tree, hashing everything and
     82        returning root hash"""
     83     left, right = node
     84     if isinstance(left, tuple):
     85         hash_left = expand_tree_hashing(left)
     86     else:
     87         hash_left = get_node_hash(left)
     88     if isinstance(right, tuple):
     89         hash_right = expand_tree_hashing(right)
     90     else:
     91         hash_right = get_node_hash(right)
     92     return hash_encode(Hash(hash_decode(hash_left) + hash_decode(hash_right)))
     93 
     94 def convert_core_to_electrum_merkle_proof(proof):
     95     """Bitcoin Core and Electrum use different formats for merkle
     96        proof, this function converts from Core's format to Electrum's format"""
     97     proof = binascii.unhexlify(proof)
     98     pos = [0]
     99     def read_as_int(bytez):
    100         pos[0] += bytez
    101         return btc.decode(proof[pos[0] - bytez:pos[0]][::-1], 256)
    102     def read_var_int():
    103         pos[0] += 1
    104         val = btc.from_byte_to_int(proof[pos[0] - 1])
    105         if val < 253:
    106             return val
    107         return read_as_int(pow(2, val - 252))
    108     def read_bytes(bytez):
    109         pos[0] += bytez
    110         return proof[pos[0] - bytez:pos[0]]
    111 
    112     merkle_root = proof[36:36+32]
    113     pos[0] = 80
    114     txcount = read_as_int(4)
    115     hash_count = read_var_int()
    116     hashes = [hash_encode(read_bytes(32)) for i in range(hash_count)]
    117     flags_count = read_var_int()
    118     flags = read_bytes(flags_count)
    119 
    120     root_node = deserialize_core_format_merkle_proof(hashes, flags, txcount)
    121     #check special case of a tree of zero height, block with only coinbase tx
    122     if not isinstance(root_node, tuple):
    123         root_node = root_node[5:] #remove the "tx:0:"
    124         result = {"pos": 0, "merkle": [], "txid": root_node,
    125             "merkleroot": hash_encode(merkle_root)}
    126         return result
    127 
    128     hashes_list = []
    129     expand_tree_electrum_format_merkle_proof(root_node, hashes_list)
    130     #remove the first or second element which is the txhash
    131     tx = hashes_list[0]
    132     if hashes_list[1].startswith("tx"):
    133         tx = hashes_list[1]
    134     assert(tx.startswith("tx"))
    135     hashes_list.remove(tx)
    136     #if the txhash was duplicated, that _is_ included in electrum's format
    137     if hashes_list[0].startswith("tx"):
    138         hashes_list[0] = tx.split(":")[2]
    139     tx_pos, txid = tx.split(":")[1:3]
    140     tx_pos = int(tx_pos)
    141     result = {"pos": tx_pos, "merkle": hashes_list, "txid": txid,
    142         "merkleroot": hash_encode(merkle_root)}
    143     return result
    144