Source code for src.main

from __future__ import annotations
import cairo
import threading
import queue
import torch
import os
import gi
from pynput import keyboard
from desmume.emulator import DeSmuME
from desmume.controls import Keys, keymask
from src.visualization.draw import consume_draw_stack, draw_text, draw_paragraph
from src.core.memory import (
    read_clock,
    load_current_kcl,
    load_current_nkm,
    read_model_view,
    read_forward_distance_checkpoint,
    read_forward_distance_obstacle,
)
from src.utils.vector import get_mps_device
from src.visualization.overlay import (
    player_overlay,
    raycasting_overlay,
    collision_overlay,
    checkpoint_overlay_1,
    checkpoint_overlay_2,
)

os.environ["PKG_CONFIG_PATH"] = "/opt/homebrew/lib/pkgconfig:$PKG_CONFIG_PATH"
os.environ["DYLD_FALLBACK_LIBRARY_PATH"] = (
    "/opt/homebrew/lib:$DYLD_FALLBACK_LIBRARY_PATH"
)

gi.require_version("Gtk", "3.0")
gi.require_version("Gdk", "3.0")
from gi.repository import Gtk, Gdk, GLib

# ----------------------------
# CONSTANTS
# ----------------------------
#device = get_mps_device()
device = None
SCALE = 3
SCREEN_WIDTH, SCREEN_HEIGHT = 256, 192
SAVE_STATE_ID = 0


# ----------------------------
# GLOBAL VARIABLES
# ----------------------------
input_state = set()
is_running = False
scene_next = None
renderer = None
emu_global: DeSmuME | None = None
callback = None
worker_queue: queue.Queue[DeSmuME | None] = queue.Queue()

# ----------------------------
# KEY MAPPING
# ----------------------------
KEY_MAP = {
    "w": Keys.KEY_UP,
    "s": Keys.KEY_DOWN,
    "a": Keys.KEY_LEFT,
    "d": Keys.KEY_RIGHT,
    "z": Keys.KEY_B,
    "x": Keys.KEY_A,
    "u": Keys.KEY_X,
    "i": Keys.KEY_Y,
    "q": Keys.KEY_L,
    "e": Keys.KEY_R,
    " ": Keys.KEY_START,
    "left": Keys.KEY_LEFT,
    "right": Keys.KEY_RIGHT,
    "up": Keys.KEY_UP,
    "down": Keys.KEY_DOWN,
}


# ----------------------------
# ASYNC KEYBOARD HANDLER
# ----------------------------
[docs] def start_keyboard_listener(): """Starts a non-blocking keyboard listener in a separate thread.""" def on_press(key): try: name = key.char.lower() if hasattr(key, "char") else key.name if name in KEY_MAP: input_state.add(name) except Exception: pass def on_release(key): try: name = key.char.lower() if hasattr(key, "char") else key.name if name in KEY_MAP: input_state.discard(name) except Exception: pass listener = keyboard.Listener(on_press=on_press, on_release=on_release) listener.daemon = True listener.start() return listener
# ---------------------------- # DRAW CALLBACK # ---------------------------- overlay_surface_cache: cairo.Surface | None = None
[docs] def on_draw_main(widget: Gtk.DrawingArea, ctx: cairo.Context): global renderer, scene_current, emu_global, overlay_surface_cache if renderer is None: return False ctx.scale(SCALE, SCALE) renderer.screen(SCREEN_WIDTH, SCREEN_HEIGHT, ctx, 0) src_surface: cairo.ImageSurface = ctx.get_target() overlay_surface = src_surface.create_similar( cairo.CONTENT_COLOR_ALPHA, SCREEN_WIDTH, SCREEN_HEIGHT ) overlay_surface.set_device_scale(*src_surface.get_device_scale()) overlay_ctx = cairo.Context(overlay_surface) overlay_ctx.set_operator(cairo.OPERATOR_CLEAR) overlay_ctx.paint() overlay_ctx.set_operator(cairo.OPERATOR_OVER) overlay_ctx.set_antialias(cairo.ANTIALIAS_NONE) num_calls = consume_draw_stack(overlay_ctx) if num_calls == 0 and overlay_surface_cache is not None: to_paint = overlay_surface_cache else: overlay_surface_cache = overlay_surface to_paint = overlay_surface pattern = cairo.SurfacePattern(to_paint) pattern.set_filter(cairo.FILTER_NEAREST) ctx.set_source(pattern) ctx.set_operator(cairo.OPERATOR_OVER) ctx.paint() return False
[docs] def on_configure_main(widget: Gtk.DrawingArea, *args): global renderer if renderer: renderer.reshape(widget, 0) return True
# ---------------------------- # GTK WINDOW # ----------------------------
[docs] class EmulatorWindow(Gtk.Window): def __init__(self): super().__init__(title="MarI/O Kart") self.set_default_size(SCREEN_WIDTH, SCREEN_HEIGHT) drawing_area = Gtk.DrawingArea() drawing_area.set_size_request(SCREEN_WIDTH * SCALE, SCREEN_HEIGHT * SCALE) drawing_area.connect("draw", on_draw_main) drawing_area.connect("configure-event", on_configure_main) self.add(drawing_area) self.drawing_area = drawing_area self.connect("destroy", Gtk.main_quit) self.set_events(Gdk.EventMask.KEY_PRESS_MASK | Gdk.EventMask.KEY_RELEASE_MASK)
# ---------------------------- # EMULATION WORKER # ----------------------------
[docs] def worker(): global scene_next, callback, is_running assert callback is not None while True: emu_instance = worker_queue.get() if emu_instance is None: break try: callback(emu_instance) except Exception as e: raise RuntimeError( f"error on thread {threading.current_thread().name}: {e}" ) worker_queue.task_done() pass
# ---------------------------- # MAIN LOOP # ----------------------------
[docs] def run_emulator(overlays): global renderer, callback, emu_global, is_running emu = DeSmuME() emu.open("mariokart_ds.nds") emu.savestate.load(4) emu_global = emu emu.volume_set(0) n = load_current_nkm(emu, device=device) k = load_current_kcl(emu, device=device) from desmume.frontend.gtk_drawing_area_desmume import AbstractRenderer renderer = AbstractRenderer.impl(emu) renderer.init() window = EmulatorWindow() window.show_all() def main_callback(emu: DeSmuME): global device for overlay in overlays: overlay(emu, device) callback = main_callback thread = threading.Thread(target=worker, daemon=True) thread.start() is_running = True def tick(): global scene_current, scene_next emu.cycle() emu.input.keypad_update(0) for key in input_state: emu.input.keypad_add_key(keymask(KEY_MAP[key])) try: pass # next(_generator) except StopIteration: pass if not is_running: Gtk.main_quit() window.drawing_area.queue_draw() worker_queue.put(emu) return True # run at 60fps, non-blocking GLib.timeout_add(16, tick) Gtk.main() window.connect("destroy", lambda w: worker_queue.put(None))
# ---------------------------- # ENTRY POINT # ---------------------------- if __name__ == "__main__": device = get_mps_device() start_keyboard_listener() run_emulator( [ # player_overlay, collision_overlay, checkpoint_overlay_1, checkpoint_overlay_2, ], )