commit 8d88b0702da6102f2f2aeae801ab679382506367
parent ec373602587223cab273f19fa399a80b5cd94fc1
Author: ThomasV <thomasv@gitorious>
Date: Mon, 19 Mar 2012 21:19:36 +0300
stratum http server
Diffstat:
2 files changed, 401 insertions(+), 35 deletions(-)
diff --git a/server/StratumJSONRPCServer.py b/server/StratumJSONRPCServer.py
@@ -0,0 +1,296 @@
+import jsonrpclib
+from jsonrpclib import Fault
+from jsonrpclib.jsonrpc import USE_UNIX_SOCKETS
+import SimpleXMLRPCServer
+import SocketServer
+import socket
+import logging
+import os
+import types
+import traceback
+import sys
+try:
+ import fcntl
+except ImportError:
+ # For Windows
+ fcntl = None
+
+import json
+
+def get_version(request):
+ # must be a dict
+ if 'jsonrpc' in request.keys():
+ return 2.0
+ if 'id' in request.keys():
+ return 1.0
+ return None
+
+def validate_request(request):
+ if type(request) is not types.DictType:
+ fault = Fault(
+ -32600, 'Request must be {}, not %s.' % type(request)
+ )
+ return fault
+ rpcid = request.get('id', None)
+ version = get_version(request)
+ if not version:
+ fault = Fault(-32600, 'Request %s invalid.' % request, rpcid=rpcid)
+ return fault
+ request.setdefault('params', [])
+ method = request.get('method', None)
+ params = request.get('params')
+ param_types = (types.ListType, types.DictType, types.TupleType)
+ if not method or type(method) not in types.StringTypes or \
+ type(params) not in param_types:
+ fault = Fault(
+ -32600, 'Invalid request parameters or method.', rpcid=rpcid
+ )
+ return fault
+ return True
+
+class StratumJSONRPCDispatcher(SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
+
+ def __init__(self, encoding=None):
+ SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self,
+ allow_none=True,
+ encoding=encoding)
+
+ def _marshaled_dispatch(self, data, dispatch_method = None):
+ response = None
+ try:
+ request = jsonrpclib.loads(data)
+ except Exception, e:
+ fault = Fault(-32700, 'Request %s invalid. (%s)' % (data, e))
+ response = fault.response()
+ return response
+
+ responses = []
+ if type(request) is not types.ListType:
+ request = [ request ]
+
+ for req_entry in request:
+ result = validate_request(req_entry)
+ if type(result) is Fault:
+ responses.append(result.response())
+ continue
+ resp_entry = self._marshaled_single_dispatch(req_entry)
+ if resp_entry is not None:
+ responses.append(resp_entry)
+
+ # poll
+ r = self._marshaled_single_dispatch({'method':'session.poll', 'params':[], 'id':'z' })
+ r = jsonrpclib.loads(r)
+ r = r.get('result')
+ for item in r:
+ responses.append(json.dumps(item))
+
+ if len(responses) > 1:
+ response = '[%s]' % ','.join(responses)
+ elif len(responses) == 1:
+ response = responses[0]
+ else:
+ response = ''
+
+ return response
+
+ def _marshaled_single_dispatch(self, request):
+ # TODO - Use the multiprocessing and skip the response if
+ # it is a notification
+ # Put in support for custom dispatcher here
+ # (See SimpleXMLRPCServer._marshaled_dispatch)
+ method = request.get('method')
+ params = request.get('params')
+ if params is None: params=[]
+ params = [ self.session_id, request['id'] ] + params
+ print method, params
+ try:
+ response = self._dispatch(method, params)
+ except:
+ exc_type, exc_value, exc_tb = sys.exc_info()
+ fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
+ return fault.response()
+ if 'id' not in request.keys() or request['id'] == None:
+ # It's a notification
+ return None
+
+ try:
+ response = jsonrpclib.dumps(response,
+ methodresponse=True,
+ rpcid=request['id']
+ )
+ return response
+ except:
+ exc_type, exc_value, exc_tb = sys.exc_info()
+ fault = Fault(-32603, '%s:%s' % (exc_type, exc_value))
+ return fault.response()
+
+ def _dispatch(self, method, params):
+ func = None
+ try:
+ func = self.funcs[method]
+ except KeyError:
+ if self.instance is not None:
+ if hasattr(self.instance, '_dispatch'):
+ return self.instance._dispatch(method, params)
+ else:
+ try:
+ func = SimpleXMLRPCServer.resolve_dotted_attribute(
+ self.instance,
+ method,
+ True
+ )
+ except AttributeError:
+ pass
+ if func is not None:
+ try:
+ if type(params) is types.ListType:
+ response = func(*params)
+ else:
+ response = func(**params)
+ return response
+ except TypeError:
+ return Fault(-32602, 'Invalid parameters.')
+ except:
+ err_lines = traceback.format_exc().splitlines()
+ trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
+ fault = jsonrpclib.Fault(-32603, 'Server error: %s' %
+ trace_string)
+ return fault
+ else:
+ return Fault(-32601, 'Method %s not supported.' % method)
+
+class StratumJSONRPCRequestHandler(
+ SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
+
+ def do_GET(self):
+ if not self.is_rpc_path_valid():
+ self.report_404()
+ return
+ try:
+ print "GET"
+
+ self.server.session_id = None
+ c = self.headers.get('cookie')
+ if c:
+ if c[0:8]=='SESSION=':
+ print "found cookie", c[8:]
+ self.server.session_id = c[8:]
+
+ if self.server.session_id is None:
+ r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
+ r = jsonrpclib.loads(r)
+ self.server.session_id = r.get('result')
+ print "setting cookie", self.server.session_id
+
+ data = json.dumps([])
+ response = self.server._marshaled_dispatch(data)
+ self.send_response(200)
+ except Exception, e:
+ self.send_response(500)
+ err_lines = traceback.format_exc().splitlines()
+ trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
+ fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
+ response = fault.response()
+ print "500", trace_string
+ if response == None:
+ response = ''
+
+ if hasattr(self.server, 'session_id'):
+ if self.server.session_id:
+ self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
+ self.session_id = None
+
+ self.send_header("Content-type", "application/json-rpc")
+ self.send_header("Content-length", str(len(response)))
+ self.end_headers()
+ self.wfile.write(response)
+ self.wfile.flush()
+ self.connection.shutdown(1)
+
+
+ def do_POST(self):
+ if not self.is_rpc_path_valid():
+ self.report_404()
+ return
+ try:
+ max_chunk_size = 10*1024*1024
+ size_remaining = int(self.headers["content-length"])
+ L = []
+ while size_remaining:
+ chunk_size = min(size_remaining, max_chunk_size)
+ L.append(self.rfile.read(chunk_size))
+ size_remaining -= len(L[-1])
+ data = ''.join(L)
+
+ self.server.session_id = None
+ c = self.headers.get('cookie')
+ if c:
+ if c[0:8]=='SESSION=':
+ print "found cookie", c[8:]
+ self.server.session_id = c[8:]
+
+ if self.server.session_id is None:
+ r = self.server._marshaled_single_dispatch({'method':'session.create', 'params':[], 'id':'z' })
+ r = jsonrpclib.loads(r)
+ self.server.session_id = r.get('result')
+ #print "setting cookie", self.server.session_id
+
+ response = self.server._marshaled_dispatch(data)
+ self.send_response(200)
+ except Exception, e:
+ self.send_response(500)
+ err_lines = traceback.format_exc().splitlines()
+ trace_string = '%s | %s' % (err_lines[-3], err_lines[-1])
+ fault = jsonrpclib.Fault(-32603, 'Server error: %s' % trace_string)
+ response = fault.response()
+ print "500", trace_string
+ if response == None:
+ response = ''
+
+ if hasattr(self.server, 'session_id'):
+ if self.server.session_id:
+ self.send_header("Set-Cookie", "SESSION=%s"%self.server.session_id)
+ self.session_id = None
+
+ self.send_header("Content-type", "application/json-rpc")
+ self.send_header("Content-length", str(len(response)))
+ self.end_headers()
+ self.wfile.write(response)
+ self.wfile.flush()
+ self.connection.shutdown(1)
+
+
+class StratumJSONRPCServer(SocketServer.TCPServer, StratumJSONRPCDispatcher):
+
+ allow_reuse_address = True
+
+ def __init__(self, addr, requestHandler=StratumJSONRPCRequestHandler,
+ logRequests=True, encoding=None, bind_and_activate=True,
+ address_family=socket.AF_INET):
+ self.logRequests = logRequests
+ StratumJSONRPCDispatcher.__init__(self, encoding)
+ # TCPServer.__init__ has an extra parameter on 2.6+, so
+ # check Python version and decide on how to call it
+ vi = sys.version_info
+ self.address_family = address_family
+ if USE_UNIX_SOCKETS and address_family == socket.AF_UNIX:
+ # Unix sockets can't be bound if they already exist in the
+ # filesystem. The convention of e.g. X11 is to unlink
+ # before binding again.
+ if os.path.exists(addr):
+ try:
+ os.unlink(addr)
+ except OSError:
+ logging.warning("Could not unlink socket %s", addr)
+ # if python 2.5 and lower
+ if vi[0] < 3 and vi[1] < 6:
+ SocketServer.TCPServer.__init__(self, addr, requestHandler)
+ else:
+ SocketServer.TCPServer.__init__(self, addr, requestHandler,
+ bind_and_activate)
+ if fcntl is not None and hasattr(fcntl, 'FD_CLOEXEC'):
+ flags = fcntl.fcntl(self.fileno(), fcntl.F_GETFD)
+ flags |= fcntl.FD_CLOEXEC
+ fcntl.fcntl(self.fileno(), fcntl.F_SETFD, flags)
+
+
diff --git a/server/server.py b/server/server.py
@@ -78,6 +78,8 @@ old_block_number = -1
sessions = {}
sessions_sub_numblocks = {} # sessions that have subscribed to the service
+m_sessions = [{}] # served by http
+
dblock = thread.allocate_lock()
peer_list = {}
@@ -88,6 +90,8 @@ input_queue = Queue()
output_queue = Queue()
address_queue = Queue()
+
+
class MyStore(Datastore_class):
def import_block(self, b, chain_ids=frozenset()):
@@ -384,41 +388,39 @@ def random_string(N):
-def cmd_stop(data):
+def cmd_stop(_,__,pw):
global stopping
- if password == data:
+ if password == pw:
stopping = True
return 'ok'
else:
return 'wrong password'
-def cmd_load(pw):
+def cmd_load(_,__,pw):
if password == pw:
return repr( len(sessions) )
else:
return 'wrong password'
-def clear_cache(pw):
+def clear_cache(_,__,pw):
if password == pw:
store.tx_cache = {}
return 'ok'
else:
return 'wrong password'
-def get_cache(pw,addr):
+def get_cache(_,__,pw,addr):
if password == pw:
return store.tx_cache.get(addr)
else:
return 'wrong password'
-def poll_session(session_id):
- session = sessions.get(session_id)
- if session is None:
- print time.asctime(), "session not found", session_id
- out = repr( (-1, {}))
- else:
+
+
+def modified_addresses(session):
+ if 1:
t1 = time.time()
addresses = session['addresses']
session['last_time'] = time.time()
@@ -427,16 +429,47 @@ def poll_session(session_id):
for addr in addresses:
if store.tx_cache.get( addr ) is not None: k += 1
status = get_address_status( addr )
- last_status = addresses.get( addr )
+ msg_id, last_status = addresses.get( addr )
if last_status != status:
- addresses[addr] = status
+ addresses[addr] = msg_id, status
ret[addr] = status
- if ret:
- sessions[session_id]['addresses'] = addresses
- out = repr( (block_number, ret ) )
+
t2 = time.time() - t1
- if t2 > 10:
- print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
+ #if t2 > 10: print "high load:", session_id, "%d/%d"%(k,len(addresses)), t2
+ return ret, addresses
+
+
+def poll_session(session_id):
+ # native
+ session = sessions.get(session_id)
+ if session is None:
+ print time.asctime(), "session not found", session_id
+ return -1, {}
+ else:
+ ret, addresses = modified_addresses(session)
+ if ret: sessions[session_id]['addresses'] = addresses
+ return repr( (block_number,ret))
+
+
+def poll_session_json(session_id, message_id):
+ session = m_sessions[0].get(session_id)
+ if session is None:
+ raise BaseException("session not found %s"%session_id)
+ else:
+ print "poll: session found", session_id
+ out = []
+ ret, addresses = modified_addresses(session)
+ if ret:
+ m_sessions[0][session_id]['addresses'] = addresses
+ for addr in ret:
+ msg_id, status = addresses[addr]
+ out.append( { 'id':msg_id, 'result':status } )
+
+ msg_id, last_nb = session.get('numblocks')
+ if last_nb:
+ if last_nb != block_number:
+ m_sessions[0][session_id]['numblocks'] = msg_id, block_number
+ out.append( {'id':msg_id, 'result':block_number} )
return out
@@ -444,6 +477,7 @@ def poll_session(session_id):
def do_update_address(addr):
# an address was involved in a transaction; we check if it was subscribed to in a session
# the address can be subscribed in several sessions; the cache should ensure that we don't do redundant requests
+
for session_id in sessions.keys():
session = sessions[session_id]
if session.get('type') != 'persistent': continue
@@ -457,7 +491,6 @@ def do_update_address(addr):
send_status(session_id,message_id,addr,status)
sessions[session_id]['addresses'][addr] = (message_id,status)
-
def get_address_status(addr):
# get address status, i.e. the last block for that address.
tx_points = store.get_history(addr)
@@ -481,19 +514,36 @@ def send_status(session_id, message_id, address, status):
out = json.dumps( { 'id':message_id, 'result':status } )
output_queue.put((session_id, out))
+def address_get_history_json(_,message_id,address):
+ return store.get_history(address)
+
def subscribe_to_numblocks(session_id, message_id):
sessions_sub_numblocks[session_id] = message_id
send_numblocks(session_id)
+def subscribe_to_numblocks_json(session_id, message_id):
+ global m_sessions
+ m_sessions[0][session_id]['numblocks'] = message_id,block_number
+ return block_number
+
def subscribe_to_address(session_id, message_id, address):
status = get_address_status(address)
sessions[session_id]['addresses'][address] = (message_id, status)
sessions[session_id]['last_time'] = time.time()
send_status(session_id, message_id, address, status)
+def add_address_to_session_json(session_id, message_id, address):
+ global m_sessions
+ sessions = m_sessions[0]
+ status = get_address_status(address)
+ sessions[session_id]['addresses'][address] = (message_id, status)
+ sessions[session_id]['last_time'] = time.time()
+ m_sessions[0] = sessions
+ return status
+
def add_address_to_session(session_id, address):
status = get_address_status(address)
- sessions[session_id]['addresses'][addr] = status
+ sessions[session_id]['addresses'][addr] = ("", status)
sessions[session_id]['last_time'] = time.time()
return status
@@ -501,13 +551,30 @@ def new_session(version, addresses):
session_id = random_string(10)
sessions[session_id] = { 'addresses':{}, 'version':version }
for a in addresses:
- sessions[session_id]['addresses'][a] = ''
+ sessions[session_id]['addresses'][a] = ('','')
out = repr( (session_id, config.get('server','banner').replace('\\n','\n') ) )
sessions[session_id]['last_time'] = time.time()
return out
-def get_banner():
- print "get banner"
+
+def client_version_json(session_id, _, version):
+ global m_sessions
+ sessions = m_sessions[0]
+ sessions[session_id]['version'] = version
+ m_sessions[0] = sessions
+
+def create_session_json(_, __):
+ sessions = m_sessions[0]
+ session_id = random_string(10)
+ print "creating session", session_id
+ sessions[session_id] = { 'addresses':{}, 'numblocks':('','') }
+ sessions[session_id]['last_time'] = time.time()
+ m_sessions[0] = sessions
+ return session_id
+
+
+
+def get_banner(_,__):
return config.get('server','banner').replace('\\n','\n')
def update_session(session_id,addresses):
@@ -892,26 +959,29 @@ def irc_thread():
s.close()
+def get_peers_json(_,__):
+ return peer_list.values()
def http_server_thread(store):
# see http://code.google.com/p/jsonrpclib/
from SocketServer import ThreadingMixIn
- from jsonrpclib.SimpleJSONRPCServer import SimpleJSONRPCServer
- class SimpleThreadedJSONRPCServer(ThreadingMixIn, SimpleJSONRPCServer): pass
- server = SimpleThreadedJSONRPCServer(( config.get('server','host'), 8081))
- server.register_function(lambda : peer_list.values(), 'server.peers')
+ from StratumJSONRPCServer import StratumJSONRPCServer
+ class StratumThreadedJSONRPCServer(ThreadingMixIn, StratumJSONRPCServer): pass
+ server = StratumThreadedJSONRPCServer(( config.get('server','host'), 8081))
+ server.register_function(get_peers_json, 'server.peers')
server.register_function(cmd_stop, 'stop')
server.register_function(cmd_load, 'load')
- server.register_function(lambda : block_number, 'blocks')
server.register_function(clear_cache, 'clear_cache')
server.register_function(get_cache, 'get_cache')
server.register_function(get_banner, 'server.banner')
server.register_function(send_tx, 'transaction.broadcast')
- server.register_function(store.get_history, 'address.get_history')
- server.register_function(add_address_to_session, 'address.subscribe')
- server.register_function(new_session, 'session.new')
- server.register_function(update_session, 'session.update')
- server.register_function(poll_session, 'session.poll')
+ server.register_function(address_get_history_json, 'address.get_history')
+ server.register_function(add_address_to_session_json, 'address.subscribe')
+ server.register_function(create_session_json, 'session.create') #internal message
+ server.register_function(poll_session_json, 'session.poll')
+ server.register_function(subscribe_to_numblocks_json, 'numblocks.subscribe')
+ server.register_function(client_version_json, 'client.version')
+ server.register_function(lambda a,b:None, 'ping')
server.serve_forever()
@@ -927,7 +997,7 @@ if __name__ == '__main__':
if cmd == 'load':
out = server.load(password)
elif cmd == 'peers':
- out = server.peers()
+ out = server.server.peers()
elif cmd == 'stop':
out = server.stop(password)
elif cmd == 'clear_cache':
@@ -939,7 +1009,7 @@ if __name__ == '__main__':
elif cmd == 'tx':
out = server.transaction.broadcast(sys.argv[2])
elif cmd == 'b':
- out = server.blocks()
+ out = server.numblocks.subscribe()
else:
out = "Unknown command: '%s'" % cmd
print out