electrum

Electrum Bitcoin wallet
git clone https://git.parazyd.org/electrum
Log | Files | Refs | Submodules

lnonion.py (23032B)


      1 # -*- coding: utf-8 -*-
      2 #
      3 # Electrum - lightweight Bitcoin client
      4 # Copyright (C) 2018 The Electrum developers
      5 #
      6 # Permission is hereby granted, free of charge, to any person
      7 # obtaining a copy of this software and associated documentation files
      8 # (the "Software"), to deal in the Software without restriction,
      9 # including without limitation the rights to use, copy, modify, merge,
     10 # publish, distribute, sublicense, and/or sell copies of the Software,
     11 # and to permit persons to whom the Software is furnished to do so,
     12 # subject to the following conditions:
     13 #
     14 # The above copyright notice and this permission notice shall be
     15 # included in all copies or substantial portions of the Software.
     16 #
     17 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
     18 # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
     19 # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
     20 # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
     21 # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
     22 # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
     23 # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     24 # SOFTWARE.
     25 
     26 import io
     27 import hashlib
     28 from typing import Sequence, List, Tuple, NamedTuple, TYPE_CHECKING
     29 from enum import IntEnum, IntFlag
     30 
     31 from . import ecc
     32 from .crypto import sha256, hmac_oneshot, chacha20_encrypt
     33 from .util import bh2u, profiler, xor_bytes, bfh
     34 from .lnutil import (get_ecdh, PaymentFailure, NUM_MAX_HOPS_IN_PAYMENT_PATH,
     35                      NUM_MAX_EDGES_IN_PAYMENT_PATH, ShortChannelID, OnionFailureCodeMetaFlag)
     36 from .lnmsg import OnionWireSerializer, read_bigsize_int, write_bigsize_int
     37 
     38 if TYPE_CHECKING:
     39     from .lnrouter import LNPaymentRoute
     40 
     41 
     42 HOPS_DATA_SIZE = 1300      # also sometimes called routingInfoSize in bolt-04
     43 TRAMPOLINE_HOPS_DATA_SIZE = 400
     44 LEGACY_PER_HOP_FULL_SIZE = 65
     45 PER_HOP_HMAC_SIZE = 32
     46 
     47 
     48 class UnsupportedOnionPacketVersion(Exception): pass
     49 class InvalidOnionMac(Exception): pass
     50 class InvalidOnionPubkey(Exception): pass
     51 
     52 
     53 class LegacyHopDataPayload:
     54 
     55     def __init__(self, *, short_channel_id: bytes, amt_to_forward: int, outgoing_cltv_value: int):
     56         self.short_channel_id = ShortChannelID(short_channel_id)
     57         self.amt_to_forward = amt_to_forward
     58         self.outgoing_cltv_value = outgoing_cltv_value
     59 
     60     def to_bytes(self) -> bytes:
     61         ret = self.short_channel_id
     62         ret += int.to_bytes(self.amt_to_forward, length=8, byteorder="big", signed=False)
     63         ret += int.to_bytes(self.outgoing_cltv_value, length=4, byteorder="big", signed=False)
     64         ret += bytes(12)  # padding
     65         if len(ret) != 32:
     66             raise Exception('unexpected length {}'.format(len(ret)))
     67         return ret
     68 
     69     def to_tlv_dict(self) -> dict:
     70         d = {
     71             "amt_to_forward": {"amt_to_forward": self.amt_to_forward},
     72             "outgoing_cltv_value": {"outgoing_cltv_value": self.outgoing_cltv_value},
     73             "short_channel_id": {"short_channel_id": self.short_channel_id},
     74         }
     75         return d
     76 
     77     @classmethod
     78     def from_bytes(cls, b: bytes) -> 'LegacyHopDataPayload':
     79         if len(b) != 32:
     80             raise Exception('unexpected length {}'.format(len(b)))
     81         return LegacyHopDataPayload(
     82             short_channel_id=b[:8],
     83             amt_to_forward=int.from_bytes(b[8:16], byteorder="big", signed=False),
     84             outgoing_cltv_value=int.from_bytes(b[16:20], byteorder="big", signed=False),
     85         )
     86 
     87     @classmethod
     88     def from_tlv_dict(cls, d: dict) -> 'LegacyHopDataPayload':
     89         return LegacyHopDataPayload(
     90             short_channel_id=d["short_channel_id"]["short_channel_id"] if "short_channel_id" in d else b"\x00" * 8,
     91             amt_to_forward=d["amt_to_forward"]["amt_to_forward"],
     92             outgoing_cltv_value=d["outgoing_cltv_value"]["outgoing_cltv_value"],
     93         )
     94 
     95 
     96 class OnionHopsDataSingle:  # called HopData in lnd
     97 
     98     def __init__(self, *, is_tlv_payload: bool, payload: dict = None):
     99         self.is_tlv_payload = is_tlv_payload
    100         if payload is None:
    101             payload = {}
    102         self.payload = payload
    103         self.hmac = None
    104         self._raw_bytes_payload = None  # used in unit tests
    105 
    106     def to_bytes(self) -> bytes:
    107         hmac_ = self.hmac if self.hmac is not None else bytes(PER_HOP_HMAC_SIZE)
    108         if self._raw_bytes_payload is not None:
    109             ret = write_bigsize_int(len(self._raw_bytes_payload))
    110             ret += self._raw_bytes_payload
    111             ret += hmac_
    112             return ret
    113         if not self.is_tlv_payload:
    114             ret = b"\x00"  # realm==0
    115             legacy_payload = LegacyHopDataPayload.from_tlv_dict(self.payload)
    116             ret += legacy_payload.to_bytes()
    117             ret += hmac_
    118             if len(ret) != LEGACY_PER_HOP_FULL_SIZE:
    119                 raise Exception('unexpected length {}'.format(len(ret)))
    120             return ret
    121         else:  # tlv
    122             payload_fd = io.BytesIO()
    123             OnionWireSerializer.write_tlv_stream(fd=payload_fd,
    124                                                  tlv_stream_name="tlv_payload",
    125                                                  **self.payload)
    126             payload_bytes = payload_fd.getvalue()
    127             with io.BytesIO() as fd:
    128                 fd.write(write_bigsize_int(len(payload_bytes)))
    129                 fd.write(payload_bytes)
    130                 fd.write(hmac_)
    131                 return fd.getvalue()
    132 
    133     @classmethod
    134     def from_fd(cls, fd: io.BytesIO) -> 'OnionHopsDataSingle':
    135         first_byte = fd.read(1)
    136         if len(first_byte) == 0:
    137             raise Exception(f"unexpected EOF")
    138         fd.seek(-1, io.SEEK_CUR)  # undo read
    139         if first_byte == b'\x00':
    140             # legacy hop data format
    141             b = fd.read(LEGACY_PER_HOP_FULL_SIZE)
    142             if len(b) != LEGACY_PER_HOP_FULL_SIZE:
    143                 raise Exception(f'unexpected length {len(b)}')
    144             ret = OnionHopsDataSingle(is_tlv_payload=False)
    145             legacy_payload = LegacyHopDataPayload.from_bytes(b[1:33])
    146             ret.payload = legacy_payload.to_tlv_dict()
    147             ret.hmac = b[33:]
    148             return ret
    149         elif first_byte == b'\x01':
    150             # reserved for future use
    151             raise Exception("unsupported hop payload: length==1")
    152         else:
    153             hop_payload_length = read_bigsize_int(fd)
    154             hop_payload = fd.read(hop_payload_length)
    155             if hop_payload_length != len(hop_payload):
    156                 raise Exception(f"unexpected EOF")
    157             ret = OnionHopsDataSingle(is_tlv_payload=True)
    158             ret.payload = OnionWireSerializer.read_tlv_stream(fd=io.BytesIO(hop_payload),
    159                                                               tlv_stream_name="tlv_payload")
    160             ret.hmac = fd.read(PER_HOP_HMAC_SIZE)
    161             assert len(ret.hmac) == PER_HOP_HMAC_SIZE
    162             return ret
    163 
    164     def __repr__(self):
    165         return f"<OnionHopsDataSingle. is_tlv_payload={self.is_tlv_payload}. payload={self.payload}. hmac={self.hmac}>"
    166 
    167 
    168 class OnionPacket:
    169 
    170     def __init__(self, public_key: bytes, hops_data: bytes, hmac: bytes):
    171         assert len(public_key) == 33
    172         assert len(hops_data) in [ HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE ]
    173         assert len(hmac) == PER_HOP_HMAC_SIZE
    174         self.version = 0
    175         self.public_key = public_key
    176         self.hops_data = hops_data  # also called RoutingInfo in bolt-04
    177         self.hmac = hmac
    178         if not ecc.ECPubkey.is_pubkey_bytes(public_key):
    179             raise InvalidOnionPubkey()
    180 
    181     def to_bytes(self) -> bytes:
    182         ret = bytes([self.version])
    183         ret += self.public_key
    184         ret += self.hops_data
    185         ret += self.hmac
    186         if len(ret) - 66 not in [ HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE ]:
    187             raise Exception('unexpected length {}'.format(len(ret)))
    188         return ret
    189 
    190     @classmethod
    191     def from_bytes(cls, b: bytes):
    192         if len(b) - 66 not in [ HOPS_DATA_SIZE, TRAMPOLINE_HOPS_DATA_SIZE ]:
    193             raise Exception('unexpected length {}'.format(len(b)))
    194         version = b[0]
    195         if version != 0:
    196             raise UnsupportedOnionPacketVersion('version {} is not supported'.format(version))
    197         return OnionPacket(
    198             public_key=b[1:34],
    199             hops_data=b[34:-32],
    200             hmac=b[-32:]
    201         )
    202 
    203 
    204 def get_bolt04_onion_key(key_type: bytes, secret: bytes) -> bytes:
    205     if key_type not in (b'rho', b'mu', b'um', b'ammag', b'pad'):
    206         raise Exception('invalid key_type {}'.format(key_type))
    207     key = hmac_oneshot(key_type, msg=secret, digest=hashlib.sha256)
    208     return key
    209 
    210 
    211 def get_shared_secrets_along_route(payment_path_pubkeys: Sequence[bytes],
    212                                    session_key: bytes) -> Sequence[bytes]:
    213     num_hops = len(payment_path_pubkeys)
    214     hop_shared_secrets = num_hops * [b'']
    215     ephemeral_key = session_key
    216     # compute shared key for each hop
    217     for i in range(0, num_hops):
    218         hop_shared_secrets[i] = get_ecdh(ephemeral_key, payment_path_pubkeys[i])
    219         ephemeral_pubkey = ecc.ECPrivkey(ephemeral_key).get_public_key_bytes()
    220         blinding_factor = sha256(ephemeral_pubkey + hop_shared_secrets[i])
    221         blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
    222         ephemeral_key_int = int.from_bytes(ephemeral_key, byteorder="big")
    223         ephemeral_key_int = ephemeral_key_int * blinding_factor_int % ecc.CURVE_ORDER
    224         ephemeral_key = ephemeral_key_int.to_bytes(32, byteorder="big")
    225     return hop_shared_secrets
    226 
    227 
    228 def new_onion_packet(payment_path_pubkeys: Sequence[bytes], session_key: bytes,
    229                      hops_data: Sequence[OnionHopsDataSingle], associated_data: bytes, trampoline=False) -> OnionPacket:
    230     num_hops = len(payment_path_pubkeys)
    231     assert num_hops == len(hops_data)
    232     hop_shared_secrets = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
    233 
    234     data_size = TRAMPOLINE_HOPS_DATA_SIZE if trampoline else HOPS_DATA_SIZE
    235     filler = _generate_filler(b'rho', hops_data, hop_shared_secrets, data_size)
    236     next_hmac = bytes(PER_HOP_HMAC_SIZE)
    237 
    238     # Our starting packet needs to be filled out with random bytes, we
    239     # generate some deterministically using the session private key.
    240     pad_key = get_bolt04_onion_key(b'pad', session_key)
    241     mix_header = generate_cipher_stream(pad_key, data_size)
    242 
    243     # compute routing info and MAC for each hop
    244     for i in range(num_hops-1, -1, -1):
    245         rho_key = get_bolt04_onion_key(b'rho', hop_shared_secrets[i])
    246         mu_key = get_bolt04_onion_key(b'mu', hop_shared_secrets[i])
    247         hops_data[i].hmac = next_hmac
    248         stream_bytes = generate_cipher_stream(rho_key, data_size)
    249         hop_data_bytes = hops_data[i].to_bytes()
    250         mix_header = mix_header[:-len(hop_data_bytes)]
    251         mix_header = hop_data_bytes + mix_header
    252         mix_header = xor_bytes(mix_header, stream_bytes)
    253         if i == num_hops - 1 and len(filler) != 0:
    254             mix_header = mix_header[:-len(filler)] + filler
    255         packet = mix_header + associated_data
    256         next_hmac = hmac_oneshot(mu_key, msg=packet, digest=hashlib.sha256)
    257 
    258     return OnionPacket(
    259         public_key=ecc.ECPrivkey(session_key).get_public_key_bytes(),
    260         hops_data=mix_header,
    261         hmac=next_hmac)
    262 
    263 
    264 def calc_hops_data_for_payment(
    265         route: 'LNPaymentRoute',
    266         amount_msat: int,
    267         final_cltv: int, *,
    268         total_msat=None,
    269         payment_secret: bytes = None) -> Tuple[List[OnionHopsDataSingle], int, int]:
    270 
    271     """Returns the hops_data to be used for constructing an onion packet,
    272     and the amount_msat and cltv to be used on our immediate channel.
    273     """
    274     if len(route) > NUM_MAX_EDGES_IN_PAYMENT_PATH:
    275         raise PaymentFailure(f"too long route ({len(route)} edges)")
    276     # payload that will be seen by the last hop:
    277     amt = amount_msat
    278     cltv = final_cltv
    279     hop_payload = {
    280         "amt_to_forward": {"amt_to_forward": amt},
    281         "outgoing_cltv_value": {"outgoing_cltv_value": cltv},
    282     }
    283     # for multipart payments we need to tell the receiver about the total and
    284     # partial amounts
    285     if payment_secret is not None:
    286         hop_payload["payment_data"] = {
    287             "payment_secret": payment_secret,
    288             "total_msat": total_msat,
    289             "amount_msat": amt
    290         }
    291     hops_data = [OnionHopsDataSingle(
    292         is_tlv_payload=route[-1].has_feature_varonion(), payload=hop_payload)]
    293     # payloads, backwards from last hop (but excluding the first edge):
    294     for edge_index in range(len(route) - 1, 0, -1):
    295         route_edge = route[edge_index]
    296         is_trampoline = route_edge.is_trampoline()
    297         if is_trampoline:
    298             amt += route_edge.fee_for_edge(amt)
    299             cltv += route_edge.cltv_expiry_delta
    300         hop_payload = {
    301             "amt_to_forward": {"amt_to_forward": amt},
    302             "outgoing_cltv_value": {"outgoing_cltv_value": cltv},
    303             "short_channel_id": {"short_channel_id": route_edge.short_channel_id},
    304         }
    305         hops_data.append(
    306             OnionHopsDataSingle(
    307                 is_tlv_payload=route[edge_index-1].has_feature_varonion(),
    308                 payload=hop_payload))
    309         if not is_trampoline:
    310             amt += route_edge.fee_for_edge(amt)
    311             cltv += route_edge.cltv_expiry_delta
    312     hops_data.reverse()
    313     return hops_data, amt, cltv
    314 
    315 
    316 def _generate_filler(key_type: bytes, hops_data: Sequence[OnionHopsDataSingle],
    317                      shared_secrets: Sequence[bytes], data_size:int) -> bytes:
    318     num_hops = len(hops_data)
    319 
    320     # generate filler that matches all but the last hop (no HMAC for last hop)
    321     filler_size = 0
    322     for hop_data in hops_data[:-1]:
    323         filler_size += len(hop_data.to_bytes())
    324     filler = bytearray(filler_size)
    325 
    326     for i in range(0, num_hops-1):  # -1, as last hop does not obfuscate
    327         # Sum up how many frames were used by prior hops.
    328         filler_start = data_size
    329         for hop_data in hops_data[:i]:
    330             filler_start -= len(hop_data.to_bytes())
    331         # The filler is the part dangling off of the end of the
    332         # routingInfo, so offset it from there, and use the current
    333         # hop's frame count as its size.
    334         filler_end = data_size + len(hops_data[i].to_bytes())
    335 
    336         stream_key = get_bolt04_onion_key(key_type, shared_secrets[i])
    337         stream_bytes = generate_cipher_stream(stream_key, 2 * data_size)
    338         filler = xor_bytes(filler, stream_bytes[filler_start:filler_end])
    339         filler += bytes(filler_size - len(filler))  # right pad with zeroes
    340 
    341     return filler
    342 
    343 
    344 def generate_cipher_stream(stream_key: bytes, num_bytes: int) -> bytes:
    345     return chacha20_encrypt(key=stream_key,
    346                             nonce=bytes(8),
    347                             data=bytes(num_bytes))
    348 
    349 
    350 class ProcessedOnionPacket(NamedTuple):
    351     are_we_final: bool
    352     hop_data: OnionHopsDataSingle
    353     next_packet: OnionPacket
    354     trampoline_onion_packet: OnionPacket
    355 
    356 
    357 # TODO replay protection
    358 def process_onion_packet(
    359         onion_packet: OnionPacket,
    360         associated_data: bytes,
    361         our_onion_private_key: bytes,
    362         is_trampoline=False) -> ProcessedOnionPacket:
    363     if not ecc.ECPubkey.is_pubkey_bytes(onion_packet.public_key):
    364         raise InvalidOnionPubkey()
    365     shared_secret = get_ecdh(our_onion_private_key, onion_packet.public_key)
    366     # check message integrity
    367     mu_key = get_bolt04_onion_key(b'mu', shared_secret)
    368     calculated_mac = hmac_oneshot(
    369         mu_key, msg=onion_packet.hops_data+associated_data,
    370         digest=hashlib.sha256)
    371     if onion_packet.hmac != calculated_mac:
    372         raise InvalidOnionMac()
    373     # peel an onion layer off
    374     rho_key = get_bolt04_onion_key(b'rho', shared_secret)
    375     data_size = TRAMPOLINE_HOPS_DATA_SIZE if is_trampoline else HOPS_DATA_SIZE
    376     stream_bytes = generate_cipher_stream(rho_key, 2 * data_size)
    377     padded_header = onion_packet.hops_data + bytes(data_size)
    378     next_hops_data = xor_bytes(padded_header, stream_bytes)
    379     next_hops_data_fd = io.BytesIO(next_hops_data)
    380     hop_data = OnionHopsDataSingle.from_fd(next_hops_data_fd)
    381     # trampoline
    382     trampoline_onion_packet = hop_data.payload.get('trampoline_onion_packet')
    383     if trampoline_onion_packet:
    384         top_version = trampoline_onion_packet.get('version')
    385         top_public_key = trampoline_onion_packet.get('public_key')
    386         top_hops_data = trampoline_onion_packet.get('hops_data')
    387         top_hops_data_fd = io.BytesIO(top_hops_data)
    388         top_hmac = trampoline_onion_packet.get('hmac')
    389         trampoline_onion_packet = OnionPacket(
    390             public_key=top_public_key,
    391             hops_data=top_hops_data_fd.read(TRAMPOLINE_HOPS_DATA_SIZE),
    392             hmac=top_hmac)
    393     # calc next ephemeral key
    394     blinding_factor = sha256(onion_packet.public_key + shared_secret)
    395     blinding_factor_int = int.from_bytes(blinding_factor, byteorder="big")
    396     next_public_key_int = ecc.ECPubkey(onion_packet.public_key) * blinding_factor_int
    397     next_public_key = next_public_key_int.get_public_key_bytes()
    398     next_onion_packet = OnionPacket(
    399         public_key=next_public_key,
    400         hops_data=next_hops_data_fd.read(data_size),
    401         hmac=hop_data.hmac)
    402     if hop_data.hmac == bytes(PER_HOP_HMAC_SIZE):
    403         # we are the destination / exit node
    404         are_we_final = True
    405     else:
    406         # we are an intermediate node; forwarding
    407         are_we_final = False
    408     return ProcessedOnionPacket(are_we_final, hop_data, next_onion_packet, trampoline_onion_packet)
    409 
    410 
    411 class FailedToDecodeOnionError(Exception): pass
    412 
    413 
    414 class OnionRoutingFailure(Exception):
    415 
    416     def __init__(self, code: int, data: bytes):
    417         self.code = code
    418         self.data = data
    419 
    420     def __repr__(self):
    421         return repr((self.code, self.data))
    422 
    423     def to_bytes(self) -> bytes:
    424         ret = self.code.to_bytes(2, byteorder="big")
    425         ret += self.data
    426         return ret
    427 
    428     @classmethod
    429     def from_bytes(cls, failure_msg: bytes):
    430         failure_code = int.from_bytes(failure_msg[:2], byteorder='big')
    431         try:
    432             failure_code = OnionFailureCode(failure_code)
    433         except ValueError:
    434             pass  # uknown failure code
    435         failure_data = failure_msg[2:]
    436         return OnionRoutingFailure(failure_code, failure_data)
    437 
    438     def code_name(self) -> str:
    439         if isinstance(self.code, OnionFailureCode):
    440             return str(self.code.name)
    441         return f"Unknown error ({self.code!r})"
    442 
    443 
    444 def construct_onion_error(
    445         reason: OnionRoutingFailure,
    446         onion_packet: OnionPacket,
    447         our_onion_private_key: bytes,
    448 ) -> bytes:
    449     # create payload
    450     failure_msg = reason.to_bytes()
    451     failure_len = len(failure_msg)
    452     pad_len = 256 - failure_len
    453     assert pad_len >= 0
    454     error_packet =  failure_len.to_bytes(2, byteorder="big")
    455     error_packet += failure_msg
    456     error_packet += pad_len.to_bytes(2, byteorder="big")
    457     error_packet += bytes(pad_len)
    458     # add hmac
    459     shared_secret = get_ecdh(our_onion_private_key, onion_packet.public_key)
    460     um_key = get_bolt04_onion_key(b'um', shared_secret)
    461     hmac_ = hmac_oneshot(um_key, msg=error_packet, digest=hashlib.sha256)
    462     error_packet = hmac_ + error_packet
    463     # obfuscate
    464     ammag_key = get_bolt04_onion_key(b'ammag', shared_secret)
    465     stream_bytes = generate_cipher_stream(ammag_key, len(error_packet))
    466     error_packet = xor_bytes(error_packet, stream_bytes)
    467     return error_packet
    468 
    469 
    470 def _decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes],
    471                         session_key: bytes) -> Tuple[bytes, int]:
    472     """Returns the decoded error bytes, and the index of the sender of the error."""
    473     num_hops = len(payment_path_pubkeys)
    474     hop_shared_secrets = get_shared_secrets_along_route(payment_path_pubkeys, session_key)
    475     for i in range(num_hops):
    476         ammag_key = get_bolt04_onion_key(b'ammag', hop_shared_secrets[i])
    477         um_key = get_bolt04_onion_key(b'um', hop_shared_secrets[i])
    478         stream_bytes = generate_cipher_stream(ammag_key, len(error_packet))
    479         error_packet = xor_bytes(error_packet, stream_bytes)
    480         hmac_computed = hmac_oneshot(um_key, msg=error_packet[32:], digest=hashlib.sha256)
    481         hmac_found = error_packet[:32]
    482         if hmac_computed == hmac_found:
    483             return error_packet, i
    484     raise FailedToDecodeOnionError()
    485 
    486 
    487 def decode_onion_error(error_packet: bytes, payment_path_pubkeys: Sequence[bytes],
    488                        session_key: bytes) -> (OnionRoutingFailure, int):
    489     """Returns the failure message, and the index of the sender of the error."""
    490     decrypted_error, sender_index = _decode_onion_error(error_packet, payment_path_pubkeys, session_key)
    491     failure_msg = get_failure_msg_from_onion_error(decrypted_error)
    492     return failure_msg, sender_index
    493 
    494 
    495 def get_failure_msg_from_onion_error(decrypted_error_packet: bytes) -> OnionRoutingFailure:
    496     # get failure_msg bytes from error packet
    497     failure_len = int.from_bytes(decrypted_error_packet[32:34], byteorder='big')
    498     failure_msg = decrypted_error_packet[34:34+failure_len]
    499     # create failure message object
    500     return OnionRoutingFailure.from_bytes(failure_msg)
    501 
    502 
    503 
    504 # TODO maybe we should rm this and just use OnionWireSerializer and onion_wire.csv
    505 BADONION = OnionFailureCodeMetaFlag.BADONION
    506 PERM     = OnionFailureCodeMetaFlag.PERM
    507 NODE     = OnionFailureCodeMetaFlag.NODE
    508 UPDATE   = OnionFailureCodeMetaFlag.UPDATE
    509 class OnionFailureCode(IntEnum):
    510     INVALID_REALM =                           PERM | 1
    511     TEMPORARY_NODE_FAILURE =                  NODE | 2
    512     PERMANENT_NODE_FAILURE =                  PERM | NODE | 2
    513     REQUIRED_NODE_FEATURE_MISSING =           PERM | NODE | 3
    514     INVALID_ONION_VERSION =                   BADONION | PERM | 4
    515     INVALID_ONION_HMAC =                      BADONION | PERM | 5
    516     INVALID_ONION_KEY =                       BADONION | PERM | 6
    517     TEMPORARY_CHANNEL_FAILURE =               UPDATE | 7
    518     PERMANENT_CHANNEL_FAILURE =               PERM | 8
    519     REQUIRED_CHANNEL_FEATURE_MISSING =        PERM | 9
    520     UNKNOWN_NEXT_PEER =                       PERM | 10
    521     AMOUNT_BELOW_MINIMUM =                    UPDATE | 11
    522     FEE_INSUFFICIENT =                        UPDATE | 12
    523     INCORRECT_CLTV_EXPIRY =                   UPDATE | 13
    524     EXPIRY_TOO_SOON =                         UPDATE | 14
    525     INCORRECT_OR_UNKNOWN_PAYMENT_DETAILS =    PERM | 15
    526     _LEGACY_INCORRECT_PAYMENT_AMOUNT =        PERM | 16
    527     FINAL_EXPIRY_TOO_SOON =                   17
    528     FINAL_INCORRECT_CLTV_EXPIRY =             18
    529     FINAL_INCORRECT_HTLC_AMOUNT =             19
    530     CHANNEL_DISABLED =                        UPDATE | 20
    531     EXPIRY_TOO_FAR =                          21
    532     INVALID_ONION_PAYLOAD =                   PERM | 22
    533     MPP_TIMEOUT =                             23
    534     TRAMPOLINE_FEE_INSUFFICIENT =             NODE | 51
    535     TRAMPOLINE_EXPIRY_TOO_SOON =              NODE | 52
    536 
    537 
    538 # don't use these elsewhere, the names are ambiguous without context
    539 del BADONION; del PERM; del NODE; del UPDATE