""" Minimal Python VM for Nyash MIR(JSON) parity with llvmlite. Supported ops (MVP): - const/binop/compare/branch/jump/ret - phi (select by predecessor block) - newbox: ConsoleBox, StringBox (minimal semantics) - boxcall: String.length/substring/lastIndexOf, Console.print/println/log - externcall: nyash.console.println Value model: - i64 -> Python int - f64 -> Python float - string -> Python str - void/null -> None - ConsoleBox -> {"__box__":"ConsoleBox"} - StringBox receiver -> Python str """ from __future__ import annotations from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple import os @dataclass class Block: id: int instructions: List[Dict[str, Any]] @dataclass class Function: name: str params: List[int] blocks: Dict[int, Block] class PyVM: def __init__(self, program: Dict[str, Any]): self.functions: Dict[str, Function] = {} for f in program.get("functions", []): name = f.get("name") params = [int(p) for p in f.get("params", [])] bmap: Dict[int, Block] = {} for bb in f.get("blocks", []): bmap[int(bb.get("id"))] = Block(id=int(bb.get("id")), instructions=list(bb.get("instructions", []))) self.functions[name] = Function(name=name, params=params, blocks=bmap) def _read(self, regs: Dict[int, Any], v: Optional[int]) -> Any: if v is None: return None return regs.get(int(v)) def _set(self, regs: Dict[int, Any], dst: Optional[int], val: Any) -> None: if dst is None: return regs[int(dst)] = val def _truthy(self, v: Any) -> bool: if isinstance(v, bool): return v if isinstance(v, (int, float)): return v != 0 if isinstance(v, str): return len(v) != 0 return v is not None def _is_console(self, v: Any) -> bool: return isinstance(v, dict) and v.get("__box__") == "ConsoleBox" def run(self, entry: str) -> Any: fn = self.functions.get(entry) if fn is None: raise RuntimeError(f"entry function not found: {entry}") return self._exec_function(fn, []) def _exec_function(self, fn: Function, args: List[Any]) -> Any: # Intrinsic fast path for small helpers used in smokes ok, ret = self._try_intrinsic(fn.name, args) if ok: return ret # Initialize registers and bind params regs: Dict[int, Any] = {} if fn.params: # If this function was lowered from a method (e.g., Main.foo/N), the first # parameter is an implicit 'me' and call sites pass only N args. # Align by detecting off-by-one and shifting args to skip the implicit receiver. if len(args) + 1 == len(fn.params): # Fill implicit 'me' (unused by our lowering at runtime) and map the rest if fn.params: regs[int(fn.params[0])] = None # placeholder for 'me' for i, pid in enumerate(fn.params[1:]): regs[int(pid)] = args[i] if i < len(args) else None else: # Direct positional bind for i, pid in enumerate(fn.params): regs[int(pid)] = args[i] if i < len(args) else None else: # Heuristic: derive param count from name suffix '/N' and bind to vids 0..N-1 n = 0 if "/" in fn.name: try: n = int(fn.name.split("/")[-1]) except Exception: n = 0 for i in range(n): regs[i] = args[i] if i < len(args) else None # Choose a deterministic first block (lowest id) if not fn.blocks: return 0 cur = min(fn.blocks.keys()) prev: Optional[int] = None # Simple block execution loop with step budget to avoid infinite hangs max_steps = 0 try: max_steps = int(os.environ.get("NYASH_PYVM_MAX_STEPS", "200000")) except Exception: max_steps = 200000 steps = 0 while True: steps += 1 if max_steps and steps > max_steps: raise RuntimeError(f"pyvm: max steps exceeded ({max_steps}) in function {fn.name}") block = fn.blocks.get(cur) if block is None: raise RuntimeError(f"block not found: {cur}") # Evaluate instructions sequentially i = 0 while i < len(block.instructions): inst = block.instructions[i] op = inst.get("op") if op == "phi": # incoming: prefer [[vid, pred_bid]], but accept [pred_bid, vid] robustly incoming = inst.get("incoming", []) chosen: Any = None dbg = os.environ.get('NYASH_PYVM_DEBUG_PHI') == '1' if dbg: print(f"[pyvm.phi] prev={prev} incoming={incoming}") for pair in incoming: if not isinstance(pair, (list, tuple)) or len(pair) < 2: continue a, b = pair[0], pair[1] # Case 1: [vid, pred] if prev is not None and int(b) == int(prev) and int(a) in regs: chosen = regs.get(int(a)) if dbg: print(f"[pyvm.phi] case1 match: use v{a} from pred {b} -> {chosen}") break if chosen is None and incoming: # Fallback to first element that resolves to a known vid for pair in incoming: if not isinstance(pair, (list, tuple)) or len(pair) < 2: continue a, b = pair[0], pair[1] if int(a) in regs: chosen = regs.get(int(a)); break # Do not try to resolve by assuming [pred, vid] — avoid false matches if dbg: print(f"[pyvm.phi] chosen={chosen}") self._set(regs, inst.get("dst"), chosen) i += 1 continue if op == "const": val = inst.get("value", {}) ty = val.get("type") vv = val.get("value") if ty == "i64": out = int(vv) elif ty == "f64": out = float(vv) elif ty == "string": out = str(vv) elif isinstance(ty, dict) and ty.get('kind') in ('handle','ptr') and ty.get('box_type') == 'StringBox': # Treat handle/pointer-typed string constants as Python str for VM semantics out = str(vv) else: out = None self._set(regs, inst.get("dst"), out) i += 1 continue if op == "binop": operation = inst.get("operation") a = self._read(regs, inst.get("lhs")) b = self._read(regs, inst.get("rhs")) res: Any = None if operation == "+": if isinstance(a, str) or isinstance(b, str): res = (str(a) if a is not None else "") + (str(b) if b is not None else "") else: av = 0 if a is None else int(a) bv = 0 if b is None else int(b) res = av + bv elif operation == "-": av = 0 if a is None else int(a) bv = 0 if b is None else int(b) res = av - bv elif operation == "*": av = 0 if a is None else int(a) bv = 0 if b is None else int(b) res = av * bv elif operation == "/": # integer division semantics for now av = 0 if a is None else int(a) bv = 1 if b in (None, 0) else int(b) res = av // bv elif operation == "%": av = 0 if a is None else int(a) bv = 1 if b in (None, 0) else int(b) res = av % bv elif operation in ("&", "|", "^"): # treat as bitwise on ints ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b)) if operation == "&": res = ai & bi elif operation == "|": res = ai | bi else: res = ai ^ bi elif operation in ("<<", ">>"): ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b)) res = (ai << bi) if operation == "<<" else (ai >> bi) else: raise RuntimeError(f"unsupported binop: {operation}") self._set(regs, inst.get("dst"), res) i += 1 continue if op == "compare": operation = inst.get("operation") a = self._read(regs, inst.get("lhs")) b = self._read(regs, inst.get("rhs")) res: bool # For ordering comparisons, be robust to None by coercing to ints if operation in ("<", "<=", ">", ">="): try: ai = 0 if a is None else (int(a) if not isinstance(a, str) else 0) except Exception: ai = 0 try: bi = 0 if b is None else (int(b) if not isinstance(b, str) else 0) except Exception: bi = 0 if operation == "<": res = ai < bi elif operation == "<=": res = ai <= bi elif operation == ">": res = ai > bi else: res = ai >= bi elif operation == "==": res = (a == b) elif operation == "!=": res = (a != b) else: raise RuntimeError(f"unsupported compare: {operation}") # VM convention: booleans are i64 0/1 self._set(regs, inst.get("dst"), 1 if res else 0) i += 1 continue if op == "typeop": # operation: "check" | "cast" ("as" is treated as cast for MVP) operation = inst.get("operation") or inst.get("op") src_vid = inst.get("src") dst_vid = inst.get("dst") target = (inst.get("target_type") or "") src_val = self._read(regs, src_vid) def is_type(val: Any, ty: str) -> bool: t = (ty or "").strip() t = t.lower() # Normalize aliases if t in ("stringbox",): t = "string" if t in ("integerbox", "int", "i64"): t = "integer" if t in ("floatbox", "f64"): t = "float" if t in ("boolbox", "boolean"): t = "bool" # Check by Python types/our boxed representations if t == "string": return isinstance(val, str) if t == "integer": # Treat Python ints (including 0/1) as integer return isinstance(val, int) and not isinstance(val, bool) if t == "float": return isinstance(val, float) if t == "bool": # Our VM uses 0/1 ints for bool; accept 0 or 1 return isinstance(val, int) and (val == 0 or val == 1) # Boxed receivers if t.endswith("box"): box_name = ty if isinstance(val, dict) and val.get("__box__") == box_name: return True if box_name == "StringBox" and isinstance(val, str): return True if box_name == "ConsoleBox" and self._is_console(val): return True if box_name == "ArrayBox" and isinstance(val, dict) and val.get("__box__") == "ArrayBox": return True if box_name == "MapBox" and isinstance(val, dict) and val.get("__box__") == "MapBox": return True return False return False if (operation or "").lower() in ("check", "is"): out = 1 if is_type(src_val, str(target)) else 0 self._set(regs, dst_vid, out) else: # cast/as: MVP pass-through self._set(regs, dst_vid, src_val) i += 1 continue if op == "unop": kind = inst.get("kind") src = self._read(regs, inst.get("src")) out: Any if kind == "neg": if isinstance(src, (int, float)): out = -src elif src is None: out = 0 else: try: out = -int(src) except Exception: out = 0 elif kind == "not": out = 0 if self._truthy(src) else 1 elif kind == "bitnot": out = ~int(src) if src is not None else -1 else: out = None self._set(regs, inst.get("dst"), out) i += 1 continue if op == "newbox": btype = inst.get("type") if btype == "ConsoleBox": val = {"__box__": "ConsoleBox"} elif btype == "StringBox": # empty string instance val = "" elif btype == "ArrayBox": val = {"__box__": "ArrayBox", "__arr": []} elif btype == "MapBox": val = {"__box__": "MapBox", "__map": {}} else: # Unknown box -> opaque val = {"__box__": btype} self._set(regs, inst.get("dst"), val) i += 1 continue if op == "copy": src = self._read(regs, inst.get("src")) self._set(regs, inst.get("dst"), src) i += 1 continue if op == "boxcall": recv = self._read(regs, inst.get("box")) method = inst.get("method") args = [self._read(regs, a) for a in inst.get("args", [])] out: Any = None # ConsoleBox methods if method in ("print", "println", "log") and self._is_console(recv): s = args[0] if args else "" if s is None: s = "" if method == "println": print(str(s)) else: # println is the primary one used by smokes; keep print/log equivalent print(str(s)) out = 0 # FileBox methods (minimal read-only) elif isinstance(recv, dict) and recv.get("__box__") == "FileBox": if method == "open": path = str(args[0]) if len(args) > 0 else "" mode = str(args[1]) if len(args) > 1 else "r" ok = 0 content = None if mode == "r": try: with open(path, "r", encoding="utf-8") as f: content = f.read() ok = 1 except Exception: ok = 0 content = None recv["__open"] = (ok == 1) recv["__path"] = path recv["__content"] = content out = ok elif method == "read": if isinstance(recv.get("__content"), str): out = recv.get("__content") else: out = None elif method == "close": recv["__open"] = False out = 0 else: out = None # PathBox methods (posix-like) elif isinstance(recv, dict) and recv.get("__box__") == "PathBox": if method == "dirname": p = str(args[0]) if args else "" # Normalize to POSIX-style out = os.path.dirname(p) if out == "": out = "." elif method == "join": base = str(args[0]) if len(args) > 0 else "" rel = str(args[1]) if len(args) > 1 else "" out = os.path.join(base, rel) else: out = None # ArrayBox minimal methods elif isinstance(recv, dict) and recv.get("__box__") == "ArrayBox": arr = recv.get("__arr", []) if method in ("len", "size"): out = len(arr) elif method == "get": idx = int(args[0]) if args else 0 out = arr[idx] if 0 <= idx < len(arr) else None elif method == "set": idx = int(args[0]) if len(args) > 0 else 0 val = args[1] if len(args) > 1 else None if 0 <= idx < len(arr): arr[idx] = val elif idx == len(arr): arr.append(val) else: # extend with None up to idx, then set while len(arr) < idx: arr.append(None) arr.append(val) out = 0 elif method == "push": val = args[0] if args else None arr.append(val) out = len(arr) elif method == "toString": out = "[" + ",".join(str(x) for x in arr) + "]" else: out = None recv["__arr"] = arr # MapBox minimal methods elif isinstance(recv, dict) and recv.get("__box__") == "MapBox": m = recv.get("__map", {}) if method == "size": out = len(m) elif method == "has": key = str(args[0]) if args else "" out = 1 if key in m else 0 elif method == "get": key = str(args[0]) if args else "" out = m.get(key) elif method == "set": key = str(args[0]) if len(args) > 0 else "" val = args[1] if len(args) > 1 else None m[key] = val out = 0 elif method == "toString": items = ",".join(f"{k}:{m[k]}" for k in m) out = "{" + items + "}" else: out = None recv["__map"] = m elif method == "esc_json": # Escape backslash and double-quote in the given string argument s = args[0] if args else "" s = "" if s is None else str(s) out_chars = [] for ch in s: if ch == "\\": out_chars.append("\\\\") elif ch == '"': out_chars.append('\\"') else: out_chars.append(ch) out = "".join(out_chars) elif method == "length": out = len(str(recv)) elif method == "substring": s = str(recv) start = int(args[0]) if len(args) > 0 else 0 end = int(args[1]) if len(args) > 1 else len(s) out = s[start:end] elif method == "lastIndexOf": s = str(recv) needle = str(args[0]) if args else "" out = s.rfind(needle) else: # Unimplemented method -> no-op out = None self._set(regs, inst.get("dst"), out) i += 1 continue if op == "externcall": func = inst.get("func") args = [self._read(regs, a) for a in inst.get("args", [])] out: Any = None # Normalize known console/debug externs if isinstance(func, str): if func in ("nyash.console.println", "nyash.console.log", "env.console.log"): s = args[0] if args else "" if s is None: s = "" print(str(s)) out = 0 elif func in ("nyash.console.warn", "env.console.warn", "nyash.console.error", "env.console.error", "nyash.debug.trace", "env.debug.trace"): s = args[0] if args else "" if s is None: s = "" # Write to stderr for warn/error/trace to approximate real consoles try: import sys as _sys print(str(s), file=_sys.stderr) except Exception: print(str(s)) out = 0 # Unknown extern -> no-op with 0/None self._set(regs, inst.get("dst"), out) i += 1 continue if op == "branch": cond = self._read(regs, inst.get("cond")) tid = int(inst.get("then")) eid = int(inst.get("else")) prev = cur cur = tid if self._truthy(cond) else eid # Restart execution at next block break if op == "jump": tgt = int(inst.get("target")) prev = cur cur = tgt break if op == "ret": v = self._read(regs, inst.get("value")) return v if op == "call": # Resolve function name from value or take as literal fval = inst.get("func") fname = self._read(regs, fval) if not isinstance(fname, str): # Fallback: if JSON encoded a literal name fname = fval if isinstance(fval, str) else None call_args = [self._read(regs, a) for a in inst.get("args", [])] result = None if isinstance(fname, str) and fname in self.functions: callee = self.functions[fname] result = self._exec_function(callee, call_args) # Store result if needed self._set(regs, inst.get("dst"), result) i += 1 continue # Unhandled op -> skip i += 1 else: # No explicit terminator; finish return 0 def _try_intrinsic(self, name: str, args: List[Any]) -> Tuple[bool, Any]: try: if name == "Main.esc_json/1": s = "" if not args else ("" if args[0] is None else str(args[0])) out = [] for ch in s: if ch == "\\": out.append("\\\\") elif ch == '"': out.append('\\"') else: out.append(ch) return True, "".join(out) if name == "Main.dirname/1": p = "" if not args else ("" if args[0] is None else str(args[0])) d = os.path.dirname(p) if d == "": d = "." return True, d except Exception: pass return (False, None)