# (c) Copyright 2023 by Coinkite Inc. This file is covered by license found in COPYING-CC. # # bbqr.py - Implement BBQr protocol for multiple QR support (also compression and filetype info) # import utime, uzlib, ngu from utils import problem_file_line from exceptions import QRDecodeExplained from ubinascii import unhexlify as a2b_hex from version import MAX_TXN_LEN b32encode = ngu.codecs.b32_encode b32decode = ngu.codecs.b32_decode TYPE_LABELS = dict(P='PSBT File', T='Transaction', J='JSON', C='CBOR', U='Unicode Text', X='Executable', B='Binary', R='KT Rx', S='KT Tx', E='KT PSBT') def int2base36(n): # convert an integer to two digits of base 36 string. 00 thu ZZ as bytes # - converse is just: int(s, base=36) tostr = lambda x: chr(48+x) if x < 10 else chr(65+x-10) a, b = divmod(n, 36) assert 0 <= a < 36 return tostr(a) + tostr(b) def calc_num_qr(char_capacity, char_len, split_mod): # Determine number of QR's would be needed to hold char_len alnum characters, # if each QR holds char_capacity of chars max. # - when 2 or more QR, consider the exact split point cannot be between encoded symbols # - accounts for BBRq header # - returns (number of QR needed), (# of chars in each) from math import ceil cap = char_capacity - 8 # 8==HEADER_LEN if char_len <= cap: # no alignment concerns return 1, char_len # max per non-final qr cap2 = cap - (cap % split_mod) need = ceil(char_len / cap2) assert need >= 2 # Going to be 2 or more, gotta be precise # - final part doesn't need to be "encoding aligned" actual = ((need - 1) * cap2) + cap #print("act=%d char_len=%d need=%d c=%d c2=%d" % (actual, char_len, need, cap, cap2)) if char_len > actual: need += 1 # Challenge: the final QR might have just a few chars in it, if we redistribute # the data into the other parts, then each QR can have more forward error correction # and be more robust. Must respect split_mod alignment tho. level = ceil(char_len / need) if level % split_mod: level += split_mod - (level % split_mod) assert level % split_mod == 0, level assert level <= cap2, (level, cap2) return need, level def num_qr_needed(encoding, data_len): # returns (QR version, num_parts, part_size[bytes]) # - lots of Q-related policy here # Just a few key values, picked because the height of the QR must # fit vertically (240 px tall) ... see "bbqr table" CHARS_PER_VERSION = [ # (QR version, alnum capacity) # first entry will be used for tiny BBQr that don't need animation (15, 758), # 77px x 3: 77*3 = 231px tall (25, 1853), # 117px, doubled: 234px tall (40, 4296), # 177px tall, shown 1:1 pixels -- phones can scan fine # last entry will be used for huge BBQr that have > 12 frames ] if encoding == 'H': char_len = data_len * 2 split_mod = 2 else: # plan for Base32, always best option # - five inputs bytes => 8 alnum chars # - for final set of 1-5 we remove padding == , so between 2..7 chars char_len = ((data_len//5) * 8) + { 0:0, 1:2, 2:4, 3:5, 4:7 }[data_len % 5] split_mod = 8 # Try a few select resolutions (sizes) in order such that we use either single QR # or the least-dense option that gives reasonable number of QR's for target_vers, capacity in CHARS_PER_VERSION: num_parts, part_size = calc_num_qr(capacity, char_len, split_mod) if num_parts == 1: # great, no animation needed! break if target_vers == 15 and num_parts == 2: # it fits in two v15, but would be a single v25; so prefer that continue if target_vers < 40 and num_parts <= 12: # will be reasonable animation, so use this size break # convert # of chars per QR, into bytes per each (last one may be less) if num_parts > 1: assert part_size % split_mod == 0 if encoding == 'H': pkt_size = part_size // 2 else: pkt_size = part_size * 5 // 8 else: pkt_size = data_len #print('bbqr: %d bytes => %d chars (%s enc) => v%d in %d parts of %d char / %d bytes each' # % (data_len, char_len, encoding, target_vers, num_parts, part_size, pkt_size)) assert num_parts * pkt_size >= data_len #assert part_size % split_mod == 0, (target_vers, part_size, split_mod, char_len, data_len) return target_vers, num_parts, pkt_size class BBQrHeader: def __init__(self, taste): # parse header based on standard # expects a string assert len(taste) >= 8 assert taste[0:2] == 'B$' self.encoding, self.file_type = taste[2:4] self.num_parts = int(taste[4:6], 36) self.which = int(taste[6:8], 36) assert 1 <= self.num_parts assert 0 <= self.which < self.num_parts def __repr__(self): return '' % (self.which+1, self.num_parts, self.encoding, self.file_type) def is_compat(self, other): # Does this header match previous ones seen? return (self.encoding == other.encoding and self.file_type == other.file_type and self.num_parts == other.num_parts) def decode_body(self, scan): # perform the decoding implied by header (but not decompression) body = bytes(memoryview(scan)[8:]) if b'B$' in body: # happens if we have an Rx overlow, and see two codes mushed together raise ValueError("Overlapped BBQrs!") if self.encoding == 'H': rv = a2b_hex(body) else: rv = b32decode(body) return rv def file_label(self): # provide a string as hint to user of what they are getting if self.file_type in TYPE_LABELS: return TYPE_LABELS[self.file_type] else: return 'Unknown: %s' % self.file_type class BBQrState: def __init__(self, storage): self.storage = storage self.reset() def reset(self): self.hdr = None self.parts = set() self.runt = None self.blksize = None def is_complete(self): return bool(self.hdr) and len(self.parts) == self.hdr.num_parts and not self.runt def collect(self, scan): # Another BBQr has come in; track it. # - return T while more parts are still needed # - updates UX to show the progress from glob import dis try: hdr = BBQrHeader(scan) except Exception as exc: raise QRDecodeExplained("Bad header: %s" % problem_file_line(exc)) if not self.hdr or not self.hdr.is_compat(hdr): # New or incompatible header, they might have changed their # minds and are now trying to scan something else; recover self.reset() self.storage.reset() self.hdr = hdr if hdr.which not in self.parts: # we've NOT YET seen this one # convert back to binary try: raw = hdr.decode_body(scan) except Exception as exc: # can happen if QR got corrupted between scanner and us (overlap) # or back BBQr implementation #print("corrupt QR: %s" % scan) # import sys; sys.print_exception(exc) dis.draw_bbqr_progress(hdr, self.parts, corrupt=True) return True if hdr.which and (hdr.which == hdr.num_parts-1) and not self.parts: # Problem: this is a runt and we saw it first, we have no idea # where to put it; store as tmp for now. self.runt = (hdr.which, raw) self.parts.add(hdr.which) else: # based on (required) assumption that all parts are equal, we know # where to put this data, so do that. self.parts.add(hdr.which) if self.blksize is None: self.blksize = len(raw) self.storage.save_packet(self.blksize, hdr, hdr.which, raw) # seeing any other packet is enough to decide where to put the runt if self.runt and self.blksize: wh, raw = self.runt self.storage.save_packet(self.blksize, hdr, wh, raw) self.runt = None # provide UX -- even if we didn't use it dis.draw_bbqr_progress(hdr, self.parts) # return T if we need more parts still return (len(self.parts) < hdr.num_parts) or self.runt class BBQrStorage: # Store BBQr in normal RAM. Simple. Pure. def __init__(self): self.reset() def reset(self): self.buf = None self.hdr = None # could be any header in series self.runt_size = None self.final_size = None def save_packet(self, blksize, hdr, which, data): # Record bytes (after deserialization, Base32/Hex decoding) # - might be zlib compressed still, but certainly binary assert blksize if not self.hdr: self.hdr = hdr else: assert self.hdr.is_compat(hdr) if which == hdr.num_parts-1: # size of runt determines final complete size self.runt_size = len(data) self.final_size = (blksize * (hdr.num_parts-1)) + self.runt_size if not self.buf: # memory alloc now upper_bound = blksize * hdr.num_parts self.alloc_buf(upper_bound) offset = which * blksize self.write_pkt(offset, data) def alloc_buf(self, upper_bound): # set aside space needed for whole thing try: self.buf = bytearray(upper_bound) except MemoryError: raise QRDecodeExplained("Too big") def write_pkt(self, offset, data): # save binary of one QR payload self.buf[offset:offset+len(data)] = data def zlib_decompress(self): # do in-place Zlib decompression, update final_size try: self.buf = uzlib.decompress(self.buf, -10) except: # corrupt data / data underruns trigger here raise RuntimeError("Zlib fail") self.final_size = len(self.buf) def _finalize(self): pass def get_buffer(self): return self.buf def finalize(self): # Got all the parts, so maybe decompress. Return details of what we got # - return: file type, exact final size self._finalize() if self.hdr.encoding == 'Z': self.zlib_decompress() return self.hdr.file_type, self.final_size, self.get_buffer() class BBQrPsramStorage(BBQrStorage): # specialized verison for use on funky PSRAM chip of Q def __init__(self): super().__init__() self.frags = dict() self.psr_offset = 0 def alloc_buf(self, upper_bound): # using first part of PSRAM if upper_bound >= MAX_TXN_LEN: raise QRDecodeExplained("Too big") # If data is compressed, write tmp (compressed) copy into top half of PSRAM # and we'll put final, decompressed copy at zero offset (later) self.psr_offset = MAX_TXN_LEN if self.hdr.encoding == 'Z' else 0 self.buf = True def write_pkt(self, offset, data): # Save indicated data, but problems: # - writes to PSRAM must be 4-aligned # - due to base32 math, typically incoming data will not be aligned # - write what we can, keep the rest around in normal memory from glob import PSRAM # our offset into PSRAM offset += self.psr_offset # will be aligned # some at the start might be unaligned off = 4 - (offset % 4) if off != 4: # up to 3 bytes at start self.frags[offset] = bytes(data[0:off]) assert 1 <= len(self.frags[offset]) < 4 offset += off data = memoryview(data)[off:] ln = len(data) ln4 = ln & ~3 # aligned middle (most times) if ln4: PSRAM.write_at(offset, ln4)[:] = data[0:ln4] # maybe a part at end runt = ln - ln4 if runt: assert 1 <= runt < 4 p = bytes(data[ln4:]) self.frags[offset+ln4] = p assert 1 <= len(p) == runt < 4 def _finalize(self): # flush out fragments to where they actually belong from glob import PSRAM while self.frags: off, data = self.frags.popitem() off4 = off & ~3 # can be aligned already (1-3 byte runt) tmp = bytearray(PSRAM.read_at(off4, 4)) tmp[off-off4:off-off4+len(data)] = data assert len(tmp) == 4 PSRAM.write_at(off4, 4)[:] = tmp def zlib_decompress(self): # do in-place Zlib decompression, update final_size # - except in-place decompression is not possible in general # - so go PSRAM(top half) -> PSRAM(bot half) from glob import PSRAM, dis from uzlib import DecompIO from io import BytesIO dis.fullscreen('Decompressing...') off = 0 buf = b'' with BytesIO(PSRAM.read_at(self.psr_offset, self.final_size)) as fd: decoded = DecompIO(fd, -10) while 1: try: here = decoded.read(1024) if not here: break except: # corrupt data / data underruns trigger here raise RuntimeError("Zlib fail") # aligned writes buf += here ln = len(buf) & ~3 if off+ln > MAX_TXN_LEN: # test with: `yes | dd bs=1000 count=2700 | bbqr make - | pbcopy` raise QRDecodeExplained("Too big") if ln: PSRAM.write_at(off, ln)[:] = buf[0:ln] buf = buf[ln:] off += ln dis.progress_sofar(fd.tell(), self.final_size) # true final size self.final_size = off + len(buf) if buf: # write final bit, perhaps some extra zeros after that too pad = 4 - (len(buf) % 4) if pad < 4: buf += bytes(pad) PSRAM.write_at(off, len(buf))[:] = buf def get_buffer(self): # give a pointer into PSRAM from glob import PSRAM return PSRAM.read_at(0, self.final_size) def finalize(self): self._finalize() if self.hdr.encoding == 'Z': self.zlib_decompress() # PSBT-typed BBQrs end up at PSRAM[0..size] # skip a redundant PSRAM->heap->PSRAM round-trip if self.hdr.file_type == 'P': return self.hdr.file_type, self.final_size, 'PSRAM' return self.hdr.file_type, self.final_size, self.get_buffer() # EOF