pyvm: split op handlers into ops_core/ops_box/ops_ctrl; add ops_flow + intrinsic; delegate vm.py without behavior change

net-plugin: modularize constants (consts.rs) and sockets (sockets.rs); remove legacy commented socket code; fix unused imports
mir: move instruction unit tests to tests/mir_instruction_unit.rs (file lean-up); no semantic changes
runner/pyvm: ensure using pre-strip; misc docs updates

Build: cargo build ok; legacy cfg warnings remain as before
This commit is contained in:
Selfhosting Dev
2025-09-21 08:53:00 +09:00
parent ee17cfd979
commit c8063c9e41
247 changed files with 10187 additions and 23124 deletions

View File

@ -0,0 +1,53 @@
"""
Intrinsic helpers for PyVM. Keep logic identical to the inline version.
"""
from __future__ import annotations
from typing import Any, List, Tuple
import os
def try_intrinsic(name: str, args: List[Any]) -> Tuple[bool, Any]:
try:
if name == "Main.esc_json/1":
s = "" if not args else ("" if args[0] is None else str(args[0]))
out = []
for ch in s:
if ch == "\\":
out.append("\\\\")
elif ch == '"':
out.append('\\"')
else:
out.append(ch)
return True, "".join(out)
if name == "MiniVm.read_digits/2":
s = "" if not args or args[0] is None else str(args[0])
pos = 0 if len(args) < 2 or args[1] is None else int(args[1])
out_chars: list[str] = []
while pos < len(s):
ch = s[pos]
if '0' <= ch <= '9':
out_chars.append(ch)
pos += 1
else:
break
return True, "".join(out_chars)
if name == "MiniVm.parse_first_int/1":
js = "" if not args or args[0] is None else str(args[0])
key = '"value":{"type":"int","value":'
idx = js.rfind(key)
if idx < 0:
return True, "0"
start = idx + len(key)
ok, digits = try_intrinsic("MiniVm.read_digits/2", [js, start])
return True, digits
if name == "Main.dirname/1":
p = "" if not args else ("" if args[0] is None else str(args[0]))
d = os.path.dirname(p)
if d == "":
d = "."
return True, d
except Exception:
pass
return (False, None)

239
src/llvm_py/pyvm/ops_box.py Normal file
View File

@ -0,0 +1,239 @@
"""
Box-related operations for the Nyash PyVM: newbox and boxcall.
Kept behaviorally identical to the original vm.py code.
"""
from __future__ import annotations
from typing import Any, Dict, List
import os
def op_newbox(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
btype = inst.get("type")
# Sandbox gate: only allow minimal boxes when sandbox is active
if not owner._sandbox_allow_newbox(str(btype)):
val = {"__box__": str(btype), "__denied__": True}
elif btype == "ConsoleBox":
val = {"__box__": "ConsoleBox"}
elif btype == "StringBox":
# empty string instance
val = ""
elif btype == "ArrayBox":
val = {"__box__": "ArrayBox", "__arr": []}
elif btype == "MapBox":
val = {"__box__": "MapBox", "__map": {}}
else:
# Unknown box -> opaque
val = {"__box__": btype}
owner._set(regs, inst.get("dst"), val)
def op_boxcall(owner, fn, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
recv = owner._read(regs, inst.get("box"))
method = inst.get("method")
args: List[Any] = [owner._read(regs, a) for a in inst.get("args", [])]
out: Any = None
owner._dbg(f"[pyvm] boxcall recv={recv} method={method} args={args}")
# Sandbox gate: disallow unsafe/unknown boxcalls
if not owner._sandbox_allow_boxcall(recv, method):
owner._set(regs, inst.get("dst"), out)
return
# Special-case: inside a method body, 'me.method(...)' lowers to a
# boxcall with a synthetic receiver marker '__me__'. Resolve it by
# dispatching to the current box's lowered function if available.
if isinstance(recv, str) and recv == "__me__" and isinstance(method, str):
box_name = ""
try:
if "." in fn.name:
box_name = fn.name.split(".")[0]
except Exception:
box_name = ""
if box_name:
cand = f"{box_name}.{method}/{len(args)}"
callee = owner.functions.get(cand)
if callee is not None:
owner._dbg(f"[pyvm] boxcall(__me__) -> {cand} args={args}")
out = owner._exec_function(callee, args)
owner._set(regs, inst.get("dst"), out)
return
# User-defined box: dispatch to lowered function if available (Box.method/N)
if isinstance(recv, dict) and isinstance(method, str) and "__box__" in recv:
box_name = recv.get("__box__")
cand = f"{box_name}.{method}/{len(args)}"
callee = owner.functions.get(cand)
if callee is not None:
owner._dbg(f"[pyvm] boxcall dispatch -> {cand} args={args}")
out = owner._exec_function(callee, args)
owner._set(regs, inst.get("dst"), out)
return
else:
if owner._debug:
prefix = f"{box_name}.{method}/"
cands = sorted([k for k in owner.functions.keys() if k.startswith(prefix)])
if cands:
owner._dbg(f"[pyvm] boxcall unresolved: '{cand}' — available: {cands}")
else:
any_for_box = sorted([k for k in owner.functions.keys() if k.startswith(f"{box_name}.")])
owner._dbg(f"[pyvm] boxcall unresolved: '{cand}' — no candidates; methods for {box_name}: {any_for_box}")
# ConsoleBox methods
if method in ("print", "println", "log") and owner._is_console(recv):
s = args[0] if args else ""
if s is None:
s = ""
# println is the primary one used by smokes; keep print/log equivalent
print(str(s))
out = 0
# FileBox methods (minimal read-only)
elif isinstance(recv, dict) and recv.get("__box__") == "FileBox":
if method == "open":
path = str(args[0]) if len(args) > 0 else ""
mode = str(args[1]) if len(args) > 1 else "r"
ok = 0
content = None
if mode == "r":
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
ok = 1
except Exception:
ok = 0
content = None
recv["__open"] = (ok == 1)
recv["__path"] = path
recv["__content"] = content
out = ok
elif method == "read":
if isinstance(recv.get("__content"), str):
out = recv.get("__content")
else:
out = None
elif method == "close":
recv["__open"] = False
out = 0
else:
out = None
# PathBox methods (posix-like)
elif isinstance(recv, dict) and recv.get("__box__") == "PathBox":
if method == "dirname":
p = str(args[0]) if args else ""
out = os.path.dirname(p)
if out == "":
out = "."
elif method == "join":
base = str(args[0]) if len(args) > 0 else ""
rel = str(args[1]) if len(args) > 1 else ""
out = os.path.join(base, rel)
else:
out = None
# ArrayBox minimal methods
elif isinstance(recv, dict) and recv.get("__box__") == "ArrayBox":
arr = recv.get("__arr", [])
if method in ("len", "size"):
out = len(arr)
elif method == "get":
idx = int(args[0]) if args else 0
out = arr[idx] if 0 <= idx < len(arr) else None
elif method == "set":
idx = int(args[0]) if len(args) > 0 else 0
val = args[1] if len(args) > 1 else None
if 0 <= idx < len(arr):
arr[idx] = val
elif idx == len(arr):
arr.append(val)
else:
while len(arr) < idx:
arr.append(None)
arr.append(val)
out = 0
elif method == "push":
val = args[0] if args else None
arr.append(val)
out = len(arr)
elif method == "toString":
out = "[" + ",".join(str(x) for x in arr) + "]"
else:
out = None
recv["__arr"] = arr
# MapBox minimal methods
elif isinstance(recv, dict) and recv.get("__box__") == "MapBox":
m = recv.get("__map", {})
if method == "size":
out = len(m)
elif method == "has":
key = str(args[0]) if args else ""
out = 1 if key in m else 0
elif method == "get":
key = str(args[0]) if args else ""
out = m.get(key)
elif method == "set":
key = str(args[0]) if len(args) > 0 else ""
val = args[1] if len(args) > 1 else None
m[key] = val
out = 0
elif method == "toString":
items = ",".join(f"{k}:{m[k]}" for k in m)
out = "{" + items + "}"
else:
out = None
recv["__map"] = m
elif method == "esc_json":
s = args[0] if args else ""
s = "" if s is None else str(s)
out_chars: List[str] = []
for ch in s:
if ch == "\\":
out_chars.append("\\\\")
elif ch == '"':
out_chars.append('\\"')
else:
out_chars.append(ch)
out = "".join(out_chars)
elif method == "length":
out = len(str(recv))
elif method == "substring":
s = str(recv)
start = int(args[0]) if (len(args) > 0 and args[0] is not None) else 0
end = int(args[1]) if (len(args) > 1 and args[1] is not None) else len(s)
out = s[start:end]
elif method == "lastIndexOf":
s = str(recv)
needle = str(args[0]) if args else ""
if len(args) > 1 and args[1] is not None:
try:
start = int(args[1])
except Exception:
start = 0
out = s.rfind(needle, start)
else:
out = s.rfind(needle)
elif method == "indexOf":
s = str(recv)
needle = str(args[0]) if args else ""
if len(args) > 1 and args[1] is not None:
try:
start = int(args[1])
except Exception:
start = 0
out = s.find(needle, start)
else:
out = s.find(needle)
else:
# Unimplemented method -> no-op
out = None
owner._set(regs, inst.get("dst"), out)

View File

@ -0,0 +1,220 @@
"""
Core operation handlers for the Nyash PyVM.
These helpers are pure functions that take the VM instance (owner),
the instruction dict, and the current register file, then update regs
via owner._set as needed. They return nothing; control flow is handled
by the caller in vm.py.
"""
from __future__ import annotations
from typing import Any, Dict, Optional
def op_phi(owner, inst: Dict[str, Any], regs: Dict[int, Any], prev: Optional[int]) -> None:
incoming = inst.get("incoming", [])
chosen: Any = None
dbg = owner and getattr(owner, "_debug", False) and (owner and (owner.__class__.__name__ == "PyVM"))
# Use dedicated env flag for phi trace (matches existing behavior)
import os
if os.environ.get("NYASH_PYVM_DEBUG_PHI") == "1":
print(f"[pyvm.phi] prev={prev} incoming={incoming}")
dbg = True
# Prefer [vid, pred] that matches prev
for pair in incoming:
if not isinstance(pair, (list, tuple)) or len(pair) < 2:
continue
a, b = pair[0], pair[1]
if prev is not None and int(b) == int(prev) and int(a) in regs:
chosen = regs.get(int(a))
if dbg:
print(f"[pyvm.phi] case1 match: use v{a} from pred {b} -> {chosen}")
break
# Fallback: first resolvable vid
if chosen is None and incoming:
for pair in incoming:
if not isinstance(pair, (list, tuple)) or len(pair) < 2:
continue
a, _b = pair[0], pair[1]
if int(a) in regs:
chosen = regs.get(int(a))
break
if os.environ.get("NYASH_PYVM_DEBUG_PHI") == "1":
print(f"[pyvm.phi] chosen={chosen}")
owner._set(regs, inst.get("dst"), chosen)
def op_const(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
val = inst.get("value", {})
ty = val.get("type")
vv = val.get("value")
if ty == "i64":
out = int(vv)
elif ty == "f64":
out = float(vv)
elif ty == "string":
out = str(vv)
elif isinstance(ty, dict) and ty.get("kind") in ("handle", "ptr") and ty.get("box_type") == "StringBox":
out = str(vv)
else:
out = None
owner._set(regs, inst.get("dst"), out)
def op_binop(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
operation = inst.get("operation")
a = owner._read(regs, inst.get("lhs"))
b = owner._read(regs, inst.get("rhs"))
res: Any = None
if operation == "+":
if isinstance(a, str) or isinstance(b, str):
res = (str(a) if a is not None else "") + (str(b) if b is not None else "")
else:
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av + bv
elif operation == "-":
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av - bv
elif operation == "*":
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av * bv
elif operation == "/":
av = 0 if a is None else int(a)
bv = 1 if b in (None, 0) else int(b)
res = av // bv
elif operation == "%":
av = 0 if a is None else int(a)
bv = 1 if b in (None, 0) else int(b)
res = av % bv
elif operation in ("&", "|", "^"):
ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b))
if operation == "&":
res = ai & bi
elif operation == "|":
res = ai | bi
else:
res = ai ^ bi
elif operation in ("<<", ">>"):
ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b))
res = (ai << bi) if operation == "<<" else (ai >> bi)
else:
raise RuntimeError(f"unsupported binop: {operation}")
owner._set(regs, inst.get("dst"), res)
def op_compare(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
operation = inst.get("operation")
a = owner._read(regs, inst.get("lhs"))
b = owner._read(regs, inst.get("rhs"))
res: bool
# For ordering comparisons, be robust to None by coercing to ints
if operation in ("<", "<=", ">", ">="):
try:
ai = 0 if a is None else (int(a) if not isinstance(a, str) else 0)
except Exception:
ai = 0
try:
bi = 0 if b is None else (int(b) if not isinstance(b, str) else 0)
except Exception:
bi = 0
if operation == "<":
res = ai < bi
elif operation == "<=":
res = ai <= bi
elif operation == ">":
res = ai > bi
else:
res = ai >= bi
elif operation == "==":
res = (a == b)
elif operation == "!=":
res = (a != b)
else:
raise RuntimeError(f"unsupported compare: {operation}")
owner._set(regs, inst.get("dst"), 1 if res else 0)
def op_typeop(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
# operation: "check" | "cast" ("as" is treated as cast for MVP)
operation = inst.get("operation") or inst.get("op")
src_vid = inst.get("src")
dst_vid = inst.get("dst")
target = (inst.get("target_type") or "")
src_val = owner._read(regs, src_vid)
def is_type(val: Any, ty: str) -> bool:
t = (ty or "").strip()
t = t.lower()
# Normalize aliases
if t in ("stringbox",):
t = "string"
if t in ("integerbox", "int", "i64"):
t = "integer"
if t in ("floatbox", "f64"):
t = "float"
if t in ("boolbox", "boolean"):
t = "bool"
# Check by Python types/our boxed representations
if t == "string":
return isinstance(val, str)
if t == "integer":
# Treat Python ints (including 0/1) as integer (bools are ints in Python; original code excluded bool)
return isinstance(val, int) and not isinstance(val, bool)
if t == "float":
return isinstance(val, float)
if t == "bool":
# Our VM uses 0/1 ints for bool; accept 0 or 1
return isinstance(val, int) and (val == 0 or val == 1)
# Boxed receivers
if t.endswith("box"):
box_name = ty
if isinstance(val, dict) and val.get("__box__") == box_name:
return True
if box_name == "StringBox" and isinstance(val, str):
return True
if box_name == "ConsoleBox" and owner._is_console(val):
return True
if box_name == "ArrayBox" and isinstance(val, dict) and val.get("__box__") == "ArrayBox":
return True
if box_name == "MapBox" and isinstance(val, dict) and val.get("__box__") == "MapBox":
return True
return False
return False
if (operation or "").lower() in ("check", "is"):
out = 1 if is_type(src_val, str(target)) else 0
owner._set(regs, dst_vid, out)
else:
# cast/as: MVP pass-through
owner._set(regs, dst_vid, src_val)
def op_unop(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
kind = inst.get("kind")
src = owner._read(regs, inst.get("src"))
out: Any
if kind == "neg":
if isinstance(src, (int, float)):
out = -src
elif src is None:
out = 0
else:
try:
out = -int(src)
except Exception:
out = 0
elif kind == "not":
out = 0 if owner._truthy(src) else 1
elif kind == "bitnot":
out = ~int(src) if src is not None else -1
else:
out = None
owner._set(regs, inst.get("dst"), out)
def op_copy(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
src = owner._read(regs, inst.get("src"))
owner._set(regs, inst.get("dst"), src)

View File

@ -0,0 +1,44 @@
"""
Control/side-effect ops for the Nyash PyVM that don't affect block control
flow directly: externcall normalization. Branch/jump/ret/call stay in vm.py.
"""
from __future__ import annotations
from typing import Any, Dict
def op_externcall(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> None:
func = inst.get("func")
args = [owner._read(regs, a) for a in inst.get("args", [])]
out: Any = None
owner._dbg(f"[pyvm] externcall func={func} args={args}")
# Normalize known console/debug externs
if isinstance(func, str):
if func in ("nyash.console.println", "nyash.console.log", "env.console.log"):
s = args[0] if args else ""
if s is None:
s = ""
print(str(s))
out = 0
elif func in (
"nyash.console.warn",
"env.console.warn",
"nyash.console.error",
"env.console.error",
"nyash.debug.trace",
"env.debug.trace",
):
s = args[0] if args else ""
if s is None:
s = ""
try:
import sys as _sys
print(str(s), file=_sys.stderr)
except Exception:
print(str(s))
out = 0
else:
# Macro sandbox: disallow unknown externcall unless explicitly whitelisted by future caps
out = 0
owner._set(regs, inst.get("dst"), out)

View File

@ -0,0 +1,71 @@
"""
Flow/control-related ops for Nyash PyVM: branch, jump, ret, call.
These mutate the control flow (cur/prev) or return from the function.
"""
from __future__ import annotations
from typing import Any, Dict, List, Tuple
def op_branch(owner, inst: Dict[str, Any], regs: Dict[int, Any], cur: int, prev: int | None) -> Tuple[int | None, int]:
cond = owner._read(regs, inst.get("cond"))
tid = int(inst.get("then"))
eid = int(inst.get("else"))
prev = cur
cur = tid if owner._truthy(cond) else eid
owner._dbg(f"[pyvm] branch cond={cond} -> next={cur}")
return prev, cur
def op_jump(owner, inst: Dict[str, Any], _regs: Dict[int, Any], cur: int, prev: int | None) -> Tuple[int | None, int]:
tgt = int(inst.get("target"))
prev = cur
cur = tgt
owner._dbg(f"[pyvm] jump -> {cur}")
return prev, cur
def op_ret(owner, inst: Dict[str, Any], regs: Dict[int, Any]) -> Any:
v = owner._read(regs, inst.get("value"))
if getattr(owner, "_debug", False):
owner._dbg(f"[pyvm] ret {owner._type_name(v)} value={v}")
return v
def op_call(owner, fn, inst: Dict[str, Any], regs: Dict[int, Any]) -> Any:
# Resolve function name from value or take as literal
fval = inst.get("func")
if isinstance(fval, str):
fname = fval
else:
fname = owner._read(regs, fval)
if not isinstance(fname, str):
# Fallback: if JSON encoded a literal name
fname = fval if isinstance(fval, str) else None
call_args = [owner._read(regs, a) for a in inst.get("args", [])]
result = None
if isinstance(fname, str):
# Direct hit
if fname in owner.functions:
callee = owner.functions[fname]
owner._dbg(f"[pyvm] call -> {fname} args={call_args}")
result = owner._exec_function(callee, call_args)
else:
# Heuristic resolution: match suffix ".name/arity"
arity = len(call_args)
suffix = f".{fname}/{arity}"
candidates = [k for k in owner.functions.keys() if k.endswith(suffix)]
if len(candidates) == 1:
callee = owner.functions[candidates[0]]
owner._dbg(f"[pyvm] call -> {candidates[0]} args={call_args}")
result = owner._exec_function(callee, call_args)
elif getattr(owner, "_debug", False) and len(candidates) > 1:
owner._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} has multiple candidates: {candidates}")
elif getattr(owner, "_debug", False):
# Suggest close candidates across arities using suffix ".name/"
any_cands = sorted([k for k in owner.functions.keys() if k.endswith(f".{fname}/") or f".{fname}/" in k])
if any_cands:
owner._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} — available: {any_cands}")
else:
owner._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} not found")
return result

View File

@ -21,6 +21,19 @@ from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
import os
from .ops_core import (
op_phi,
op_const,
op_binop,
op_compare,
op_typeop,
op_unop,
op_copy,
)
from .ops_box import op_newbox, op_boxcall
from .ops_ctrl import op_externcall
from .ops_flow import op_branch, op_jump, op_ret, op_call
from .intrinsic import try_intrinsic as _intrinsic_try
@dataclass
@ -234,573 +247,68 @@ class PyVM:
op = inst.get("op")
if op == "phi":
# incoming: prefer [[vid, pred_bid]], but accept [pred_bid, vid] robustly
incoming = inst.get("incoming", [])
chosen: Any = None
dbg = os.environ.get('NYASH_PYVM_DEBUG_PHI') == '1'
if dbg:
print(f"[pyvm.phi] prev={prev} incoming={incoming}")
for pair in incoming:
if not isinstance(pair, (list, tuple)) or len(pair) < 2:
continue
a, b = pair[0], pair[1]
# Case 1: [vid, pred]
if prev is not None and int(b) == int(prev) and int(a) in regs:
chosen = regs.get(int(a))
if dbg:
print(f"[pyvm.phi] case1 match: use v{a} from pred {b} -> {chosen}")
break
if chosen is None and incoming:
# Fallback to first element that resolves to a known vid
for pair in incoming:
if not isinstance(pair, (list, tuple)) or len(pair) < 2:
continue
a, b = pair[0], pair[1]
if int(a) in regs:
chosen = regs.get(int(a)); break
# Do not try to resolve by assuming [pred, vid] — avoid false matches
if dbg:
print(f"[pyvm.phi] chosen={chosen}")
self._set(regs, inst.get("dst"), chosen)
op_phi(self, inst, regs, prev)
i += 1
continue
if op == "const":
val = inst.get("value", {})
ty = val.get("type")
vv = val.get("value")
if ty == "i64":
out = int(vv)
elif ty == "f64":
out = float(vv)
elif ty == "string":
out = str(vv)
elif isinstance(ty, dict) and ty.get('kind') in ('handle','ptr') and ty.get('box_type') == 'StringBox':
# Treat handle/pointer-typed string constants as Python str for VM semantics
out = str(vv)
else:
out = None
self._set(regs, inst.get("dst"), out)
op_const(self, inst, regs)
i += 1
continue
if op == "binop":
operation = inst.get("operation")
a = self._read(regs, inst.get("lhs"))
b = self._read(regs, inst.get("rhs"))
res: Any = None
if operation == "+":
if isinstance(a, str) or isinstance(b, str):
res = (str(a) if a is not None else "") + (str(b) if b is not None else "")
else:
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av + bv
elif operation == "-":
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av - bv
elif operation == "*":
av = 0 if a is None else int(a)
bv = 0 if b is None else int(b)
res = av * bv
elif operation == "/":
# integer division semantics for now
av = 0 if a is None else int(a)
bv = 1 if b in (None, 0) else int(b)
res = av // bv
elif operation == "%":
av = 0 if a is None else int(a)
bv = 1 if b in (None, 0) else int(b)
res = av % bv
elif operation in ("&", "|", "^"):
# treat as bitwise on ints
ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b))
if operation == "&":
res = ai & bi
elif operation == "|":
res = ai | bi
else:
res = ai ^ bi
elif operation in ("<<", ">>"):
ai, bi = (0 if a is None else int(a)), (0 if b is None else int(b))
res = (ai << bi) if operation == "<<" else (ai >> bi)
else:
raise RuntimeError(f"unsupported binop: {operation}")
self._set(regs, inst.get("dst"), res)
op_binop(self, inst, regs)
i += 1
continue
if op == "compare":
operation = inst.get("operation")
a = self._read(regs, inst.get("lhs"))
b = self._read(regs, inst.get("rhs"))
res: bool
# For ordering comparisons, be robust to None by coercing to ints
if operation in ("<", "<=", ">", ">="):
try:
ai = 0 if a is None else (int(a) if not isinstance(a, str) else 0)
except Exception:
ai = 0
try:
bi = 0 if b is None else (int(b) if not isinstance(b, str) else 0)
except Exception:
bi = 0
if operation == "<":
res = ai < bi
elif operation == "<=":
res = ai <= bi
elif operation == ">":
res = ai > bi
else:
res = ai >= bi
elif operation == "==":
res = (a == b)
elif operation == "!=":
res = (a != b)
else:
raise RuntimeError(f"unsupported compare: {operation}")
# VM convention: booleans are i64 0/1
self._set(regs, inst.get("dst"), 1 if res else 0)
op_compare(self, inst, regs)
i += 1
continue
if op == "typeop":
# operation: "check" | "cast" ("as" is treated as cast for MVP)
operation = inst.get("operation") or inst.get("op")
src_vid = inst.get("src")
dst_vid = inst.get("dst")
target = (inst.get("target_type") or "")
src_val = self._read(regs, src_vid)
def is_type(val: Any, ty: str) -> bool:
t = (ty or "").strip()
t = t.lower()
# Normalize aliases
if t in ("stringbox",):
t = "string"
if t in ("integerbox", "int", "i64"):
t = "integer"
if t in ("floatbox", "f64"):
t = "float"
if t in ("boolbox", "boolean"):
t = "bool"
# Check by Python types/our boxed representations
if t == "string":
return isinstance(val, str)
if t == "integer":
# Treat Python ints (including 0/1) as integer
return isinstance(val, int) and not isinstance(val, bool)
if t == "float":
return isinstance(val, float)
if t == "bool":
# Our VM uses 0/1 ints for bool; accept 0 or 1
return isinstance(val, int) and (val == 0 or val == 1)
# Boxed receivers
if t.endswith("box"):
box_name = ty
if isinstance(val, dict) and val.get("__box__") == box_name:
return True
if box_name == "StringBox" and isinstance(val, str):
return True
if box_name == "ConsoleBox" and self._is_console(val):
return True
if box_name == "ArrayBox" and isinstance(val, dict) and val.get("__box__") == "ArrayBox":
return True
if box_name == "MapBox" and isinstance(val, dict) and val.get("__box__") == "MapBox":
return True
return False
return False
if (operation or "").lower() in ("check", "is"):
out = 1 if is_type(src_val, str(target)) else 0
self._set(regs, dst_vid, out)
else:
# cast/as: MVP pass-through
self._set(regs, dst_vid, src_val)
op_typeop(self, inst, regs)
i += 1
continue
if op == "unop":
kind = inst.get("kind")
src = self._read(regs, inst.get("src"))
out: Any
if kind == "neg":
if isinstance(src, (int, float)):
out = -src
elif src is None:
out = 0
else:
try:
out = -int(src)
except Exception:
out = 0
elif kind == "not":
out = 0 if self._truthy(src) else 1
elif kind == "bitnot":
out = ~int(src) if src is not None else -1
else:
out = None
self._set(regs, inst.get("dst"), out)
op_unop(self, inst, regs)
i += 1
continue
if op == "newbox":
btype = inst.get("type")
# Sandbox gate: only allow minimal boxes when sandbox is active
if not self._sandbox_allow_newbox(str(btype)):
val = {"__box__": str(btype), "__denied__": True}
elif btype == "ConsoleBox":
val = {"__box__": "ConsoleBox"}
elif btype == "StringBox":
# empty string instance
val = ""
elif btype == "ArrayBox":
val = {"__box__": "ArrayBox", "__arr": []}
elif btype == "MapBox":
val = {"__box__": "MapBox", "__map": {}}
else:
# Unknown box -> opaque
val = {"__box__": btype}
self._set(regs, inst.get("dst"), val)
op_newbox(self, inst, regs)
i += 1
continue
if op == "copy":
src = self._read(regs, inst.get("src"))
self._set(regs, inst.get("dst"), src)
op_copy(self, inst, regs)
i += 1
continue
if op == "boxcall":
recv = self._read(regs, inst.get("box"))
method = inst.get("method")
args = [self._read(regs, a) for a in inst.get("args", [])]
out: Any = None
self._dbg(f"[pyvm] boxcall recv={recv} method={method} args={args}")
# Sandbox gate: disallow unsafe/unknown boxcalls
if not self._sandbox_allow_boxcall(recv, method):
self._set(regs, inst.get("dst"), out)
i += 1
continue
# Special-case: inside a method body, 'me.method(...)' lowers to a
# boxcall with a synthetic receiver marker '__me__'. Resolve it by
# dispatching to the current box's lowered function if available.
if isinstance(recv, str) and recv == "__me__" and isinstance(method, str):
# Derive box name from current function (e.g., 'MiniVm.foo/2' -> 'MiniVm')
box_name = ""
try:
if "." in fn.name:
box_name = fn.name.split(".")[0]
except Exception:
box_name = ""
if box_name:
cand = f"{box_name}.{method}/{len(args)}"
callee = self.functions.get(cand)
if callee is not None:
self._dbg(f"[pyvm] boxcall(__me__) -> {cand} args={args}")
out = self._exec_function(callee, args)
self._set(regs, inst.get("dst"), out)
i += 1
continue
# Fast-path: built-in ArrayBox minimal methods (avoid noisy unresolved logs)
if isinstance(recv, dict) and recv.get("__box__") == "ArrayBox":
arr = recv.get("__arr", [])
if method in ("len", "size"):
out = len(arr)
elif method == "get":
idx = int(args[0]) if args else 0
out = arr[idx] if 0 <= idx < len(arr) else None
elif method == "set":
idx = int(args[0]) if len(args) > 0 else 0
val = args[1] if len(args) > 1 else None
if 0 <= idx < len(arr):
arr[idx] = val
elif idx == len(arr):
arr.append(val)
else:
while len(arr) < idx:
arr.append(None)
arr.append(val)
out = 0
elif method == "push":
val = args[0] if args else None
arr.append(val)
out = len(arr)
elif method == "toString":
out = "[" + ",".join(str(x) for x in arr) + "]"
else:
out = None
recv["__arr"] = arr
self._set(regs, inst.get("dst"), out)
i += 1
continue
# User-defined box: dispatch to lowered function if available (Box.method/N)
if isinstance(recv, dict) and isinstance(method, str) and "__box__" in recv:
box_name = recv.get("__box__")
cand = f"{box_name}.{method}/{len(args)}"
callee = self.functions.get(cand)
if callee is not None:
self._dbg(f"[pyvm] boxcall dispatch -> {cand} args={args}")
out = self._exec_function(callee, args)
self._set(regs, inst.get("dst"), out)
i += 1
continue
else:
if self._debug:
prefix = f"{box_name}.{method}/"
cands = sorted([k for k in self.functions.keys() if k.startswith(prefix)])
if cands:
self._dbg(f"[pyvm] boxcall unresolved: '{cand}' — available: {cands}")
else:
any_for_box = sorted([k for k in self.functions.keys() if k.startswith(f"{box_name}.")])
self._dbg(f"[pyvm] boxcall unresolved: '{cand}' — no candidates; methods for {box_name}: {any_for_box}")
# ConsoleBox methods
if method in ("print", "println", "log") and self._is_console(recv):
s = args[0] if args else ""
if s is None:
s = ""
if method == "println":
print(str(s))
else:
# println is the primary one used by smokes; keep print/log equivalent
print(str(s))
out = 0
# FileBox methods (minimal read-only)
elif isinstance(recv, dict) and recv.get("__box__") == "FileBox":
if method == "open":
path = str(args[0]) if len(args) > 0 else ""
mode = str(args[1]) if len(args) > 1 else "r"
ok = 0
content = None
if mode == "r":
try:
with open(path, "r", encoding="utf-8") as f:
content = f.read()
ok = 1
except Exception:
ok = 0
content = None
recv["__open"] = (ok == 1)
recv["__path"] = path
recv["__content"] = content
out = ok
elif method == "read":
if isinstance(recv.get("__content"), str):
out = recv.get("__content")
else:
out = None
elif method == "close":
recv["__open"] = False
out = 0
else:
out = None
# PathBox methods (posix-like)
elif isinstance(recv, dict) and recv.get("__box__") == "PathBox":
if method == "dirname":
p = str(args[0]) if args else ""
# Normalize to POSIX-style
out = os.path.dirname(p)
if out == "":
out = "."
elif method == "join":
base = str(args[0]) if len(args) > 0 else ""
rel = str(args[1]) if len(args) > 1 else ""
out = os.path.join(base, rel)
else:
out = None
# ArrayBox minimal methods
elif isinstance(recv, dict) and recv.get("__box__") == "ArrayBox":
arr = recv.get("__arr", [])
if method in ("len", "size"):
out = len(arr)
elif method == "get":
idx = int(args[0]) if args else 0
out = arr[idx] if 0 <= idx < len(arr) else None
elif method == "set":
idx = int(args[0]) if len(args) > 0 else 0
val = args[1] if len(args) > 1 else None
if 0 <= idx < len(arr):
arr[idx] = val
elif idx == len(arr):
arr.append(val)
else:
# extend with None up to idx, then set
while len(arr) < idx:
arr.append(None)
arr.append(val)
out = 0
elif method == "push":
val = args[0] if args else None
arr.append(val)
out = len(arr)
elif method == "toString":
out = "[" + ",".join(str(x) for x in arr) + "]"
else:
out = None
recv["__arr"] = arr
# MapBox minimal methods
elif isinstance(recv, dict) and recv.get("__box__") == "MapBox":
m = recv.get("__map", {})
if method == "size":
out = len(m)
elif method == "has":
key = str(args[0]) if args else ""
out = 1 if key in m else 0
elif method == "get":
key = str(args[0]) if args else ""
out = m.get(key)
elif method == "set":
key = str(args[0]) if len(args) > 0 else ""
val = args[1] if len(args) > 1 else None
m[key] = val
out = 0
elif method == "toString":
items = ",".join(f"{k}:{m[k]}" for k in m)
out = "{" + items + "}"
else:
out = None
recv["__map"] = m
elif method == "esc_json":
# Escape backslash and double-quote in the given string argument
s = args[0] if args else ""
s = "" if s is None else str(s)
out_chars = []
for ch in s:
if ch == "\\":
out_chars.append("\\\\")
elif ch == '"':
out_chars.append('\\"')
else:
out_chars.append(ch)
out = "".join(out_chars)
elif method == "length":
out = len(str(recv))
elif method == "substring":
s = str(recv)
start = int(args[0]) if (len(args) > 0 and args[0] is not None) else 0
end = int(args[1]) if (len(args) > 1 and args[1] is not None) else len(s)
out = s[start:end]
elif method == "lastIndexOf":
s = str(recv)
needle = str(args[0]) if args else ""
# Optional start index (ignored by many call sites; support if provided)
if len(args) > 1 and args[1] is not None:
try:
start = int(args[1])
except Exception:
start = 0
out = s.rfind(needle, start)
else:
out = s.rfind(needle)
elif method == "indexOf":
s = str(recv)
needle = str(args[0]) if args else ""
# Support optional start index: indexOf(needle, start)
if len(args) > 1 and args[1] is not None:
try:
start = int(args[1])
except Exception:
start = 0
out = s.find(needle, start)
else:
out = s.find(needle)
else:
# Unimplemented method -> no-op
out = None
self._set(regs, inst.get("dst"), out)
op_boxcall(self, fn, inst, regs)
i += 1
continue
if op == "externcall":
func = inst.get("func")
args = [self._read(regs, a) for a in inst.get("args", [])]
out: Any = None
self._dbg(f"[pyvm] externcall func={func} args={args}")
# Normalize known console/debug externs
if isinstance(func, str):
if func in ("nyash.console.println", "nyash.console.log", "env.console.log"):
s = args[0] if args else ""
if s is None:
s = ""
print(str(s))
out = 0
elif func in ("nyash.console.warn", "env.console.warn", "nyash.console.error", "env.console.error", "nyash.debug.trace", "env.debug.trace"):
s = args[0] if args else ""
if s is None:
s = ""
# Write to stderr for warn/error/trace to approximate real consoles
try:
import sys as _sys
print(str(s), file=_sys.stderr)
except Exception:
print(str(s))
out = 0
else:
# Macro sandbox: disallow unknown externcall unless explicitly whitelisted by future caps
# (currently no IO/NET externs are allowed in macro child)
out = 0
# Unknown extern -> no-op with 0/None
self._set(regs, inst.get("dst"), out)
op_externcall(self, inst, regs)
i += 1
continue
if op == "branch":
cond = self._read(regs, inst.get("cond"))
tid = int(inst.get("then"))
eid = int(inst.get("else"))
prev = cur
cur = tid if self._truthy(cond) else eid
self._dbg(f"[pyvm] branch cond={cond} -> next={cur}")
# Restart execution at next block
prev, cur = op_branch(self, inst, regs, cur, prev)
break
if op == "jump":
tgt = int(inst.get("target"))
prev = cur
cur = tgt
self._dbg(f"[pyvm] jump -> {cur}")
prev, cur = op_jump(self, inst, regs, cur, prev)
break
if op == "ret":
v = self._read(regs, inst.get("value"))
if self._debug:
self._dbg(f"[pyvm] ret {self._type_name(v)} value={v}")
return v
return op_ret(self, inst, regs)
if op == "call":
# Resolve function name from value or take as literal
fval = inst.get("func")
fname = self._read(regs, fval)
if not isinstance(fname, str):
# Fallback: if JSON encoded a literal name
fname = fval if isinstance(fval, str) else None
call_args = [self._read(regs, a) for a in inst.get("args", [])]
result = None
if isinstance(fname, str):
# Direct hit
if fname in self.functions:
callee = self.functions[fname]
self._dbg(f"[pyvm] call -> {fname} args={call_args}")
result = self._exec_function(callee, call_args)
else:
# Heuristic resolution: match suffix ".name/arity"
arity = len(call_args)
suffix = f".{fname}/{arity}"
candidates = [k for k in self.functions.keys() if k.endswith(suffix)]
if len(candidates) == 1:
callee = self.functions[candidates[0]]
self._dbg(f"[pyvm] call -> {candidates[0]} args={call_args}")
result = self._exec_function(callee, call_args)
elif self._debug and len(candidates) > 1:
self._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} has multiple candidates: {candidates}")
elif self._debug:
# Suggest close candidates across arities using suffix ".name/"
any_cands = sorted([k for k in self.functions.keys() if k.endswith(f".{fname}/") or f".{fname}/" in k])
if any_cands:
self._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} — available: {any_cands}")
else:
self._dbg(f"[pyvm] call unresolved: '{fname}'/{arity} not found")
# Store result if needed
result = op_call(self, fn, inst, regs)
self._set(regs, inst.get("dst"), result)
i += 1
continue
@ -813,45 +321,4 @@ class PyVM:
return 0
def _try_intrinsic(self, name: str, args: List[Any]) -> Tuple[bool, Any]:
try:
if name == "Main.esc_json/1":
s = "" if not args else ("" if args[0] is None else str(args[0]))
out = []
for ch in s:
if ch == "\\":
out.append("\\\\")
elif ch == '"':
out.append('\\"')
else:
out.append(ch)
return True, "".join(out)
if name == "MiniVm.read_digits/2":
s = "" if not args or args[0] is None else str(args[0])
pos = 0 if len(args) < 2 or args[1] is None else int(args[1])
out_chars = []
while pos < len(s):
ch = s[pos]
if '0' <= ch <= '9':
out_chars.append(ch)
pos += 1
else:
break
return True, "".join(out_chars)
if name == "MiniVm.parse_first_int/1":
js = "" if not args or args[0] is None else str(args[0])
key = '"value":{"type":"int","value":'
idx = js.rfind(key)
if idx < 0:
return True, "0"
start = idx + len(key)
ok, digits = self._try_intrinsic("MiniVm.read_digits/2", [js, start])
return True, digits
if name == "Main.dirname/1":
p = "" if not args else ("" if args[0] is None else str(args[0]))
d = os.path.dirname(p)
if d == "":
d = "."
return True, d
except Exception:
pass
return (False, None)
return _intrinsic_try(name, args)