Files
hakorune/src/llvm_py/pyvm/vm.py

858 lines
40 KiB
Python
Raw Normal View History

"""
Minimal Python VM for Nyash MIR(JSON) parity with llvmlite.
Supported ops (MVP):
- const/binop/compare/branch/jump/ret
- phi (select by predecessor block)
- newbox: ConsoleBox, StringBox (minimal semantics)
- boxcall: String.length/substring/lastIndexOf, Console.print/println/log
- externcall: nyash.console.println
Value model:
- i64 -> Python int
- f64 -> Python float
- string -> Python str
- void/null -> None
- ConsoleBox -> {"__box__":"ConsoleBox"}
- StringBox receiver -> Python str
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import os
@dataclass
class Block:
id: int
instructions: List[Dict[str, Any]]
@dataclass
class Function:
name: str
params: List[int]
blocks: Dict[int, Block]
class PyVM:
def __init__(self, program: Dict[str, Any]):
self.functions: Dict[str, Function] = {}
self._debug = os.environ.get('NYASH_PYVM_DEBUG') in ('1','true','on')
for f in program.get("functions", []):
name = f.get("name")
params = [int(p) for p in f.get("params", [])]
bmap: Dict[int, Block] = {}
for bb in f.get("blocks", []):
bmap[int(bb.get("id"))] = Block(id=int(bb.get("id")), instructions=list(bb.get("instructions", [])))
# Register each function inside the loop (bugfix)
self.functions[name] = Function(name=name, params=params, blocks=bmap)
def _dbg(self, *a):
if self._debug:
try:
import sys as _sys
print(*a, file=_sys.stderr)
except Exception:
pass
def _type_name(self, v: Any) -> str:
"""Pretty type name for debug traces mapped to MIR conventions."""
if v is None:
return "null"
if isinstance(v, bool):
# Booleans are encoded as i64 0/1 in MIR
return "i64"
if isinstance(v, int):
return "i64"
if isinstance(v, float):
return "f64"
if isinstance(v, str):
return "string"
if isinstance(v, dict) and "__box__" in v:
return f"Box({v.get('__box__')})"
return type(v).__name__
# --- Capability helpers (macro sandbox) ---
def _macro_sandbox_active(self) -> bool:
"""Detect if we are running under macro sandbox.
Heuristics:
- Explicit flag NYASH_MACRO_SANDBOX=1
- Macro child default envs (plugins off + macro off)
- Any MACRO_CAP_* enabled
"""
if os.environ.get("NYASH_MACRO_SANDBOX", "0") in ("1", "true", "on"):
return True
if os.environ.get("NYASH_DISABLE_PLUGINS") in ("1", "true", "on") and os.environ.get("NYASH_MACRO_ENABLE") in ("0", "false", "off"):
return True
if self._cap_env() or self._cap_io() or self._cap_net():
return True
return False
def _cap_env(self) -> bool:
return os.environ.get("NYASH_MACRO_CAP_ENV", "0") in ("1", "true", "on")
def _cap_io(self) -> bool:
return os.environ.get("NYASH_MACRO_CAP_IO", "0") in ("1", "true", "on")
def _cap_net(self) -> bool:
return os.environ.get("NYASH_MACRO_CAP_NET", "0") in ("1", "true", "on")
def _read(self, regs: Dict[int, Any], v: Optional[int]) -> Any:
if v is None:
return None
return regs.get(int(v))
def _set(self, regs: Dict[int, Any], dst: Optional[int], val: Any) -> None:
if dst is None:
return
regs[int(dst)] = val
def _truthy(self, v: Any) -> bool:
if isinstance(v, bool):
return v
if isinstance(v, (int, float)):
return v != 0
if isinstance(v, str):
return len(v) != 0
return v is not None
def _is_console(self, v: Any) -> bool:
return isinstance(v, dict) and v.get("__box__") == "ConsoleBox"
def _sandbox_allow_newbox(self, box_type: str) -> bool:
"""Allow-list for constructing boxes under macro sandbox."""
if not self._macro_sandbox_active():
return True
if box_type in ("ConsoleBox", "StringBox", "ArrayBox", "MapBox"):
return True
if box_type in ("FileBox", "PathBox", "DirBox"):
return self._cap_io()
# Simple net-related boxes
if box_type in ("HTTPBox", "HttpBox", "SocketBox"):
return self._cap_net()
# Unknown boxes are denied in sandbox
return False
def _sandbox_allow_boxcall(self, recv: Any, method: Optional[str]) -> bool:
if not self._macro_sandbox_active():
return True
# Console methods are fine
if self._is_console(recv):
return True
# String methods (our VM treats StringBox receiver as Python str)
if isinstance(recv, str):
return method in ("length", "substring", "lastIndexOf", "indexOf")
# File/Path/Dir need IO cap
if isinstance(recv, dict) and recv.get("__box__") in ("FileBox", "PathBox", "DirBox"):
return self._cap_io()
# Other boxes are denied in sandbox
return False
def run(self, entry: str) -> Any:
fn = self.functions.get(entry)
if fn is None:
raise RuntimeError(f"entry function not found: {entry}")
self._dbg(f"[pyvm] run entry={entry}")
return self._exec_function(fn, [])
def run_args(self, entry: str, args: list[Any]) -> Any:
fn = self.functions.get(entry)
if fn is None:
raise RuntimeError(f"entry function not found: {entry}")
self._dbg(f"[pyvm] run entry={entry} argv={args}")
call_args = list(args)
# If entry is a typical main (main / *.main), pack argv into an ArrayBox-like value
# to match Nyash's `main(args)` convention regardless of param count.
try:
if entry == 'main' or entry.endswith('.main'):
call_args = [{"__box__": "ArrayBox", "__arr": list(args)}]
elif fn.params and len(fn.params) == 1:
call_args = [{"__box__": "ArrayBox", "__arr": list(args)}]
except Exception:
pass
return self._exec_function(fn, call_args)
def _exec_function(self, fn: Function, args: List[Any]) -> Any:
self._dbg(f"[pyvm] call {fn.name} args={args}")
# Intrinsic fast path for small helpers used in smokes
ok, ret = self._try_intrinsic(fn.name, args)
if ok:
return ret
# Initialize registers and bind params
regs: Dict[int, Any] = {}
if fn.params:
# If this function was lowered from a method (e.g., Main.foo/N), the first
# parameter is an implicit 'me' and call sites pass only N args.
# Align by detecting off-by-one and shifting args to skip the implicit receiver.
if len(args) + 1 == len(fn.params):
# Fill implicit 'me' (unused by our lowering at runtime) and map the rest
if fn.params:
regs[int(fn.params[0])] = None # placeholder for 'me'
for i, pid in enumerate(fn.params[1:]):
regs[int(pid)] = args[i] if i < len(args) else None
else:
# Direct positional bind
for i, pid in enumerate(fn.params):
regs[int(pid)] = args[i] if i < len(args) else None
else:
# Heuristic: derive param count from name suffix '/N' and bind to vids 0..N-1
n = 0
if "/" in fn.name:
try:
n = int(fn.name.split("/")[-1])
except Exception:
n = 0
for i in range(n):
regs[i] = args[i] if i < len(args) else None
# Choose a deterministic first block (lowest id)
if not fn.blocks:
return 0
cur = min(fn.blocks.keys())
prev: Optional[int] = None
# Simple block execution loop with step budget to avoid infinite hangs
max_steps = 0
try:
max_steps = int(os.environ.get("NYASH_PYVM_MAX_STEPS", "200000"))
except Exception:
max_steps = 200000
steps = 0
while True:
steps += 1
if max_steps and steps > max_steps:
raise RuntimeError(f"pyvm: max steps exceeded ({max_steps}) in function {fn.name}")
block = fn.blocks.get(cur)
if block is None:
raise RuntimeError(f"block not found: {cur}")
# Evaluate instructions sequentially
i = 0
while i < len(block.instructions):
inst = block.instructions[i]
op = inst.get("op")
if op == "phi":
# incoming: prefer [[vid, pred_bid]], but accept [pred_bid, vid] robustly
incoming = inst.get("incoming", [])
chosen: Any = None
dbg = os.environ.get('NYASH_PYVM_DEBUG_PHI') == '1'
if dbg:
print(f"[pyvm.phi] prev={prev} incoming={incoming}")
for pair in incoming:
if not isinstance(pair, (list, tuple)) or len(pair) < 2:
continue
a, b = pair[0], pair[1]
# Case 1: [vid, pred]
if prev is not None and int(b) == int(prev) and int(a) in regs:
chosen = regs.get(int(a))
if dbg:
print(f"[pyvm.phi] case1 match: use v{a} from pred {b} -> {chosen}")
break
if chosen is None and incoming:
# Fallback to first element that resolves to a known vid
for pair in incoming:
if not isinstance(pair, (list, tuple)) or len(pair) < 2:
continue
a, b = pair[0], pair[1]
if int(a) in regs:
chosen = regs.get(int(a)); break
# Do not try to resolve by assuming [pred, vid] — avoid false matches
if dbg:
print(f"[pyvm.phi] chosen={chosen}")
self._set(regs, inst.get("dst"), chosen)
i += 1
continue
if op == "const":
val = inst.get("value", {})
ty = val.get("type")
vv = val.get("value")
if ty == "i64":
out = int(vv)
elif ty == "f64":
out = float(vv)
elif ty == "string":
out = str(vv)
elif isinstance(ty, dict) and ty.get('kind') in ('handle','ptr') and ty.get('box_type') == 'StringBox':
# Treat handle/pointer-typed string constants as Python str for VM semantics
out = str(vv)
else:
out = None
self._set(regs, inst.get("dst"), out)
i += 1
continue
if op == "binop":
operation = inst.get("operation")
a = self._read(regs, inst.get("lhs"))
b = self._read(regs, inst.get("rhs"))
res: Any = None
if operation == "+":
if isinstance(a, str) or isinstance(b, str):
res = (str(a) if a is not None else "") + (str(b) if b is not None else "")
else:
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av + bv
elif operation == "-":
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av - bv
elif operation == "*":
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av * bv
elif operation == "/":
# integer division semantics for now
av = 0 if a is None else int(a)
bv = 1 if b in (None, 0) else int(b)
res = av // bv
elif operation == "%":
av = 0 if a is None else int(a)
bv = 1 if b in (None, 0) else int(b)
res = av % bv
elif operation in ("&", "|", "^"):
# treat as bitwise on ints
ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b))
if operation == "&":
res = ai & bi
elif operation == "|":
res = ai | bi
else:
res = ai ^ bi
elif operation in ("<<", ">>"):
ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b))
res = (ai << bi) if operation == "<<" else (ai >> bi)
else:
raise RuntimeError(f"unsupported binop: {operation}")
self._set(regs, inst.get("dst"), res)
i += 1
continue
if op == "compare":
operation = inst.get("operation")
a = self._read(regs, inst.get("lhs"))
b = self._read(regs, inst.get("rhs"))
res: bool
# For ordering comparisons, be robust to None by coercing to ints
if operation in ("<", "<=", ">", ">="):
try:
ai = 0 if a is None else (int(a) if not isinstance(a, str) else 0)
except Exception:
ai = 0
try:
bi = 0 if b is None else (int(b) if not isinstance(b, str) else 0)
except Exception:
bi = 0
if operation == "<":
res = ai < bi
elif operation == "<=":
res = ai <= bi
elif operation == ">":
res = ai > bi
else:
res = ai >= bi
elif operation == "==":
res = (a == b)
elif operation == "!=":
res = (a != b)
else:
raise RuntimeError(f"unsupported compare: {operation}")
# VM convention: booleans are i64 0/1
self._set(regs, inst.get("dst"), 1 if res else 0)
i += 1
continue
if op == "typeop":
# operation: "check" | "cast" ("as" is treated as cast for MVP)
operation = inst.get("operation") or inst.get("op")
src_vid = inst.get("src")
dst_vid = inst.get("dst")
target = (inst.get("target_type") or "")
src_val = self._read(regs, src_vid)
def is_type(val: Any, ty: str) -> bool:
t = (ty or "").strip()
t = t.lower()
# Normalize aliases
if t in ("stringbox",):
t = "string"
if t in ("integerbox", "int", "i64"):
t = "integer"
if t in ("floatbox", "f64"):
t = "float"
if t in ("boolbox", "boolean"):
t = "bool"
# Check by Python types/our boxed representations
if t == "string":
return isinstance(val, str)
if t == "integer":
# Treat Python ints (including 0/1) as integer
return isinstance(val, int) and not isinstance(val, bool)
if t == "float":
return isinstance(val, float)
if t == "bool":
# Our VM uses 0/1 ints for bool; accept 0 or 1
return isinstance(val, int) and (val == 0 or val == 1)
# Boxed receivers
if t.endswith("box"):
box_name = ty
if isinstance(val, dict) and val.get("__box__") == box_name:
return True
if box_name == "StringBox" and isinstance(val, str):
return True
if box_name == "ConsoleBox" and self._is_console(val):
return True
if box_name == "ArrayBox" and isinstance(val, dict) and val.get("__box__") == "ArrayBox":
return True
if box_name == "MapBox" and isinstance(val, dict) and val.get("__box__") == "MapBox":
return True
return False
return False
if (operation or "").lower() in ("check", "is"):
out = 1 if is_type(src_val, str(target)) else 0
self._set(regs, dst_vid, out)
else:
# cast/as: MVP pass-through
self._set(regs, dst_vid, src_val)
i += 1
continue
if op == "unop":
kind = inst.get("kind")
src = self._read(regs, inst.get("src"))
out: Any
if kind == "neg":
if isinstance(src, (int, float)):
out = -src
elif src is None:
out = 0
else:
try:
out = -int(src)
except Exception:
out = 0
elif kind == "not":
out = 0 if self._truthy(src) else 1
elif kind == "bitnot":
out = ~int(src) if src is not None else -1
else:
out = None
self._set(regs, inst.get("dst"), out)
i += 1
continue
if op == "newbox":
btype = inst.get("type")
# Sandbox gate: only allow minimal boxes when sandbox is active
if not self._sandbox_allow_newbox(str(btype)):
val = {"__box__": str(btype), "__denied__": True}
elif btype == "ConsoleBox":
val = {"__box__": "ConsoleBox"}
elif btype == "StringBox":
# empty string instance
val = ""
elif btype == "ArrayBox":
val = {"__box__": "ArrayBox", "__arr": []}
elif btype == "MapBox":
val = {"__box__": "MapBox", "__map": {}}
else:
# Unknown box -> opaque
val = {"__box__": btype}
self._set(regs, inst.get("dst"), val)
i += 1
continue
if op == "copy":
src = self._read(regs, inst.get("src"))
self._set(regs, inst.get("dst"), src)
i += 1
continue
if op == "boxcall":
recv = self._read(regs, inst.get("box"))
method = inst.get("method")
args = [self._read(regs, a) for a in inst.get("args", [])]
out: Any = None
self._dbg(f"[pyvm] boxcall recv={recv} method={method} args={args}")
# Sandbox gate: disallow unsafe/unknown boxcalls
if not self._sandbox_allow_boxcall(recv, method):
self._set(regs, inst.get("dst"), out)
i += 1
continue
# Special-case: inside a method body, 'me.method(...)' lowers to a
# boxcall with a synthetic receiver marker '__me__'. Resolve it by
# dispatching to the current box's lowered function if available.
if isinstance(recv, str) and recv == "__me__" and isinstance(method, str):
# Derive box name from current function (e.g., 'MiniVm.foo/2' -> 'MiniVm')
box_name = ""
try:
if "." in fn.name:
box_name = fn.name.split(".")[0]
except Exception:
box_name = ""
if box_name:
cand = f"{box_name}.{method}/{len(args)}"
callee = self.functions.get(cand)
if callee is not None:
self._dbg(f"[pyvm] boxcall(__me__) -> {cand} args={args}")
out = self._exec_function(callee, args)
self._set(regs, inst.get("dst"), out)
i += 1
continue
# Fast-path: built-in ArrayBox minimal methods (avoid noisy unresolved logs)
if isinstance(recv, dict) and recv.get("__box__") == "ArrayBox":
arr = recv.get("__arr", [])
if method in ("len", "size"):
out = len(arr)
elif method == "get":
idx = int(args[0]) if args else 0
out = arr[idx] if 0 <= idx < len(arr) else None
elif method == "set":
idx = int(args[0]) if len(args) > 0 else 0
val = args[1] if len(args) > 1 else None
if 0 <= idx < len(arr):
arr[idx] = val
elif idx == len(arr):
arr.append(val)
else:
while len(arr) < idx:
arr.append(None)
arr.append(val)
out = 0
elif method == "push":
val = args[0] if args else None
arr.append(val)
out = len(arr)
elif method == "toString":
out = "[" + ",".join(str(x) for x in arr) + "]"
else:
out = None
recv["__arr"] = arr
self._set(regs, inst.get("dst"), out)
i += 1
continue
# User-defined box: dispatch to lowered function if available (Box.method/N)
if isinstance(recv, dict) and isinstance(method, str) and "__box__" in recv:
box_name = recv.get("__box__")
cand = f"{box_name}.{method}/{len(args)}"
callee = self.functions.get(cand)
if callee is not None:
self._dbg(f"[pyvm] boxcall dispatch -> {cand} args={args}")
out = self._exec_function(callee, args)
self._set(regs, inst.get("dst"), out)
i += 1
continue
else:
if self._debug:
prefix = f"{box_name}.{method}/"
cands = sorted([k for k in self.functions.keys() if k.startswith(prefix)])
if cands:
self._dbg(f"[pyvm] boxcall unresolved: '{cand}' — available: {cands}")
else:
any_for_box = sorted([k for k in self.functions.keys() if k.startswith(f"{box_name}.")])
self._dbg(f"[pyvm] boxcall unresolved: '{cand}' — no candidates; methods for {box_name}: {any_for_box}")
# ConsoleBox methods
if method in ("print", "println", "log") and self._is_console(recv):
s = args[0] if args else ""
if s is None:
s = ""
if method == "println":
print(str(s))
else:
# println is the primary one used by smokes; keep print/log equivalent
print(str(s))
out = 0
# FileBox methods (minimal read-only)
elif isinstance(recv, dict) and recv.get("__box__") == "FileBox":
if method == "open":
path = str(args[0]) if len(args) > 0 else ""
mode = str(args[1]) if len(args) > 1 else "r"
ok = 0
content = None
if mode == "r":
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
ok = 1
except Exception:
ok = 0
content = None
recv["__open"] = (ok == 1)
recv["__path"] = path
recv["__content"] = content
out = ok
elif method == "read":
if isinstance(recv.get("__content"), str):
out = recv.get("__content")
else:
out = None
elif method == "close":
recv["__open"] = False
out = 0
else:
out = None
# PathBox methods (posix-like)
elif isinstance(recv, dict) and recv.get("__box__") == "PathBox":
if method == "dirname":
p = str(args[0]) if args else ""
# Normalize to POSIX-style
out = os.path.dirname(p)
if out == "":
out = "."
elif method == "join":
base = str(args[0]) if len(args) > 0 else ""
rel = str(args[1]) if len(args) > 1 else ""
out = os.path.join(base, rel)
else:
out = None
# ArrayBox minimal methods
elif isinstance(recv, dict) and recv.get("__box__") == "ArrayBox":
arr = recv.get("__arr", [])
if method in ("len", "size"):
out = len(arr)
elif method == "get":
idx = int(args[0]) if args else 0
out = arr[idx] if 0 <= idx < len(arr) else None
elif method == "set":
idx = int(args[0]) if len(args) > 0 else 0
val = args[1] if len(args) > 1 else None
if 0 <= idx < len(arr):
arr[idx] = val
elif idx == len(arr):
arr.append(val)
else:
# extend with None up to idx, then set
while len(arr) < idx:
arr.append(None)
arr.append(val)
out = 0
elif method == "push":
val = args[0] if args else None
arr.append(val)
out = len(arr)
elif method == "toString":
out = "[" + ",".join(str(x) for x in arr) + "]"
else:
out = None
recv["__arr"] = arr
# MapBox minimal methods
elif isinstance(recv, dict) and recv.get("__box__") == "MapBox":
m = recv.get("__map", {})
if method == "size":
out = len(m)
elif method == "has":
key = str(args[0]) if args else ""
out = 1 if key in m else 0
elif method == "get":
key = str(args[0]) if args else ""
out = m.get(key)
elif method == "set":
key = str(args[0]) if len(args) > 0 else ""
val = args[1] if len(args) > 1 else None
m[key] = val
out = 0
elif method == "toString":
items = ",".join(f"{k}:{m[k]}" for k in m)
out = "{" + items + "}"
else:
out = None
recv["__map"] = m
elif method == "esc_json":
# Escape backslash and double-quote in the given string argument
s = args[0] if args else ""
s = "" if s is None else str(s)
out_chars = []
for ch in s:
if ch == "\\":
out_chars.append("\\\\")
elif ch == '"':
out_chars.append('\\"')
else:
out_chars.append(ch)
out = "".join(out_chars)
elif method == "length":
out = len(str(recv))
elif method == "substring":
s = str(recv)
start = int(args[0]) if (len(args) > 0 and args[0] is not None) else 0
end = int(args[1]) if (len(args) > 1 and args[1] is not None) else len(s)
out = s[start:end]
elif method == "lastIndexOf":
s = str(recv)
needle = str(args[0]) if args else ""
# Optional start index (ignored by many call sites; support if provided)
if len(args) > 1 and args[1] is not None:
try:
start = int(args[1])
except Exception:
start = 0
out = s.rfind(needle, start)
else:
out = s.rfind(needle)
elif method == "indexOf":
s = str(recv)
needle = str(args[0]) if args else ""
# Support optional start index: indexOf(needle, start)
if len(args) > 1 and args[1] is not None:
try:
start = int(args[1])
except Exception:
start = 0
out = s.find(needle, start)
else:
out = s.find(needle)
else:
# Unimplemented method -> no-op
out = None
self._set(regs, inst.get("dst"), out)
i += 1
continue
if op == "externcall":
func = inst.get("func")
args = [self._read(regs, a) for a in inst.get("args", [])]
out: Any = None
self._dbg(f"[pyvm] externcall func={func} args={args}")
# Normalize known console/debug externs
if isinstance(func, str):
if func in ("nyash.console.println", "nyash.console.log", "env.console.log"):
s = args[0] if args else ""
if s is None:
s = ""
print(str(s))
out = 0
elif func in ("nyash.console.warn", "env.console.warn", "nyash.console.error", "env.console.error", "nyash.debug.trace", "env.debug.trace"):
s = args[0] if args else ""
if s is None:
s = ""
# Write to stderr for warn/error/trace to approximate real consoles
try:
import sys as _sys
print(str(s), file=_sys.stderr)
except Exception:
print(str(s))
out = 0
else:
# Macro sandbox: disallow unknown externcall unless explicitly whitelisted by future caps
# (currently no IO/NET externs are allowed in macro child)
out = 0
# Unknown extern -> no-op with 0/None
self._set(regs, inst.get("dst"), out)
i += 1
continue
if op == "branch":
cond = self._read(regs, inst.get("cond"))
tid = int(inst.get("then"))
eid = int(inst.get("else"))
prev = cur
cur = tid if self._truthy(cond) else eid
self._dbg(f"[pyvm] branch cond={cond} -> next={cur}")
# Restart execution at next block
break
if op == "jump":
tgt = int(inst.get("target"))
prev = cur
cur = tgt
self._dbg(f"[pyvm] jump -> {cur}")
break
if op == "ret":
v = self._read(regs, inst.get("value"))
if self._debug:
self._dbg(f"[pyvm] ret {self._type_name(v)} value={v}")
return v
if op == "call":
# Resolve function name from value or take as literal
fval = inst.get("func")
fname = self._read(regs, fval)
if not isinstance(fname, str):
# Fallback: if JSON encoded a literal name
fname = fval if isinstance(fval, str) else None
call_args = [self._read(regs, a) for a in inst.get("args", [])]
result = None
if isinstance(fname, str):
# Direct hit
if fname in self.functions:
callee = self.functions[fname]
self._dbg(f"[pyvm] call -> {fname} args={call_args}")
result = self._exec_function(callee, call_args)
else:
# Heuristic resolution: match suffix ".name/arity"
arity = len(call_args)
suffix = f".{fname}/{arity}"
candidates = [k for k in self.functions.keys() if k.endswith(suffix)]
if len(candidates) == 1:
callee = self.functions[candidates[0]]
self._dbg(f"[pyvm] call -> {candidates[0]} args={call_args}")
result = self._exec_function(callee, call_args)
elif self._debug and len(candidates) > 1:
self._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} has multiple candidates: {candidates}")
elif self._debug:
# Suggest close candidates across arities using suffix ".name/"
any_cands = sorted([k for k in self.functions.keys() if k.endswith(f".{fname}/") or f".{fname}/" in k])
if any_cands:
self._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} — available: {any_cands}")
else:
self._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} not found")
# Store result if needed
self._set(regs, inst.get("dst"), result)
i += 1
continue
# Unhandled op -> skip
i += 1
else:
# No explicit terminator; finish
return 0
def _try_intrinsic(self, name: str, args: List[Any]) -> Tuple[bool, Any]:
try:
if name == "Main.esc_json/1":
s = "" if not args else ("" if args[0] is None else str(args[0]))
out = []
for ch in s:
if ch == "\\":
out.append("\\\\")
elif ch == '"':
out.append('\\"')
else:
out.append(ch)
return True, "".join(out)
if name == "MiniVm.read_digits/2":
s = "" if not args or args[0] is None else str(args[0])
pos = 0 if len(args) < 2 or args[1] is None else int(args[1])
out_chars = []
while pos < len(s):
ch = s[pos]
if '0' <= ch <= '9':
out_chars.append(ch)
pos += 1
else:
break
return True, "".join(out_chars)
if name == "MiniVm.parse_first_int/1":
js = "" if not args or args[0] is None else str(args[0])
key = '"value":{"type":"int","value":'
idx = js.rfind(key)
if idx < 0:
return True, "0"
start = idx + len(key)
ok, digits = self._try_intrinsic("MiniVm.read_digits/2", [js, start])
return True, digits
if name == "Main.dirname/1":
p = "" if not args else ("" if args[0] is None else str(args[0]))
d = os.path.dirname(p)
if d == "":
d = "."
return True, d
except Exception:
pass
return (False, None)