electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

transaction.py (87885B)


      1 #!/usr/bin/env python
      2 #
      3 # Electrum - lightweight Bitcoin client
      4 # Copyright (C) 2011 Thomas Voegtlin
      5 #
      6 # Permission is hereby granted, free of charge, to any person
      7 # obtaining a copy of this software and associated documentation files
      8 # (the "Software"), to deal in the Software without restriction,
      9 # including without limitation the rights to use, copy, modify, merge,
     10 # publish, distribute, sublicense, and/or sell copies of the Software,
     11 # and to permit persons to whom the Software is furnished to do so,
     12 # subject to the following conditions:
     13 #
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 #
     17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     24 # SOFTWARE.
     25 
     26 
     27 
     28 # Note: The deserialization code originally comes from ABE.
     29 
     30 import struct
     31 import traceback
     32 import sys
     33 import io
     34 import base64
     35 from typing import (Sequence, Union, NamedTuple, Tuple, Optional, Iterable,
     36                     Callable, List, Dict, Set, TYPE_CHECKING)
     37 from collections import defaultdict
     38 from enum import IntEnum
     39 import itertools
     40 import binascii
     41 import copy
     42 
     43 from . import ecc, bitcoin, constants, segwit_addr, bip32
     44 from .bip32 import BIP32Node
     45 from .util import profiler, to_bytes, bh2u, bfh, chunks, is_hex_str
     46 from .bitcoin import (TYPE_ADDRESS, TYPE_SCRIPT, hash_160,
     47                       hash160_to_p2sh, hash160_to_p2pkh, hash_to_segwit_addr,
     48                       var_int, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC, COIN,
     49                       int_to_hex, push_script, b58_address_to_hash160,
     50                       opcodes, add_number_to_script, base_decode, is_segwit_script_type,
     51                       base_encode, construct_witness, construct_script)
     52 from .crypto import sha256d
     53 from .logging import get_logger
     54 
     55 if TYPE_CHECKING:
     56     from .wallet import Abstract_Wallet
     57 
     58 
     59 _logger = get_logger(__name__)
     60 DEBUG_PSBT_PARSING = False
     61 
     62 
     63 class SerializationError(Exception):
     64     """ Thrown when there's a problem deserializing or serializing """
     65 
     66 
     67 class UnknownTxinType(Exception):
     68     pass
     69 
     70 
     71 class BadHeaderMagic(SerializationError):
     72     pass
     73 
     74 
     75 class UnexpectedEndOfStream(SerializationError):
     76     pass
     77 
     78 
     79 class PSBTInputConsistencyFailure(SerializationError):
     80     pass
     81 
     82 
     83 class MalformedBitcoinScript(Exception):
     84     pass
     85 
     86 
     87 class MissingTxInputAmount(Exception):
     88     pass
     89 
     90 
     91 SIGHASH_ALL = 1
     92 
     93 
     94 class TxOutput:
     95     scriptpubkey: bytes
     96     value: Union[int, str]
     97 
     98     def __init__(self, *, scriptpubkey: bytes, value: Union[int, str]):
     99         self.scriptpubkey = scriptpubkey
    100         self.value = value  # str when the output is set to max: '!'  # in satoshis
    101 
    102     @classmethod
    103     def from_address_and_value(cls, address: str, value: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
    104         return cls(scriptpubkey=bfh(bitcoin.address_to_script(address)),
    105                    value=value)
    106 
    107     def serialize_to_network(self) -> bytes:
    108         buf = int.to_bytes(self.value, 8, byteorder="little", signed=False)
    109         script = self.scriptpubkey
    110         buf += bfh(var_int(len(script.hex()) // 2))
    111         buf += script
    112         return buf
    113 
    114     @classmethod
    115     def from_network_bytes(cls, raw: bytes) -> 'TxOutput':
    116         vds = BCDataStream()
    117         vds.write(raw)
    118         txout = parse_output(vds)
    119         if vds.can_read_more():
    120             raise SerializationError('extra junk at the end of TxOutput bytes')
    121         return txout
    122 
    123     def to_legacy_tuple(self) -> Tuple[int, str, Union[int, str]]:
    124         if self.address:
    125             return TYPE_ADDRESS, self.address, self.value
    126         return TYPE_SCRIPT, self.scriptpubkey.hex(), self.value
    127 
    128     @classmethod
    129     def from_legacy_tuple(cls, _type: int, addr: str, val: Union[int, str]) -> Union['TxOutput', 'PartialTxOutput']:
    130         if _type == TYPE_ADDRESS:
    131             return cls.from_address_and_value(addr, val)
    132         if _type == TYPE_SCRIPT:
    133             return cls(scriptpubkey=bfh(addr), value=val)
    134         raise Exception(f"unexptected legacy address type: {_type}")
    135 
    136     @property
    137     def address(self) -> Optional[str]:
    138         return get_address_from_output_script(self.scriptpubkey)  # TODO cache this?
    139 
    140     def get_ui_address_str(self) -> str:
    141         addr = self.address
    142         if addr is not None:
    143             return addr
    144         return f"SCRIPT {self.scriptpubkey.hex()}"
    145 
    146     def __repr__(self):
    147         return f"<TxOutput script={self.scriptpubkey.hex()} address={self.address} value={self.value}>"
    148 
    149     def __eq__(self, other):
    150         if not isinstance(other, TxOutput):
    151             return False
    152         return self.scriptpubkey == other.scriptpubkey and self.value == other.value
    153 
    154     def __ne__(self, other):
    155         return not (self == other)
    156 
    157     def to_json(self):
    158         d = {
    159             'scriptpubkey': self.scriptpubkey.hex(),
    160             'address': self.address,
    161             'value_sats': self.value,
    162         }
    163         return d
    164 
    165 
    166 class BIP143SharedTxDigestFields(NamedTuple):
    167     hashPrevouts: str
    168     hashSequence: str
    169     hashOutputs: str
    170 
    171 
    172 class TxOutpoint(NamedTuple):
    173     txid: bytes  # endianness same as hex string displayed; reverse of tx serialization order
    174     out_idx: int
    175 
    176     @classmethod
    177     def from_str(cls, s: str) -> 'TxOutpoint':
    178         hash_str, idx_str = s.split(':')
    179         assert len(hash_str) == 64, f"{hash_str} should be a sha256 hash"
    180         return TxOutpoint(txid=bfh(hash_str),
    181                           out_idx=int(idx_str))
    182 
    183     def to_str(self) -> str:
    184         return f"{self.txid.hex()}:{self.out_idx}"
    185 
    186     def to_json(self):
    187         return [self.txid.hex(), self.out_idx]
    188 
    189     def serialize_to_network(self) -> bytes:
    190         return self.txid[::-1] + bfh(int_to_hex(self.out_idx, 4))
    191 
    192     def is_coinbase(self) -> bool:
    193         return self.txid == bytes(32)
    194 
    195 
    196 class TxInput:
    197     prevout: TxOutpoint
    198     script_sig: Optional[bytes]
    199     nsequence: int
    200     witness: Optional[bytes]
    201     _is_coinbase_output: bool
    202 
    203     def __init__(self, *,
    204                  prevout: TxOutpoint,
    205                  script_sig: bytes = None,
    206                  nsequence: int = 0xffffffff - 1,
    207                  witness: bytes = None,
    208                  is_coinbase_output: bool = False):
    209         self.prevout = prevout
    210         self.script_sig = script_sig
    211         self.nsequence = nsequence
    212         self.witness = witness
    213         self._is_coinbase_output = is_coinbase_output
    214 
    215     def is_coinbase_input(self) -> bool:
    216         """Whether this is the input of a coinbase tx."""
    217         return self.prevout.is_coinbase()
    218 
    219     def is_coinbase_output(self) -> bool:
    220         """Whether the coin being spent is an output of a coinbase tx.
    221         This matters for coin maturity.
    222         """
    223         return self._is_coinbase_output
    224 
    225     def value_sats(self) -> Optional[int]:
    226         return None
    227 
    228     def to_json(self):
    229         d = {
    230             'prevout_hash': self.prevout.txid.hex(),
    231             'prevout_n': self.prevout.out_idx,
    232             'coinbase': self.is_coinbase_output(),
    233             'nsequence': self.nsequence,
    234         }
    235         if self.script_sig is not None:
    236             d['scriptSig'] = self.script_sig.hex()
    237         if self.witness is not None:
    238             d['witness'] = self.witness.hex()
    239         return d
    240 
    241     def witness_elements(self)-> Sequence[bytes]:
    242         vds = BCDataStream()
    243         vds.write(self.witness)
    244         n = vds.read_compact_size()
    245         return list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
    246 
    247     def is_segwit(self, *, guess_for_address=False) -> bool:
    248         if self.witness not in (b'\x00', b'', None):
    249             return True
    250         return False
    251 
    252 
    253 class BCDataStream(object):
    254     """Workalike python implementation of Bitcoin's CDataStream class."""
    255 
    256     def __init__(self):
    257         self.input = None  # type: Optional[bytearray]
    258         self.read_cursor = 0
    259 
    260     def clear(self):
    261         self.input = None
    262         self.read_cursor = 0
    263 
    264     def write(self, _bytes: Union[bytes, bytearray]):  # Initialize with string of _bytes
    265         assert isinstance(_bytes, (bytes, bytearray))
    266         if self.input is None:
    267             self.input = bytearray(_bytes)
    268         else:
    269             self.input += bytearray(_bytes)
    270 
    271     def read_string(self, encoding='ascii'):
    272         # Strings are encoded depending on length:
    273         # 0 to 252 :  1-byte-length followed by bytes (if any)
    274         # 253 to 65,535 : byte'253' 2-byte-length followed by bytes
    275         # 65,536 to 4,294,967,295 : byte '254' 4-byte-length followed by bytes
    276         # ... and the Bitcoin client is coded to understand:
    277         # greater than 4,294,967,295 : byte '255' 8-byte-length followed by bytes of string
    278         # ... but I don't think it actually handles any strings that big.
    279         if self.input is None:
    280             raise SerializationError("call write(bytes) before trying to deserialize")
    281 
    282         length = self.read_compact_size()
    283 
    284         return self.read_bytes(length).decode(encoding)
    285 
    286     def write_string(self, string, encoding='ascii'):
    287         string = to_bytes(string, encoding)
    288         # Length-encoded as with read-string
    289         self.write_compact_size(len(string))
    290         self.write(string)
    291 
    292     def read_bytes(self, length: int) -> bytes:
    293         if self.input is None:
    294             raise SerializationError("call write(bytes) before trying to deserialize")
    295         assert length >= 0
    296         input_len = len(self.input)
    297         read_begin = self.read_cursor
    298         read_end = read_begin + length
    299         if 0 <= read_begin <= read_end <= input_len:
    300             result = self.input[read_begin:read_end]  # type: bytearray
    301             self.read_cursor += length
    302             return bytes(result)
    303         else:
    304             raise SerializationError('attempt to read past end of buffer')
    305 
    306     def write_bytes(self, _bytes: Union[bytes, bytearray], length: int):
    307         assert len(_bytes) == length, len(_bytes)
    308         self.write(_bytes)
    309 
    310     def can_read_more(self) -> bool:
    311         if not self.input:
    312             return False
    313         return self.read_cursor < len(self.input)
    314 
    315     def read_boolean(self) -> bool: return self.read_bytes(1) != b'\x00'
    316     def read_int16(self): return self._read_num('<h')
    317     def read_uint16(self): return self._read_num('<H')
    318     def read_int32(self): return self._read_num('<i')
    319     def read_uint32(self): return self._read_num('<I')
    320     def read_int64(self): return self._read_num('<q')
    321     def read_uint64(self): return self._read_num('<Q')
    322 
    323     def write_boolean(self, val): return self.write(b'\x01' if val else b'\x00')
    324     def write_int16(self, val): return self._write_num('<h', val)
    325     def write_uint16(self, val): return self._write_num('<H', val)
    326     def write_int32(self, val): return self._write_num('<i', val)
    327     def write_uint32(self, val): return self._write_num('<I', val)
    328     def write_int64(self, val): return self._write_num('<q', val)
    329     def write_uint64(self, val): return self._write_num('<Q', val)
    330 
    331     def read_compact_size(self):
    332         try:
    333             size = self.input[self.read_cursor]
    334             self.read_cursor += 1
    335             if size == 253:
    336                 size = self._read_num('<H')
    337             elif size == 254:
    338                 size = self._read_num('<I')
    339             elif size == 255:
    340                 size = self._read_num('<Q')
    341             return size
    342         except IndexError as e:
    343             raise SerializationError("attempt to read past end of buffer") from e
    344 
    345     def write_compact_size(self, size):
    346         if size < 0:
    347             raise SerializationError("attempt to write size < 0")
    348         elif size < 253:
    349             self.write(bytes([size]))
    350         elif size < 2**16:
    351             self.write(b'\xfd')
    352             self._write_num('<H', size)
    353         elif size < 2**32:
    354             self.write(b'\xfe')
    355             self._write_num('<I', size)
    356         elif size < 2**64:
    357             self.write(b'\xff')
    358             self._write_num('<Q', size)
    359         else:
    360             raise Exception(f"size {size} too large for compact_size")
    361 
    362     def _read_num(self, format):
    363         try:
    364             (i,) = struct.unpack_from(format, self.input, self.read_cursor)
    365             self.read_cursor += struct.calcsize(format)
    366         except Exception as e:
    367             raise SerializationError(e) from e
    368         return i
    369 
    370     def _write_num(self, format, num):
    371         s = struct.pack(format, num)
    372         self.write(s)
    373 
    374 
    375 def script_GetOp(_bytes : bytes):
    376     i = 0
    377     while i < len(_bytes):
    378         vch = None
    379         opcode = _bytes[i]
    380         i += 1
    381 
    382         if opcode <= opcodes.OP_PUSHDATA4:
    383             nSize = opcode
    384             if opcode == opcodes.OP_PUSHDATA1:
    385                 try: nSize = _bytes[i]
    386                 except IndexError: raise MalformedBitcoinScript()
    387                 i += 1
    388             elif opcode == opcodes.OP_PUSHDATA2:
    389                 try: (nSize,) = struct.unpack_from('<H', _bytes, i)
    390                 except struct.error: raise MalformedBitcoinScript()
    391                 i += 2
    392             elif opcode == opcodes.OP_PUSHDATA4:
    393                 try: (nSize,) = struct.unpack_from('<I', _bytes, i)
    394                 except struct.error: raise MalformedBitcoinScript()
    395                 i += 4
    396             vch = _bytes[i:i + nSize]
    397             i += nSize
    398 
    399         yield opcode, vch, i
    400 
    401 
    402 class OPPushDataGeneric:
    403     def __init__(self, pushlen: Callable=None):
    404         if pushlen is not None:
    405             self.check_data_len = pushlen
    406 
    407     @classmethod
    408     def check_data_len(cls, datalen: int) -> bool:
    409         # Opcodes below OP_PUSHDATA4 all just push data onto stack, and are equivalent.
    410         return opcodes.OP_PUSHDATA4 >= datalen >= 0
    411 
    412     @classmethod
    413     def is_instance(cls, item):
    414         # accept objects that are instances of this class
    415         # or other classes that are subclasses
    416         return isinstance(item, cls) \
    417                or (isinstance(item, type) and issubclass(item, cls))
    418 
    419 
    420 OPPushDataPubkey = OPPushDataGeneric(lambda x: x in (33, 65))
    421 
    422 SCRIPTPUBKEY_TEMPLATE_P2PKH = [opcodes.OP_DUP, opcodes.OP_HASH160,
    423                                OPPushDataGeneric(lambda x: x == 20),
    424                                opcodes.OP_EQUALVERIFY, opcodes.OP_CHECKSIG]
    425 SCRIPTPUBKEY_TEMPLATE_P2SH = [opcodes.OP_HASH160, OPPushDataGeneric(lambda x: x == 20), opcodes.OP_EQUAL]
    426 SCRIPTPUBKEY_TEMPLATE_WITNESS_V0 = [opcodes.OP_0, OPPushDataGeneric(lambda x: x in (20, 32))]
    427 SCRIPTPUBKEY_TEMPLATE_P2WPKH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 20)]
    428 SCRIPTPUBKEY_TEMPLATE_P2WSH = [opcodes.OP_0, OPPushDataGeneric(lambda x: x == 32)]
    429 
    430 
    431 def match_script_against_template(script, template) -> bool:
    432     """Returns whether 'script' matches 'template'."""
    433     if script is None:
    434         return False
    435     # optionally decode script now:
    436     if isinstance(script, (bytes, bytearray)):
    437         try:
    438             script = [x for x in script_GetOp(script)]
    439         except MalformedBitcoinScript:
    440             return False
    441     if len(script) != len(template):
    442         return False
    443     for i in range(len(script)):
    444         template_item = template[i]
    445         script_item = script[i]
    446         if OPPushDataGeneric.is_instance(template_item) and template_item.check_data_len(script_item[0]):
    447             continue
    448         if template_item != script_item[0]:
    449             return False
    450     return True
    451 
    452 def get_script_type_from_output_script(_bytes: bytes) -> Optional[str]:
    453     if _bytes is None:
    454         return None
    455     try:
    456         decoded = [x for x in script_GetOp(_bytes)]
    457     except MalformedBitcoinScript:
    458         return None
    459     if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
    460         return 'p2pkh'
    461     if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
    462         return 'p2sh'
    463     if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WPKH):
    464         return 'p2wpkh'
    465     if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2WSH):
    466         return 'p2wsh'
    467     return None
    468 
    469 def get_address_from_output_script(_bytes: bytes, *, net=None) -> Optional[str]:
    470     try:
    471         decoded = [x for x in script_GetOp(_bytes)]
    472     except MalformedBitcoinScript:
    473         return None
    474 
    475     # p2pkh
    476     if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2PKH):
    477         return hash160_to_p2pkh(decoded[2][1], net=net)
    478 
    479     # p2sh
    480     if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_P2SH):
    481         return hash160_to_p2sh(decoded[1][1], net=net)
    482 
    483     # segwit address (version 0)
    484     if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
    485         return hash_to_segwit_addr(decoded[1][1], witver=0, net=net)
    486 
    487     # segwit address (version 1-16)
    488     future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
    489     for witver, opcode in enumerate(future_witness_versions, start=1):
    490         match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
    491         if match_script_against_template(decoded, match):
    492             return hash_to_segwit_addr(decoded[1][1], witver=witver, net=net)
    493 
    494     return None
    495 
    496 
    497 def parse_input(vds: BCDataStream) -> TxInput:
    498     prevout_hash = vds.read_bytes(32)[::-1]
    499     prevout_n = vds.read_uint32()
    500     prevout = TxOutpoint(txid=prevout_hash, out_idx=prevout_n)
    501     script_sig = vds.read_bytes(vds.read_compact_size())
    502     nsequence = vds.read_uint32()
    503     return TxInput(prevout=prevout, script_sig=script_sig, nsequence=nsequence)
    504 
    505 
    506 def parse_witness(vds: BCDataStream, txin: TxInput) -> None:
    507     n = vds.read_compact_size()
    508     witness_elements = list(vds.read_bytes(vds.read_compact_size()) for i in range(n))
    509     txin.witness = bfh(construct_witness(witness_elements))
    510 
    511 
    512 def parse_output(vds: BCDataStream) -> TxOutput:
    513     value = vds.read_int64()
    514     if value > TOTAL_COIN_SUPPLY_LIMIT_IN_BTC * COIN:
    515         raise SerializationError('invalid output amount (too large)')
    516     if value < 0:
    517         raise SerializationError('invalid output amount (negative)')
    518     scriptpubkey = vds.read_bytes(vds.read_compact_size())
    519     return TxOutput(value=value, scriptpubkey=scriptpubkey)
    520 
    521 
    522 # pay & redeem scripts
    523 
    524 def multisig_script(public_keys: Sequence[str], m: int) -> str:
    525     n = len(public_keys)
    526     assert 1 <= m <= n <= 15, f'm {m}, n {n}'
    527     return construct_script([m, *public_keys, n, opcodes.OP_CHECKMULTISIG])
    528 
    529 
    530 
    531 
    532 class Transaction:
    533     _cached_network_ser: Optional[str]
    534 
    535     def __str__(self):
    536         return self.serialize()
    537 
    538     def __init__(self, raw):
    539         if raw is None:
    540             self._cached_network_ser = None
    541         elif isinstance(raw, str):
    542             self._cached_network_ser = raw.strip() if raw else None
    543             assert is_hex_str(self._cached_network_ser)
    544         elif isinstance(raw, (bytes, bytearray)):
    545             self._cached_network_ser = bh2u(raw)
    546         else:
    547             raise Exception(f"cannot initialize transaction from {raw}")
    548         self._inputs = None  # type: List[TxInput]
    549         self._outputs = None  # type: List[TxOutput]
    550         self._locktime = 0
    551         self._version = 2
    552 
    553         self._cached_txid = None  # type: Optional[str]
    554 
    555     @property
    556     def locktime(self):
    557         self.deserialize()
    558         return self._locktime
    559 
    560     @locktime.setter
    561     def locktime(self, value: int):
    562         assert isinstance(value, int), f"locktime must be int, not {value!r}"
    563         self._locktime = value
    564         self.invalidate_ser_cache()
    565 
    566     @property
    567     def version(self):
    568         self.deserialize()
    569         return self._version
    570 
    571     @version.setter
    572     def version(self, value):
    573         self._version = value
    574         self.invalidate_ser_cache()
    575 
    576     def to_json(self) -> dict:
    577         d = {
    578             'version': self.version,
    579             'locktime': self.locktime,
    580             'inputs': [txin.to_json() for txin in self.inputs()],
    581             'outputs': [txout.to_json() for txout in self.outputs()],
    582         }
    583         return d
    584 
    585     def inputs(self) -> Sequence[TxInput]:
    586         if self._inputs is None:
    587             self.deserialize()
    588         return self._inputs
    589 
    590     def outputs(self) -> Sequence[TxOutput]:
    591         if self._outputs is None:
    592             self.deserialize()
    593         return self._outputs
    594 
    595     def deserialize(self) -> None:
    596         if self._cached_network_ser is None:
    597             return
    598         if self._inputs is not None:
    599             return
    600 
    601         raw_bytes = bfh(self._cached_network_ser)
    602         vds = BCDataStream()
    603         vds.write(raw_bytes)
    604         self._version = vds.read_int32()
    605         n_vin = vds.read_compact_size()
    606         is_segwit = (n_vin == 0)
    607         if is_segwit:
    608             marker = vds.read_bytes(1)
    609             if marker != b'\x01':
    610                 raise ValueError('invalid txn marker byte: {}'.format(marker))
    611             n_vin = vds.read_compact_size()
    612         if n_vin < 1:
    613             raise SerializationError('tx needs to have at least 1 input')
    614         self._inputs = [parse_input(vds) for i in range(n_vin)]
    615         n_vout = vds.read_compact_size()
    616         if n_vout < 1:
    617             raise SerializationError('tx needs to have at least 1 output')
    618         self._outputs = [parse_output(vds) for i in range(n_vout)]
    619         if is_segwit:
    620             for txin in self._inputs:
    621                 parse_witness(vds, txin)
    622         self._locktime = vds.read_uint32()
    623         if vds.can_read_more():
    624             raise SerializationError('extra junk at the end')
    625 
    626     @classmethod
    627     def get_siglist(self, txin: 'PartialTxInput', *, estimate_size=False):
    628         if txin.is_coinbase_input():
    629             return [], []
    630 
    631         if estimate_size:
    632             try:
    633                 pubkey_size = len(txin.pubkeys[0])
    634             except IndexError:
    635                 pubkey_size = 33  # guess it is compressed
    636             num_pubkeys = max(1, len(txin.pubkeys))
    637             pk_list = ["00" * pubkey_size] * num_pubkeys
    638             num_sig = max(1, txin.num_sig)
    639             # we guess that signatures will be 72 bytes long
    640             # note: DER-encoded ECDSA signatures are 71 or 72 bytes in practice
    641             #       See https://bitcoin.stackexchange.com/questions/77191/what-is-the-maximum-size-of-a-der-encoded-ecdsa-signature
    642             #       We assume low S (as that is a bitcoin standardness rule).
    643             #       We do not assume low R (even though the sigs we create conform), as external sigs,
    644             #       e.g. from a hw signer cannot be expected to have a low R.
    645             sig_list = [ "00" * 72 ] * num_sig
    646         else:
    647             pk_list = [pubkey.hex() for pubkey in txin.pubkeys]
    648             sig_list = [txin.part_sigs.get(pubkey, b'').hex() for pubkey in txin.pubkeys]
    649             if txin.is_complete():
    650                 sig_list = [sig for sig in sig_list if sig]
    651         return pk_list, sig_list
    652 
    653     @classmethod
    654     def serialize_witness(cls, txin: TxInput, *, estimate_size=False) -> str:
    655         if txin.witness is not None:
    656             return txin.witness.hex()
    657         if txin.is_coinbase_input():
    658             return ''
    659         assert isinstance(txin, PartialTxInput)
    660 
    661         _type = txin.script_type
    662         if not txin.is_segwit():
    663             return construct_witness([])
    664 
    665         if _type in ('address', 'unknown') and estimate_size:
    666             _type = cls.guess_txintype_from_address(txin.address)
    667         pubkeys, sig_list = cls.get_siglist(txin, estimate_size=estimate_size)
    668         if _type in ['p2wpkh', 'p2wpkh-p2sh']:
    669             return construct_witness([sig_list[0], pubkeys[0]])
    670         elif _type in ['p2wsh', 'p2wsh-p2sh']:
    671             witness_script = multisig_script(pubkeys, txin.num_sig)
    672             return construct_witness([0, *sig_list, witness_script])
    673         elif _type in ['p2pk', 'p2pkh', 'p2sh']:
    674             return construct_witness([])
    675         raise UnknownTxinType(f'cannot construct witness for txin_type: {_type}')
    676 
    677     @classmethod
    678     def guess_txintype_from_address(cls, addr: Optional[str]) -> str:
    679         # It's not possible to tell the script type in general
    680         # just from an address.
    681         # - "1" addresses are of course p2pkh
    682         # - "3" addresses are p2sh but we don't know the redeem script..
    683         # - "bc1" addresses (if they are 42-long) are p2wpkh
    684         # - "bc1" addresses that are 62-long are p2wsh but we don't know the script..
    685         # If we don't know the script, we _guess_ it is pubkeyhash.
    686         # As this method is used e.g. for tx size estimation,
    687         # the estimation will not be precise.
    688         if addr is None:
    689             return 'p2wpkh'
    690         witver, witprog = segwit_addr.decode(constants.net.SEGWIT_HRP, addr)
    691         if witprog is not None:
    692             return 'p2wpkh'
    693         addrtype, hash_160_ = b58_address_to_hash160(addr)
    694         if addrtype == constants.net.ADDRTYPE_P2PKH:
    695             return 'p2pkh'
    696         elif addrtype == constants.net.ADDRTYPE_P2SH:
    697             return 'p2wpkh-p2sh'
    698         raise Exception(f'unrecognized address: {repr(addr)}')
    699 
    700     @classmethod
    701     def input_script(self, txin: TxInput, *, estimate_size=False) -> str:
    702         if txin.script_sig is not None:
    703             return txin.script_sig.hex()
    704         if txin.is_coinbase_input():
    705             return ''
    706         assert isinstance(txin, PartialTxInput)
    707 
    708         if txin.is_p2sh_segwit() and txin.redeem_script:
    709             return construct_script([txin.redeem_script])
    710         if txin.is_native_segwit():
    711             return ''
    712 
    713         _type = txin.script_type
    714         pubkeys, sig_list = self.get_siglist(txin, estimate_size=estimate_size)
    715         if _type in ('address', 'unknown') and estimate_size:
    716             _type = self.guess_txintype_from_address(txin.address)
    717         if _type == 'p2pk':
    718             return construct_script([sig_list[0]])
    719         elif _type == 'p2sh':
    720             # put op_0 before script
    721             redeem_script = multisig_script(pubkeys, txin.num_sig)
    722             return construct_script([0, *sig_list, redeem_script])
    723         elif _type == 'p2pkh':
    724             return construct_script([sig_list[0], pubkeys[0]])
    725         elif _type in ['p2wpkh', 'p2wsh']:
    726             return ''
    727         elif _type == 'p2wpkh-p2sh':
    728             redeem_script = bitcoin.p2wpkh_nested_script(pubkeys[0])
    729             return construct_script([redeem_script])
    730         elif _type == 'p2wsh-p2sh':
    731             if estimate_size:
    732                 witness_script = ''
    733             else:
    734                 witness_script = self.get_preimage_script(txin)
    735             redeem_script = bitcoin.p2wsh_nested_script(witness_script)
    736             return construct_script([redeem_script])
    737         raise UnknownTxinType(f'cannot construct scriptSig for txin_type: {_type}')
    738 
    739     @classmethod
    740     def get_preimage_script(cls, txin: 'PartialTxInput') -> str:
    741         if txin.witness_script:
    742             if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.witness_script)]:
    743                 raise Exception('OP_CODESEPARATOR black magic is not supported')
    744             return txin.witness_script.hex()
    745         if not txin.is_segwit() and txin.redeem_script:
    746             if opcodes.OP_CODESEPARATOR in [x[0] for x in script_GetOp(txin.redeem_script)]:
    747                 raise Exception('OP_CODESEPARATOR black magic is not supported')
    748             return txin.redeem_script.hex()
    749 
    750         pubkeys = [pk.hex() for pk in txin.pubkeys]
    751         if txin.script_type in ['p2sh', 'p2wsh', 'p2wsh-p2sh']:
    752             return multisig_script(pubkeys, txin.num_sig)
    753         elif txin.script_type in ['p2pkh', 'p2wpkh', 'p2wpkh-p2sh']:
    754             pubkey = pubkeys[0]
    755             pkh = bh2u(hash_160(bfh(pubkey)))
    756             return bitcoin.pubkeyhash_to_p2pkh_script(pkh)
    757         elif txin.script_type == 'p2pk':
    758             pubkey = pubkeys[0]
    759             return bitcoin.public_key_to_p2pk_script(pubkey)
    760         else:
    761             raise UnknownTxinType(f'cannot construct preimage_script for txin_type: {txin.script_type}')
    762 
    763     @classmethod
    764     def serialize_input(self, txin: TxInput, script: str) -> str:
    765         # Prev hash and index
    766         s = txin.prevout.serialize_to_network().hex()
    767         # Script length, script, sequence
    768         s += var_int(len(script)//2)
    769         s += script
    770         s += int_to_hex(txin.nsequence, 4)
    771         return s
    772 
    773     def _calc_bip143_shared_txdigest_fields(self) -> BIP143SharedTxDigestFields:
    774         inputs = self.inputs()
    775         outputs = self.outputs()
    776         hashPrevouts = bh2u(sha256d(b''.join(txin.prevout.serialize_to_network() for txin in inputs)))
    777         hashSequence = bh2u(sha256d(bfh(''.join(int_to_hex(txin.nsequence, 4) for txin in inputs))))
    778         hashOutputs = bh2u(sha256d(bfh(''.join(o.serialize_to_network().hex() for o in outputs))))
    779         return BIP143SharedTxDigestFields(hashPrevouts=hashPrevouts,
    780                                           hashSequence=hashSequence,
    781                                           hashOutputs=hashOutputs)
    782 
    783     def is_segwit(self, *, guess_for_address=False):
    784         return any(txin.is_segwit(guess_for_address=guess_for_address)
    785                    for txin in self.inputs())
    786 
    787     def invalidate_ser_cache(self):
    788         self._cached_network_ser = None
    789         self._cached_txid = None
    790 
    791     def serialize(self) -> str:
    792         if not self._cached_network_ser:
    793             self._cached_network_ser = self.serialize_to_network(estimate_size=False, include_sigs=True)
    794         return self._cached_network_ser
    795 
    796     def serialize_as_bytes(self) -> bytes:
    797         return bfh(self.serialize())
    798 
    799     def serialize_to_network(self, *, estimate_size=False, include_sigs=True, force_legacy=False) -> str:
    800         """Serialize the transaction as used on the Bitcoin network, into hex.
    801         `include_sigs` signals whether to include scriptSigs and witnesses.
    802         `force_legacy` signals to use the pre-segwit format
    803         note: (not include_sigs) implies force_legacy
    804         """
    805         self.deserialize()
    806         nVersion = int_to_hex(self.version, 4)
    807         nLocktime = int_to_hex(self.locktime, 4)
    808         inputs = self.inputs()
    809         outputs = self.outputs()
    810 
    811         def create_script_sig(txin: TxInput) -> str:
    812             if include_sigs:
    813                 return self.input_script(txin, estimate_size=estimate_size)
    814             return ''
    815         txins = var_int(len(inputs)) + ''.join(self.serialize_input(txin, create_script_sig(txin))
    816                                                for txin in inputs)
    817         txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs)
    818 
    819         use_segwit_ser_for_estimate_size = estimate_size and self.is_segwit(guess_for_address=True)
    820         use_segwit_ser_for_actual_use = not estimate_size and self.is_segwit()
    821         use_segwit_ser = use_segwit_ser_for_estimate_size or use_segwit_ser_for_actual_use
    822         if include_sigs and not force_legacy and use_segwit_ser:
    823             marker = '00'
    824             flag = '01'
    825             witness = ''.join(self.serialize_witness(x, estimate_size=estimate_size) for x in inputs)
    826             return nVersion + marker + flag + txins + txouts + witness + nLocktime
    827         else:
    828             return nVersion + txins + txouts + nLocktime
    829 
    830     def to_qr_data(self) -> str:
    831         """Returns tx as data to be put into a QR code. No side-effects."""
    832         tx = copy.deepcopy(self)  # make copy as we mutate tx
    833         if isinstance(tx, PartialTransaction):
    834             # this makes QR codes a lot smaller (or just possible in the first place!)
    835             tx.convert_all_utxos_to_witness_utxos()
    836         tx_bytes = tx.serialize_as_bytes()
    837         return base_encode(tx_bytes, base=43)
    838 
    839     def txid(self) -> Optional[str]:
    840         if self._cached_txid is None:
    841             self.deserialize()
    842             all_segwit = all(txin.is_segwit() for txin in self.inputs())
    843             if not all_segwit and not self.is_complete():
    844                 return None
    845             try:
    846                 ser = self.serialize_to_network(force_legacy=True)
    847             except UnknownTxinType:
    848                 # we might not know how to construct scriptSig for some scripts
    849                 return None
    850             self._cached_txid = bh2u(sha256d(bfh(ser))[::-1])
    851         return self._cached_txid
    852 
    853     def wtxid(self) -> Optional[str]:
    854         self.deserialize()
    855         if not self.is_complete():
    856             return None
    857         try:
    858             ser = self.serialize_to_network()
    859         except UnknownTxinType:
    860             # we might not know how to construct scriptSig/witness for some scripts
    861             return None
    862         return bh2u(sha256d(bfh(ser))[::-1])
    863 
    864     def add_info_from_wallet(self, wallet: 'Abstract_Wallet', **kwargs) -> None:
    865         return  # no-op
    866 
    867     def is_final(self) -> bool:
    868         """Whether RBF is disabled."""
    869         return not any([txin.nsequence < 0xffffffff - 1 for txin in self.inputs()])
    870 
    871     def estimated_size(self):
    872         """Return an estimated virtual tx size in vbytes.
    873         BIP-0141 defines 'Virtual transaction size' to be weight/4 rounded up.
    874         This definition is only for humans, and has little meaning otherwise.
    875         If we wanted sub-byte precision, fee calculation should use transaction
    876         weights, but for simplicity we approximate that with (virtual_size)x4
    877         """
    878         weight = self.estimated_weight()
    879         return self.virtual_size_from_weight(weight)
    880 
    881     @classmethod
    882     def estimated_input_weight(cls, txin, is_segwit_tx):
    883         '''Return an estimate of serialized input weight in weight units.'''
    884         script = cls.input_script(txin, estimate_size=True)
    885         input_size = len(cls.serialize_input(txin, script)) // 2
    886 
    887         if txin.is_segwit(guess_for_address=True):
    888             witness_size = len(cls.serialize_witness(txin, estimate_size=True)) // 2
    889         else:
    890             witness_size = 1 if is_segwit_tx else 0
    891 
    892         return 4 * input_size + witness_size
    893 
    894     @classmethod
    895     def estimated_output_size_for_address(cls, address: str) -> int:
    896         """Return an estimate of serialized output size in bytes."""
    897         script = bitcoin.address_to_script(address)
    898         return cls.estimated_output_size_for_script(script)
    899 
    900     @classmethod
    901     def estimated_output_size_for_script(cls, script: str) -> int:
    902         """Return an estimate of serialized output size in bytes."""
    903         # 8 byte value + varint script len + script
    904         script_len = len(script) // 2
    905         var_int_len = len(var_int(script_len)) // 2
    906         return 8 + var_int_len + script_len
    907 
    908     @classmethod
    909     def virtual_size_from_weight(cls, weight):
    910         return weight // 4 + (weight % 4 > 0)
    911 
    912     @classmethod
    913     def satperbyte_from_satperkw(cls, feerate_kw):
    914         """Converts feerate from sat/kw to sat/vbyte."""
    915         return feerate_kw * 4 / 1000
    916 
    917     def estimated_total_size(self):
    918         """Return an estimated total transaction size in bytes."""
    919         if not self.is_complete() or self._cached_network_ser is None:
    920             return len(self.serialize_to_network(estimate_size=True)) // 2
    921         else:
    922             return len(self._cached_network_ser) // 2  # ASCII hex string
    923 
    924     def estimated_witness_size(self):
    925         """Return an estimate of witness size in bytes."""
    926         estimate = not self.is_complete()
    927         if not self.is_segwit(guess_for_address=estimate):
    928             return 0
    929         inputs = self.inputs()
    930         witness = ''.join(self.serialize_witness(x, estimate_size=estimate) for x in inputs)
    931         witness_size = len(witness) // 2 + 2  # include marker and flag
    932         return witness_size
    933 
    934     def estimated_base_size(self):
    935         """Return an estimated base transaction size in bytes."""
    936         return self.estimated_total_size() - self.estimated_witness_size()
    937 
    938     def estimated_weight(self):
    939         """Return an estimate of transaction weight."""
    940         total_tx_size = self.estimated_total_size()
    941         base_tx_size = self.estimated_base_size()
    942         return 3 * base_tx_size + total_tx_size
    943 
    944     def is_complete(self) -> bool:
    945         return True
    946 
    947     def get_output_idxs_from_scriptpubkey(self, script: str) -> Set[int]:
    948         """Returns the set indices of outputs with given script."""
    949         assert isinstance(script, str)  # hex
    950         # build cache if there isn't one yet
    951         # note: can become stale and return incorrect data
    952         #       if the tx is modified later; that's out of scope.
    953         if not hasattr(self, '_script_to_output_idx'):
    954             d = defaultdict(set)
    955             for output_idx, o in enumerate(self.outputs()):
    956                 o_script = o.scriptpubkey.hex()
    957                 assert isinstance(o_script, str)
    958                 d[o_script].add(output_idx)
    959             self._script_to_output_idx = d
    960         return set(self._script_to_output_idx[script])  # copy
    961 
    962     def get_output_idxs_from_address(self, addr: str) -> Set[int]:
    963         script = bitcoin.address_to_script(addr)
    964         return self.get_output_idxs_from_scriptpubkey(script)
    965 
    966     def output_value_for_address(self, addr):
    967         # assumes exactly one output has that address
    968         for o in self.outputs():
    969             if o.address == addr:
    970                 return o.value
    971         else:
    972             raise Exception('output not found', addr)
    973 
    974     def get_input_idx_that_spent_prevout(self, prevout: TxOutpoint) -> Optional[int]:
    975         # build cache if there isn't one yet
    976         # note: can become stale and return incorrect data
    977         #       if the tx is modified later; that's out of scope.
    978         if not hasattr(self, '_prevout_to_input_idx'):
    979             d = {}  # type: Dict[TxOutpoint, int]
    980             for i, txin in enumerate(self.inputs()):
    981                 d[txin.prevout] = i
    982             self._prevout_to_input_idx = d
    983         idx = self._prevout_to_input_idx.get(prevout)
    984         if idx is not None:
    985             assert self.inputs()[idx].prevout == prevout
    986         return idx
    987 
    988 
    989 def convert_raw_tx_to_hex(raw: Union[str, bytes]) -> str:
    990     """Sanitizes tx-describing input (hex/base43/base64) into
    991     raw tx hex string."""
    992     if not raw:
    993         raise ValueError("empty string")
    994     raw_unstripped = raw
    995     raw = raw.strip()
    996     # try hex
    997     try:
    998         return binascii.unhexlify(raw).hex()
    999     except:
   1000         pass
   1001     # try base43
   1002     try:
   1003         return base_decode(raw, base=43).hex()
   1004     except:
   1005         pass
   1006     # try base64
   1007     if raw[0:6] in ('cHNidP', b'cHNidP'):  # base64 psbt
   1008         try:
   1009             return base64.b64decode(raw).hex()
   1010         except:
   1011             pass
   1012     # raw bytes (do not strip whitespaces in this case)
   1013     if isinstance(raw_unstripped, bytes):
   1014         return raw_unstripped.hex()
   1015     raise ValueError(f"failed to recognize transaction encoding for txt: {raw[:30]}...")
   1016 
   1017 
   1018 def tx_from_any(raw: Union[str, bytes], *,
   1019                 deserialize: bool = True) -> Union['PartialTransaction', 'Transaction']:
   1020     if isinstance(raw, bytearray):
   1021         raw = bytes(raw)
   1022     raw = convert_raw_tx_to_hex(raw)
   1023     try:
   1024         return PartialTransaction.from_raw_psbt(raw)
   1025     except BadHeaderMagic:
   1026         if raw[:10] == b'EPTF\xff'.hex():
   1027             raise SerializationError("Partial transactions generated with old Electrum versions "
   1028                                      "(< 4.0) are no longer supported. Please upgrade Electrum on "
   1029                                      "the other machine where this transaction was created.")
   1030     try:
   1031         tx = Transaction(raw)
   1032         if deserialize:
   1033             tx.deserialize()
   1034         return tx
   1035     except Exception as e:
   1036         raise SerializationError(f"Failed to recognise tx encoding, or to parse transaction. "
   1037                                  f"raw: {raw[:30]}...") from e
   1038 
   1039 
   1040 class PSBTGlobalType(IntEnum):
   1041     UNSIGNED_TX = 0
   1042     XPUB = 1
   1043     VERSION = 0xFB
   1044 
   1045 
   1046 class PSBTInputType(IntEnum):
   1047     NON_WITNESS_UTXO = 0
   1048     WITNESS_UTXO = 1
   1049     PARTIAL_SIG = 2
   1050     SIGHASH_TYPE = 3
   1051     REDEEM_SCRIPT = 4
   1052     WITNESS_SCRIPT = 5
   1053     BIP32_DERIVATION = 6
   1054     FINAL_SCRIPTSIG = 7
   1055     FINAL_SCRIPTWITNESS = 8
   1056 
   1057 
   1058 class PSBTOutputType(IntEnum):
   1059     REDEEM_SCRIPT = 0
   1060     WITNESS_SCRIPT = 1
   1061     BIP32_DERIVATION = 2
   1062 
   1063 
   1064 # Serialization/deserialization tools
   1065 def deser_compact_size(f) -> Optional[int]:
   1066     try:
   1067         nit = f.read(1)[0]
   1068     except IndexError:
   1069         return None     # end of file
   1070 
   1071     if nit == 253:
   1072         nit = struct.unpack("<H", f.read(2))[0]
   1073     elif nit == 254:
   1074         nit = struct.unpack("<I", f.read(4))[0]
   1075     elif nit == 255:
   1076         nit = struct.unpack("<Q", f.read(8))[0]
   1077     return nit
   1078 
   1079 
   1080 class PSBTSection:
   1081 
   1082     def _populate_psbt_fields_from_fd(self, fd=None):
   1083         if not fd: return
   1084 
   1085         while True:
   1086             try:
   1087                 key_type, key, val = self.get_next_kv_from_fd(fd)
   1088             except StopIteration:
   1089                 break
   1090             self.parse_psbt_section_kv(key_type, key, val)
   1091 
   1092     @classmethod
   1093     def get_next_kv_from_fd(cls, fd) -> Tuple[int, bytes, bytes]:
   1094         key_size = deser_compact_size(fd)
   1095         if key_size == 0:
   1096             raise StopIteration()
   1097         if key_size is None:
   1098             raise UnexpectedEndOfStream()
   1099 
   1100         full_key = fd.read(key_size)
   1101         key_type, key = cls.get_keytype_and_key_from_fullkey(full_key)
   1102 
   1103         val_size = deser_compact_size(fd)
   1104         if val_size is None: raise UnexpectedEndOfStream()
   1105         val = fd.read(val_size)
   1106 
   1107         return key_type, key, val
   1108 
   1109     @classmethod
   1110     def create_psbt_writer(cls, fd):
   1111         def wr(key_type: int, val: bytes, key: bytes = b''):
   1112             full_key = cls.get_fullkey_from_keytype_and_key(key_type, key)
   1113             fd.write(bytes.fromhex(var_int(len(full_key))))  # key_size
   1114             fd.write(full_key)  # key
   1115             fd.write(bytes.fromhex(var_int(len(val))))  # val_size
   1116             fd.write(val)  # val
   1117         return wr
   1118 
   1119     @classmethod
   1120     def get_keytype_and_key_from_fullkey(cls, full_key: bytes) -> Tuple[int, bytes]:
   1121         with io.BytesIO(full_key) as key_stream:
   1122             key_type = deser_compact_size(key_stream)
   1123             if key_type is None: raise UnexpectedEndOfStream()
   1124             key = key_stream.read()
   1125         return key_type, key
   1126 
   1127     @classmethod
   1128     def get_fullkey_from_keytype_and_key(cls, key_type: int, key: bytes) -> bytes:
   1129         key_type_bytes = bytes.fromhex(var_int(key_type))
   1130         return key_type_bytes + key
   1131 
   1132     def _serialize_psbt_section(self, fd):
   1133         wr = self.create_psbt_writer(fd)
   1134         self.serialize_psbt_section_kvs(wr)
   1135         fd.write(b'\x00')  # section-separator
   1136 
   1137     def parse_psbt_section_kv(self, kt: int, key: bytes, val: bytes) -> None:
   1138         raise NotImplementedError()  # implemented by subclasses
   1139 
   1140     def serialize_psbt_section_kvs(self, wr) -> None:
   1141         raise NotImplementedError()  # implemented by subclasses
   1142 
   1143 
   1144 class PartialTxInput(TxInput, PSBTSection):
   1145     def __init__(self, *args, **kwargs):
   1146         TxInput.__init__(self, *args, **kwargs)
   1147         self._utxo = None  # type: Optional[Transaction]
   1148         self._witness_utxo = None  # type: Optional[TxOutput]
   1149         self.part_sigs = {}  # type: Dict[bytes, bytes]  # pubkey -> sig
   1150         self.sighash = None  # type: Optional[int]
   1151         self.bip32_paths = {}  # type: Dict[bytes, Tuple[bytes, Sequence[int]]]  # pubkey -> (xpub_fingerprint, path)
   1152         self.redeem_script = None  # type: Optional[bytes]
   1153         self.witness_script = None  # type: Optional[bytes]
   1154         self._unknown = {}  # type: Dict[bytes, bytes]
   1155 
   1156         self.script_type = 'unknown'
   1157         self.num_sig = 0  # type: int  # num req sigs for multisig
   1158         self.pubkeys = []  # type: List[bytes]  # note: order matters
   1159         self._trusted_value_sats = None  # type: Optional[int]
   1160         self._trusted_address = None  # type: Optional[str]
   1161         self.block_height = None  # type: Optional[int]  # height at which the TXO is mined; None means unknown
   1162         self.spent_height = None  # type: Optional[int]  # height at which the TXO got spent
   1163         self._is_p2sh_segwit = None  # type: Optional[bool]  # None means unknown
   1164         self._is_native_segwit = None  # type: Optional[bool]  # None means unknown
   1165 
   1166     @property
   1167     def utxo(self):
   1168         return self._utxo
   1169 
   1170     @utxo.setter
   1171     def utxo(self, tx: Optional[Transaction]):
   1172         if tx is None:
   1173             return
   1174         # note that tx might be a PartialTransaction
   1175         # serialize and de-serialize tx now. this might e.g. convert a complete PartialTx to a Tx
   1176         tx = tx_from_any(str(tx))
   1177         # 'utxo' field in PSBT cannot be another PSBT:
   1178         if not tx.is_complete():
   1179             return
   1180         self._utxo = tx
   1181         self.validate_data()
   1182         self.ensure_there_is_only_one_utxo()
   1183 
   1184     @property
   1185     def witness_utxo(self):
   1186         return self._witness_utxo
   1187 
   1188     @witness_utxo.setter
   1189     def witness_utxo(self, value: Optional[TxOutput]):
   1190         self._witness_utxo = value
   1191         self.validate_data()
   1192         self.ensure_there_is_only_one_utxo()
   1193 
   1194     def to_json(self):
   1195         d = super().to_json()
   1196         d.update({
   1197             'height': self.block_height,
   1198             'value_sats': self.value_sats(),
   1199             'address': self.address,
   1200             'utxo': str(self.utxo) if self.utxo else None,
   1201             'witness_utxo': self.witness_utxo.serialize_to_network().hex() if self.witness_utxo else None,
   1202             'sighash': self.sighash,
   1203             'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
   1204             'witness_script': self.witness_script.hex() if self.witness_script else None,
   1205             'part_sigs': {pubkey.hex(): sig.hex() for pubkey, sig in self.part_sigs.items()},
   1206             'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
   1207                             for pubkey, (xfp, path) in self.bip32_paths.items()},
   1208             'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
   1209         })
   1210         return d
   1211 
   1212     @classmethod
   1213     def from_txin(cls, txin: TxInput, *, strip_witness: bool = True) -> 'PartialTxInput':
   1214         # FIXME: if strip_witness is True, res.is_segwit() will return False,
   1215         # and res.estimated_size() will return an incorrect value. These methods
   1216         # will return the correct values after we call add_input_info(). (see dscancel and bump_fee)
   1217         # This is very fragile: the value returned by estimate_size() depends on the calling order.
   1218         res = PartialTxInput(prevout=txin.prevout,
   1219                              script_sig=None if strip_witness else txin.script_sig,
   1220                              nsequence=txin.nsequence,
   1221                              witness=None if strip_witness else txin.witness,
   1222                              is_coinbase_output=txin.is_coinbase_output())
   1223         return res
   1224 
   1225     def validate_data(self, *, for_signing=False) -> None:
   1226         if self.utxo:
   1227             if self.prevout.txid.hex() != self.utxo.txid():
   1228                 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
   1229                                                   f"If a non-witness UTXO is provided, its hash must match the hash specified in the prevout")
   1230             if self.witness_utxo:
   1231                 if self.utxo.outputs()[self.prevout.out_idx] != self.witness_utxo:
   1232                     raise PSBTInputConsistencyFailure(f"PSBT input validation: "
   1233                                                       f"If both non-witness UTXO and witness UTXO are provided, they must be consistent")
   1234         # The following test is disabled, so we are willing to sign non-segwit inputs
   1235         # without verifying the input amount. This means, given a maliciously modified PSBT,
   1236         # for non-segwit inputs, we might end up burning coins as miner fees.
   1237         if for_signing and False:
   1238             if not self.is_segwit() and self.witness_utxo:
   1239                 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
   1240                                                   f"If a witness UTXO is provided, no non-witness signature may be created")
   1241         if self.redeem_script and self.address:
   1242             addr = hash160_to_p2sh(hash_160(self.redeem_script))
   1243             if self.address != addr:
   1244                 raise PSBTInputConsistencyFailure(f"PSBT input validation: "
   1245                                                   f"If a redeemScript is provided, the scriptPubKey must be for that redeemScript")
   1246         if self.witness_script:
   1247             if self.redeem_script:
   1248                 if self.redeem_script != bfh(bitcoin.p2wsh_nested_script(self.witness_script.hex())):
   1249                     raise PSBTInputConsistencyFailure(f"PSBT input validation: "
   1250                                                       f"If a witnessScript is provided, the redeemScript must be for that witnessScript")
   1251             elif self.address:
   1252                 if self.address != bitcoin.script_to_p2wsh(self.witness_script.hex()):
   1253                     raise PSBTInputConsistencyFailure(f"PSBT input validation: "
   1254                                                       f"If a witnessScript is provided, the scriptPubKey must be for that witnessScript")
   1255 
   1256     def parse_psbt_section_kv(self, kt, key, val):
   1257         try:
   1258             kt = PSBTInputType(kt)
   1259         except ValueError:
   1260             pass  # unknown type
   1261         if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
   1262         if kt == PSBTInputType.NON_WITNESS_UTXO:
   1263             if self.utxo is not None:
   1264                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1265             self.utxo = Transaction(val)
   1266             self.utxo.deserialize()
   1267             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1268         elif kt == PSBTInputType.WITNESS_UTXO:
   1269             if self.witness_utxo is not None:
   1270                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1271             self.witness_utxo = TxOutput.from_network_bytes(val)
   1272             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1273         elif kt == PSBTInputType.PARTIAL_SIG:
   1274             if key in self.part_sigs:
   1275                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1276             if len(key) not in (33, 65):  # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32...
   1277                 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
   1278             self.part_sigs[key] = val
   1279         elif kt == PSBTInputType.SIGHASH_TYPE:
   1280             if self.sighash is not None:
   1281                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1282             if len(val) != 4:
   1283                 raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)}")
   1284             self.sighash = struct.unpack("<I", val)[0]
   1285             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1286         elif kt == PSBTInputType.BIP32_DERIVATION:
   1287             if key in self.bip32_paths:
   1288                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1289             if len(key) not in (33, 65):  # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32...
   1290                 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
   1291             self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
   1292         elif kt == PSBTInputType.REDEEM_SCRIPT:
   1293             if self.redeem_script is not None:
   1294                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1295             self.redeem_script = val
   1296             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1297         elif kt == PSBTInputType.WITNESS_SCRIPT:
   1298             if self.witness_script is not None:
   1299                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1300             self.witness_script = val
   1301             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1302         elif kt == PSBTInputType.FINAL_SCRIPTSIG:
   1303             if self.script_sig is not None:
   1304                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1305             self.script_sig = val
   1306             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1307         elif kt == PSBTInputType.FINAL_SCRIPTWITNESS:
   1308             if self.witness is not None:
   1309                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1310             self.witness = val
   1311             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1312         else:
   1313             full_key = self.get_fullkey_from_keytype_and_key(kt, key)
   1314             if full_key in self._unknown:
   1315                 raise SerializationError(f'duplicate key. PSBT input key for unknown type: {full_key}')
   1316             self._unknown[full_key] = val
   1317 
   1318     def serialize_psbt_section_kvs(self, wr):
   1319         self.ensure_there_is_only_one_utxo()
   1320         if self.witness_utxo:
   1321             wr(PSBTInputType.WITNESS_UTXO, self.witness_utxo.serialize_to_network())
   1322         if self.utxo:
   1323             wr(PSBTInputType.NON_WITNESS_UTXO, bfh(self.utxo.serialize_to_network(include_sigs=True)))
   1324         for pk, val in sorted(self.part_sigs.items()):
   1325             wr(PSBTInputType.PARTIAL_SIG, val, pk)
   1326         if self.sighash is not None:
   1327             wr(PSBTInputType.SIGHASH_TYPE, struct.pack('<I', self.sighash))
   1328         if self.redeem_script is not None:
   1329             wr(PSBTInputType.REDEEM_SCRIPT, self.redeem_script)
   1330         if self.witness_script is not None:
   1331             wr(PSBTInputType.WITNESS_SCRIPT, self.witness_script)
   1332         for k in sorted(self.bip32_paths):
   1333             packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
   1334             wr(PSBTInputType.BIP32_DERIVATION, packed_path, k)
   1335         if self.script_sig is not None:
   1336             wr(PSBTInputType.FINAL_SCRIPTSIG, self.script_sig)
   1337         if self.witness is not None:
   1338             wr(PSBTInputType.FINAL_SCRIPTWITNESS, self.witness)
   1339         for full_key, val in sorted(self._unknown.items()):
   1340             key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
   1341             wr(key_type, val, key=key)
   1342 
   1343     def value_sats(self) -> Optional[int]:
   1344         if self._trusted_value_sats is not None:
   1345             return self._trusted_value_sats
   1346         if self.utxo:
   1347             out_idx = self.prevout.out_idx
   1348             return self.utxo.outputs()[out_idx].value
   1349         if self.witness_utxo:
   1350             return self.witness_utxo.value
   1351         return None
   1352 
   1353     @property
   1354     def address(self) -> Optional[str]:
   1355         if self._trusted_address is not None:
   1356             return self._trusted_address
   1357         scriptpubkey = self.scriptpubkey
   1358         if scriptpubkey:
   1359             return get_address_from_output_script(scriptpubkey)
   1360         return None
   1361 
   1362     @property
   1363     def scriptpubkey(self) -> Optional[bytes]:
   1364         if self._trusted_address is not None:
   1365             return bfh(bitcoin.address_to_script(self._trusted_address))
   1366         if self.utxo:
   1367             out_idx = self.prevout.out_idx
   1368             return self.utxo.outputs()[out_idx].scriptpubkey
   1369         if self.witness_utxo:
   1370             return self.witness_utxo.scriptpubkey
   1371         return None
   1372 
   1373     def set_script_type(self) -> None:
   1374         if self.scriptpubkey is None:
   1375             return
   1376         type = get_script_type_from_output_script(self.scriptpubkey)
   1377         inner_type = None
   1378         if type is not None:
   1379             if type == 'p2sh':
   1380                 inner_type = get_script_type_from_output_script(self.redeem_script)
   1381             elif type == 'p2wsh':
   1382                 inner_type = get_script_type_from_output_script(self.witness_script)
   1383             if inner_type is not None:
   1384                 type = inner_type + '-' + type
   1385             if type in ('p2pkh', 'p2wpkh-p2sh', 'p2wpkh'):
   1386                 self.script_type = type
   1387         return
   1388 
   1389     def is_complete(self) -> bool:
   1390         if self.script_sig is not None and self.witness is not None:
   1391             return True
   1392         if self.is_coinbase_input():
   1393             return True
   1394         if self.script_sig is not None and not self.is_segwit():
   1395             return True
   1396         signatures = list(self.part_sigs.values())
   1397         s = len(signatures)
   1398         # note: The 'script_type' field is currently only set by the wallet,
   1399         #       for its own addresses. This means we can only finalize inputs
   1400         #       that are related to the wallet.
   1401         #       The 'fix' would be adding extra logic that matches on templates,
   1402         #       and figures out the script_type from available fields.
   1403         if self.script_type in ('p2pk', 'p2pkh', 'p2wpkh', 'p2wpkh-p2sh'):
   1404             return s >= 1
   1405         if self.script_type in ('p2sh', 'p2wsh', 'p2wsh-p2sh'):
   1406             return s >= self.num_sig
   1407         return False
   1408 
   1409     def finalize(self) -> None:
   1410         def clear_fields_when_finalized():
   1411             # BIP-174: "All other data except the UTXO and unknown fields in the
   1412             #           input key-value map should be cleared from the PSBT"
   1413             self.part_sigs = {}
   1414             self.sighash = None
   1415             self.bip32_paths = {}
   1416             self.redeem_script = None
   1417             self.witness_script = None
   1418 
   1419         if self.script_sig is not None and self.witness is not None:
   1420             clear_fields_when_finalized()
   1421             return  # already finalized
   1422         if self.is_complete():
   1423             self.script_sig = bfh(Transaction.input_script(self))
   1424             self.witness = bfh(Transaction.serialize_witness(self))
   1425             clear_fields_when_finalized()
   1426 
   1427     def combine_with_other_txin(self, other_txin: 'TxInput') -> None:
   1428         assert self.prevout == other_txin.prevout
   1429         if other_txin.script_sig is not None:
   1430             self.script_sig = other_txin.script_sig
   1431         if other_txin.witness is not None:
   1432             self.witness = other_txin.witness
   1433         if isinstance(other_txin, PartialTxInput):
   1434             if other_txin.witness_utxo:
   1435                 self.witness_utxo = other_txin.witness_utxo
   1436             if other_txin.utxo:
   1437                 self.utxo = other_txin.utxo
   1438             self.part_sigs.update(other_txin.part_sigs)
   1439             if other_txin.sighash is not None:
   1440                 self.sighash = other_txin.sighash
   1441             self.bip32_paths.update(other_txin.bip32_paths)
   1442             if other_txin.redeem_script is not None:
   1443                 self.redeem_script = other_txin.redeem_script
   1444             if other_txin.witness_script is not None:
   1445                 self.witness_script = other_txin.witness_script
   1446             self._unknown.update(other_txin._unknown)
   1447         self.ensure_there_is_only_one_utxo()
   1448         # try to finalize now
   1449         self.finalize()
   1450 
   1451     def ensure_there_is_only_one_utxo(self):
   1452         # we prefer having the full previous tx, even for segwit inputs. see #6198
   1453         # for witness v1, witness_utxo will be enough though
   1454         if self.utxo is not None and self.witness_utxo is not None:
   1455             self.witness_utxo = None
   1456 
   1457     def convert_utxo_to_witness_utxo(self) -> None:
   1458         if self.utxo:
   1459             self._witness_utxo = self.utxo.outputs()[self.prevout.out_idx]
   1460             self._utxo = None  # type: Optional[Transaction]
   1461 
   1462     def is_native_segwit(self) -> Optional[bool]:
   1463         """Whether this input is native segwit. None means inconclusive."""
   1464         if self._is_native_segwit is None:
   1465             if self.address:
   1466                 self._is_native_segwit = bitcoin.is_segwit_address(self.address)
   1467         return self._is_native_segwit
   1468 
   1469     def is_p2sh_segwit(self) -> Optional[bool]:
   1470         """Whether this input is p2sh-embedded-segwit. None means inconclusive."""
   1471         if self._is_p2sh_segwit is None:
   1472             def calc_if_p2sh_segwit_now():
   1473                 if not (self.address and self.redeem_script):
   1474                     return None
   1475                 if self.address != bitcoin.hash160_to_p2sh(hash_160(self.redeem_script)):
   1476                     # not p2sh address
   1477                     return False
   1478                 try:
   1479                     decoded = [x for x in script_GetOp(self.redeem_script)]
   1480                 except MalformedBitcoinScript:
   1481                     decoded = None
   1482                 # witness version 0
   1483                 if match_script_against_template(decoded, SCRIPTPUBKEY_TEMPLATE_WITNESS_V0):
   1484                     return True
   1485                 # witness version 1-16
   1486                 future_witness_versions = list(range(opcodes.OP_1, opcodes.OP_16 + 1))
   1487                 for witver, opcode in enumerate(future_witness_versions, start=1):
   1488                     match = [opcode, OPPushDataGeneric(lambda x: 2 <= x <= 40)]
   1489                     if match_script_against_template(decoded, match):
   1490                         return True
   1491                 return False
   1492 
   1493             self._is_p2sh_segwit = calc_if_p2sh_segwit_now()
   1494         return self._is_p2sh_segwit
   1495 
   1496     def is_segwit(self, *, guess_for_address=False) -> bool:
   1497         if super().is_segwit():
   1498             return True
   1499         if self.is_native_segwit() or self.is_p2sh_segwit():
   1500             return True
   1501         if self.is_native_segwit() is False and self.is_p2sh_segwit() is False:
   1502             return False
   1503         if self.witness_script:
   1504             return True
   1505         _type = self.script_type
   1506         if _type == 'address' and guess_for_address:
   1507             _type = Transaction.guess_txintype_from_address(self.address)
   1508         return is_segwit_script_type(_type)
   1509 
   1510     def already_has_some_signatures(self) -> bool:
   1511         """Returns whether progress has been made towards completing this input."""
   1512         return (self.part_sigs
   1513                 or self.script_sig is not None
   1514                 or self.witness is not None)
   1515 
   1516 
   1517 class PartialTxOutput(TxOutput, PSBTSection):
   1518     def __init__(self, *args, **kwargs):
   1519         TxOutput.__init__(self, *args, **kwargs)
   1520         self.redeem_script = None  # type: Optional[bytes]
   1521         self.witness_script = None  # type: Optional[bytes]
   1522         self.bip32_paths = {}  # type: Dict[bytes, Tuple[bytes, Sequence[int]]]  # pubkey -> (xpub_fingerprint, path)
   1523         self._unknown = {}  # type: Dict[bytes, bytes]
   1524 
   1525         self.script_type = 'unknown'
   1526         self.num_sig = 0  # num req sigs for multisig
   1527         self.pubkeys = []  # type: List[bytes]  # note: order matters
   1528         self.is_mine = False  # type: bool  # whether the wallet considers the output to be ismine
   1529         self.is_change = False  # type: bool  # whether the wallet considers the output to be change
   1530 
   1531     def to_json(self):
   1532         d = super().to_json()
   1533         d.update({
   1534             'redeem_script': self.redeem_script.hex() if self.redeem_script else None,
   1535             'witness_script': self.witness_script.hex() if self.witness_script else None,
   1536             'bip32_paths': {pubkey.hex(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
   1537                             for pubkey, (xfp, path) in self.bip32_paths.items()},
   1538             'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
   1539         })
   1540         return d
   1541 
   1542     @classmethod
   1543     def from_txout(cls, txout: TxOutput) -> 'PartialTxOutput':
   1544         res = PartialTxOutput(scriptpubkey=txout.scriptpubkey,
   1545                               value=txout.value)
   1546         return res
   1547 
   1548     def parse_psbt_section_kv(self, kt, key, val):
   1549         try:
   1550             kt = PSBTOutputType(kt)
   1551         except ValueError:
   1552             pass  # unknown type
   1553         if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
   1554         if kt == PSBTOutputType.REDEEM_SCRIPT:
   1555             if self.redeem_script is not None:
   1556                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1557             self.redeem_script = val
   1558             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1559         elif kt == PSBTOutputType.WITNESS_SCRIPT:
   1560             if self.witness_script is not None:
   1561                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1562             self.witness_script = val
   1563             if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1564         elif kt == PSBTOutputType.BIP32_DERIVATION:
   1565             if key in self.bip32_paths:
   1566                 raise SerializationError(f"duplicate key: {repr(kt)}")
   1567             if len(key) not in (33, 65):  # TODO also allow 32? one of the tests in the BIP is "supposed to" fail with len==32...
   1568                 raise SerializationError(f"key for {repr(kt)} has unexpected length: {len(key)}")
   1569             self.bip32_paths[key] = unpack_bip32_root_fingerprint_and_int_path(val)
   1570         else:
   1571             full_key = self.get_fullkey_from_keytype_and_key(kt, key)
   1572             if full_key in self._unknown:
   1573                 raise SerializationError(f'duplicate key. PSBT output key for unknown type: {full_key}')
   1574             self._unknown[full_key] = val
   1575 
   1576     def serialize_psbt_section_kvs(self, wr):
   1577         if self.redeem_script is not None:
   1578             wr(PSBTOutputType.REDEEM_SCRIPT, self.redeem_script)
   1579         if self.witness_script is not None:
   1580             wr(PSBTOutputType.WITNESS_SCRIPT, self.witness_script)
   1581         for k in sorted(self.bip32_paths):
   1582             packed_path = pack_bip32_root_fingerprint_and_int_path(*self.bip32_paths[k])
   1583             wr(PSBTOutputType.BIP32_DERIVATION, packed_path, k)
   1584         for full_key, val in sorted(self._unknown.items()):
   1585             key_type, key = self.get_keytype_and_key_from_fullkey(full_key)
   1586             wr(key_type, val, key=key)
   1587 
   1588     def combine_with_other_txout(self, other_txout: 'TxOutput') -> None:
   1589         assert self.scriptpubkey == other_txout.scriptpubkey
   1590         if not isinstance(other_txout, PartialTxOutput):
   1591             return
   1592         if other_txout.redeem_script is not None:
   1593             self.redeem_script = other_txout.redeem_script
   1594         if other_txout.witness_script is not None:
   1595             self.witness_script = other_txout.witness_script
   1596         self.bip32_paths.update(other_txout.bip32_paths)
   1597         self._unknown.update(other_txout._unknown)
   1598 
   1599 
   1600 class PartialTransaction(Transaction):
   1601 
   1602     def __init__(self):
   1603         Transaction.__init__(self, None)
   1604         self.xpubs = {}  # type: Dict[BIP32Node, Tuple[bytes, Sequence[int]]]  # intermediate bip32node -> (xfp, der_prefix)
   1605         self._inputs = []  # type: List[PartialTxInput]
   1606         self._outputs = []  # type: List[PartialTxOutput]
   1607         self._unknown = {}  # type: Dict[bytes, bytes]
   1608 
   1609     def to_json(self) -> dict:
   1610         d = super().to_json()
   1611         d.update({
   1612             'xpubs': {bip32node.to_xpub(): (xfp.hex(), bip32.convert_bip32_intpath_to_strpath(path))
   1613                       for bip32node, (xfp, path) in self.xpubs.items()},
   1614             'unknown_psbt_fields': {key.hex(): val.hex() for key, val in self._unknown.items()},
   1615         })
   1616         return d
   1617 
   1618     @classmethod
   1619     def from_tx(cls, tx: Transaction) -> 'PartialTransaction':
   1620         res = cls()
   1621         res._inputs = [PartialTxInput.from_txin(txin, strip_witness=True)
   1622                        for txin in tx.inputs()]
   1623         res._outputs = [PartialTxOutput.from_txout(txout) for txout in tx.outputs()]
   1624         res.version = tx.version
   1625         res.locktime = tx.locktime
   1626         return res
   1627 
   1628     @classmethod
   1629     def from_raw_psbt(cls, raw) -> 'PartialTransaction':
   1630         # auto-detect and decode Base64 and Hex.
   1631         if raw[0:10].lower() in (b'70736274ff', '70736274ff'):  # hex
   1632             raw = bytes.fromhex(raw)
   1633         elif raw[0:6] in (b'cHNidP', 'cHNidP'):  # base64
   1634             raw = base64.b64decode(raw)
   1635         if not isinstance(raw, (bytes, bytearray)) or raw[0:5] != b'psbt\xff':
   1636             raise BadHeaderMagic("bad magic")
   1637 
   1638         tx = None  # type: Optional[PartialTransaction]
   1639 
   1640         # We parse the raw stream twice. The first pass is used to find the
   1641         # PSBT_GLOBAL_UNSIGNED_TX key in the global section and set 'tx'.
   1642         # The second pass does everything else.
   1643         with io.BytesIO(raw[5:]) as fd:  # parsing "first pass"
   1644             while True:
   1645                 try:
   1646                     kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
   1647                 except StopIteration:
   1648                     break
   1649                 try:
   1650                     kt = PSBTGlobalType(kt)
   1651                 except ValueError:
   1652                     pass  # unknown type
   1653                 if kt == PSBTGlobalType.UNSIGNED_TX:
   1654                     if tx is not None:
   1655                         raise SerializationError(f"duplicate key: {repr(kt)}")
   1656                     if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1657                     unsigned_tx = Transaction(val.hex())
   1658                     for txin in unsigned_tx.inputs():
   1659                         if txin.script_sig or txin.witness:
   1660                             raise SerializationError(f"PSBT {repr(kt)} must have empty scriptSigs and witnesses")
   1661                     tx = PartialTransaction.from_tx(unsigned_tx)
   1662 
   1663         if tx is None:
   1664             raise SerializationError(f"PSBT missing required global section PSBT_GLOBAL_UNSIGNED_TX")
   1665 
   1666         with io.BytesIO(raw[5:]) as fd:  # parsing "second pass"
   1667             # global section
   1668             while True:
   1669                 try:
   1670                     kt, key, val = PSBTSection.get_next_kv_from_fd(fd)
   1671                 except StopIteration:
   1672                     break
   1673                 try:
   1674                     kt = PSBTGlobalType(kt)
   1675                 except ValueError:
   1676                     pass  # unknown type
   1677                 if DEBUG_PSBT_PARSING: print(f"{repr(kt)} {key.hex()} {val.hex()}")
   1678                 if kt == PSBTGlobalType.UNSIGNED_TX:
   1679                     pass  # already handled during "first" parsing pass
   1680                 elif kt == PSBTGlobalType.XPUB:
   1681                     bip32node = BIP32Node.from_bytes(key)
   1682                     if bip32node in tx.xpubs:
   1683                         raise SerializationError(f"duplicate key: {repr(kt)}")
   1684                     xfp, path = unpack_bip32_root_fingerprint_and_int_path(val)
   1685                     if bip32node.depth != len(path):
   1686                         raise SerializationError(f"PSBT global xpub has mismatching depth ({bip32node.depth}) "
   1687                                                  f"and derivation prefix len ({len(path)})")
   1688                     child_number_of_xpub = int.from_bytes(bip32node.child_number, 'big')
   1689                     if not ((bip32node.depth == 0 and child_number_of_xpub == 0)
   1690                             or (bip32node.depth != 0 and child_number_of_xpub == path[-1])):
   1691                         raise SerializationError(f"PSBT global xpub has inconsistent child_number and derivation prefix")
   1692                     tx.xpubs[bip32node] = xfp, path
   1693                 elif kt == PSBTGlobalType.VERSION:
   1694                     if len(val) > 4:
   1695                         raise SerializationError(f"value for {repr(kt)} has unexpected length: {len(val)} > 4")
   1696                     psbt_version = int.from_bytes(val, byteorder='little', signed=False)
   1697                     if psbt_version > 0:
   1698                         raise SerializationError(f"Only PSBTs with version 0 are supported. Found version: {psbt_version}")
   1699                     if key: raise SerializationError(f"key for {repr(kt)} must be empty")
   1700                 else:
   1701                     full_key = PSBTSection.get_fullkey_from_keytype_and_key(kt, key)
   1702                     if full_key in tx._unknown:
   1703                         raise SerializationError(f'duplicate key. PSBT global key for unknown type: {full_key}')
   1704                     tx._unknown[full_key] = val
   1705             try:
   1706                 # inputs sections
   1707                 for txin in tx.inputs():
   1708                     if DEBUG_PSBT_PARSING: print("-> new input starts")
   1709                     txin._populate_psbt_fields_from_fd(fd)
   1710                 # outputs sections
   1711                 for txout in tx.outputs():
   1712                     if DEBUG_PSBT_PARSING: print("-> new output starts")
   1713                     txout._populate_psbt_fields_from_fd(fd)
   1714             except UnexpectedEndOfStream:
   1715                 raise UnexpectedEndOfStream('Unexpected end of stream. Num input and output maps provided does not match unsigned tx.') from None
   1716 
   1717             if fd.read(1) != b'':
   1718                 raise SerializationError("extra junk at the end of PSBT")
   1719 
   1720         for txin in tx.inputs():
   1721             txin.validate_data()
   1722 
   1723         return tx
   1724 
   1725     @classmethod
   1726     def from_io(cls, inputs: Sequence[PartialTxInput], outputs: Sequence[PartialTxOutput], *,
   1727                 locktime: int = None, version: int = None):
   1728         self = cls()
   1729         self._inputs = list(inputs)
   1730         self._outputs = list(outputs)
   1731         if locktime is not None:
   1732             self.locktime = locktime
   1733         if version is not None:
   1734             self.version = version
   1735         self.BIP69_sort()
   1736         return self
   1737 
   1738     def _serialize_psbt(self, fd) -> None:
   1739         wr = PSBTSection.create_psbt_writer(fd)
   1740         fd.write(b'psbt\xff')
   1741         # global section
   1742         wr(PSBTGlobalType.UNSIGNED_TX, bfh(self.serialize_to_network(include_sigs=False)))
   1743         for bip32node, (xfp, path) in sorted(self.xpubs.items()):
   1744             val = pack_bip32_root_fingerprint_and_int_path(xfp, path)
   1745             wr(PSBTGlobalType.XPUB, val, key=bip32node.to_bytes())
   1746         for full_key, val in sorted(self._unknown.items()):
   1747             key_type, key = PSBTSection.get_keytype_and_key_from_fullkey(full_key)
   1748             wr(key_type, val, key=key)
   1749         fd.write(b'\x00')  # section-separator
   1750         # input sections
   1751         for inp in self._inputs:
   1752             inp._serialize_psbt_section(fd)
   1753         # output sections
   1754         for outp in self._outputs:
   1755             outp._serialize_psbt_section(fd)
   1756 
   1757     def finalize_psbt(self) -> None:
   1758         for txin in self.inputs():
   1759             txin.finalize()
   1760 
   1761     def combine_with_other_psbt(self, other_tx: 'Transaction') -> None:
   1762         """Pulls in all data from other_tx we don't yet have (e.g. signatures).
   1763         other_tx must be concerning the same unsigned tx.
   1764         """
   1765         if self.serialize_to_network(include_sigs=False) != other_tx.serialize_to_network(include_sigs=False):
   1766             raise Exception('A Combiner must not combine two different PSBTs.')
   1767         # BIP-174: "The resulting PSBT must contain all of the key-value pairs from each of the PSBTs.
   1768         #           The Combiner must remove any duplicate key-value pairs, in accordance with the specification."
   1769         # global section
   1770         if isinstance(other_tx, PartialTransaction):
   1771             self.xpubs.update(other_tx.xpubs)
   1772             self._unknown.update(other_tx._unknown)
   1773         # input sections
   1774         for txin, other_txin in zip(self.inputs(), other_tx.inputs()):
   1775             txin.combine_with_other_txin(other_txin)
   1776         # output sections
   1777         for txout, other_txout in zip(self.outputs(), other_tx.outputs()):
   1778             txout.combine_with_other_txout(other_txout)
   1779         self.invalidate_ser_cache()
   1780 
   1781     def join_with_other_psbt(self, other_tx: 'PartialTransaction') -> None:
   1782         """Adds inputs and outputs from other_tx into this one."""
   1783         if not isinstance(other_tx, PartialTransaction):
   1784             raise Exception('Can only join partial transactions.')
   1785         # make sure there are no duplicate prevouts
   1786         prevouts = set()
   1787         for txin in itertools.chain(self.inputs(), other_tx.inputs()):
   1788             prevout_str = txin.prevout.to_str()
   1789             if prevout_str in prevouts:
   1790                 raise Exception(f"Duplicate inputs! "
   1791                                 f"Transactions that spend the same prevout cannot be joined.")
   1792             prevouts.add(prevout_str)
   1793         # copy global PSBT section
   1794         self.xpubs.update(other_tx.xpubs)
   1795         self._unknown.update(other_tx._unknown)
   1796         # copy and add inputs and outputs
   1797         self.add_inputs(list(other_tx.inputs()))
   1798         self.add_outputs(list(other_tx.outputs()))
   1799         self.remove_signatures()
   1800         self.invalidate_ser_cache()
   1801 
   1802     def inputs(self) -> Sequence[PartialTxInput]:
   1803         return self._inputs
   1804 
   1805     def outputs(self) -> Sequence[PartialTxOutput]:
   1806         return self._outputs
   1807 
   1808     def add_inputs(self, inputs: List[PartialTxInput]) -> None:
   1809         self._inputs.extend(inputs)
   1810         self.BIP69_sort(outputs=False)
   1811         self.invalidate_ser_cache()
   1812 
   1813     def add_outputs(self, outputs: List[PartialTxOutput]) -> None:
   1814         self._outputs.extend(outputs)
   1815         self.BIP69_sort(inputs=False)
   1816         self.invalidate_ser_cache()
   1817 
   1818     def set_rbf(self, rbf: bool) -> None:
   1819         nSequence = 0xffffffff - (2 if rbf else 1)
   1820         for txin in self.inputs():
   1821             txin.nsequence = nSequence
   1822         self.invalidate_ser_cache()
   1823 
   1824     def BIP69_sort(self, inputs=True, outputs=True):
   1825         # NOTE: other parts of the code rely on these sorts being *stable* sorts
   1826         if inputs:
   1827             self._inputs.sort(key = lambda i: (i.prevout.txid, i.prevout.out_idx))
   1828         if outputs:
   1829             self._outputs.sort(key = lambda o: (o.value, o.scriptpubkey))
   1830         self.invalidate_ser_cache()
   1831 
   1832     def input_value(self) -> int:
   1833         input_values = [txin.value_sats() for txin in self.inputs()]
   1834         if any([val is None for val in input_values]):
   1835             raise MissingTxInputAmount()
   1836         return sum(input_values)
   1837 
   1838     def output_value(self) -> int:
   1839         return sum(o.value for o in self.outputs())
   1840 
   1841     def get_fee(self) -> Optional[int]:
   1842         try:
   1843             return self.input_value() - self.output_value()
   1844         except MissingTxInputAmount:
   1845             return None
   1846 
   1847     def serialize_preimage(self, txin_index: int, *,
   1848                            bip143_shared_txdigest_fields: BIP143SharedTxDigestFields = None) -> str:
   1849         nVersion = int_to_hex(self.version, 4)
   1850         nLocktime = int_to_hex(self.locktime, 4)
   1851         inputs = self.inputs()
   1852         outputs = self.outputs()
   1853         txin = inputs[txin_index]
   1854         sighash = txin.sighash if txin.sighash is not None else SIGHASH_ALL
   1855         if sighash != SIGHASH_ALL:
   1856             raise Exception("only SIGHASH_ALL signing is supported!")
   1857         nHashType = int_to_hex(sighash, 4)
   1858         preimage_script = self.get_preimage_script(txin)
   1859         if txin.is_segwit():
   1860             if bip143_shared_txdigest_fields is None:
   1861                 bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields()
   1862             hashPrevouts = bip143_shared_txdigest_fields.hashPrevouts
   1863             hashSequence = bip143_shared_txdigest_fields.hashSequence
   1864             hashOutputs = bip143_shared_txdigest_fields.hashOutputs
   1865             outpoint = txin.prevout.serialize_to_network().hex()
   1866             scriptCode = var_int(len(preimage_script) // 2) + preimage_script
   1867             amount = int_to_hex(txin.value_sats(), 8)
   1868             nSequence = int_to_hex(txin.nsequence, 4)
   1869             preimage = nVersion + hashPrevouts + hashSequence + outpoint + scriptCode + amount + nSequence + hashOutputs + nLocktime + nHashType
   1870         else:
   1871             txins = var_int(len(inputs)) + ''.join(self.serialize_input(txin, preimage_script if txin_index==k else '')
   1872                                                    for k, txin in enumerate(inputs))
   1873             txouts = var_int(len(outputs)) + ''.join(o.serialize_to_network().hex() for o in outputs)
   1874             preimage = nVersion + txins + txouts + nLocktime + nHashType
   1875         return preimage
   1876 
   1877     def sign(self, keypairs) -> None:
   1878         # keypairs:  pubkey_hex -> (secret_bytes, is_compressed)
   1879         bip143_shared_txdigest_fields = self._calc_bip143_shared_txdigest_fields()
   1880         for i, txin in enumerate(self.inputs()):
   1881             pubkeys = [pk.hex() for pk in txin.pubkeys]
   1882             for pubkey in pubkeys:
   1883                 if txin.is_complete():
   1884                     break
   1885                 if pubkey not in keypairs:
   1886                     continue
   1887                 _logger.info(f"adding signature for {pubkey}")
   1888                 sec, compressed = keypairs[pubkey]
   1889                 sig = self.sign_txin(i, sec, bip143_shared_txdigest_fields=bip143_shared_txdigest_fields)
   1890                 self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey, sig=sig)
   1891 
   1892         _logger.debug(f"is_complete {self.is_complete()}")
   1893         self.invalidate_ser_cache()
   1894 
   1895     def sign_txin(self, txin_index, privkey_bytes, *, bip143_shared_txdigest_fields=None) -> str:
   1896         txin = self.inputs()[txin_index]
   1897         txin.validate_data(for_signing=True)
   1898         pre_hash = sha256d(bfh(self.serialize_preimage(txin_index,
   1899                                                        bip143_shared_txdigest_fields=bip143_shared_txdigest_fields)))
   1900         privkey = ecc.ECPrivkey(privkey_bytes)
   1901         sig = privkey.sign_transaction(pre_hash)
   1902         sig = bh2u(sig) + '01'  # SIGHASH_ALL
   1903         return sig
   1904 
   1905     def is_complete(self) -> bool:
   1906         return all([txin.is_complete() for txin in self.inputs()])
   1907 
   1908     def signature_count(self) -> Tuple[int, int]:
   1909         s = 0  # "num Sigs we have"
   1910         r = 0  # "Required"
   1911         for txin in self.inputs():
   1912             if txin.is_coinbase_input():
   1913                 continue
   1914             signatures = list(txin.part_sigs.values())
   1915             s += len(signatures)
   1916             r += txin.num_sig
   1917         return s, r
   1918 
   1919     def serialize(self) -> str:
   1920         """Returns PSBT as base64 text, or raw hex of network tx (if complete)."""
   1921         self.finalize_psbt()
   1922         if self.is_complete():
   1923             return Transaction.serialize(self)
   1924         return self._serialize_as_base64()
   1925 
   1926     def serialize_as_bytes(self, *, force_psbt: bool = False) -> bytes:
   1927         """Returns PSBT as raw bytes, or raw bytes of network tx (if complete)."""
   1928         self.finalize_psbt()
   1929         if force_psbt or not self.is_complete():
   1930             with io.BytesIO() as fd:
   1931                 self._serialize_psbt(fd)
   1932                 return fd.getvalue()
   1933         else:
   1934             return Transaction.serialize_as_bytes(self)
   1935 
   1936     def _serialize_as_base64(self) -> str:
   1937         raw_bytes = self.serialize_as_bytes()
   1938         return base64.b64encode(raw_bytes).decode('ascii')
   1939 
   1940     def update_signatures(self, signatures: Sequence[str]):
   1941         """Add new signatures to a transaction
   1942 
   1943         `signatures` is expected to be a list of sigs with signatures[i]
   1944         intended for self._inputs[i].
   1945         This is used by the Trezor, KeepKey and Safe-T plugins.
   1946         """
   1947         if self.is_complete():
   1948             return
   1949         if len(self.inputs()) != len(signatures):
   1950             raise Exception('expected {} signatures; got {}'.format(len(self.inputs()), len(signatures)))
   1951         for i, txin in enumerate(self.inputs()):
   1952             pubkeys = [pk.hex() for pk in txin.pubkeys]
   1953             sig = signatures[i]
   1954             if bfh(sig) in list(txin.part_sigs.values()):
   1955                 continue
   1956             pre_hash = sha256d(bfh(self.serialize_preimage(i)))
   1957             sig_string = ecc.sig_string_from_der_sig(bfh(sig[:-2]))
   1958             for recid in range(4):
   1959                 try:
   1960                     public_key = ecc.ECPubkey.from_sig_string(sig_string, recid, pre_hash)
   1961                 except ecc.InvalidECPointException:
   1962                     # the point might not be on the curve for some recid values
   1963                     continue
   1964                 pubkey_hex = public_key.get_public_key_hex(compressed=True)
   1965                 if pubkey_hex in pubkeys:
   1966                     try:
   1967                         public_key.verify_message_hash(sig_string, pre_hash)
   1968                     except Exception:
   1969                         _logger.exception('')
   1970                         continue
   1971                     _logger.info(f"adding sig: txin_idx={i}, signing_pubkey={pubkey_hex}, sig={sig}")
   1972                     self.add_signature_to_txin(txin_idx=i, signing_pubkey=pubkey_hex, sig=sig)
   1973                     break
   1974         # redo raw
   1975         self.invalidate_ser_cache()
   1976 
   1977     def add_signature_to_txin(self, *, txin_idx: int, signing_pubkey: str, sig: str):
   1978         txin = self._inputs[txin_idx]
   1979         txin.part_sigs[bfh(signing_pubkey)] = bfh(sig)
   1980         # force re-serialization
   1981         txin.script_sig = None
   1982         txin.witness = None
   1983         self.invalidate_ser_cache()
   1984 
   1985     def add_info_from_wallet(
   1986             self,
   1987             wallet: 'Abstract_Wallet',
   1988             *,
   1989             include_xpubs: bool = False,
   1990             ignore_network_issues: bool = True,
   1991     ) -> None:
   1992         if self.is_complete():
   1993             return
   1994         # only include xpubs for multisig wallets; currently only they need it in practice
   1995         # note: coldcard fw have a limitation that if they are included then all
   1996         #       inputs are assumed to be multisig... https://github.com/spesmilo/electrum/pull/5440#issuecomment-549504761
   1997         # note: trezor plugin needs xpubs included, if there are multisig inputs/change_outputs
   1998         from .wallet import Multisig_Wallet
   1999         if include_xpubs and isinstance(wallet, Multisig_Wallet):
   2000             from .keystore import Xpub
   2001             for ks in wallet.get_keystores():
   2002                 if isinstance(ks, Xpub):
   2003                     fp_bytes, der_full = ks.get_fp_and_derivation_to_be_used_in_partial_tx(
   2004                         der_suffix=[], only_der_suffix=False)
   2005                     xpub = ks.get_xpub_to_be_used_in_partial_tx(only_der_suffix=False)
   2006                     bip32node = BIP32Node.from_xkey(xpub)
   2007                     self.xpubs[bip32node] = (fp_bytes, der_full)
   2008         for txin in self.inputs():
   2009             wallet.add_input_info(
   2010                 txin,
   2011                 only_der_suffix=False,
   2012                 ignore_network_issues=ignore_network_issues,
   2013             )
   2014         for txout in self.outputs():
   2015             wallet.add_output_info(
   2016                 txout,
   2017                 only_der_suffix=False,
   2018             )
   2019 
   2020     def remove_xpubs_and_bip32_paths(self) -> None:
   2021         self.xpubs.clear()
   2022         for txin in self.inputs():
   2023             txin.bip32_paths.clear()
   2024         for txout in self.outputs():
   2025             txout.bip32_paths.clear()
   2026 
   2027     def prepare_for_export_for_coinjoin(self) -> None:
   2028         """Removes all sensitive details."""
   2029         # globals
   2030         self.xpubs.clear()
   2031         self._unknown.clear()
   2032         # inputs
   2033         for txin in self.inputs():
   2034             txin.bip32_paths.clear()
   2035         # outputs
   2036         for txout in self.outputs():
   2037             txout.redeem_script = None
   2038             txout.witness_script = None
   2039             txout.bip32_paths.clear()
   2040             txout._unknown.clear()
   2041 
   2042     def convert_all_utxos_to_witness_utxos(self) -> None:
   2043         """Replaces all NON-WITNESS-UTXOs with WITNESS-UTXOs.
   2044         This will likely make an exported PSBT invalid spec-wise,
   2045         but it makes e.g. QR codes significantly smaller.
   2046         """
   2047         for txin in self.inputs():
   2048             txin.convert_utxo_to_witness_utxo()
   2049 
   2050     def remove_signatures(self):
   2051         for txin in self.inputs():
   2052             txin.part_sigs = {}
   2053             txin.script_sig = None
   2054             txin.witness = None
   2055         assert not self.is_complete()
   2056         self.invalidate_ser_cache()
   2057 
   2058     def update_txin_script_type(self):
   2059         """Determine the script_type of each input by analyzing the scripts.
   2060         It updates all tx-Inputs, NOT only the wallet owned, if the
   2061         scriptpubkey is present.
   2062         """
   2063         for txin in self.inputs():
   2064             if txin.script_type in ('unknown', 'address'):
   2065                 txin.set_script_type()
   2066 
   2067 def pack_bip32_root_fingerprint_and_int_path(xfp: bytes, path: Sequence[int]) -> bytes:
   2068     if len(xfp) != 4:
   2069         raise Exception(f'unexpected xfp length. xfp={xfp}')
   2070     return xfp + b''.join(i.to_bytes(4, byteorder='little', signed=False) for i in path)
   2071 
   2072 
   2073 def unpack_bip32_root_fingerprint_and_int_path(path: bytes) -> Tuple[bytes, Sequence[int]]:
   2074     if len(path) % 4 != 0:
   2075         raise Exception(f'unexpected packed path length. path={path.hex()}')
   2076     xfp = path[0:4]
   2077     int_path = [int.from_bytes(b, byteorder='little', signed=False) for b in chunks(path[4:], 4)]
   2078     return xfp, int_path