#!/usr/bin/env python # # A web server. # import sys, os, asyncio, logging, aiohttp_jinja2, jinja2, time, weakref, re from aiohttp import web from yarl import URL from conn import Connection, MissingColdcard from ckcc.protocol import CCProtocolPacker from utils import pformat_json, json_loads, json_dumps, cleanup_psbt from objstruct import ObjectStruct from aiohttp.web_exceptions import HTTPMovedPermanently, HTTPNotFound, HTTPBadRequest, HTTPFound from decimal import Decimal import aiohttp_session from aiohttp_session import get_session, new_session from base64 import b32encode, b64decode, b64encode from binascii import b2a_hex, a2b_hex from status import STATUS from persist import settings, BP from hashlib import sha256 from chain import broadcast_txn from version import VERSION try: from jinja2 import Markup, escape except ImportError: from markupsafe import Markup, escape import policy from ckcc.constants import USER_AUTH_TOTP, USER_AUTH_HMAC, USER_AUTH_SHOW_QR, MAX_USERNAME_LEN from ckcc.constants import STXN_VISUALIZE, STXN_SIGNED, AF_P2WPKH, AF_CLASSIC from ckcc.protocol import CCUserRefused logging.getLogger(__name__).addHandler(logging.NullHandler()) routes = web.RouteTableDef() web_sockets = weakref.WeakSet() class HTMLErrorMsg(ValueError): def __init__(self, html): super(HTMLErrorMsg, self).__init__(Markup(html)) APPROVE_CTA = '''\ Please consult the Coldcard screen and review the HSM policy shown there. If you \ are satisfied it does what you need, approve the policy and the Coldcard will enter HSM mode. ''' def default_context(): # # Put values you want in every template here. They cannot vary per-request. # rv = ObjectStruct(VERSION=VERSION) # this defines the nav menu in top bar rv.PAGES = [ ('/', 'Sign Transaction'), ('/tools', 'Tools'), ('/setup', 'Coldcard Setup'), ('/bunker', 'Bunker Setup'), #('/help', 'Help') ] rv['zip'] = zip return rv async def add_shared_ctx(request, **rv): # add ctx vars needed to support fancy Vue.js stuff on logged-in pages ses = await get_session(request) rv.update(dict( ws_url = '/websocket/' + ses.get('ws_token'), STATUS=STATUS.as_dict(), )) rv['CUR_PAGE'] = '/' + request.path.split('/')[-1] return rv @routes.get('/') @aiohttp_jinja2.template('txn/index.html') async def homepage(request): # may take some details from the policy, and cook up into more useful forms ss = BP.get('summary', None) if STATUS.hsm.get('active') else None # coldcard tells us when we'll need a local code, by providing seed value nl = bool('next_local_code' in STATUS.hsm) return await add_shared_ctx(request, policy_summary=ss, needs_local=nl) @routes.get('/setup') @aiohttp_jinja2.template('setup/index.html') async def setup_page(request): # HSM policy setup # get latest status dev = Connection() await dev.hsm_status() return await add_shared_ctx(request) @routes.get('/bunker') @aiohttp_jinja2.template('bunker/index.html') async def bunker_page(request): # Bunker config and setup # get latest status dev = Connection() await dev.hsm_status() from torsion import TOR return await add_shared_ctx(request, BP=BP, force_local_mode=STATUS.force_local_mode) @routes.get('/tools') @aiohttp_jinja2.template('tools/index.html') async def tools_page(request): # various random things I've been talked into # - message signing useul tho if BP.get('policy'): paths = BP['policy'].get('msg_paths') or [] paths = set(i.replace('*', '999') for i in paths) else: # priv_over_ux: we don't know, but some useful ones paths = ['m', "m/0/0", "m/44'/0'/0'/0/0", "m/49'/0'/0'/0/0", "m/84'/0'/0/0" ] paths = list(sorted(paths, key=lambda x: (len(x.split('/')), x.split('/')))) return await add_shared_ctx(request, msg_paths=paths) @routes.get('/help') @aiohttp_jinja2.template('help.html') async def help_page(request): return await add_shared_ctx(request) _static_html_cache = dict() def send_static_html(fname, disable_cache=True): # just return the contents of an HTML file, and cache it along the way # - can't use web.StaticFileResponse because want set-cookies to happen global _static_html_cache if not disable_cache and (fname in _static_html_cache): page_html = _static_html_cache[fname] else: _static_html_cache[fname] = page_html = open(fname, 'rt').read() return web.Response(text=page_html, content_type='text/html') @routes.get('/logout') async def logout_page(request): # do a logout ses = await get_session(request) if ses and not ses.new: logging.warn("Logout of user.") # This clears cookie, but if they come back to any page on our site, # they will get a new one, so we can't redirect them to any other page # here ses.invalidate() return send_static_html('static/html/logout.html') @routes.get('/login') async def login_page(request): # Completely static and plain. # thanks to # Create a session, if they don't have one. Need this # to establish timing/authencity of login data when its posted ses = await get_session(request) if not ses: ses = await new_session(request) else: if ses.get('login_ok'): # they are already logged in, so send them to homepage return HTTPFound('/') # - cannot leave session empty, so put anything in there. ses['j'] = True ses.changed() return send_static_html('static/html/login.html') def accept_user_login(ses): # setup session for good user ses['login_ok'] = True ses['active'] = time.time() ses['ws_token'] = str(b32encode(os.urandom(15)), 'ascii') ses.pop('captcha', None) @routes.post('/login') async def login_post(request): # they must have a current session already # - hope this is enough against CSRF # - TODO: some rate limiting here, without DoS attacks ses = await get_session(request) form = await request.post() ok = False if ses.new: logging.warn("Bad login attempt (no cookie)") elif (time.time() - ses.created) > settings.MAX_LOGIN_WAIT_TIME: ses.invalidate() logging.warn("Stale login attempt (cookie too old)") elif ses.get('kiddie', 0): logging.warn("Keeping the kiddies at bay") else: captcha = form.get('captcha', '').lower() pw = form.get('password', None) if not captcha or not pw: # keep same captcha; they just pressed enter ok = False else: expect = BP.get('master_pw', settings.MASTER_PW) # XXX scrypt(pw) expect_code = ses.pop('captcha', None) ok = (pw == expect) and (captcha == expect_code) if not ok: # fail; do nothing visible (but they will get new captcha) dest = URL(request.headers.get('referer', '/login')) return HTTPFound(dest) # SUCCESS accept_user_login(ses) # try to put them back where they were before try: dest = URL(request.headers.get('referer', '')).query.get('u', '/') except: dest = '/' logging.warn(f"Good login from user, sending to: {dest}") return HTTPFound(dest) @routes.get('/captcha') async def captcha_image(request): # make a captcha image, but always the same one per session ses = await get_session(request) if ses.new: return HTTPNotFound() from make_captcha import RansomCaptcha, MegaGifCaptcha, TOKEN_CHARS import random if 'captcha' in ses: # dont let them retry? code = ses['captcha'] else: code = ''.join(random.sample(TOKEN_CHARS, 8)) ses['captcha'] = code easy = BP.get('easy_captcha', settings.EASY_CAPTCHA) if easy: itype, data = RansomCaptcha(seed=code).draw(code, foreground='#444') else: itype, data = MegaGifCaptcha(seed=code).draw(code, foreground='#444') return web.Response(body=data, content_type='image/'+itype, headers = {'Cache-Control': 'no-cache'}) async def rx_handler(ses, ws, orig_request): # Block on receive, handle each message as it comes in. # see pp/aiohttp/client_ws.py async def tx_resp(_ws=ws, **resp): logging.debug(f"Send resp: {resp}") await _ws.send_str(json_dumps(resp)) async for msg in ws: if msg.type != web.WSMsgType.TEXT: raise TypeError('expected text') try: assert len(msg.data) < 20000 req = json_loads(msg.data) if '_ping' in req: # connection keep alive, simple await tx_resp(_pong=1) continue # Caution: lots of sensitive values here XXX #logging.info("WS api data: %r" % req) except Exception as e: logging.critical("Junk data on WS", exc_info=1) break # the connection # do something with the request failed = True try: await ws_api_handler(ses, tx_resp, req, orig_request) failed = False except SystemExit: raise except KeyboardInterrupt: break except HTMLErrorMsg as exc: # pre-formated text for display msg = exc.args[0] except RuntimeError as exc: # covers CCProtoError and similar msg = str(exc) or str(type(exc).__name__) except BaseException as exc: logging.exception("API fail: req=%r" % req) msg = str(exc) or str(type(exc).__name__) if failed: # standard error response await tx_resp(show_modal=True, html=escape(msg), selector='.js-api-fail') async def push_status_updates_handler(ws): # block for a bit, and then send display updates (and all other system status changes) # - there is no need for immediate update because when we rendered the HTML on page # load, we put in current values. await asyncio.sleep(0.250) last = None while 1: # get latest state now = STATUS.as_dict() if last != now: # it has changed, so send it. await ws.send_str(json_dumps(dict(vue_app_cb=dict(update_status=now)))) last = now # wait until next update, or X seconds max (for keep alive/just in case) try: await asyncio.wait_for(STATUS._update_event.wait(), 120) except asyncio.TimeoutError: # force an update last = None async def ws_api_handler(ses, send_json, req, orig_request): # handle_api # # Handle incoming requests over websocket; send back results. # req = already json parsed request coming in # send_json() = means to send the response back # action = req.action args = getattr(req, 'args', None) #logging.warn("API action=%s (%r)" % (action, args)) # MAJOR info leak XXX logging.debug(f"API action={action}") if action == '_connected': logging.info("Websocket connected: %r" % args) # can send special state update at this point, depending on the page elif action == 'start_hsm_btn': await Connection().hsm_start() await send_json(show_flash_msg=APPROVE_CTA) elif action == 'delete_user': name, = args assert 1 <= len(name) <= MAX_USERNAME_LEN, "bad username length" await Connection().delete_user(name.encode('utf8')) # assume it worked, so UX updates right away try: STATUS.hsm.users.remove(name) except ValueError: pass STATUS.notify_watchers() elif action == 'create_user': name, authmode, new_pw = args assert 1 <= len(name) <= MAX_USERNAME_LEN, "bad username length" assert ',' not in name, "no commas in names" if authmode == 'totp': mode = USER_AUTH_TOTP | USER_AUTH_SHOW_QR new_pw = '' elif authmode == 'rand_pw': mode = USER_AUTH_HMAC | USER_AUTH_SHOW_QR new_pw = '' elif authmode == 'give_pw': mode = USER_AUTH_HMAC else: raise ValueError(authmode) await Connection().create_user(name.encode('utf8'), mode, new_pw) # assume it worked, so UX updates right away try: STATUS.hsm.users = list(set(STATUS.hsm.users + [name])) except ValueError: pass STATUS.notify_watchers() elif action == 'submit_policy': # get some JSON w/ everything the user entered. p, save_copy = args proposed = policy.web_cleanup(json_loads(p)) policy.update_sl(proposed) await Connection().hsm_start(proposed) STATUS.notify_watchers() await send_json(show_flash_msg=APPROVE_CTA) if save_copy: d = policy.desensitize(proposed) await send_json(local_download=dict(data=json_dumps(d, indent=2), filename=f'hsm-policy-{STATUS.xfp}.json.txt')) elif action == 'download_policy': proposed = policy.web_cleanup(json_loads(args[0])) await send_json(local_download=dict(data=json_dumps(proposed, indent=2), filename=f'hsm-policy-{STATUS.xfp}.json.txt')) elif action == 'import_policy': # they are uploading a JSON capture, but need values we can load in Vue proposed = args[0] cooked = policy.web_cookup(proposed) await send_json(vue_app_cb=dict(update_policy=cooked), show_flash_msg="Policy file imported.") elif action == 'pick_onion_addr': from torsion import TOR addr, pk = await TOR.pick_onion_addr() await send_json(vue_app_cb=dict(new_onion_addr=[addr, pk])) elif action == 'pick_master_pw': pw = b64encode(os.urandom(12)).decode('ascii') pw = pw.replace('/', 'S').replace('+', 'p') assert '=' not in pw await send_json(vue_app_cb=dict(new_master_pw=pw)) elif action == 'new_bunker_config': from torsion import TOR # save and apply config values nv = json_loads(args[0]) assert 4 <= len(nv.master_pw) < 200, "Master password must be at least 4 chars long" # copy in simple stuff for fn in [ 'tor_enabled', 'master_pw', 'easy_captcha', 'allow_reboots']: if fn in nv: BP[fn] = nv[fn] # update onion stuff only if PK is known (ie. they changed it) if nv.get('onion_pk', False) or False: for fn in [ 'onion_addr', 'onion_pk']: if fn in nv: BP[fn] = nv[fn] BP.save() await send_json(show_flash_msg="Bunker settings encrypted and saved to disk.") STATUS.tor_enabled = BP['tor_enabled'] STATUS.notify_watchers() if not BP['tor_enabled']: await TOR.stop_tunnel() elif BP.get('onion_pk') and not (STATUS.force_local_mode or STATUS.setup_mode) \ and TOR.get_current_addr() != BP.get('onion_addr'): # disconnect/reconnect await TOR.start_tunnel() elif action == 'sign_message': # sign a short text message # - lots more checking could be done here, but CC does it anyway msg_text, path, addr_fmt = args addr_fmt = AF_P2WPKH if addr_fmt != 'classic' else AF_CLASSIC try: sig, addr = await Connection().sign_text_msg(msg_text, path, addr_fmt) except: # get the spinner to stop: error msg will be "refused by policy" typically await send_json(vue_app_cb=dict(msg_signing_result='(failed)')) raise sig = b64encode(sig).decode('ascii').replace('\n', '') await send_json(vue_app_cb=dict(msg_signing_result=f'{sig}\n{addr}')) elif action == 'upload_psbt': # receiving a PSBT for signing size, digest, contents = args psbt = b64decode(contents) assert len(psbt) == size, "truncated/padded in transit" assert sha256(psbt).hexdigest() == digest, "corrupted in transit" STATUS.import_psbt(psbt) STATUS.notify_watchers() elif action == 'clear_psbt': STATUS.clear_psbt() STATUS.notify_watchers() elif action == 'preview_psbt': STATUS.psbt_preview = 'Wait...' STATUS.notify_watchers() try: txt = await Connection().sign_psbt(STATUS._pending_psbt, flags=STXN_VISUALIZE) txt = txt.decode('ascii') # force some line splits, especially for bech32, 32-byte values (p2wsh) probs = re.findall(r'([a-zA-Z0-9]{36,})', txt) for p in probs: txt = txt.replace(p, p[0:30] + '\u22ef\n\u22ef' + p[30:]) STATUS.psbt_preview = txt except: # like if CC doesn't like the keys, whatever .. STATUS.psbt_preview = None raise finally: STATUS.notify_watchers() elif action == 'auth_set_name': idx, name = args assert 0 <= len(name) <= MAX_USERNAME_LEN assert 0 <= idx < len(STATUS.pending_auth) STATUS.pending_auth[idx].name = name STATUS.notify_watchers() elif action == 'auth_offer_guess': idx, ts, guess = args assert 0 <= idx < len(STATUS.pending_auth) STATUS.pending_auth[idx].totp = ts STATUS.pending_auth[idx].has_guess = 'x'*len(guess) STATUS._auth_guess[idx] = guess STATUS.notify_watchers() elif action == 'submit_psbt': # they want to sign it now expect_hash, send_immediately, finalize, wants_dl = args assert expect_hash == STATUS.psbt_hash, "hash mismatch" if send_immediately: assert finalize, "must finalize b4 send" logging.info("Starting to sign...") STATUS.busy_signing = True STATUS.notify_watchers() try: dev = Connection() # do auth steps first (no feedback given) for pa, guess in zip(STATUS.pending_auth, STATUS._auth_guess): if pa.name and guess: await dev.user_auth(pa.name, guess, int(pa.totp), a2b_hex(STATUS.psbt_hash)) STATUS.reset_pending_auth() try: result = await dev.sign_psbt(STATUS._pending_psbt, finalize=finalize) logging.info("Done signing") msg = "Transaction signed." if send_immediately: msg += '

' + broadcast_txn(result) await send_json(show_modal=True, html=Markup(msg), selector='.js-api-success') result = (b2a_hex(result) if finalize else b64encode(result)).decode('ascii') fname = 'transaction.txt' if finalize else ('signed-%s.psbt'%STATUS.psbt_hash[-6:]) if wants_dl: await send_json(local_download=dict(data=result, filename=fname, is_b64=(not finalize))) await dev.hsm_status() except CCUserRefused: logging.error("Coldcard refused to sign txn") await dev.hsm_status() r = STATUS.hsm.get('last_refusal', None) if not r: raise HTMLErroMsg('Refused by local user.') else: raise HTMLErrorMsg(f"Rejected by Coldcard.

{r}") finally: STATUS.busy_signing = False STATUS.notify_watchers() elif action == 'shutdown_bunker': await send_json(show_flash_msg="Bunker is shutdown.") await asyncio.sleep(0.25) logging.warn("User-initiated shutdown") asyncio.get_running_loop().stop() sys.exit(0) elif action == 'leave_setup_mode': # During setup process, they want to go Tor mode; which I consider leaving # setup mode ... in particular, logins are required. # - button label is "Start Tor" tho ... so user doesn't see it that way assert STATUS.setup_mode, 'not in setup mode?' assert BP['tor_enabled'], 'Tor not enabled (need to save?)' addr = BP['onion_addr'] assert addr and '.onion' in addr, "Bad address?" STATUS.setup_mode = False await send_json(show_flash_msg="Tor hidden service has been enabled. " "It may take a few minutes for the website to become available") STATUS.notify_watchers() from torsion import TOR logging.info(f"Starting hidden service: %s" % addr) asyncio.create_task(TOR.start_tunnel()) elif action == 'logout_everyone': # useful for running battles... # - changes crypto key for cookies, so they are all invalid immediately. from aiohttp_session.nacl_storage import NaClCookieStorage import nacl logging.warning("Logout of everyone!") # reset all session cookies storage = orig_request.get('aiohttp_session_storage') assert isinstance(storage, NaClCookieStorage) storage._secretbox = nacl.secret.SecretBox(os.urandom(32)) # kick everyone off (bonus step) for w in web_sockets: try: await send_json(redirect='/logout', _ws=w) await w.close() except: pass else: raise NotImplementedError(action) @routes.get('/websocket/{token}') async def api_websocket(request): ''' Stream display activity as HTML fragments - accept config changes - and more? ''' try: ses = await get_session(request) assert not ses.new assert ses['login_ok'] token = request.match_info['token'] assert token == ses['ws_token'], ses.get('ws_token') except: logging.exception("Bad websocket link") raise web.HTTPForbidden # begin a streaming response ws = web.WebSocketResponse() await ws.prepare(request) web_sockets.add(ws) try: api_rx = asyncio.create_task(rx_handler(ses, ws, request)) dis_tx = asyncio.create_task(push_status_updates_handler(ws)) await asyncio.gather(api_rx, dis_tx) finally: api_rx.cancel() dis_tx.cancel() await ws.close() return ws @routes.get('/{fname}.php') @routes.get('/{path}/{fname}.php') @routes.get('/admin') async def kiddie_traps(request): # I hate skill-less people running burpsuite! Track them ses = await get_session(request) ses['kiddie'] = ses.get('kiddie', 0) + 1 return HTTPFound("/login") @routes.get('/favicon.ico') @routes.get('/robots.txt') @routes.get('/humans.txt') async def expected_404s(request): # we are expecting these URL's to 404 # - don't redirect to login return HTTPNotFound() @routes.get('/static/ext/themes/default/assets/fonts/{fname}') async def remap_fonts(request): fn = request.match_info['fname'] # remap some links so we don't need to edit Semantic files. return HTTPMovedPermanently('/static/ext/semantic-fonts/' + fn) def extra_filters(app): def my_time(dt): if dt is None: return None try: dt = pendulum.instance(dt) except ValueError: return repr(dt) return dt.in_tz('local').strftime('%l:%M%p').lower() def my_date(dt): if dt is None: return None try: dt = pendulum.instance(dt) except ValueError: return repr(dt) return dt.in_tz('local').strftime('%b %e') def duration(i): if i < 90: return '%d seconds' % i else: return '%d:%02d minutes' % (i//60, i%60) def extlink(url, label=None): # in the templates, use like this: # {{ "http://sdfsfd/sdf/sdf/" | extlink( "Hello") }} if not label: from urllib.parse import urlparse label = urlparse(url).netloc.lower() if label[0:4] == 'www.': label = label[4:] rv = f''\ f'{label} ' return Markup(rv) def link_txn(txn_hash): from chain import link_to_txn url = link_to_txn(txn_hash) return extlink(url, label=txn_hash) def link_to_explorer(unused): url = settings.EXPLORA label = 'Bitcoin Explorer' if STATUS.is_testnet: label = 'Testnet Explorer' url += '/testnet' return extlink(url, label=label) def slugify(t): return t.lower().strip().replace('&', 'and').replace(' ', '-') rv = locals().copy() return rv @web.middleware async def auth_everything(request, handler): # during setup, no login needed if STATUS.force_local_mode or STATUS.setup_mode: # bypass security completely ses = await get_session(request) if not ses: ses = await new_session(request) accept_user_login(ses) return await handler(request) # whitelist of pages that do not need login # all other requests will need auth if handler in { login_page, login_post, captcha_image, kiddie_traps, expected_404s }: return await handler(request) # verify auth ses = await get_session(request) if ses: active = ses.get('active', 0) idle = time.time() - active if idle > settings.MAX_IDLE_TIME: # stale / idle timeout logging.warn("Idle timeout") ses.invalidate() ses = None if not ses: # no cookie/stale cookie, so send them to logout/in page u = URL("/login") target = request.path[1:] if target and target != 'logout': u = u.update_query(u=target) return HTTPFound(u) # normal path: continue to page ses['active'] = time.time() resp = await handler(request) # clearly an HTML request, so force no caching if '/static/' not in request.path: resp.headers['Cache-Control'] = 'no-cache' return resp def startup(setup_mode): # Entry point. Return an awaitable to be run. # encrypted cookies for session logic from aiohttp_session.nacl_storage import NaClCookieStorage from aiohttp_session import session_middleware todays_key = os.urandom(32) sml = session_middleware(NaClCookieStorage(todays_key, cookie_name='X')) # create app: order of middlewares matters app = web.Application(middlewares=[sml, auth_everything]) app.add_routes(routes) app.router.add_static('/static', './static') # hack to obfuscate our identity a little # - from 23% market share import aiohttp.web_response as ht ht.SERVER_SOFTWARE = 'nginx/1.14.1' j_env = aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader('templates'), filters=extra_filters(app)) j_env.globals.update(default_context()) j_env.policies['json.dumps_function'] = json_dumps my_url = f"http://localhost:{settings.PORT_NUMBER}" + ('/setup' if setup_mode else '') logging.info(f"Web server at: {my_url}") # meh; kinda annoying. if 0: def popup(): try: # see # if this doesn't happen on MacOS from webbrowser import open as open_browser open_browser(my_url) except: logging.error("Unable to pop browser open", exc_info=1) asyncio.get_running_loop().call_later(3, popup) from aiohttp.abc import AbstractAccessLogger class AccessLogger(AbstractAccessLogger): def log(self, request, response, time): self.logger.info(f'{response.status} <= {request.method} {request.path}') return web._run_app(app, port=settings.PORT_NUMBER, print=None, access_log_class=AccessLogger) if __name__ == "__main__": from utils import setup_logging setup_logging() dev = Connection(None) # won't do anything tho, because async dev.run not called asyncio.run(startup()) # EOF