commit 51e0672da6ed4d8d8b078b0aad8cd515d59aaeef
parent d92a4e8365c48b1531ad8fb561a804a0ff91fcb2
Author: SomberNight <somber.night@protonmail.com>
Date: Tue, 30 Apr 2019 21:24:39 +0200
update to aiorpcx 0.17
Diffstat:
2 files changed, 40 insertions(+), 33 deletions(-)
diff --git a/contrib/requirements/requirements.txt b/contrib/requirements/requirements.txt
@@ -5,7 +5,7 @@ protobuf
dnspython
jsonrpclib-pelix
qdarkstyle<2.6
-aiorpcx>=0.9,<0.11
+aiorpcx>=0.17,<0.18
aiohttp>=3.3.0
aiohttp_socks
certifi
diff --git a/electrum/interface.py b/electrum/interface.py
@@ -34,6 +34,7 @@ from ipaddress import IPv4Network, IPv6Network, ip_address
import aiorpcx
from aiorpcx import RPCSession, Notification
+from aiorpcx.curio import timeout_after, TaskTimeout
import certifi
from .util import PrintError, ignore_exceptions, log_exceptions, bfh, SilentTaskGroup
@@ -72,10 +73,10 @@ class NotificationSession(RPCSession):
super(NotificationSession, self).__init__(*args, **kwargs)
self.subscriptions = defaultdict(list)
self.cache = {}
- self.in_flight_requests_semaphore = asyncio.Semaphore(100)
self.default_timeout = NetworkTimeout.Generic.NORMAL
self._msg_counter = 0
self.interface = None # type: Optional[Interface]
+ self.cost_hard_limit = 0 # disable aiorpcx resource limits
def _get_and_inc_msg_counter(self):
# runs in event loop thread, no need for lock
@@ -84,35 +85,40 @@ class NotificationSession(RPCSession):
async def handle_request(self, request):
self.maybe_log(f"--> {request}")
- # note: if server sends malformed request and we raise, the superclass
- # will catch the exception, count errors, and at some point disconnect
- if isinstance(request, Notification):
- params, result = request.args[:-1], request.args[-1]
- key = self.get_hashable_key_for_rpc_call(request.method, params)
- if key in self.subscriptions:
- self.cache[key] = result
- for queue in self.subscriptions[key]:
- await queue.put(request.args)
+ try:
+ if isinstance(request, Notification):
+ params, result = request.args[:-1], request.args[-1]
+ key = self.get_hashable_key_for_rpc_call(request.method, params)
+ if key in self.subscriptions:
+ self.cache[key] = result
+ for queue in self.subscriptions[key]:
+ await queue.put(request.args)
+ else:
+ raise Exception(f'unexpected notification')
else:
- raise Exception('unexpected request: {}'.format(repr(request)))
+ raise Exception(f'unexpected request. not a notification')
+ except Exception as e:
+ self.interface.print_error(f"error handling request {request}. exc: {repr(e)}")
+ await self.close()
async def send_request(self, *args, timeout=None, **kwargs):
- # note: the timeout starts after the request touches the wire!
- if timeout is None:
- timeout = self.default_timeout
- # note: the semaphore implementation guarantees no starvation
- async with self.in_flight_requests_semaphore:
- msg_id = self._get_and_inc_msg_counter()
- self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
- try:
- response = await asyncio.wait_for(
- super().send_request(*args, **kwargs),
- timeout)
- except asyncio.TimeoutError as e:
- raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
- else:
- self.maybe_log(f"--> {response} (id: {msg_id})")
- return response
+ # note: semaphores/timeouts/backpressure etc are handled by
+ # aiorpcx. the timeout arg here in most cases should not be set
+ msg_id = self._get_and_inc_msg_counter()
+ self.maybe_log(f"<-- {args} {kwargs} (id: {msg_id})")
+ try:
+ response = await asyncio.wait_for(
+ super().send_request(*args, **kwargs),
+ timeout)
+ except (TaskTimeout, asyncio.TimeoutError) as e:
+ raise RequestTimedOut(f'request timed out: {args} (id: {msg_id})') from e
+ else:
+ self.maybe_log(f"--> {response} (id: {msg_id})")
+ return response
+
+ def set_default_timeout(self, timeout):
+ self.sent_request_timeout = timeout
+ self.max_send_delay = timeout
async def subscribe(self, method: str, params: List, queue: asyncio.Queue):
# note: until the cache is written for the first time,
@@ -212,10 +218,11 @@ class Interface(PrintError):
auth = None
else:
auth = aiorpcx.socks.SOCKSUserAuth(username, pw)
+ addr = "{}:{}".format(proxy['host'], proxy['port'])
if proxy['mode'] == "socks4":
- self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS4a, auth)
+ self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS4a, auth)
elif proxy['mode'] == "socks5":
- self.proxy = aiorpcx.socks.SOCKSProxy((proxy['host'], int(proxy['port'])), aiorpcx.socks.SOCKS5, auth)
+ self.proxy = aiorpcx.socks.SOCKSProxy(addr, aiorpcx.socks.SOCKS5, auth)
else:
raise NotImplementedError # http proxy not available with aiorpcx
else:
@@ -408,7 +415,7 @@ class Interface(PrintError):
ssl=sslc, proxy=self.proxy) as session:
self.session = session # type: NotificationSession
self.session.interface = self
- self.session.default_timeout = self.network.get_network_timeout_seconds(NetworkTimeout.Generic)
+ self.session.set_default_timeout(self.network.get_network_timeout_seconds(NetworkTimeout.Generic))
try:
ver = await session.send_request('server.version', [self.client_name(), version.PROTOCOL_VERSION])
except aiorpcx.jsonrpc.RPCError as e:
@@ -620,9 +627,9 @@ class Interface(PrintError):
def ip_addr(self) -> Optional[str]:
session = self.session
if not session: return None
- peer_addr = session.peer_address()
+ peer_addr = session.remote_address()
if not peer_addr: return None
- return peer_addr[0]
+ return str(peer_addr.host)
def bucket_based_on_ipaddress(self) -> str:
def do_bucket():