Source code for src.visualization.window

from desmume.emulator import DeSmuME
from desmume.controls import Keys, keymask
from desmume.frontend.gtk_drawing_area_desmume import AbstractRenderer
from src.visualization.draw import consume_draw_stack
import cairo, numpy as np, math, time, os
from pynput import keyboard
from multiprocessing.shared_memory import SharedMemory
import gi

gi.require_version("Gtk", "3.0")
gi.require_version("Gdk", "3.0")
from gi.repository import Gtk, Gdk, GLib

# ----------------------------
# CONSTANTS
# ----------------------------
SCALE = 3
SCREEN_WIDTH, SCREEN_HEIGHT = 256, 192

# ----------------------------
# GLOBAL VARIABLES
# ----------------------------
input_state = set()
renderer = None
overlay_surface_cache = None


# ----------------------------
# KEY MAPPING
# ----------------------------
KEY_MAP = {
    "w": Keys.KEY_UP,
    "s": Keys.KEY_DOWN,
    "a": Keys.KEY_LEFT,
    "d": Keys.KEY_RIGHT,
    "z": Keys.KEY_B,
    "x": Keys.KEY_A,
    "u": Keys.KEY_X,
    "i": Keys.KEY_Y,
    "q": Keys.KEY_L,
    "e": Keys.KEY_R,
    " ": Keys.KEY_START,
    "left": Keys.KEY_LEFT,
    "right": Keys.KEY_RIGHT,
    "up": Keys.KEY_UP,
    "down": Keys.KEY_DOWN,
}


# ----------------------------
# ASYNC KEYBOARD HANDLER
# ----------------------------
[docs] def start_keyboard_listener(): """Starts a non-blocking keyboard listener in a separate thread.""" def on_press(key): try: name = key.char.lower() if hasattr(key, "char") else key.name if name in KEY_MAP: input_state.add(name) except Exception: pass def on_release(key): try: name = key.char.lower() if hasattr(key, "char") else key.name if name in KEY_MAP: input_state.discard(name) except Exception: pass listener = keyboard.Listener(on_press=on_press, on_release=on_release) listener.daemon = True listener.start() return listener
# ============================================================ # Overlay-Composited Offscreen Draw # ============================================================
[docs] def on_draw_memoryview( buf: memoryview, width: int, height: int, scale_x: float, scale_y: float ) -> np.ndarray: """Scale emulator RGBX frame and apply overlay → NumPy RGBA array.""" stride = width * 4 src_surface = cairo.ImageSurface.create_for_data( buf, cairo.FORMAT_RGB24, width, height, stride ) new_w, new_h = int(width * scale_x), int(height * scale_y) dest_surface = cairo.ImageSurface(cairo.FORMAT_ARGB32, new_w, new_h) ctx = cairo.Context(dest_surface) ctx.scale(scale_x, scale_y) # Compose overlays composited = apply_overlay_to_surface(src_surface) ctx.set_source_surface(composited, 0, 0) ctx.paint() buf_out = dest_surface.get_data() arr = np.ndarray((new_h, new_w, 4), np.uint8, buffer=buf_out) return arr
[docs] def apply_overlay_to_surface(src_surface: cairo.ImageSurface) -> cairo.Surface: """Return new Cairo surface with overlay applied.""" global overlay_surface_cache width, height = src_surface.get_width(), src_surface.get_height() overlay_surface = src_surface.create_similar( cairo.CONTENT_COLOR_ALPHA, width, height ) overlay_surface.set_device_scale(*src_surface.get_device_scale()) overlay_ctx = cairo.Context(overlay_surface) overlay_ctx.set_operator(cairo.OPERATOR_CLEAR) overlay_ctx.paint() overlay_ctx.set_operator(cairo.OPERATOR_OVER) overlay_ctx.set_antialias(cairo.ANTIALIAS_NONE) num_calls = consume_draw_stack(overlay_ctx) if num_calls == 0 and overlay_surface_cache is not None: overlay = overlay_surface_cache else: overlay_surface_cache = overlay_surface overlay = overlay_surface result_surface = src_surface.create_similar( cairo.CONTENT_COLOR_ALPHA, width, height ) ctx = cairo.Context(result_surface) ctx.set_source_surface(src_surface, 0, 0) ctx.paint() ctx.set_source_surface(overlay, 0, 0) ctx.paint() return result_surface
# ============================================================ # Multi-Process Display Window (shared memory compositor) # ============================================================
[docs] class SharedEmulatorWindow(Gtk.Window): def __init__(self, width: int, height: int, n_cols: int, n_rows: int, renderer: AbstractRenderer, shm_names: list[str]): super().__init__(title="MarI/O Kart — Multi Display") self.connect("destroy", self._on_destroy) self.set_default_size(width, height) self.width = width self.height = height self.n_cols = n_cols self.n_rows = n_rows self.renderer = renderer # Hold references so their buffers remain valid: self._shms = [SharedMemory(name=name) for name in shm_names] self.frames = [ np.ndarray((SCREEN_HEIGHT, SCREEN_WIDTH, 4), dtype=np.uint8, buffer=shm.buf) for shm in self._shms ] self.area = Gtk.DrawingArea() self.area.connect("draw", self.on_draw) self.add(self.area) GLib.timeout_add(33, self.refresh)
[docs] def on_draw(self, widget: Gtk.DrawingArea, ctx: cairo.Context): import math scale = 1.0 tile_w = self.width / self.n_cols tile_h = tile_w * (SCREEN_HEIGHT / SCREEN_WIDTH) for i, frame in enumerate(self.frames): r, c = divmod(i, self.n_cols) surface = cairo.ImageSurface.create_for_data( frame, cairo.FORMAT_RGB24, SCREEN_WIDTH, SCREEN_HEIGHT ) ctx.save() ctx.translate(c * tile_w, r * tile_h) ctx.scale(tile_w / SCREEN_WIDTH, tile_h / SCREEN_HEIGHT) ctx.set_source_surface(surface, 0, 0) ctx.get_source().set_filter(cairo.Filter.NEAREST) # Disable smoothing ctx.paint() ctx.restore()
[docs] def refresh(self): self.area.queue_draw() return True
def _on_destroy(self, *_): # Close (do not unlink) — unlink after procs finish in the trainer for shm in self._shms: try: shm.close() except Exception: pass
# ============================================================ # Standard Single Emulator GTK Window (for interactive mode) # ============================================================
[docs] class EmulatorWindow(Gtk.Window): """Single-emulator interactive window with overlay support.""" def __init__(self, emu: DeSmuME): global renderer renderer = AbstractRenderer.impl(emu) renderer.init() super().__init__(title="MarI/O Kart") self.set_default_size(SCREEN_WIDTH, SCREEN_HEIGHT) drawing_area = Gtk.DrawingArea() drawing_area.set_size_request(SCREEN_WIDTH * SCALE, SCREEN_HEIGHT * SCALE) drawing_area.connect("draw", on_draw_main) drawing_area.connect("configure-event", on_configure_main) self.add(drawing_area) self.drawing_area = drawing_area self.connect("destroy", Gtk.main_quit) self.set_events(Gdk.EventMask.KEY_PRESS_MASK | Gdk.EventMask.KEY_RELEASE_MASK)
[docs] def start(self, func, queue): GLib.timeout_add(16, func) Gtk.main() self.connect("destroy", lambda w: queue.put(None))
[docs] def kill(self): Gtk.main_quit()
# ============================================================ # GTK Draw Helpers (used by EmulatorWindow) # ============================================================
[docs] def on_draw_main(widget: Gtk.DrawingArea, ctx: cairo.Context): global renderer, overlay_surface_cache if renderer is None: return False ctx.scale(SCALE, SCALE) renderer.screen(SCREEN_WIDTH, SCREEN_HEIGHT, ctx, 0) src_surface = ctx.get_target() overlay_surface = src_surface.create_similar( cairo.CONTENT_COLOR_ALPHA, SCREEN_WIDTH, SCREEN_HEIGHT ) overlay_surface.set_device_scale(*src_surface.get_device_scale()) overlay_ctx = cairo.Context(overlay_surface) overlay_ctx.set_operator(cairo.OPERATOR_CLEAR) overlay_ctx.paint() overlay_ctx.set_operator(cairo.OPERATOR_OVER) overlay_ctx.set_antialias(cairo.ANTIALIAS_NONE) num_calls = consume_draw_stack(overlay_ctx) if num_calls == 0 and overlay_surface_cache is not None: to_paint = overlay_surface_cache else: overlay_surface_cache = overlay_surface to_paint = overlay_surface pattern = cairo.SurfacePattern(to_paint) pattern.set_filter(cairo.FILTER_NEAREST) ctx.set_source(pattern) ctx.set_operator(cairo.OPERATOR_OVER) ctx.paint() return False
[docs] def on_configure_main(widget: Gtk.DrawingArea, *args): global renderer if renderer: renderer.reshape(widget, 0) return True