firmware/shared/queues.py
scgbckbone e209980630 improve taptree parser
remove msas (always allowed)
remove unsort_ms (always allowed)
rework fake_txn

# Conflicts:
#	cli/signit.py
#	releases/ChangeLog.md
#	releases/History-Mk4.md
#	releases/Next-ChangeLog.md
#	releases/signatures.txt
#	shared/actions.py
#	shared/address_explorer.py
#	shared/auth.py
#	shared/backups.py
#	shared/chains.py
#	shared/decoders.py
#	shared/descriptor.py
#	shared/display.py
#	shared/export.py
#	shared/flow.py
#	shared/lcd_display.py
#	shared/multisig.py
#	shared/nfc.py
#	shared/notes.py
#	shared/nvstore.py
#	shared/ownership.py
#	shared/paper.py
#	shared/psbt.py
#	shared/qrs.py
#	shared/seed.py
#	shared/serializations.py
#	shared/utils.py
#	shared/ux.py
#	shared/ux_mk4.py
#	shared/ux_q1.py
#	shared/version.py
#	shared/wallet.py
#	shared/xor_seed.py
#	stm32/COLDCARD_MK4/file_time.c
#	stm32/COLDCARD_Q1/file_time.c
#	stm32/MK4-Makefile
#	stm32/Q1-Makefile
#	testing/conftest.py
#	testing/helpers.py
#	testing/test_address_explorer.py
#	testing/test_backup.py
#	testing/test_bbqr.py
#	testing/test_export.py
#	testing/test_msg.py
#	testing/test_multisig.py
#	testing/test_notes.py
#	testing/test_ownership.py
#	testing/test_sign.py
#	testing/test_unit.py
#	testing/txn.py
2025-06-11 15:58:04 +02:00

97 lines
3.0 KiB
Python

# from https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/queue.py @ 07108cc
#
# queue.py: adapted from uasyncio V2
#
# Copyright (c) 2018-2020 Peter Hinch
# Released under the MIT License (MIT) - see LICENSE file
# Code is based on Paul Sokolovsky's work.
# This is a temporary solution until uasyncio V3 gets an efficient official version
import uasyncio as asyncio
# Exception raised by get_nowait().
class QueueEmpty(Exception):
pass
# Exception raised by put_nowait().
class QueueFull(Exception):
pass
class Queue:
def __init__(self, maxsize=0):
self.maxsize = maxsize
self._queue = []
self._evput = asyncio.Event() # Triggered by put, tested by get
self._evget = asyncio.Event() # Triggered by get, tested by put
self._jncnt = 0
self._jnevt = asyncio.Event()
self._upd_jnevt(0) #update join event
def _get(self):
self._evget.set() # Schedule all tasks waiting on get
self._evget.clear()
return self._queue.pop(0)
async def get(self): # Usage: item = await queue.get()
while self.empty(): # May be multiple tasks waiting on get()
# Queue is empty, suspend task until a put occurs
# 1st of N tasks gets, the rest loop again
await self._evput.wait()
return self._get()
def get_nowait(self): # Remove and return an item from the queue.
# Return an item if one is immediately available, else raise QueueEmpty.
if self.empty():
raise QueueEmpty()
return self._get()
def _put(self, val):
self._upd_jnevt(1) # update join event
self._evput.set() # Schedule tasks waiting on put
self._evput.clear()
self._queue.append(val)
async def put(self, val): # Usage: await queue.put(item)
while self.full():
# Queue full
await self._evget.wait()
# Task(s) waiting to get from queue, schedule first Task
self._put(val)
def put_nowait(self, val): # Put an item into the queue without blocking.
if self.full():
raise QueueFull()
self._put(val)
def qsize(self): # Number of items in the queue.
return len(self._queue)
def empty(self): # Return True if the queue is empty, False otherwise.
return not self._queue
def full(self): # Return True if there are maxsize items in the queue.
# Note: if the Queue was initialized with maxsize=0 (the default) or
# any negative number, then full() is never True.
return self.maxsize > 0 and self.qsize() >= self.maxsize
def _upd_jnevt(self, inc:int): # #Update join count and join event
self._jncnt += inc
if self._jncnt <= 0:
self._jnevt.set()
else:
self._jnevt.clear()
def task_done(self): # Task Done decrements counter
self._upd_jnevt(-1)
async def join(self): # Wait for join event
await self._jnevt.wait()