import random import time import sys ADDR_CTRL = bytes([0xDB, 0x96]) ADDR_OVEN = bytes([0xAA, 0x55]) # used by oven FUNC_O_CONNECT = 0x01 FUNC_O_TEMPS = 0x02 FUNC_O_READ = 0x03 FUNC_O_ACK_WRITE = 0x06 # used by CTRL FUNC_C_PWM_HEAT = 0x08 FUNC_C_PWM_COOL = 0x0D FUNC_C_STARTSTOP = 0x10 FUNC_C_REQ_READ = 0x11 FUNC_C_WRITE = 0x20 FUNC_C_ACK_CONNECT = 0xF1 FUNC_C_ACK_READ = 0xF2 # for STARSTOP, REQUEST_READ ARG_STARTSTOP_START = 0x11 ARG_STARTSTOP_STOP = 0x22 ARG_REQUEST_READ = 0x11 # for CONNECT / ACK_CONNECT ARG_CONNECT_CONNECTING = bytes([0x01, 0x00]) ARG_CONNECT_CONNECTED = bytes([0x02, 0x00]) # for *_PWM ARG_PWM_ON = 0x44 ARG_PWM_OFF = 0x88 def ru16(b): assert len(b) == 2 return b[0] << 8 | b[1] def wu16(b): b = int(b) hi, lo = b >> 8, b & 0xFF return bytes([hi, lo]) def pct(n): if n is None: return "off" return str(int(n * 100)) + "%" class MessageParser: def __init__(self): self.buffer = bytes() def encode(self, addr, func, data): msg = addr + bytes([func, len(data)]) + data chk = sum(msg) return msg + bytes([chk >> 8, chk & 0xFF]) def try_read(self, data): if data: self.buffer += data while len(self.buffer) >= 4 + 2: data_len = self.buffer[3] msg_len = 4 + data_len + 2 if len(self.buffer) < msg_len: return (msg, self.buffer) = (self.buffer[:msg_len], self.buffer[msg_len:]) assert sum(msg[:-2]) == ru16(msg[-2:]) assert msg[:2] == ADDR_OVEN func = msg[2] data = msg[4 : 4 + data_len] yield (func, data) class DummyOven: def __init__(self): self.temps = [24, 24, 24] self.heat = None self.cool = None def step(self): rate = -2 if self.heat is not None: rate += 12 * self.heat if self.cool is not None: rate -= 8 * self.cool for i, temp in enumerate(self.temps): mult = random.uniform(0.5, 1) if i < 2 else 0.05 temp = temp + rate * mult self.temps[i] = max(24, min(temp, 350)) def print_state(self): print(f"heat PWM: {pct(self.heat)}, cool PWM: {pct(self.cool)}") def start(self, i): print(f"starting profile {i+1}") def stop(self, i): print(f"stopping profile {i+1}") def wait_connect(port, parser): print("broadcasting connection...") while True: port.write(parser.encode(ADDR_CTRL, FUNC_O_CONNECT, ARG_CONNECT_CONNECTING)) time.sleep(1) for msg in parser.try_read(port.read(port.in_waiting)): if msg[0] == FUNC_C_ACK_CONNECT and msg[1] == ARG_CONNECT_CONNECTED: return else: print("unexpected message", msg) def run(port): parser = MessageParser() wait_connect(port, parser) print("accepting connection...") port.write(parser.encode(ADDR_CTRL, FUNC_O_CONNECT, ARG_CONNECT_CONNECTED)) oven = DummyOven() while True: oven.step() port.write( parser.encode( ADDR_CTRL, FUNC_O_TEMPS, b"".join(wu16(t) for t in oven.temps) ) ) time.sleep(1) for msg in parser.try_read(port.read(port.in_waiting)): if msg[0] == FUNC_C_PWM_HEAT or msg[0] == FUNC_C_PWM_COOL: pwm, state = msg[1] if state == ARG_PWM_ON: pwm = pwm / 255 elif state == ARG_PWM_OFF: pwm = None else: print("invalid PWM mode") if msg[0] == FUNC_C_PWM_HEAT: oven.heat = pwm oven.cool = None else: oven.cool = pwm oven.print_state() elif msg[0] == FUNC_C_STARTSTOP: profile, mode = msg[1] if mode == ARG_STARTSTOP_START: oven.start(profile) elif mode == ARG_STARTSTOP_STOP: oven.stop(profile) else: print("invalid STARTSTOP mode") elif msg[0] == FUNC_C_REQ_READ: profile, mode = msg[1] pass elif msg[0] == FUNC_C_WRITE: pass else: print("unexpected message", msg) class WSPort: def __init__(self, ws): self.ws = ws self.in_waiting = 0 def write(self, data): self.ws.send(data) def read(self, _): try: return self.ws.recv(0) except TimeoutError: pass @staticmethod def wrap(fn): return lambda ws: fn(WSPort(ws)) if __name__ == "__main__": if len(sys.argv) < 2: import websockets.sync.server as ws print("starting Dummy Oven over WebSocket on port :8001") with ws.serve(WSPort.wrap(run), host="0.0.0.0", port=8001) as server: server.serve_forever() else: import serial port = sys.argv[-1] print(f"starting Dummy Oven on port {port}") with serial.Serial(port, baudrate=38400, timeout=1) as ser: run(ser)