import displayio
import rtmidi
from io import BytesIO
from math import sqrt
from base64 import b64encode
from xml.etree import ElementTree
from adafruit_midi import MIDI, MIDIMessage
from adafruit_midi.system_exclusive import SystemExclusive
from PIL import Image
import adafruit_midi.note_on
import adafruit_midi.note_off
from adafruit_ticks import ticks_ms
from threading import Thread
import json
import asyncio
import websockets.server
from websockets.exceptions import ConnectionClosed
from http.server import BaseHTTPRequestHandler, HTTPServer
from .. import Keyboard as BaseKeyboard
rtmidi_api = rtmidi.API_UNSPECIFIED
if rtmidi.API_UNIX_JACK in rtmidi.get_compiled_api():
rtmidi_api = rtmidi.API_UNIX_JACK
class MIDI:
def __init__(self, inp, out):
self.inp = inp
self.out = out
self.in_channel = tuple(range(16))
self.out_channel = out
def send(self, message, channel=None):
if not self.out:
return
if channel is None:
channel = self.out_channel
if isinstance(message, MIDIMessage):
message.channel = channel
self.out.send_message(bytes(message))
def receive(self):
if not self.inp:
return
msg = self.inp.get_message()
if msg:
return MIDIMessage.from_message_bytes(msg[0], self.in_channel)[0]
def create_midi_usb(board):
inp = rtmidi.MidiIn(rtmidi_api, "0x33.board sim")
out = rtmidi.MidiOut(rtmidi_api, "0x33.board sim")
inp.open_virtual_port("USB in")
out.open_virtual_port("USB out")
inp.ignore_types(sysex=False)
return MIDI(inp, out)
def create_midi_trs(board):
out = rtmidi.MidiOut(rtmidi_api, "0x33.board sim")
out.open_virtual_port("TRS out")
return MIDI(None, out)
def create_matrix(board):
class DummyMatrix:
def __init__(self, board):
self.board = board
self.queue = []
def scan_midi(self, midi):
msg = midi.receive()
while msg:
if isinstance(msg, SystemExclusive) and msg.manufacturer_id == b"\0s-":
key, state = msg.data
self.queue.append((key, state))
msg = midi.receive()
def scan_for_changes(self):
if self.board.midi_usb:
self.scan_midi(self.board.midi_usb)
yield from self.queue
self.queue.clear()
return DummyMatrix(board)
def create_display(board, **kwargs):
class Display(displayio.Display):
def __init__(self, board, **args):
self.board = board
super().__init__(None, None, **args)
def _initialize(self, init_sequence):
pass
def _write(self, command, data):
pass
def refresh(self):
if self._core._current_group is not None:
buffer = Image.new("RGBA", (self._core._width, self._core._height))
self._core._current_group._fill_area(buffer)
self._buffer.paste(buffer)
# subrects = self._core.get_refresh_areas()
def get_png(self):
with BytesIO() as buf:
self._buffer.save(buf, format="PNG")
return buf.getvalue()
# data = b64encode(buf.getvalue())
# self.board.svg_preview.set_oled(
# "data:image/png;base64," + data.decode()
# )
return Display(board, **kwargs)
def create_pixels(board, **kwargs):
class DummyNeopixels:
def __init__(self, board):
self.board = board
self.brightness = 1
self.pixels = [0] * 52
def __setitem__(self, key, value):
self.pixels[key] = value
def fill(self, value):
for i in range(len(self.pixels)):
self.pixels[i] = value
def hex_colors(self):
colors = []
for i, rgb in enumerate(self.pixels):
if not isinstance(rgb, tuple):
rgb = (rgb >> 16, (rgb >> 8) & 0xFF, rgb & 0xFF)
colors.append(
"#{0:02x}{1:02x}{2:02x}".format(
*(int(128 + sqrt(x) * 127) for x in rgb)
)
)
return colors
def show(self):
return self.board.ws_queue.put(
{
"type": "pixels",
"colors": self.hex_colors(),
}
)
return DummyNeopixels(board)
def create_audio_out(board):
pass
def create_i2c(board, **kwargs):
pass
def create_i2ctarget(board, **kwargs):
pass
# class SVGPreview:
# def __init__(self):
# self.template = ElementTree.parse("template.svg")
# self.oled = self.template.find(".//*[@id='oled']")
# self.leds = self.template.findall(".//*[@class='btn']")
#
# self.update()
#
# def set_oled(self, data):
# self.oled.set("href", data)
#
# def set_pixels(self, colors):
# for i, color in enumerate(colors):
# self.leds[i].set("style", "stroke:#000;stroke-width:1;fill:"+color)
#
# def update(self):
# data = ElementTree.tostring(self.template.getroot(), encoding="unicode")
# print(data.encode("utf-8"))
def HTTPHandler(board):
class HTTPHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
with open("template.svg", "rb") as f:
template = f.read()
self.index = b"""<!DOCTYPE html>
<html>
<head>
<style>
svg {
width: 100%;
height: calc(100vh - 4rem);
touch-action: none;
user-select: none;
-webkit-tap-highlight-color: transparent;
}
</style>
</head>
<body>
"""
self.index += template
self.index += b"""
<div class="menu">
<button onclick="reconnect()" id="reconnect">reconnect</button>
<button onclick="snapshot()">save SVG snapshot</button>
</div>
<script type="text/javascript">
const svg = document.body.firstElementChild;
const oled = document.evaluate(".//*[@id='oled']", svg).iterateNext();
let websocket;
const blockTouch = (e) => {
e.preventDefault();
e.stopPropagation();
};
svg.ontouchstart = blockTouch;
svg.ontouchend = blockTouch;
svg.ontouchcancel = blockTouch;
svg.oncontextmenu = blockTouch;
let res = document.evaluate(".//*[contains(@class,'led')]", svg);
let node = res.iterateNext();
const pixels = {};
while (node) {
pixels[+node.dataset.ledI] = node;
node = res.iterateNext();
}
const keys = {};
res = document.evaluate(".//*[contains(@class,'key')]", svg);
node = res.iterateNext();
while (node) {
keys[+node.dataset.keyI] = node;
node = res.iterateNext();
}
Object.entries(keys).forEach(([key, node]) => {
node.style.cursor = 'pointer';
node.onpointerdown = (e) => {
e.preventDefault();
node.setPointerCapture(e.pointerId);
if (!websocket) return;
websocket.send(JSON.stringify({ type: 'key', key: +key, down: true }));
};
node.onpointerup = (e) => {
e.preventDefault();
if (!websocket) return;
websocket.send(JSON.stringify({ type: 'key', key: +key, down: false }));
};
});
let lastBlob = null;
setInterval(() => {
if (!websocket) return;
fetch('/oled.png#' + Math.random())
.then((response) => response.blob())
.then((blob) => {
if (lastBlob) URL.revokeObjectURL(lastBlob);
lastBlob = URL.createObjectURL(blob);
oled.href.baseVal = lastBlob;
});
}, 100);
const reconnectButton = document.getElementById("reconnect");
const reconnect = () => {
websocket = new WebSocket(`ws://${location.hostname}:8001/`);
websocket.onmessage = ({ data }) => {
const msg = JSON.parse(data);
switch (msg.type) {
case 'pixels': {
msg.colors.forEach((color, i) => {
pixels[i].style.fill = color;
});
break;
}
}
};
websocket.onopen = (event) => {
reconnectButton.disabled = true;
svg.style.opacity = 1;
};
websocket.onclose = (event) => {
websocket = null;
reconnectButton.disabled = false;
svg.style.opacity = 0.5;
};
websocket.onerror = (event) => console.error("socket error", event);
};
const snapshot = () => {
fetch('/oled.png#' + Math.random())
.then((response) => response.blob())
.then((blob) => {
const reader = new FileReader();
reader.onloadend = () => {
oled.href.baseVal = reader.result;
const svgBlob = new Blob([svg.outerHTML], { type: 'image/svg+xml' });
var link = document.createElement('a');
link.href = URL.createObjectURL(svgBlob);
link.download = 'snapshot.svg';
link.click();
URL.revokeObjectURL(link.href);
};
reader.readAsDataURL(blob);
});
};
reconnect();
</script>
</body>
</html>
"""
super().__init__(*args, **kwargs)
def do_GET(self):
if self.path == "/":
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(self.index)
elif self.path == "/oled.png":
self.send_response(200)
self.send_header("Content-type", "image/png")
self.end_headers()
self.wfile.write(board.display.get_png())
else:
self.send_response(404)
self.send_header("Content-type", "text/plain")
self.end_headers()
self.wfile.write(b"Not Found")
def log_message(self, *args):
pass
return HTTPHandler
class Keyboard(BaseKeyboard):
def run(self):
http_thread = Thread(target=self.run_http)
http_thread.start()
self.ws_queue = asyncio.Queue()
loop = asyncio.get_event_loop()
loop.create_task(self.run_ws())
loop.create_task(self.run_main())
loop.run_forever()
async def run_main(self):
while True:
ticks = ticks_ms()
self.mode.tick(ticks)
for (i, pressed) in self.matrix.scan_for_changes():
mode = self.sticky_modes.pop(i, self.mode)
should_stick = mode.key_event(i, pressed)
if should_stick:
self.sticky_modes[i] = mode
if self.i2c:
self.i2c.tick()
self.mode.update_pixels(self.pixels)
await self.pixels.show()
self.display.refresh()
await asyncio.sleep(1 / 120)
def run_http(self):
try:
with HTTPServer(("", 8000), HTTPHandler(self)) as server:
server.serve_forever()
except e:
print(e)
async def run_ws(self):
connections = set()
async def handler(conn):
connections.add(conn)
try:
while True:
msg = json.loads(await conn.recv())
if msg['type'] == 'key':
key, down = msg['key'], msg['down']
self.matrix.queue.append((key, down))
except ConnectionClosed:
pass
finally:
connections.remove(conn)
async with websockets.serve(handler, "", 8001):
while True:
msg = await self.ws_queue.get()
data = json.dumps(msg)
for conn in connections:
try:
await conn.send(data)
except ConnectionClosed:
pass