217 lines
7.6 KiB
Python
217 lines
7.6 KiB
Python
import framebuf
|
|
import struct
|
|
import time
|
|
import binascii
|
|
import gc
|
|
|
|
class Font:
|
|
def __init__(self, ws=None):
|
|
self.ws = ws
|
|
self.cache = {}
|
|
self.pending_requests = set()
|
|
self.retry_count = {}
|
|
self.max_retries = 3
|
|
|
|
def set_ws(self, ws):
|
|
self.ws = ws
|
|
|
|
def clear_cache(self):
|
|
"""清除字体缓存以释放内存"""
|
|
self.cache.clear()
|
|
gc.collect()
|
|
|
|
def get_cache_size(self):
|
|
"""获取当前缓存的字体数量"""
|
|
return len(self.cache)
|
|
|
|
def text(self, tft, text, x, y, color, bg=0x0000):
|
|
"""在ST7789显示器上绘制文本"""
|
|
if not text:
|
|
return
|
|
|
|
color_bytes = struct.pack(">H", color)
|
|
bg_bytes = struct.pack(">H", bg)
|
|
|
|
lut = [bytearray(16) for _ in range(256)]
|
|
for i in range(256):
|
|
for bit in range(8):
|
|
val = (i >> bit) & 1
|
|
idx = (7 - bit) * 2
|
|
if val:
|
|
lut[i][idx] = color_bytes[0]
|
|
lut[i][idx+1] = color_bytes[1]
|
|
else:
|
|
lut[i][idx] = bg_bytes[0]
|
|
lut[i][idx+1] = bg_bytes[1]
|
|
|
|
initial_x = x
|
|
|
|
missing_codes = set()
|
|
for char in text:
|
|
if ord(char) > 127:
|
|
code = ord(char)
|
|
if code not in self.cache:
|
|
missing_codes.add(code)
|
|
|
|
if missing_codes and self.ws:
|
|
missing_list = list(missing_codes)
|
|
|
|
req_str = ",".join([str(c) for c in missing_list])
|
|
print(f"Batch requesting fonts: {req_str}")
|
|
try:
|
|
self.ws.send(f"GET_FONTS_BATCH:{req_str}")
|
|
self._wait_for_fonts(missing_codes)
|
|
except Exception as e:
|
|
print(f"Batch font request failed: {e}")
|
|
|
|
for char in text:
|
|
if char == '\n':
|
|
x = initial_x
|
|
y += 16
|
|
continue
|
|
|
|
if x + 16 > tft.width:
|
|
x = initial_x
|
|
y += 16
|
|
if y + 16 > tft.height:
|
|
break
|
|
|
|
is_chinese = False
|
|
buf_data = None
|
|
|
|
if ord(char) > 127:
|
|
code = ord(char)
|
|
if code in self.cache:
|
|
buf_data = self.cache[code]
|
|
is_chinese = True
|
|
else:
|
|
if code in self.pending_requests:
|
|
retry = self.retry_count.get(code, 0)
|
|
if retry < self.max_retries:
|
|
self.retry_count[code] = retry + 1
|
|
self._request_single_font(code)
|
|
|
|
if is_chinese and buf_data:
|
|
self._draw_bitmap(tft, buf_data, x, y, 16, 16, lut)
|
|
x += 16
|
|
else:
|
|
if ord(char) > 127:
|
|
char = '?'
|
|
self._draw_ascii(tft, char, x, y, color, bg)
|
|
x += 8
|
|
|
|
def _request_single_font(self, code):
|
|
"""请求单个字体"""
|
|
if self.ws:
|
|
try:
|
|
self.ws.send(f"GET_FONT_UNICODE:{code}")
|
|
except:
|
|
pass
|
|
|
|
def _wait_for_fonts(self, target_codes):
|
|
"""等待字体数据返回"""
|
|
if not self.ws or not target_codes:
|
|
return
|
|
|
|
start = time.ticks_ms()
|
|
self.local_deferred = []
|
|
|
|
while time.ticks_diff(time.ticks_ms(), start) < 3000 and target_codes:
|
|
try:
|
|
can_read = False
|
|
if hasattr(self.ws, 'unread_messages') and self.ws.unread_messages:
|
|
can_read = True
|
|
else:
|
|
import uselect
|
|
poller = uselect.poll()
|
|
poller.register(self.ws.sock, uselect.POLLIN)
|
|
events = poller.poll(100)
|
|
if events:
|
|
can_read = True
|
|
|
|
if can_read:
|
|
msg = self.ws.recv()
|
|
if msg is None:
|
|
continue
|
|
|
|
if isinstance(msg, str):
|
|
if msg.startswith("FONT_BATCH_END:"):
|
|
parts = msg[15:].split(":")
|
|
success = int(parts[0]) if len(parts) > 0 else 0
|
|
failed = int(parts[1]) if len(parts) > 1 else 0
|
|
|
|
if failed > 0:
|
|
temp_missing = list(target_codes)
|
|
for c in temp_missing:
|
|
if c not in self.cache:
|
|
print(f"Font failed after retries: {c}")
|
|
self.cache[c] = None
|
|
if c in target_codes:
|
|
target_codes.remove(c)
|
|
|
|
target_codes.clear()
|
|
|
|
elif msg.startswith("FONT_DATA:"):
|
|
parts = msg.split(":")
|
|
if len(parts) >= 3:
|
|
try:
|
|
key_str = parts[1]
|
|
if key_str.startswith("0x"):
|
|
c = int(key_str, 16)
|
|
else:
|
|
c = int(key_str)
|
|
|
|
d = binascii.unhexlify(parts[2])
|
|
self.cache[c] = d
|
|
if c in target_codes:
|
|
target_codes.remove(c)
|
|
if c in self.retry_count:
|
|
del self.retry_count[c]
|
|
except:
|
|
pass
|
|
else:
|
|
self.local_deferred.append(msg)
|
|
|
|
elif msg is not None:
|
|
self.local_deferred.append(msg)
|
|
|
|
except Exception as e:
|
|
print(f"Wait font error: {e}")
|
|
|
|
if self.local_deferred:
|
|
if hasattr(self.ws, 'unread_messages'):
|
|
self.ws.unread_messages = self.local_deferred + self.ws.unread_messages
|
|
self.local_deferred = []
|
|
|
|
def _draw_bitmap(self, tft, bitmap, x, y, w, h, lut):
|
|
"""绘制位图"""
|
|
chunks = [lut[b] for b in bitmap]
|
|
rgb_buf = b''.join(chunks)
|
|
tft.blit_buffer(rgb_buf, x, y, w, h)
|
|
|
|
def _draw_ascii(self, tft, char, x, y, color, bg):
|
|
"""绘制ASCII字符"""
|
|
w, h = 8, 8
|
|
buf = bytearray(w * h // 8)
|
|
fb = framebuf.FrameBuffer(buf, w, h, framebuf.MONO_VLSB)
|
|
fb.fill(0)
|
|
fb.text(char, 0, 0, 1)
|
|
|
|
rgb_buf = bytearray(8 * 16 * 2)
|
|
bg_high, bg_low = bg >> 8, bg & 0xFF
|
|
color_high, color_low = color >> 8, color & 0xFF
|
|
|
|
for i in range(0, len(rgb_buf), 2):
|
|
rgb_buf[i] = bg_high
|
|
rgb_buf[i+1] = bg_low
|
|
|
|
for col in range(8):
|
|
byte = buf[col]
|
|
for row in range(8):
|
|
if (byte >> row) & 1:
|
|
pos = ((row + 4) * 8 + col) * 2
|
|
rgb_buf[pos] = color_high
|
|
rgb_buf[pos+1] = color_low
|
|
|
|
tft.blit_buffer(rgb_buf, x, y, 8, 16)
|