commit 6321b14f9feca1cb5d6293ab722f3936d08c170d
parent 353a7b8fd93426075466caf7506e3b6acccec9de
Author: ThomasV <thomasv@electrum.org>
Date: Fri, 31 Mar 2017 12:45:09 +0200
Locate blockchain connection point with binary search
Diffstat:
M | lib/blockchain.py | | | 34 | ++++++++++------------------------ |
M | lib/network.py | | | 206 | +++++++++++++++++++++++++++++++++++++++++++------------------------------------ |
2 files changed, 122 insertions(+), 118 deletions(-)
diff --git a/lib/blockchain.py b/lib/blockchain.py
@@ -247,35 +247,21 @@ class Blockchain(util.PrintError):
new_bits = bitsN << 24 | bitsBase
return new_bits, bitsBase << (8 * (bitsN-3))
- def connect_header(self, chain, header):
- '''Builds a header chain until it connects. Returns True if it has
- successfully connected, False if verification failed, otherwise the
- height of the next header needed.'''
- chain.append(header) # Ordered by decreasing height
+ def can_connect(self, header):
previous_height = header['block_height'] - 1
previous_header = self.read_header(previous_height)
-
- # Missing header, request it
if not previous_header:
- return previous_height
-
- # Does it connect to my chain?
+ return False
prev_hash = self.hash_header(previous_header)
if prev_hash != header.get('prev_block_hash'):
- self.print_error("reorg")
- return previous_height
-
- # The chain is complete. Reverse to order by increasing height
- chain.reverse()
+ return False
+ height = header.get('block_height')
+ bits, target = self.get_target(height / 2016)
try:
- self.verify_chain(chain)
- self.print_error("new height:", previous_height + len(chain))
- for header in chain:
- self.save_header(header)
- return True
- except BaseException as e:
- self.print_error(str(e))
+ self.verify_header(header, previous_header, bits, target)
+ except:
return False
+ return True
def connect_chunk(self, idx, hexdata):
try:
@@ -283,10 +269,10 @@ class Blockchain(util.PrintError):
self.verify_chunk(idx, data)
self.print_error("validated chunk %d" % idx)
self.save_chunk(idx, data)
- return idx + 1
+ return True
except BaseException as e:
self.print_error('verify_chunk failed', str(e))
- return idx - 1
+ return False
def get_checkpoint(self):
height = self.config.get('checkpoint_height', 0)
diff --git a/lib/network.py b/lib/network.py
@@ -554,7 +554,7 @@ class Network(util.DaemonThread):
interface.server_version = result
elif method == 'blockchain.headers.subscribe':
if error is None:
- self.on_header(interface, result)
+ self.on_notify_header(interface, result)
elif method == 'server.peers.subscribe':
if error is None:
self.irc_servers = parse_servers(result)
@@ -691,12 +691,9 @@ class Network(util.DaemonThread):
def new_interface(self, server, socket):
self.add_recent_server(server)
interface = Interface(server, socket)
- # A deque of interface header requests, processed left-to-right
- interface.bc_requests = deque()
- interface.failed_checkpoint = False
+ interface.mode = 'checkpoint'
self.interfaces[server] = interface
- self.queue_request('blockchain.block.get_header', [self.blockchain.checkpoint_height], interface)
- self.queue_request('blockchain.headers.subscribe', [], interface)
+ self.request_header(interface, self.blockchain.checkpoint_height)
if server == self.default_server:
self.switch_to_interface(server)
self.notify('interfaces')
@@ -743,111 +740,124 @@ class Network(util.DaemonThread):
else:
self.switch_to_interface(self.default_server)
- def request_chunk(self, interface, data, idx):
+ def request_chunk(self, interface, idx):
interface.print_error("requesting chunk %d" % idx)
self.queue_request('blockchain.block.get_chunk', [idx], interface)
- data['chunk_idx'] = idx
- data['req_time'] = time.time()
+ interface.request = idx
+ interface.req_time = time.time()
def on_get_chunk(self, interface, response):
'''Handle receiving a chunk of block headers'''
if response.get('error'):
interface.print_error(response.get('error'))
return
- if interface.bc_requests:
- data = interface.bc_requests[0]
- req_idx = data.get('chunk_idx')
- # Ignore unsolicited chunks
- if req_idx == response['params'][0]:
- idx = self.blockchain.connect_chunk(req_idx, response['result'])
- # If not finished, get the next chunk
- if idx < 0 or self.get_local_height() >= data['if_height']:
- interface.bc_requests.popleft()
- self.notify('updated')
- else:
- self.request_chunk(interface, data, idx)
- self.notify('updated')
+ # Ignore unsolicited chunks
+ index = response['params'][0]
+ if interface.request != index:
+ return
+ connect = self.blockchain.connect_chunk(index, response['result'])
+ # If not finished, get the next chunk
+ if not connect:
+ return
+ if self.get_local_height() < interface.tip:
+ self.request_chunk(interface, index+1)
+ else:
+ interface.request = None
+ self.notify('updated')
- def request_header(self, interface, data, height):
+ def request_header(self, interface, height):
interface.print_error("requesting header %d" % height)
self.queue_request('blockchain.block.get_header', [height], interface)
- data['header_height'] = height
- data['req_time'] = time.time()
- if not 'chain' in data:
- data['chain'] = []
+ interface.request = height
+ interface.req_time = time.time()
def on_get_header(self, interface, response):
'''Handle receiving a single block header'''
- # close connection if header does not pass checkpoint
- if not self.blockchain.pass_checkpoint(response['result']):
- if interface == self.interface and not self.auto_connect:
- interface.failed_checkpoint = True
- else:
- interface.print_error("header did not pass checkpoint, dismissing interface")
- self.connection_down(interface.server)
- return
if self.blockchain.downloading_headers:
return
- if interface.bc_requests:
- data = interface.bc_requests[0]
- req_height = data.get('header_height', -1)
- # Ignore unsolicited headers
- if req_height == response['params'][0]:
- if interface.failed_checkpoint:
- interface.bc_requests.popleft()
- return
- next_height = self.blockchain.connect_header(data['chain'], response['result'])
- # If not finished, get the next header
- if next_height in [True, False]:
- interface.bc_requests.popleft()
- if next_height:
- self.switch_lagging_interface(interface.server)
- self.notify('updated')
- else:
- interface.print_error("header didn't connect, dismissing interface")
- self.connection_down(interface.server)
- else:
- self.request_header(interface, data, next_height)
+ header = response.get('result')
+ if not header:
+ interface.print_error(response)
+ self.connection_down(interface.server)
+ return
+ height = header.get('block_height')
+ if interface.request != height:
+ interface.print_error("unsolicited header",interface.request, height)
+ self.connection_down(interface.server)
+ return
+ self.on_header(interface, header)
- def bc_request_headers(self, interface, data):
- '''Send a request for the next header, or a chunk of them,
- if necessary.
- '''
- if self.blockchain.downloading_headers:
- return False
- local_height, if_height = self.get_local_height(), data['if_height']
- if if_height <= local_height:
- return False
- elif if_height > local_height + 50:
- self.request_chunk(interface, data, (local_height + 1) / 2016)
+ def on_header(self, interface, header):
+ height = header.get('block_height')
+ if interface.mode == 'checkpoint':
+ if self.blockchain.pass_checkpoint(header):
+ interface.mode = 'default'
+ self.queue_request('blockchain.headers.subscribe', [], interface)
+ else:
+ if interface != self.interface or self.auto_connect:
+ interface.print_error("checkpoint failed")
+ self.connection_down(interface.server)
+ interface.request = None
+ return
+ can_connect = self.blockchain.can_connect(header)
+ if interface.mode == 'backward':
+ if can_connect:
+ interface.good = height
+ interface.mode = 'binary'
+ interface.print_error("binary search")
+ next_height = (interface.bad + interface.good) // 2
+ else:
+ interface.bad = height
+ delta = interface.tip - height
+ next_height = interface.tip - 2 * delta
+ if next_height < 0:
+ interface.print_error("header didn't connect, dismissing interface")
+ self.connection_down(interface.server)
+ elif interface.mode == 'binary':
+ if can_connect:
+ interface.good = height
+ else:
+ interface.bad = height
+ if interface.good == interface.bad - 1:
+ interface.print_error("catching up from %d"% interface.good)
+ interface.mode = 'default'
+ next_height = interface.good
+ else:
+ next_height = (interface.bad + interface.good) // 2
+ elif interface.mode == 'default':
+ if can_connect:
+ if height > self.get_local_height():
+ self.blockchain.save_header(header)
+ self.notify('updated')
+ if height < interface.tip:
+ next_height = height + 1
+ else:
+ next_height = None
+ else:
+ interface.mode = 'backward'
+ interface.bad = height
+ next_height = height - 1
else:
- self.request_header(interface, data, if_height)
- return True
+ raise BaseException(interface.mode)
+ # If not finished, get the next header
+ if next_height:
+ if interface.mode != 'default':
+ self.request_header(interface, next_height)
+ else:
+ local_height = self.get_local_height()
+ if interface.tip > local_height + 50:
+ self.request_chunk(interface, (local_height + 1) // 2016)
+ else:
+ self.request_header(interface, next_height)
+ else:
+ interface.request = None
- def handle_bc_requests(self):
- '''Work through each interface that has notified us of a new header.
- Send it requests if it is ahead of our blockchain object.
- '''
+ def maintain_requests(self):
for interface in self.interfaces.values():
- if not interface.bc_requests:
- continue
- data = interface.bc_requests.popleft()
- # If the connection was lost move on
- if not interface in self.interfaces.values():
- continue
- req_time = data.get('req_time')
- if not req_time:
- # No requests sent yet. This interface has a new height.
- # Request headers if it is ahead of our blockchain
- if not self.bc_request_headers(interface, data):
- continue
- elif time.time() - req_time > 20:
+ if interface.request and time.time() - interface.request_time > 20:
interface.print_error("blockchain request timed out")
self.connection_down(interface.server)
continue
- # Put updated request state back at head of deque
- interface.bc_requests.appendleft(data)
- break
def wait_on_sockets(self):
# Python docs say Windows doesn't like empty selects.
@@ -874,21 +884,29 @@ class Network(util.DaemonThread):
while self.is_running():
self.maintain_sockets()
self.wait_on_sockets()
- self.handle_bc_requests()
+ self.maintain_requests()
self.run_jobs() # Synchronizer and Verifier
self.process_pending_sends()
-
self.stop_network()
self.on_stop()
- def on_header(self, i, header):
+ def on_notify_header(self, i, header):
height = header.get('block_height')
if not height:
return
self.headers[i.server] = header
-
- # Queue this interface's height for asynchronous catch-up
- i.bc_requests.append({'if_height': height})
+ i.tip = height
+ local_height = self.get_local_height()
+
+ if i.tip > local_height:
+ i.print_error("better height", height)
+ # if I can connect, do it right away
+ if self.blockchain.can_connect(header):
+ self.blockchain.save_header(header)
+ self.notify('updated')
+ # otherwise trigger a search
+ elif i.request is None:
+ self.on_header(i, header)
if i == self.interface:
self.switch_lagging_interface()