Phase 11.8/12: MIR Core-13 roadmap, Nyash ABI design, async/await enhancements with TaskGroupBox foundation
Major additions:
- Phase 11.8 MIR cleanup specification (Core-15→14→13 roadmap)
- Nyash ABI unified design document (3×u64 structure)
- TaskGroupBox foundation with cancelAll/joinAll methods
- Enhanced async/await with checkpoint auto-insertion
- Structured concurrency preparation (parent-child task relationships)
Documentation:
- docs/development/roadmap/phases/phase-11.8_mir_cleanup/: Complete Core-13 path
- docs/development/roadmap/phases/phase-12/NYASH-ABI-DESIGN.md: Unified ABI spec
- Updated Phase 12 README with AOT/JIT explanation for script performance
- Added async_task_system/ design docs
Implementation progress:
- FutureBox spawn tracking with weak/strong reference management
- VM checkpoint integration before/after await
- LLVM backend async support preparation
- Verifier rules for await-checkpoint enforcement
- Result<T,E> normalization for timeout/cancellation
Technical insights:
- MIR as 'atomic instructions', Box as 'molecules' philosophy
- 'Everything is Box' enables full-stack with minimal instructions
- Unified BoxCall for array/plugin/async operations future consolidation
Next steps:
- Complete TaskGroupBox implementation
- Migrate from global to scoped task management
- Implement LIFO cleanup on scope exit
- Continue Core-13 instruction consolidation
🚀 'From 15 atoms to infinite programs: The Nyash Box Theory'
This commit is contained in:
@ -4,20 +4,140 @@ use once_cell::sync::OnceCell;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::{gc::GcHooks, scheduler::Scheduler};
|
||||
use super::scheduler::CancellationToken;
|
||||
|
||||
static GLOBAL_GC: OnceCell<RwLock<Option<Arc<dyn GcHooks>>>> = OnceCell::new();
|
||||
static GLOBAL_SCHED: OnceCell<RwLock<Option<Arc<dyn Scheduler>>>> = OnceCell::new();
|
||||
// Phase 2 scaffold: current task group's cancellation token (no-op default)
|
||||
static GLOBAL_CUR_TOKEN: OnceCell<RwLock<Option<CancellationToken>>> = OnceCell::new();
|
||||
// Phase 2 scaffold: current group's child futures registry (best-effort)
|
||||
static GLOBAL_GROUP_FUTURES: OnceCell<RwLock<Vec<crate::boxes::future::FutureWeak>>> = OnceCell::new();
|
||||
// Strong ownership list for implicit group (pre-TaskGroup actualization)
|
||||
static GLOBAL_GROUP_STRONG: OnceCell<RwLock<Vec<crate::boxes::future::FutureBox>>> = OnceCell::new();
|
||||
// Simple scope depth counter for implicit group (join-at-scope-exit footing)
|
||||
static TASK_SCOPE_DEPTH: OnceCell<RwLock<usize>> = OnceCell::new();
|
||||
// TaskGroup scope stack (explicit group ownership per function scope)
|
||||
static TASK_GROUP_STACK: OnceCell<RwLock<Vec<std::sync::Arc<crate::boxes::task_group_box::TaskGroupInner>>>> = OnceCell::new();
|
||||
|
||||
fn gc_cell() -> &'static RwLock<Option<Arc<dyn GcHooks>>> { GLOBAL_GC.get_or_init(|| RwLock::new(None)) }
|
||||
fn sched_cell() -> &'static RwLock<Option<Arc<dyn Scheduler>>> { GLOBAL_SCHED.get_or_init(|| RwLock::new(None)) }
|
||||
fn token_cell() -> &'static RwLock<Option<CancellationToken>> { GLOBAL_CUR_TOKEN.get_or_init(|| RwLock::new(None)) }
|
||||
fn futures_cell() -> &'static RwLock<Vec<crate::boxes::future::FutureWeak>> { GLOBAL_GROUP_FUTURES.get_or_init(|| RwLock::new(Vec::new())) }
|
||||
fn strong_cell() -> &'static RwLock<Vec<crate::boxes::future::FutureBox>> { GLOBAL_GROUP_STRONG.get_or_init(|| RwLock::new(Vec::new())) }
|
||||
fn scope_depth_cell() -> &'static RwLock<usize> { TASK_SCOPE_DEPTH.get_or_init(|| RwLock::new(0)) }
|
||||
fn group_stack_cell() -> &'static RwLock<Vec<std::sync::Arc<crate::boxes::task_group_box::TaskGroupInner>>> { TASK_GROUP_STACK.get_or_init(|| RwLock::new(Vec::new())) }
|
||||
|
||||
pub fn set_from_runtime(rt: &crate::runtime::nyash_runtime::NyashRuntime) {
|
||||
if let Ok(mut g) = gc_cell().write() { *g = Some(rt.gc.clone()); }
|
||||
if let Ok(mut s) = sched_cell().write() { *s = rt.scheduler.as_ref().cloned(); }
|
||||
// Optional: initialize a fresh token for the runtime's root group (Phase 2 wiring)
|
||||
if let Ok(mut t) = token_cell().write() { if t.is_none() { *t = Some(CancellationToken::new()); } }
|
||||
// Reset group futures registry on new runtime
|
||||
if let Ok(mut f) = futures_cell().write() { f.clear(); }
|
||||
if let Ok(mut s) = strong_cell().write() { s.clear(); }
|
||||
if let Ok(mut d) = scope_depth_cell().write() { *d = 0; }
|
||||
if let Ok(mut st) = group_stack_cell().write() { st.clear(); }
|
||||
}
|
||||
|
||||
pub fn set_gc(gc: Arc<dyn GcHooks>) { if let Ok(mut g) = gc_cell().write() { *g = Some(gc); } }
|
||||
pub fn set_scheduler(s: Arc<dyn Scheduler>) { if let Ok(mut w) = sched_cell().write() { *w = Some(s); } }
|
||||
/// Set the current task group's cancellation token (scaffold).
|
||||
pub fn set_current_group_token(tok: CancellationToken) { if let Ok(mut w) = token_cell().write() { *w = Some(tok); } }
|
||||
|
||||
/// Get the current task group's cancellation token (no-op default).
|
||||
pub fn current_group_token() -> CancellationToken {
|
||||
if let Ok(r) = token_cell().read() {
|
||||
if let Some(t) = r.as_ref() { return t.clone(); }
|
||||
}
|
||||
CancellationToken::new()
|
||||
}
|
||||
|
||||
/// Register a Future into the current group's registry (best-effort; clones share state)
|
||||
pub fn register_future_to_current_group(fut: &crate::boxes::future::FutureBox) {
|
||||
// Prefer explicit current TaskGroup at top of stack
|
||||
if let Ok(st) = group_stack_cell().read() {
|
||||
if let Some(inner) = st.last() {
|
||||
if let Ok(mut v) = inner.strong.lock() { v.push(fut.clone()); return; }
|
||||
}
|
||||
}
|
||||
// Fallback to implicit global group
|
||||
if let Ok(mut list) = futures_cell().write() { list.push(fut.downgrade()); }
|
||||
if let Ok(mut s) = strong_cell().write() { s.push(fut.clone()); }
|
||||
}
|
||||
|
||||
/// Join all currently registered futures with a coarse timeout guard.
|
||||
pub fn join_all_registered_futures(timeout_ms: u64) {
|
||||
use std::time::{Duration, Instant};
|
||||
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
|
||||
loop {
|
||||
let mut all_ready = true;
|
||||
// purge list of dropped or completed futures opportunistically
|
||||
{
|
||||
// purge weak list: keep only upgradeable futures
|
||||
if let Ok(mut list) = futures_cell().write() { list.retain(|fw| fw.is_ready().is_some()); }
|
||||
// purge strong list: remove completed futures to reduce retention
|
||||
if let Ok(mut s) = strong_cell().write() { s.retain(|f| !f.ready()); }
|
||||
}
|
||||
// check readiness
|
||||
{
|
||||
if let Ok(list) = futures_cell().read() {
|
||||
for fw in list.iter() {
|
||||
if let Some(ready) = fw.is_ready() {
|
||||
if !ready { all_ready = false; break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if all_ready { break; }
|
||||
if Instant::now() >= deadline { break; }
|
||||
safepoint_and_poll();
|
||||
std::thread::yield_now();
|
||||
}
|
||||
// Final sweep
|
||||
if let Ok(mut s) = strong_cell().write() { s.retain(|f| !f.ready()); }
|
||||
if let Ok(mut list) = futures_cell().write() { list.retain(|fw| matches!(fw.is_ready(), Some(false))); }
|
||||
}
|
||||
|
||||
/// Push a task scope (footing). On pop of the outermost scope, perform a best-effort join.
|
||||
pub fn push_task_scope() {
|
||||
if let Ok(mut d) = scope_depth_cell().write() { *d += 1; }
|
||||
// Push a new explicit TaskGroup for this scope
|
||||
if let Ok(mut st) = group_stack_cell().write() {
|
||||
st.push(std::sync::Arc::new(crate::boxes::task_group_box::TaskGroupInner { strong: std::sync::Mutex::new(Vec::new()) }));
|
||||
}
|
||||
}
|
||||
|
||||
/// Pop a task scope. When depth reaches 0, join outstanding futures.
|
||||
pub fn pop_task_scope() {
|
||||
let mut do_join = false;
|
||||
let mut popped: Option<std::sync::Arc<crate::boxes::task_group_box::TaskGroupInner>> = None;
|
||||
{
|
||||
if let Ok(mut d) = scope_depth_cell().write() {
|
||||
if *d > 0 { *d -= 1; }
|
||||
if *d == 0 { do_join = true; }
|
||||
}
|
||||
}
|
||||
// Pop explicit group for this scope
|
||||
if let Ok(mut st) = group_stack_cell().write() { popped = st.pop(); }
|
||||
if do_join {
|
||||
let ms: u64 = std::env::var("NYASH_TASK_SCOPE_JOIN_MS").ok().and_then(|s| s.parse().ok()).unwrap_or(1000);
|
||||
if let Some(inner) = popped {
|
||||
// Join this group's outstanding futures
|
||||
let deadline = std::time::Instant::now() + std::time::Duration::from_millis(ms);
|
||||
loop {
|
||||
let mut all_ready = true;
|
||||
if let Ok(mut list) = inner.strong.lock() { list.retain(|f| !f.ready()); if !list.is_empty() { all_ready = false; } }
|
||||
if all_ready { break; }
|
||||
if std::time::Instant::now() >= deadline { break; }
|
||||
safepoint_and_poll();
|
||||
std::thread::yield_now();
|
||||
}
|
||||
} else {
|
||||
// Fallback to implicit global group
|
||||
join_all_registered_futures(ms);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Perform a runtime safepoint and poll the scheduler if available.
|
||||
pub fn safepoint_and_poll() {
|
||||
@ -30,8 +150,27 @@ pub fn safepoint_and_poll() {
|
||||
}
|
||||
|
||||
/// Try to schedule a task on the global scheduler. Returns true if scheduled.
|
||||
pub fn spawn_task(_name: &str, f: Box<dyn FnOnce() + 'static>) -> bool {
|
||||
// Minimal inline execution to avoid Send bounds; upgrade to true scheduling later
|
||||
pub fn spawn_task(name: &str, f: Box<dyn FnOnce() + Send + 'static>) -> bool {
|
||||
// If a scheduler is registered, enqueue the task; otherwise run inline.
|
||||
if let Ok(s) = sched_cell().read() {
|
||||
if let Some(sched) = s.as_ref() {
|
||||
sched.spawn(name, f);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
// Fallback inline execution
|
||||
f();
|
||||
true
|
||||
false
|
||||
}
|
||||
|
||||
/// Spawn a task bound to a cancellation token when available (skeleton).
|
||||
pub fn spawn_task_with_token(name: &str, token: crate::runtime::scheduler::CancellationToken, f: Box<dyn FnOnce() + Send + 'static>) -> bool {
|
||||
if let Ok(s) = sched_cell().read() {
|
||||
if let Some(sched) = s.as_ref() {
|
||||
sched.spawn_with_token(name, token, f);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
f();
|
||||
false
|
||||
}
|
||||
|
||||
@ -118,6 +118,31 @@ impl PluginHost {
|
||||
method_name: &str,
|
||||
args: &[Box<dyn crate::box_trait::NyashBox>],
|
||||
) -> BidResult<Option<Box<dyn crate::box_trait::NyashBox>>> {
|
||||
// Special-case env.future.await to avoid holding loader RwLock while polling scheduler
|
||||
if iface_name == "env.future" && method_name == "await" {
|
||||
use crate::boxes::result::NyashResultBox;
|
||||
if let Some(arg0) = args.get(0) {
|
||||
if let Some(fut) = arg0.as_any().downcast_ref::<crate::boxes::future::FutureBox>() {
|
||||
let max_ms: u64 = std::env::var("NYASH_AWAIT_MAX_MS").ok().and_then(|s| s.parse().ok()).unwrap_or(5000);
|
||||
let start = std::time::Instant::now();
|
||||
let mut spins = 0usize;
|
||||
while !fut.ready() {
|
||||
crate::runtime::global_hooks::safepoint_and_poll();
|
||||
std::thread::yield_now();
|
||||
spins += 1;
|
||||
if spins % 1024 == 0 { std::thread::sleep(std::time::Duration::from_millis(1)); }
|
||||
if start.elapsed() >= std::time::Duration::from_millis(max_ms) {
|
||||
let err = crate::box_trait::StringBox::new("Timeout");
|
||||
return Ok(Some(Box::new(NyashResultBox::new_err(Box::new(err)))));
|
||||
}
|
||||
}
|
||||
return Ok(fut.wait_and_get().ok().map(|v| Box::new(NyashResultBox::new_ok(v)) as Box<dyn crate::box_trait::NyashBox>));
|
||||
} else {
|
||||
return Ok(Some(Box::new(NyashResultBox::new_ok(arg0.clone_box()))));
|
||||
}
|
||||
}
|
||||
return Ok(Some(Box::new(NyashResultBox::new_err(Box::new(crate::box_trait::StringBox::new("InvalidArgs"))))));
|
||||
}
|
||||
let l = self.loader.read().unwrap();
|
||||
l.extern_call(iface_name, method_name, args)
|
||||
}
|
||||
|
||||
@ -500,18 +500,35 @@ impl PluginLoaderV2 {
|
||||
Ok(None)
|
||||
}
|
||||
("env.future", "await") => {
|
||||
// await(future) -> value (pass-through if not a FutureBox)
|
||||
// await(future) -> Result.Ok(value) / Result.Err(Timeout|Error)
|
||||
use crate::boxes::result::NyashResultBox;
|
||||
if let Some(arg) = args.get(0) {
|
||||
if let Some(fut) = arg.as_any().downcast_ref::<crate::boxes::future::FutureBox>() {
|
||||
match fut.wait_and_get() { Ok(v) => return Ok(Some(v)), Err(e) => {
|
||||
eprintln!("[env.future.await] error: {}", e);
|
||||
return Ok(None);
|
||||
} }
|
||||
let max_ms: u64 = std::env::var("NYASH_AWAIT_MAX_MS").ok().and_then(|s| s.parse().ok()).unwrap_or(5000);
|
||||
let start = std::time::Instant::now();
|
||||
let mut spins = 0usize;
|
||||
while !fut.ready() {
|
||||
crate::runtime::global_hooks::safepoint_and_poll();
|
||||
std::thread::yield_now();
|
||||
spins += 1;
|
||||
if spins % 1024 == 0 { std::thread::sleep(std::time::Duration::from_millis(1)); }
|
||||
if start.elapsed() >= std::time::Duration::from_millis(max_ms) {
|
||||
let err = crate::box_trait::StringBox::new("Timeout");
|
||||
return Ok(Some(Box::new(NyashResultBox::new_err(Box::new(err)))));
|
||||
}
|
||||
}
|
||||
return match fut.wait_and_get() {
|
||||
Ok(v) => Ok(Some(Box::new(NyashResultBox::new_ok(v)))),
|
||||
Err(e) => {
|
||||
let err = crate::box_trait::StringBox::new(format!("Error: {}", e));
|
||||
Ok(Some(Box::new(NyashResultBox::new_err(Box::new(err)))))
|
||||
}
|
||||
};
|
||||
} else {
|
||||
return Ok(Some(arg.clone_box()));
|
||||
return Ok(Some(Box::new(NyashResultBox::new_ok(arg.clone_box()))));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
Ok(Some(Box::new(crate::boxes::result::NyashResultBox::new_err(Box::new(crate::box_trait::StringBox::new("InvalidArgs"))))))
|
||||
}
|
||||
("env.future", "spawn_instance") => {
|
||||
// spawn_instance(recv, method_name, args...) -> FutureBox
|
||||
@ -530,7 +547,9 @@ impl PluginLoaderV2 {
|
||||
let method_name_inline = method_name.clone();
|
||||
let tail_inline: Vec<Box<dyn NyashBox>> = tail.iter().map(|a| a.clone_box()).collect();
|
||||
let fut_setter = fut.clone();
|
||||
let scheduled = crate::runtime::global_hooks::spawn_task("spawn_instance", Box::new(move || {
|
||||
// Phase 2: attempt to bind to current task group's token (no-op if unset)
|
||||
let token = crate::runtime::global_hooks::current_group_token();
|
||||
let scheduled = crate::runtime::global_hooks::spawn_task_with_token("spawn_instance", token, Box::new(move || {
|
||||
let host = crate::runtime::get_global_plugin_host();
|
||||
let read_res = host.read();
|
||||
if let Ok(ro) = read_res {
|
||||
@ -551,11 +570,14 @@ impl PluginLoaderV2 {
|
||||
}
|
||||
}
|
||||
}
|
||||
// Register into current TaskGroup (if any) or implicit group (best-effort)
|
||||
crate::runtime::global_hooks::register_future_to_current_group(&fut);
|
||||
return Ok(Some(Box::new(fut)));
|
||||
}
|
||||
}
|
||||
// Fallback: resolved future of first arg
|
||||
if let Some(v) = args.get(0) { fut.set_result(v.clone_box()); }
|
||||
crate::runtime::global_hooks::register_future_to_current_group(&fut);
|
||||
Ok(Some(Box::new(fut)))
|
||||
}
|
||||
("env.canvas", _) => {
|
||||
|
||||
@ -11,6 +11,11 @@ pub trait Scheduler: Send + Sync {
|
||||
fn poll(&self) {}
|
||||
/// Cooperative yield point (no-op for single-thread).
|
||||
fn yield_now(&self) { }
|
||||
|
||||
/// Optional: spawn with a cancellation token. Default delegates to spawn.
|
||||
fn spawn_with_token(&self, name: &str, _token: CancellationToken, f: Box<dyn FnOnce() + Send + 'static>) {
|
||||
self.spawn(name, f)
|
||||
}
|
||||
}
|
||||
|
||||
use std::collections::VecDeque;
|
||||
@ -67,3 +72,15 @@ impl Scheduler for SingleThreadScheduler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
/// Simple idempotent cancellation token for structured concurrency (skeleton)
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CancellationToken(Arc<AtomicBool>);
|
||||
|
||||
impl CancellationToken {
|
||||
pub fn new() -> Self { Self(Arc::new(AtomicBool::new(false))) }
|
||||
pub fn cancel(&self) { self.0.store(true, Ordering::SeqCst); }
|
||||
pub fn is_cancelled(&self) -> bool { self.0.load(Ordering::SeqCst) }
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user