from __future__ import annotations import json import time from .core import Note NOTE_ON_BIT = 1 << 7 PITCH_MASK = ~NOTE_ON_BIT def i2c_encode(pitch, on): if on: pitch |= NOTE_ON_BIT return pitch def i2c_decode(msg): return (msg & PITCH_MASK, bool(msg & NOTE_ON_BIT)) def u16tob(v): return bytes([v >> 8, v & 0xFF]) def btou16(b): return b[0] << 8 | b[1] class I2CModule: queue: list[int] chunked: tuple[int, bytes] | None keyboard: Keyboard base: BaseMode MSG_NOP = bytes([0, 0]) def __init__(self, keyboard: Keyboard): self.queue = [] self.chunked = None self.keyboard = keyboard self.base = self.keyboard.modes["base"] def process_msg(self, msg): if msg[0] == ord(b"N"): pitch, on = i2c_decode(msg[1]) if on: Note(self.keyboard, pitch).on(local=False) elif pitch in self.base.notes: self.base.notes[pitch].off(local=False) elif msg[0] == ord(b"C") or msg[0] == ord(b"S"): total_len = btou16(msg[1:]) assert not self.chunked self.chunked = (total_len, bytes()) # TODO: preallocate full buffer elif msg[0] == ord(b"c") or msg[0] == ord(b"s"): assert self.chunked total_len, data = self.chunked data += msg[1:] if total_len == len(data): self.chunked = None self.keyboard.settings.loadstring( data.decode(), restore=False ) # msg[0] == ord(b"c") elif total_len < len(data): raise ValueError("chunked message longer than size in header") else: self.chunked = (total_len, data) else: raise ValueError(f"invalid message type {msg[0]}") def send(self, note, on): self.queue.append(i2c_encode(note.pitch, on)) class I2CLeader(I2CModule): devices: list[int] def __init__(self, keyboard: Keyboard, board): super().__init__(keyboard) self.i2c = keyboard.board.create_i2c(keyboard, frequency=1000000) self.i2c.try_lock() self.devices = self.i2c.scan() config = self.keyboard.settings.storestring( only=["rgb", "layout", "scale"] ).encode() self.broadcast_chunked("C", config) def broadcast(self, prefix: buffer, data: buffer, except_addr=None): msg = u16tob(len(data)) + prefix + data for device in self.devices: if device == except_addr: continue try: self.i2c.writeto(device, msg) except Exception as e: # print("error writing to", device, e) pass def broadcast_chunked(self, prefix: str, data: buffer): CHUNK_SIZE = 12 total_len = len(data) # uppercase: total data size self.broadcast(prefix.encode(), u16tob(total_len)) # lower case: data buffers prefix = prefix.lower().encode() for i in range(0, total_len, CHUNK_SIZE): self.broadcast(prefix, data[i : i + CHUNK_SIZE]) def tick(self): msg = bytearray(2) incoming = {} for device in self.devices: incoming[device] = [] more = True while more: try: self.i2c.readfrom_into(device, msg) except Exception as e: # print("error reading from", device, e) break if not msg[0]: break self.process_msg(b"N" + msg[1:]) self.broadcast(b"N", msg[1:], device) more = msg[0] > 1 while self.queue: self.broadcast(b"N", bytes([self.queue.pop(0)])) def deinit(self): self.i2c.deinit() class I2CFollower(I2CModule): def __init__(self, keyboard: Keyboard, board, address): super().__init__(keyboard) self.i2c = keyboard.board.create_i2ctarget(keyboard, addresses=(address,)) def pop_msg(self): if self.queue: rest = len(self.queue) return bytes([rest, self.queue.pop(0)]) else: return self.MSG_NOP def tick(self): req = self.i2c.request() if not req: return with req: if req.is_read: req.write(self.pop_msg()) else: length = req.read(2) if len(length) != 2: return length = btou16(length) + 1 got = req.read(length) if length != len(got): print("got only", length, len(got), got) return self.process_msg(got) def deinit(self): self.i2c.deinit()