lnaddr.py (19074B)
1 #! /usr/bin/env python3 2 # This was forked from https://github.com/rustyrussell/lightning-payencode/tree/acc16ec13a3fa1dc16c07af6ec67c261bd8aff23 3 4 import re 5 import time 6 from hashlib import sha256 7 from binascii import hexlify 8 from decimal import Decimal 9 from typing import Optional, TYPE_CHECKING 10 11 import random 12 import bitstring 13 14 from .bitcoin import hash160_to_b58_address, b58_address_to_hash160, TOTAL_COIN_SUPPLY_LIMIT_IN_BTC 15 from .segwit_addr import bech32_encode, bech32_decode, CHARSET 16 from . import constants 17 from . import ecc 18 from .bitcoin import COIN 19 20 if TYPE_CHECKING: 21 from .lnutil import LnFeatures 22 23 24 # BOLT #11: 25 # 26 # A writer MUST encode `amount` as a positive decimal integer with no 27 # leading zeroes, SHOULD use the shortest representation possible. 28 def shorten_amount(amount): 29 """ Given an amount in bitcoin, shorten it 30 """ 31 # Convert to pico initially 32 amount = int(amount * 10**12) 33 units = ['p', 'n', 'u', 'm', ''] 34 for unit in units: 35 if amount % 1000 == 0: 36 amount //= 1000 37 else: 38 break 39 return str(amount) + unit 40 41 def unshorten_amount(amount) -> Decimal: 42 """ Given a shortened amount, convert it into a decimal 43 """ 44 # BOLT #11: 45 # The following `multiplier` letters are defined: 46 # 47 #* `m` (milli): multiply by 0.001 48 #* `u` (micro): multiply by 0.000001 49 #* `n` (nano): multiply by 0.000000001 50 #* `p` (pico): multiply by 0.000000000001 51 units = { 52 'p': 10**12, 53 'n': 10**9, 54 'u': 10**6, 55 'm': 10**3, 56 } 57 unit = str(amount)[-1] 58 # BOLT #11: 59 # A reader SHOULD fail if `amount` contains a non-digit, or is followed by 60 # anything except a `multiplier` in the table above. 61 if not re.fullmatch("\\d+[pnum]?", str(amount)): 62 raise ValueError("Invalid amount '{}'".format(amount)) 63 64 if unit in units.keys(): 65 return Decimal(amount[:-1]) / units[unit] 66 else: 67 return Decimal(amount) 68 69 _INT_TO_BINSTR = {a: '0' * (5-len(bin(a)[2:])) + bin(a)[2:] for a in range(32)} 70 71 # Bech32 spits out array of 5-bit values. Shim here. 72 def u5_to_bitarray(arr): 73 b = ''.join(_INT_TO_BINSTR[a] for a in arr) 74 return bitstring.BitArray(bin=b) 75 76 def bitarray_to_u5(barr): 77 assert barr.len % 5 == 0 78 ret = [] 79 s = bitstring.ConstBitStream(barr) 80 while s.pos != s.len: 81 ret.append(s.read(5).uint) 82 return ret 83 84 def encode_fallback(fallback, currency): 85 """ Encode all supported fallback addresses. 86 """ 87 if currency in [constants.BitcoinMainnet.SEGWIT_HRP, constants.BitcoinTestnet.SEGWIT_HRP]: 88 fbhrp, witness = bech32_decode(fallback, ignore_long_length=True) 89 if fbhrp: 90 if fbhrp != currency: 91 raise ValueError("Not a bech32 address for this currency") 92 wver = witness[0] 93 if wver > 16: 94 raise ValueError("Invalid witness version {}".format(witness[0])) 95 wprog = u5_to_bitarray(witness[1:]) 96 else: 97 addrtype, addr = b58_address_to_hash160(fallback) 98 if is_p2pkh(currency, addrtype): 99 wver = 17 100 elif is_p2sh(currency, addrtype): 101 wver = 18 102 else: 103 raise ValueError("Unknown address type for {}".format(currency)) 104 wprog = addr 105 return tagged('f', bitstring.pack("uint:5", wver) + wprog) 106 else: 107 raise NotImplementedError("Support for currency {} not implemented".format(currency)) 108 109 def parse_fallback(fallback, currency): 110 if currency in [constants.BitcoinMainnet.SEGWIT_HRP, constants.BitcoinTestnet.SEGWIT_HRP]: 111 wver = fallback[0:5].uint 112 if wver == 17: 113 addr=hash160_to_b58_address(fallback[5:].tobytes(), base58_prefix_map[currency][0]) 114 elif wver == 18: 115 addr=hash160_to_b58_address(fallback[5:].tobytes(), base58_prefix_map[currency][1]) 116 elif wver <= 16: 117 addr=bech32_encode(currency, bitarray_to_u5(fallback)) 118 else: 119 return None 120 else: 121 addr=fallback.tobytes() 122 return addr 123 124 125 # Map of classical and witness address prefixes 126 base58_prefix_map = { 127 constants.BitcoinMainnet.SEGWIT_HRP : (constants.BitcoinMainnet.ADDRTYPE_P2PKH, constants.BitcoinMainnet.ADDRTYPE_P2SH), 128 constants.BitcoinTestnet.SEGWIT_HRP : (constants.BitcoinTestnet.ADDRTYPE_P2PKH, constants.BitcoinTestnet.ADDRTYPE_P2SH) 129 } 130 131 def is_p2pkh(currency, prefix): 132 return prefix == base58_prefix_map[currency][0] 133 134 def is_p2sh(currency, prefix): 135 return prefix == base58_prefix_map[currency][1] 136 137 # Tagged field containing BitArray 138 def tagged(char, l): 139 # Tagged fields need to be zero-padded to 5 bits. 140 while l.len % 5 != 0: 141 l.append('0b0') 142 return bitstring.pack("uint:5, uint:5, uint:5", 143 CHARSET.find(char), 144 (l.len / 5) / 32, (l.len / 5) % 32) + l 145 146 # Tagged field containing bytes 147 def tagged_bytes(char, l): 148 return tagged(char, bitstring.BitArray(l)) 149 150 def trim_to_min_length(bits): 151 """Ensures 'bits' have min number of leading zeroes. 152 Assumes 'bits' is big-endian, and that it needs to be encoded in 5 bit blocks. 153 """ 154 bits = bits[:] # copy 155 # make sure we can be split into 5 bit blocks 156 while bits.len % 5 != 0: 157 bits.prepend('0b0') 158 # Get minimal length by trimming leading 5 bits at a time. 159 while bits.startswith('0b00000'): 160 if len(bits) == 5: 161 break # v == 0 162 bits = bits[5:] 163 return bits 164 165 # Discard trailing bits, convert to bytes. 166 def trim_to_bytes(barr): 167 # Adds a byte if necessary. 168 b = barr.tobytes() 169 if barr.len % 8 != 0: 170 return b[:-1] 171 return b 172 173 # Try to pull out tagged data: returns tag, tagged data and remainder. 174 def pull_tagged(stream): 175 tag = stream.read(5).uint 176 length = stream.read(5).uint * 32 + stream.read(5).uint 177 return (CHARSET[tag], stream.read(length * 5), stream) 178 179 def lnencode(addr: 'LnAddr', privkey) -> str: 180 if addr.amount: 181 amount = addr.currency + shorten_amount(addr.amount) 182 else: 183 amount = addr.currency if addr.currency else '' 184 185 hrp = 'ln' + amount 186 187 # Start with the timestamp 188 data = bitstring.pack('uint:35', addr.date) 189 190 tags_set = set() 191 192 # Payment hash 193 data += tagged_bytes('p', addr.paymenthash) 194 tags_set.add('p') 195 196 if addr.payment_secret is not None: 197 data += tagged_bytes('s', addr.payment_secret) 198 tags_set.add('s') 199 200 for k, v in addr.tags: 201 202 # BOLT #11: 203 # 204 # A writer MUST NOT include more than one `d`, `h`, `n` or `x` fields, 205 if k in ('d', 'h', 'n', 'x', 'p', 's'): 206 if k in tags_set: 207 raise ValueError("Duplicate '{}' tag".format(k)) 208 209 if k == 'r': 210 route = bitstring.BitArray() 211 for step in v: 212 pubkey, channel, feebase, feerate, cltv = step 213 route.append(bitstring.BitArray(pubkey) + bitstring.BitArray(channel) + bitstring.pack('intbe:32', feebase) + bitstring.pack('intbe:32', feerate) + bitstring.pack('intbe:16', cltv)) 214 data += tagged('r', route) 215 elif k == 't': 216 pubkey, feebase, feerate, cltv = v 217 route = bitstring.BitArray(pubkey) + bitstring.pack('intbe:32', feebase) + bitstring.pack('intbe:32', feerate) + bitstring.pack('intbe:16', cltv) 218 data += tagged('t', route) 219 elif k == 'f': 220 data += encode_fallback(v, addr.currency) 221 elif k == 'd': 222 # truncate to max length: 1024*5 bits = 639 bytes 223 data += tagged_bytes('d', v.encode()[0:639]) 224 elif k == 'x': 225 expirybits = bitstring.pack('intbe:64', v) 226 expirybits = trim_to_min_length(expirybits) 227 data += tagged('x', expirybits) 228 elif k == 'h': 229 data += tagged_bytes('h', sha256(v.encode('utf-8')).digest()) 230 elif k == 'n': 231 data += tagged_bytes('n', v) 232 elif k == 'c': 233 finalcltvbits = bitstring.pack('intbe:64', v) 234 finalcltvbits = trim_to_min_length(finalcltvbits) 235 data += tagged('c', finalcltvbits) 236 elif k == '9': 237 if v == 0: 238 continue 239 feature_bits = bitstring.BitArray(uint=v, length=v.bit_length()) 240 feature_bits = trim_to_min_length(feature_bits) 241 data += tagged('9', feature_bits) 242 else: 243 # FIXME: Support unknown tags? 244 raise ValueError("Unknown tag {}".format(k)) 245 246 tags_set.add(k) 247 248 # BOLT #11: 249 # 250 # A writer MUST include either a `d` or `h` field, and MUST NOT include 251 # both. 252 if 'd' in tags_set and 'h' in tags_set: 253 raise ValueError("Cannot include both 'd' and 'h'") 254 if not 'd' in tags_set and not 'h' in tags_set: 255 raise ValueError("Must include either 'd' or 'h'") 256 257 # We actually sign the hrp, then data (padded to 8 bits with zeroes). 258 msg = hrp.encode("ascii") + data.tobytes() 259 privkey = ecc.ECPrivkey(privkey) 260 sig = privkey.sign_message(msg, is_compressed=False, algo=lambda x:sha256(x).digest()) 261 recovery_flag = bytes([sig[0] - 27]) 262 sig = bytes(sig[1:]) + recovery_flag 263 data += sig 264 265 return bech32_encode(hrp, bitarray_to_u5(data)) 266 267 class LnAddr(object): 268 def __init__(self, *, paymenthash: bytes = None, amount=None, currency=None, tags=None, date=None, 269 payment_secret: bytes = None): 270 self.date = int(time.time()) if not date else int(date) 271 self.tags = [] if not tags else tags 272 self.unknown_tags = [] 273 self.paymenthash = paymenthash 274 self.payment_secret = payment_secret 275 self.signature = None 276 self.pubkey = None 277 self.currency = constants.net.SEGWIT_HRP if currency is None else currency 278 self._amount = amount # type: Optional[Decimal] # in bitcoins 279 self._min_final_cltv_expiry = 18 280 281 @property 282 def amount(self) -> Optional[Decimal]: 283 return self._amount 284 285 @amount.setter 286 def amount(self, value): 287 if not (isinstance(value, Decimal) or value is None): 288 raise ValueError(f"amount must be Decimal or None, not {value!r}") 289 if value is None: 290 self._amount = None 291 return 292 assert isinstance(value, Decimal) 293 if value.is_nan() or not (0 <= value <= TOTAL_COIN_SUPPLY_LIMIT_IN_BTC): 294 raise ValueError(f"amount is out-of-bounds: {value!r} BTC") 295 if value * 10**12 % 10: 296 # max resolution is millisatoshi 297 raise ValueError(f"Cannot encode {value!r}: too many decimal places") 298 self._amount = value 299 300 def get_amount_sat(self) -> Optional[Decimal]: 301 # note that this has msat resolution potentially 302 if self.amount is None: 303 return None 304 return self.amount * COIN 305 306 def get_routing_info(self, tag): 307 # note: tag will be 't' for trampoline 308 r_tags = list(filter(lambda x: x[0] == tag, self.tags)) 309 # strip the tag type, it's implicitly 'r' now 310 r_tags = list(map(lambda x: x[1], r_tags)) 311 # if there are multiple hints, we will use the first one that works, 312 # from a random permutation 313 random.shuffle(r_tags) 314 return r_tags 315 316 def get_amount_msat(self) -> Optional[int]: 317 if self.amount is None: 318 return None 319 return int(self.amount * COIN * 1000) 320 321 def get_features(self) -> 'LnFeatures': 322 from .lnutil import LnFeatures 323 return LnFeatures(self.get_tag('9') or 0) 324 325 def __str__(self): 326 return "LnAddr[{}, amount={}{} tags=[{}]]".format( 327 hexlify(self.pubkey.serialize()).decode('utf-8') if self.pubkey else None, 328 self.amount, self.currency, 329 ", ".join([k + '=' + str(v) for k, v in self.tags]) 330 ) 331 332 def get_min_final_cltv_expiry(self) -> int: 333 return self._min_final_cltv_expiry 334 335 def get_tag(self, tag): 336 for k, v in self.tags: 337 if k == tag: 338 return v 339 return None 340 341 def get_description(self) -> str: 342 return self.get_tag('d') or '' 343 344 def get_expiry(self) -> int: 345 exp = self.get_tag('x') 346 if exp is None: 347 exp = 3600 348 return int(exp) 349 350 def is_expired(self) -> bool: 351 now = time.time() 352 # BOLT-11 does not specify what expiration of '0' means. 353 # we treat it as 0 seconds here (instead of never) 354 return now > self.get_expiry() + self.date 355 356 357 class LnDecodeException(Exception): pass 358 359 class SerializableKey: 360 def __init__(self, pubkey): 361 self.pubkey = pubkey 362 def serialize(self): 363 return self.pubkey.get_public_key_bytes(True) 364 365 def lndecode(invoice: str, *, verbose=False, expected_hrp=None) -> LnAddr: 366 if expected_hrp is None: 367 expected_hrp = constants.net.SEGWIT_HRP 368 hrp, data = bech32_decode(invoice, ignore_long_length=True) 369 if not hrp: 370 raise ValueError("Bad bech32 checksum") 371 372 # BOLT #11: 373 # 374 # A reader MUST fail if it does not understand the `prefix`. 375 if not hrp.startswith('ln'): 376 raise ValueError("Does not start with ln") 377 378 if not hrp[2:].startswith(expected_hrp): 379 raise ValueError("Wrong Lightning invoice HRP " + hrp[2:] + ", should be " + expected_hrp) 380 381 data = u5_to_bitarray(data) 382 383 # Final signature 65 bytes, split it off. 384 if len(data) < 65*8: 385 raise ValueError("Too short to contain signature") 386 sigdecoded = data[-65*8:].tobytes() 387 data = bitstring.ConstBitStream(data[:-65*8]) 388 389 addr = LnAddr() 390 addr.pubkey = None 391 392 m = re.search("[^\\d]+", hrp[2:]) 393 if m: 394 addr.currency = m.group(0) 395 amountstr = hrp[2+m.end():] 396 # BOLT #11: 397 # 398 # A reader SHOULD indicate if amount is unspecified, otherwise it MUST 399 # multiply `amount` by the `multiplier` value (if any) to derive the 400 # amount required for payment. 401 if amountstr != '': 402 addr.amount = unshorten_amount(amountstr) 403 404 addr.date = data.read(35).uint 405 406 while data.pos != data.len: 407 tag, tagdata, data = pull_tagged(data) 408 409 # BOLT #11: 410 # 411 # A reader MUST skip over unknown fields, an `f` field with unknown 412 # `version`, or a `p`, `h`, or `n` field which does not have 413 # `data_length` 52, 52, or 53 respectively. 414 data_length = len(tagdata) / 5 415 416 if tag == 'r': 417 # BOLT #11: 418 # 419 # * `r` (3): `data_length` variable. One or more entries 420 # containing extra routing information for a private route; 421 # there may be more than one `r` field, too. 422 # * `pubkey` (264 bits) 423 # * `short_channel_id` (64 bits) 424 # * `feebase` (32 bits, big-endian) 425 # * `feerate` (32 bits, big-endian) 426 # * `cltv_expiry_delta` (16 bits, big-endian) 427 route=[] 428 s = bitstring.ConstBitStream(tagdata) 429 while s.pos + 264 + 64 + 32 + 32 + 16 < s.len: 430 route.append((s.read(264).tobytes(), 431 s.read(64).tobytes(), 432 s.read(32).uintbe, 433 s.read(32).uintbe, 434 s.read(16).uintbe)) 435 addr.tags.append(('r',route)) 436 elif tag == 't': 437 s = bitstring.ConstBitStream(tagdata) 438 e = (s.read(264).tobytes(), 439 s.read(32).uintbe, 440 s.read(32).uintbe, 441 s.read(16).uintbe) 442 addr.tags.append(('t', e)) 443 elif tag == 'f': 444 fallback = parse_fallback(tagdata, addr.currency) 445 if fallback: 446 addr.tags.append(('f', fallback)) 447 else: 448 # Incorrect version. 449 addr.unknown_tags.append((tag, tagdata)) 450 continue 451 452 elif tag == 'd': 453 addr.tags.append(('d', trim_to_bytes(tagdata).decode('utf-8'))) 454 455 elif tag == 'h': 456 if data_length != 52: 457 addr.unknown_tags.append((tag, tagdata)) 458 continue 459 addr.tags.append(('h', trim_to_bytes(tagdata))) 460 461 elif tag == 'x': 462 addr.tags.append(('x', tagdata.uint)) 463 464 elif tag == 'p': 465 if data_length != 52: 466 addr.unknown_tags.append((tag, tagdata)) 467 continue 468 addr.paymenthash = trim_to_bytes(tagdata) 469 470 elif tag == 's': 471 if data_length != 52: 472 addr.unknown_tags.append((tag, tagdata)) 473 continue 474 addr.payment_secret = trim_to_bytes(tagdata) 475 476 elif tag == 'n': 477 if data_length != 53: 478 addr.unknown_tags.append((tag, tagdata)) 479 continue 480 pubkeybytes = trim_to_bytes(tagdata) 481 addr.pubkey = pubkeybytes 482 483 elif tag == 'c': 484 addr._min_final_cltv_expiry = tagdata.uint 485 486 elif tag == '9': 487 features = tagdata.uint 488 addr.tags.append(('9', features)) 489 from .lnutil import validate_features 490 validate_features(features) 491 492 else: 493 addr.unknown_tags.append((tag, tagdata)) 494 495 if verbose: 496 print('hex of signature data (32 byte r, 32 byte s): {}' 497 .format(hexlify(sigdecoded[0:64]))) 498 print('recovery flag: {}'.format(sigdecoded[64])) 499 print('hex of data for signing: {}' 500 .format(hexlify(hrp.encode("ascii") + data.tobytes()))) 501 print('SHA256 of above: {}'.format(sha256(hrp.encode("ascii") + data.tobytes()).hexdigest())) 502 503 # BOLT #11: 504 # 505 # A reader MUST check that the `signature` is valid (see the `n` tagged 506 # field specified below). 507 addr.signature = sigdecoded[:65] 508 hrp_hash = sha256(hrp.encode("ascii") + data.tobytes()).digest() 509 if addr.pubkey: # Specified by `n` 510 # BOLT #11: 511 # 512 # A reader MUST use the `n` field to validate the signature instead of 513 # performing signature recovery if a valid `n` field is provided. 514 ecc.ECPubkey(addr.pubkey).verify_message_hash(sigdecoded[:64], hrp_hash) 515 pubkey_copy = addr.pubkey 516 class WrappedBytesKey: 517 serialize = lambda: pubkey_copy 518 addr.pubkey = WrappedBytesKey 519 else: # Recover pubkey from signature. 520 addr.pubkey = SerializableKey(ecc.ECPubkey.from_sig_string(sigdecoded[:64], sigdecoded[64], hrp_hash)) 521 522 return addr 523 524 525 526 527 if __name__ == '__main__': 528 # run using 529 # python3 -m electrum.lnaddr <invoice> <expected hrp> 530 # python3 -m electrum.lnaddr lntb1n1pdlcakepp5e7rn0knl0gm46qqp9eqdsza2c942d8pjqnwa5903n39zu28sgk3sdq423jhxapqv3hkuct5d9hkucqp2rzjqwyx8nu2hygyvgc02cwdtvuxe0lcxz06qt3lpsldzcdr46my5epmj9vk9sqqqlcqqqqqqqlgqqqqqqgqjqdhnmkgahfaynuhe9md8k49xhxuatnv6jckfmsjq8maxta2l0trh5sdrqlyjlwutdnpd5gwmdnyytsl9q0dj6g08jacvthtpeg383k0sq542rz2 tb1n 531 import sys 532 print(lndecode(sys.argv[1], expected_hrp=sys.argv[2]))