docs/ci: selfhost bootstrap/exe-first workflows; add ny-llvmc scaffolding + JSON v0 schema validation; plan: unify to Nyash ABI v2 (no backwards compat)
This commit is contained in:
@ -11,11 +11,13 @@ pub enum BarrierKind {
|
||||
|
||||
/// GC hooks that execution engines may call at key points.
|
||||
/// Implementations must be Send + Sync for multi-thread preparation.
|
||||
pub trait GcHooks: Send + Sync {
|
||||
pub trait GcHooks: Send + Sync + std::any::Any {
|
||||
/// Safe point for cooperative GC (e.g., poll or yield).
|
||||
fn safepoint(&self) {}
|
||||
/// Memory barrier hint for loads/stores.
|
||||
fn barrier(&self, _kind: BarrierKind) {}
|
||||
/// Allocation accounting (bytes are best-effort; may be 0 when unknown)
|
||||
fn alloc(&self, _bytes: u64) {}
|
||||
/// Optional counters snapshot for diagnostics. Default: None.
|
||||
fn snapshot_counters(&self) -> Option<(u64, u64, u64)> {
|
||||
None
|
||||
@ -27,48 +29,35 @@ pub struct NullGc;
|
||||
|
||||
impl GcHooks for NullGc {}
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
/// Simple counting GC (PoC): counts safepoints and barriers.
|
||||
/// Useful to validate hook frequency without affecting semantics.
|
||||
/// CountingGc is now a thin wrapper around the unified GcController.
|
||||
pub struct CountingGc {
|
||||
pub safepoints: AtomicU64,
|
||||
pub barrier_reads: AtomicU64,
|
||||
pub barrier_writes: AtomicU64,
|
||||
inner: crate::runtime::gc_controller::GcController,
|
||||
}
|
||||
|
||||
impl CountingGc {
|
||||
pub fn new() -> Self {
|
||||
// Default to rc+cycle mode for development metrics
|
||||
let mode = crate::runtime::gc_mode::GcMode::RcCycle;
|
||||
Self {
|
||||
safepoints: AtomicU64::new(0),
|
||||
barrier_reads: AtomicU64::new(0),
|
||||
barrier_writes: AtomicU64::new(0),
|
||||
inner: crate::runtime::gc_controller::GcController::new(mode),
|
||||
}
|
||||
}
|
||||
pub fn snapshot(&self) -> (u64, u64, u64) {
|
||||
(
|
||||
self.safepoints.load(Ordering::Relaxed),
|
||||
self.barrier_reads.load(Ordering::Relaxed),
|
||||
self.barrier_writes.load(Ordering::Relaxed),
|
||||
)
|
||||
self.inner.snapshot()
|
||||
}
|
||||
}
|
||||
|
||||
impl GcHooks for CountingGc {
|
||||
fn safepoint(&self) {
|
||||
self.safepoints.fetch_add(1, Ordering::Relaxed);
|
||||
self.inner.safepoint();
|
||||
}
|
||||
fn barrier(&self, kind: BarrierKind) {
|
||||
match kind {
|
||||
BarrierKind::Read => {
|
||||
self.barrier_reads.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
BarrierKind::Write => {
|
||||
self.barrier_writes.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
self.inner.barrier(kind);
|
||||
}
|
||||
fn alloc(&self, bytes: u64) {
|
||||
self.inner.alloc(bytes);
|
||||
}
|
||||
fn snapshot_counters(&self) -> Option<(u64, u64, u64)> {
|
||||
Some(self.snapshot())
|
||||
Some(self.inner.snapshot())
|
||||
}
|
||||
}
|
||||
|
||||
207
src/runtime/gc_controller.rs
Normal file
207
src/runtime/gc_controller.rs
Normal file
@ -0,0 +1,207 @@
|
||||
//! Unified GC controller (skeleton)
|
||||
//! Implements GcHooks and centralizes mode selection and metrics.
|
||||
|
||||
use std::sync::atomic::{AtomicU64, Ordering};
|
||||
|
||||
use super::gc::{BarrierKind, GcHooks};
|
||||
use super::gc_mode::GcMode;
|
||||
use crate::config::env;
|
||||
use crate::runtime::gc_trace;
|
||||
use std::collections::{HashSet, VecDeque};
|
||||
|
||||
pub struct GcController {
|
||||
mode: GcMode,
|
||||
safepoints: AtomicU64,
|
||||
barrier_reads: AtomicU64,
|
||||
barrier_writes: AtomicU64,
|
||||
alloc_bytes: AtomicU64,
|
||||
alloc_count: AtomicU64,
|
||||
sp_since_last: AtomicU64,
|
||||
bytes_since_last: AtomicU64,
|
||||
collect_sp_interval: Option<u64>,
|
||||
collect_alloc_bytes: Option<u64>,
|
||||
// Diagnostics: last trial reachability counters
|
||||
trial_nodes_last: AtomicU64,
|
||||
trial_edges_last: AtomicU64,
|
||||
// Diagnostics: collection counters and last duration/flags
|
||||
collect_count_total: AtomicU64,
|
||||
collect_by_sp: AtomicU64,
|
||||
collect_by_alloc: AtomicU64,
|
||||
trial_duration_last_ms: AtomicU64,
|
||||
trial_reason_last: AtomicU64, // bitflags: 1=sp, 2=alloc
|
||||
}
|
||||
|
||||
impl GcController {
|
||||
pub fn new(mode: GcMode) -> Self {
|
||||
Self {
|
||||
mode,
|
||||
safepoints: AtomicU64::new(0),
|
||||
barrier_reads: AtomicU64::new(0),
|
||||
barrier_writes: AtomicU64::new(0),
|
||||
alloc_bytes: AtomicU64::new(0),
|
||||
alloc_count: AtomicU64::new(0),
|
||||
sp_since_last: AtomicU64::new(0),
|
||||
bytes_since_last: AtomicU64::new(0),
|
||||
collect_sp_interval: env::gc_collect_sp_interval(),
|
||||
collect_alloc_bytes: env::gc_collect_alloc_bytes(),
|
||||
trial_nodes_last: AtomicU64::new(0),
|
||||
trial_edges_last: AtomicU64::new(0),
|
||||
collect_count_total: AtomicU64::new(0),
|
||||
collect_by_sp: AtomicU64::new(0),
|
||||
collect_by_alloc: AtomicU64::new(0),
|
||||
trial_duration_last_ms: AtomicU64::new(0),
|
||||
trial_reason_last: AtomicU64::new(0),
|
||||
}
|
||||
}
|
||||
pub fn mode(&self) -> GcMode {
|
||||
self.mode
|
||||
}
|
||||
pub fn snapshot(&self) -> (u64, u64, u64) {
|
||||
(
|
||||
self.safepoints.load(Ordering::Relaxed),
|
||||
self.barrier_reads.load(Ordering::Relaxed),
|
||||
self.barrier_writes.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl GcHooks for GcController {
|
||||
fn safepoint(&self) {
|
||||
// Off mode: minimal overhead but still callable
|
||||
if self.mode != GcMode::Off {
|
||||
self.safepoints.fetch_add(1, Ordering::Relaxed);
|
||||
let sp = self.sp_since_last.fetch_add(1, Ordering::Relaxed) + 1;
|
||||
// Opportunistic collection trigger
|
||||
let sp_hit = self
|
||||
.collect_sp_interval
|
||||
.map(|n| n > 0 && sp >= n)
|
||||
.unwrap_or(false);
|
||||
let bytes = self.bytes_since_last.load(Ordering::Relaxed);
|
||||
let bytes_hit = self
|
||||
.collect_alloc_bytes
|
||||
.map(|n| n > 0 && bytes >= n)
|
||||
.unwrap_or(false);
|
||||
if sp_hit || bytes_hit {
|
||||
// Record reason flags for diagnostics
|
||||
let mut flags: u64 = 0;
|
||||
if sp_hit { flags |= 1; self.collect_by_sp.fetch_add(1, Ordering::Relaxed); }
|
||||
if bytes_hit { flags |= 2; self.collect_by_alloc.fetch_add(1, Ordering::Relaxed); }
|
||||
self.trial_reason_last.store(flags, Ordering::Relaxed);
|
||||
self.run_trial_collection();
|
||||
}
|
||||
}
|
||||
// Future: per-mode collection/cooperation hooks
|
||||
}
|
||||
fn barrier(&self, kind: BarrierKind) {
|
||||
if self.mode == GcMode::Off {
|
||||
return;
|
||||
}
|
||||
match kind {
|
||||
BarrierKind::Read => {
|
||||
self.barrier_reads.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
BarrierKind::Write => {
|
||||
self.barrier_writes.fetch_add(1, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
}
|
||||
fn snapshot_counters(&self) -> Option<(u64, u64, u64)> {
|
||||
Some(self.snapshot())
|
||||
}
|
||||
fn alloc(&self, bytes: u64) {
|
||||
if self.mode == GcMode::Off {
|
||||
return;
|
||||
}
|
||||
self.alloc_count.fetch_add(1, Ordering::Relaxed);
|
||||
self.alloc_bytes.fetch_add(bytes, Ordering::Relaxed);
|
||||
self.bytes_since_last
|
||||
.fetch_add(bytes, Ordering::Relaxed);
|
||||
}
|
||||
}
|
||||
|
||||
impl GcController {
|
||||
pub fn alloc_totals(&self) -> (u64, u64) {
|
||||
(
|
||||
self.alloc_count.load(Ordering::Relaxed),
|
||||
self.alloc_bytes.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl GcController {
|
||||
fn run_trial_collection(&self) {
|
||||
// Reset windows
|
||||
self.sp_since_last.store(0, Ordering::Relaxed);
|
||||
self.bytes_since_last.store(0, Ordering::Relaxed);
|
||||
// PoC: no object graph; report current handles as leak candidates and return.
|
||||
if self.mode == GcMode::Off {
|
||||
return;
|
||||
}
|
||||
// Only run for rc/rc+cycle/stw; rc+cycle is default.
|
||||
match self.mode {
|
||||
GcMode::Rc | GcMode::RcCycle | GcMode::STW => {
|
||||
let started = std::time::Instant::now();
|
||||
// Roots: JIT/AOT handle registry snapshot
|
||||
let roots = crate::jit::rt::handles::snapshot_arcs();
|
||||
let mut visited: HashSet<u64> = HashSet::new();
|
||||
let mut q: VecDeque<std::sync::Arc<dyn crate::box_trait::NyashBox>> =
|
||||
VecDeque::new();
|
||||
for r in roots.into_iter() {
|
||||
let id = r.box_id();
|
||||
if visited.insert(id) {
|
||||
q.push_back(r);
|
||||
}
|
||||
}
|
||||
let mut nodes: u64 = visited.len() as u64;
|
||||
let mut edges: u64 = 0;
|
||||
while let Some(cur) = q.pop_front() {
|
||||
gc_trace::trace_children(&*cur, &mut |child| {
|
||||
edges += 1;
|
||||
let id = child.box_id();
|
||||
if visited.insert(id) {
|
||||
nodes += 1;
|
||||
q.push_back(child);
|
||||
}
|
||||
});
|
||||
}
|
||||
// Store last diagnostics (available for JSON metrics)
|
||||
self.trial_nodes_last.store(nodes, Ordering::Relaxed);
|
||||
self.trial_edges_last.store(edges, Ordering::Relaxed);
|
||||
if (nodes + edges) > 0 && crate::config::env::gc_metrics() {
|
||||
eprintln!(
|
||||
"[GC] trial: reachable nodes={} edges={} (roots=jit_handles)",
|
||||
nodes, edges
|
||||
);
|
||||
}
|
||||
// Update counters
|
||||
let dur = started.elapsed();
|
||||
let ms = dur.as_millis() as u64;
|
||||
self.trial_duration_last_ms.store(ms, Ordering::Relaxed);
|
||||
self.collect_count_total.fetch_add(1, Ordering::Relaxed);
|
||||
// Reason flags derive from current env thresholds vs last windows reaching triggers
|
||||
// Note: we set flags in safepoint() where triggers were decided.
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl GcController {
|
||||
pub fn trial_reachability_last(&self) -> (u64, u64) {
|
||||
(
|
||||
self.trial_nodes_last.load(Ordering::Relaxed),
|
||||
self.trial_edges_last.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
pub fn collection_totals(&self) -> (u64, u64, u64) {
|
||||
(
|
||||
self.collect_count_total.load(Ordering::Relaxed),
|
||||
self.collect_by_sp.load(Ordering::Relaxed),
|
||||
self.collect_by_alloc.load(Ordering::Relaxed),
|
||||
)
|
||||
}
|
||||
pub fn trial_duration_last_ms(&self) -> u64 {
|
||||
self.trial_duration_last_ms.load(Ordering::Relaxed)
|
||||
}
|
||||
pub fn trial_reason_last_bits(&self) -> u64 { self.trial_reason_last.load(Ordering::Relaxed) }
|
||||
}
|
||||
34
src/runtime/gc_mode.rs
Normal file
34
src/runtime/gc_mode.rs
Normal file
@ -0,0 +1,34 @@
|
||||
//! GC mode selection (user-facing)
|
||||
use crate::config::env;
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum GcMode {
|
||||
RcCycle,
|
||||
Minorgen,
|
||||
STW,
|
||||
Rc,
|
||||
Off,
|
||||
}
|
||||
|
||||
impl GcMode {
|
||||
pub fn from_env() -> Self {
|
||||
match env::gc_mode().to_ascii_lowercase().as_str() {
|
||||
"auto" | "rc+cycle" => GcMode::RcCycle,
|
||||
"minorgen" => GcMode::Minorgen,
|
||||
"stw" => GcMode::STW,
|
||||
"rc" => GcMode::Rc,
|
||||
"off" => GcMode::Off,
|
||||
_ => GcMode::RcCycle,
|
||||
}
|
||||
}
|
||||
pub fn as_str(&self) -> &'static str {
|
||||
match self {
|
||||
GcMode::RcCycle => "rc+cycle",
|
||||
GcMode::Minorgen => "minorgen",
|
||||
GcMode::STW => "stw",
|
||||
GcMode::Rc => "rc",
|
||||
GcMode::Off => "off",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
35
src/runtime/gc_trace.rs
Normal file
35
src/runtime/gc_trace.rs
Normal file
@ -0,0 +1,35 @@
|
||||
//! Minimal GC tracing helpers (skeleton)
|
||||
//!
|
||||
//! Downcast-based child edge enumeration for builtin containers.
|
||||
//! This is a non-invasive helper to support diagnostics and future collectors.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::box_trait::NyashBox;
|
||||
|
||||
/// Visit child boxes of a given object and invoke `visit(child)` for each.
|
||||
/// This function recognizes builtin containers (ArrayBox/MapBox) and is a no-op otherwise.
|
||||
pub fn trace_children(obj: &dyn NyashBox, visit: &mut dyn FnMut(Arc<dyn NyashBox>)) {
|
||||
// ArrayBox
|
||||
if let Some(arr) = obj.as_any().downcast_ref::<crate::boxes::array::ArrayBox>() {
|
||||
if let Ok(items) = arr.items.read() {
|
||||
for it in items.iter() {
|
||||
// Convert Box<dyn NyashBox> to Arc<dyn NyashBox>
|
||||
let arc: Arc<dyn NyashBox> = Arc::from(it.clone_box());
|
||||
visit(arc);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
// MapBox
|
||||
if let Some(map) = obj.as_any().downcast_ref::<crate::boxes::map_box::MapBox>() {
|
||||
if let Ok(data) = map.get_data().read() {
|
||||
for (_k, v) in data.iter() {
|
||||
let arc: Arc<dyn NyashBox> = Arc::from(v.clone_box());
|
||||
visit(arc);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,97 +4,74 @@ use once_cell::sync::OnceCell;
|
||||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use super::scheduler::CancellationToken;
|
||||
use super::{gc::GcHooks, scheduler::Scheduler};
|
||||
use super::{gc::BarrierKind, gc::GcHooks, scheduler::Scheduler};
|
||||
|
||||
static GLOBAL_GC: OnceCell<RwLock<Option<Arc<dyn GcHooks>>>> = OnceCell::new();
|
||||
static GLOBAL_SCHED: OnceCell<RwLock<Option<Arc<dyn Scheduler>>>> = OnceCell::new();
|
||||
// Phase 2 scaffold: current task group's cancellation token (no-op default)
|
||||
static GLOBAL_CUR_TOKEN: OnceCell<RwLock<Option<CancellationToken>>> = OnceCell::new();
|
||||
// Phase 2 scaffold: current group's child futures registry (best-effort)
|
||||
static GLOBAL_GROUP_FUTURES: OnceCell<RwLock<Vec<crate::boxes::future::FutureWeak>>> =
|
||||
OnceCell::new();
|
||||
// Strong ownership list for implicit group (pre-TaskGroup actualization)
|
||||
static GLOBAL_GROUP_STRONG: OnceCell<RwLock<Vec<crate::boxes::future::FutureBox>>> =
|
||||
OnceCell::new();
|
||||
// Simple scope depth counter for implicit group (join-at-scope-exit footing)
|
||||
static TASK_SCOPE_DEPTH: OnceCell<RwLock<usize>> = OnceCell::new();
|
||||
// TaskGroup scope stack (explicit group ownership per function scope)
|
||||
static TASK_GROUP_STACK: OnceCell<
|
||||
RwLock<Vec<std::sync::Arc<crate::boxes::task_group_box::TaskGroupInner>>>,
|
||||
> = OnceCell::new();
|
||||
// Unified global runtime hooks state (single lock for consistency)
|
||||
struct GlobalHooksState {
|
||||
gc: Option<Arc<dyn GcHooks>>,
|
||||
sched: Option<Arc<dyn Scheduler>>,
|
||||
cur_token: Option<CancellationToken>,
|
||||
futures: Vec<crate::boxes::future::FutureWeak>,
|
||||
strong: Vec<crate::boxes::future::FutureBox>,
|
||||
scope_depth: usize,
|
||||
group_stack: Vec<std::sync::Arc<crate::boxes::task_group_box::TaskGroupInner>>,
|
||||
}
|
||||
|
||||
fn gc_cell() -> &'static RwLock<Option<Arc<dyn GcHooks>>> {
|
||||
GLOBAL_GC.get_or_init(|| RwLock::new(None))
|
||||
impl GlobalHooksState {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
gc: None,
|
||||
sched: None,
|
||||
cur_token: None,
|
||||
futures: Vec::new(),
|
||||
strong: Vec::new(),
|
||||
scope_depth: 0,
|
||||
group_stack: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
fn sched_cell() -> &'static RwLock<Option<Arc<dyn Scheduler>>> {
|
||||
GLOBAL_SCHED.get_or_init(|| RwLock::new(None))
|
||||
}
|
||||
fn token_cell() -> &'static RwLock<Option<CancellationToken>> {
|
||||
GLOBAL_CUR_TOKEN.get_or_init(|| RwLock::new(None))
|
||||
}
|
||||
fn futures_cell() -> &'static RwLock<Vec<crate::boxes::future::FutureWeak>> {
|
||||
GLOBAL_GROUP_FUTURES.get_or_init(|| RwLock::new(Vec::new()))
|
||||
}
|
||||
fn strong_cell() -> &'static RwLock<Vec<crate::boxes::future::FutureBox>> {
|
||||
GLOBAL_GROUP_STRONG.get_or_init(|| RwLock::new(Vec::new()))
|
||||
}
|
||||
fn scope_depth_cell() -> &'static RwLock<usize> {
|
||||
TASK_SCOPE_DEPTH.get_or_init(|| RwLock::new(0))
|
||||
}
|
||||
fn group_stack_cell(
|
||||
) -> &'static RwLock<Vec<std::sync::Arc<crate::boxes::task_group_box::TaskGroupInner>>> {
|
||||
TASK_GROUP_STACK.get_or_init(|| RwLock::new(Vec::new()))
|
||||
|
||||
static GLOBAL_STATE: OnceCell<RwLock<GlobalHooksState>> = OnceCell::new();
|
||||
|
||||
fn state() -> &'static RwLock<GlobalHooksState> {
|
||||
GLOBAL_STATE.get_or_init(|| RwLock::new(GlobalHooksState::new()))
|
||||
}
|
||||
|
||||
pub fn set_from_runtime(rt: &crate::runtime::nyash_runtime::NyashRuntime) {
|
||||
if let Ok(mut g) = gc_cell().write() {
|
||||
*g = Some(rt.gc.clone());
|
||||
}
|
||||
if let Ok(mut s) = sched_cell().write() {
|
||||
*s = rt.scheduler.as_ref().cloned();
|
||||
}
|
||||
// Optional: initialize a fresh token for the runtime's root group (Phase 2 wiring)
|
||||
if let Ok(mut t) = token_cell().write() {
|
||||
if t.is_none() {
|
||||
*t = Some(CancellationToken::new());
|
||||
if let Ok(mut st) = state().write() {
|
||||
st.gc = Some(rt.gc.clone());
|
||||
st.sched = rt.scheduler.as_ref().cloned();
|
||||
if st.cur_token.is_none() {
|
||||
st.cur_token = Some(CancellationToken::new());
|
||||
}
|
||||
}
|
||||
// Reset group futures registry on new runtime
|
||||
if let Ok(mut f) = futures_cell().write() {
|
||||
f.clear();
|
||||
}
|
||||
if let Ok(mut s) = strong_cell().write() {
|
||||
s.clear();
|
||||
}
|
||||
if let Ok(mut d) = scope_depth_cell().write() {
|
||||
*d = 0;
|
||||
}
|
||||
if let Ok(mut st) = group_stack_cell().write() {
|
||||
st.clear();
|
||||
st.futures.clear();
|
||||
st.strong.clear();
|
||||
st.scope_depth = 0;
|
||||
st.group_stack.clear();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn set_gc(gc: Arc<dyn GcHooks>) {
|
||||
if let Ok(mut g) = gc_cell().write() {
|
||||
*g = Some(gc);
|
||||
if let Ok(mut st) = state().write() {
|
||||
st.gc = Some(gc);
|
||||
}
|
||||
}
|
||||
pub fn set_scheduler(s: Arc<dyn Scheduler>) {
|
||||
if let Ok(mut w) = sched_cell().write() {
|
||||
*w = Some(s);
|
||||
if let Ok(mut st) = state().write() {
|
||||
st.sched = Some(s);
|
||||
}
|
||||
}
|
||||
/// Set the current task group's cancellation token (scaffold).
|
||||
pub fn set_current_group_token(tok: CancellationToken) {
|
||||
if let Ok(mut w) = token_cell().write() {
|
||||
*w = Some(tok);
|
||||
if let Ok(mut st) = state().write() {
|
||||
st.cur_token = Some(tok);
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the current task group's cancellation token (no-op default).
|
||||
pub fn current_group_token() -> CancellationToken {
|
||||
if let Ok(r) = token_cell().read() {
|
||||
if let Some(t) = r.as_ref() {
|
||||
if let Ok(st) = state().read() {
|
||||
if let Some(t) = st.cur_token.as_ref() {
|
||||
return t.clone();
|
||||
}
|
||||
}
|
||||
@ -103,21 +80,17 @@ pub fn current_group_token() -> CancellationToken {
|
||||
|
||||
/// Register a Future into the current group's registry (best-effort; clones share state)
|
||||
pub fn register_future_to_current_group(fut: &crate::boxes::future::FutureBox) {
|
||||
// Prefer explicit current TaskGroup at top of stack
|
||||
if let Ok(st) = group_stack_cell().read() {
|
||||
if let Some(inner) = st.last() {
|
||||
if let Ok(mut st) = state().write() {
|
||||
// Prefer explicit current TaskGroup at top of stack
|
||||
if let Some(inner) = st.group_stack.last() {
|
||||
if let Ok(mut v) = inner.strong.lock() {
|
||||
v.push(fut.clone());
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Fallback to implicit global group
|
||||
if let Ok(mut list) = futures_cell().write() {
|
||||
list.push(fut.downgrade());
|
||||
}
|
||||
if let Ok(mut s) = strong_cell().write() {
|
||||
s.push(fut.clone());
|
||||
// Fallback to implicit global group
|
||||
st.futures.push(fut.downgrade());
|
||||
st.strong.push(fut.clone());
|
||||
}
|
||||
}
|
||||
|
||||
@ -127,26 +100,15 @@ pub fn join_all_registered_futures(timeout_ms: u64) {
|
||||
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
|
||||
loop {
|
||||
let mut all_ready = true;
|
||||
// purge list of dropped or completed futures opportunistically
|
||||
{
|
||||
// purge weak list: keep only upgradeable futures
|
||||
if let Ok(mut list) = futures_cell().write() {
|
||||
list.retain(|fw| fw.is_ready().is_some());
|
||||
}
|
||||
// purge strong list: remove completed futures to reduce retention
|
||||
if let Ok(mut s) = strong_cell().write() {
|
||||
s.retain(|f| !f.ready());
|
||||
}
|
||||
}
|
||||
// check readiness
|
||||
{
|
||||
if let Ok(list) = futures_cell().read() {
|
||||
for fw in list.iter() {
|
||||
if let Some(ready) = fw.is_ready() {
|
||||
if !ready {
|
||||
all_ready = false;
|
||||
break;
|
||||
}
|
||||
// purge + readiness check under single state lock (short critical sections)
|
||||
if let Ok(mut st) = state().write() {
|
||||
st.futures.retain(|fw| fw.is_ready().is_some());
|
||||
st.strong.retain(|f| !f.ready());
|
||||
for fw in st.futures.iter() {
|
||||
if let Some(ready) = fw.is_ready() {
|
||||
if !ready {
|
||||
all_ready = false;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -161,22 +123,18 @@ pub fn join_all_registered_futures(timeout_ms: u64) {
|
||||
std::thread::yield_now();
|
||||
}
|
||||
// Final sweep
|
||||
if let Ok(mut s) = strong_cell().write() {
|
||||
s.retain(|f| !f.ready());
|
||||
}
|
||||
if let Ok(mut list) = futures_cell().write() {
|
||||
list.retain(|fw| matches!(fw.is_ready(), Some(false)));
|
||||
if let Ok(mut st) = state().write() {
|
||||
st.strong.retain(|f| !f.ready());
|
||||
st.futures.retain(|fw| matches!(fw.is_ready(), Some(false)));
|
||||
}
|
||||
}
|
||||
|
||||
/// Push a task scope (footing). On pop of the outermost scope, perform a best-effort join.
|
||||
pub fn push_task_scope() {
|
||||
if let Ok(mut d) = scope_depth_cell().write() {
|
||||
*d += 1;
|
||||
}
|
||||
// Push a new explicit TaskGroup for this scope
|
||||
if let Ok(mut st) = group_stack_cell().write() {
|
||||
st.push(std::sync::Arc::new(
|
||||
if let Ok(mut st) = state().write() {
|
||||
st.scope_depth += 1;
|
||||
// Push a new explicit TaskGroup for this scope
|
||||
st.group_stack.push(std::sync::Arc::new(
|
||||
crate::boxes::task_group_box::TaskGroupInner {
|
||||
strong: std::sync::Mutex::new(Vec::new()),
|
||||
},
|
||||
@ -190,19 +148,13 @@ pub fn push_task_scope() {
|
||||
pub fn pop_task_scope() {
|
||||
let mut do_join = false;
|
||||
let mut popped: Option<std::sync::Arc<crate::boxes::task_group_box::TaskGroupInner>> = None;
|
||||
{
|
||||
if let Ok(mut d) = scope_depth_cell().write() {
|
||||
if *d > 0 {
|
||||
*d -= 1;
|
||||
}
|
||||
if *d == 0 {
|
||||
do_join = true;
|
||||
}
|
||||
if let Ok(mut st) = state().write() {
|
||||
if st.scope_depth > 0 {
|
||||
st.scope_depth -= 1;
|
||||
}
|
||||
}
|
||||
// Pop explicit group for this scope
|
||||
if let Ok(mut st) = group_stack_cell().write() {
|
||||
popped = st.pop();
|
||||
if st.scope_depth == 0 { do_join = true; }
|
||||
// Pop explicit group for this scope
|
||||
popped = st.group_stack.pop();
|
||||
}
|
||||
if do_join {
|
||||
let ms: u64 = std::env::var("NYASH_TASK_SCOPE_JOIN_MS")
|
||||
@ -240,13 +192,11 @@ pub fn pop_task_scope() {
|
||||
|
||||
/// Perform a runtime safepoint and poll the scheduler if available.
|
||||
pub fn safepoint_and_poll() {
|
||||
if let Ok(g) = gc_cell().read() {
|
||||
if let Some(gc) = g.as_ref() {
|
||||
if let Ok(st) = state().read() {
|
||||
if let Some(gc) = st.gc.as_ref() {
|
||||
gc.safepoint();
|
||||
}
|
||||
}
|
||||
if let Ok(s) = sched_cell().read() {
|
||||
if let Some(sched) = s.as_ref() {
|
||||
if let Some(sched) = st.sched.as_ref() {
|
||||
sched.poll();
|
||||
}
|
||||
}
|
||||
@ -255,8 +205,8 @@ pub fn safepoint_and_poll() {
|
||||
/// Try to schedule a task on the global scheduler. Returns true if scheduled.
|
||||
pub fn spawn_task(name: &str, f: Box<dyn FnOnce() + Send + 'static>) -> bool {
|
||||
// If a scheduler is registered, enqueue the task; otherwise run inline.
|
||||
if let Ok(s) = sched_cell().read() {
|
||||
if let Some(sched) = s.as_ref() {
|
||||
if let Ok(st) = state().read() {
|
||||
if let Some(sched) = st.sched.as_ref() {
|
||||
sched.spawn(name, f);
|
||||
return true;
|
||||
}
|
||||
@ -272,8 +222,8 @@ pub fn spawn_task_with_token(
|
||||
token: crate::runtime::scheduler::CancellationToken,
|
||||
f: Box<dyn FnOnce() + Send + 'static>,
|
||||
) -> bool {
|
||||
if let Ok(s) = sched_cell().read() {
|
||||
if let Some(sched) = s.as_ref() {
|
||||
if let Ok(st) = state().read() {
|
||||
if let Some(sched) = st.sched.as_ref() {
|
||||
sched.spawn_with_token(name, token, f);
|
||||
return true;
|
||||
}
|
||||
@ -284,8 +234,8 @@ pub fn spawn_task_with_token(
|
||||
|
||||
/// Spawn a delayed task via scheduler if available; returns true if scheduled.
|
||||
pub fn spawn_task_after(delay_ms: u64, name: &str, f: Box<dyn FnOnce() + Send + 'static>) -> bool {
|
||||
if let Ok(s) = sched_cell().read() {
|
||||
if let Some(sched) = s.as_ref() {
|
||||
if let Ok(st) = state().read() {
|
||||
if let Some(sched) = st.sched.as_ref() {
|
||||
sched.spawn_after(delay_ms, name, f);
|
||||
return true;
|
||||
}
|
||||
@ -297,3 +247,21 @@ pub fn spawn_task_after(delay_ms: u64, name: &str, f: Box<dyn FnOnce() + Send +
|
||||
});
|
||||
false
|
||||
}
|
||||
|
||||
/// Forward a GC barrier event to the currently registered GC hooks (if any).
|
||||
pub fn gc_barrier(kind: BarrierKind) {
|
||||
if let Ok(st) = state().read() {
|
||||
if let Some(gc) = st.gc.as_ref() {
|
||||
gc.barrier(kind);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Report an allocation to the current GC hooks (best-effort)
|
||||
pub fn gc_alloc(bytes: u64) {
|
||||
if let Ok(st) = state().read() {
|
||||
if let Some(gc) = st.gc.as_ref() {
|
||||
gc.alloc(bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -4,6 +4,9 @@
|
||||
|
||||
pub mod box_registry;
|
||||
pub mod gc;
|
||||
pub mod gc_controller;
|
||||
pub mod gc_mode;
|
||||
pub mod gc_trace;
|
||||
pub mod global_hooks;
|
||||
pub mod leak_tracker;
|
||||
pub mod nyash_runtime;
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
use super::types::{LoadedPluginV2, PluginBoxMetadata, PluginBoxV2, PluginHandleInner};
|
||||
use super::types::{LoadedPluginV2, NyashTypeBoxFfi, PluginBoxMetadata, PluginBoxV2, PluginHandleInner};
|
||||
use crate::bid::{BidError, BidResult};
|
||||
use crate::box_trait::NyashBox;
|
||||
use crate::config::nyash_toml_v2::{LibraryDefinition, NyashConfigV2};
|
||||
@ -12,11 +12,15 @@ fn dbg_on() -> bool {
|
||||
std::env::var("NYASH_DEBUG_PLUGIN").unwrap_or_default() == "1"
|
||||
}
|
||||
|
||||
type BoxInvokeFn = extern "C" fn(u32, u32, *const u8, usize, *mut u8, *mut usize) -> i32;
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct LoadedBoxSpec {
|
||||
type_id: Option<u32>,
|
||||
methods: HashMap<String, MethodSpec>,
|
||||
fini_method_id: Option<u32>,
|
||||
// Optional Nyash ABI v2 per-box invoke entry (not yet used for calls)
|
||||
invoke_id: Option<BoxInvokeFn>,
|
||||
}
|
||||
#[derive(Debug, Clone, Copy)]
|
||||
struct MethodSpec {
|
||||
@ -124,7 +128,7 @@ impl PluginLoaderV2 {
|
||||
let lib = unsafe { Library::new(&lib_path) }.map_err(|_| BidError::PluginError)?;
|
||||
let lib_arc = Arc::new(lib);
|
||||
|
||||
// Resolve required invoke symbol (TypeBox v2: nyash_plugin_invoke)
|
||||
// Resolve required invoke symbol (legacy library-level): nyash_plugin_invoke
|
||||
unsafe {
|
||||
let invoke_sym: Symbol<
|
||||
unsafe extern "C" fn(u32, u32, u32, *const u8, usize, *mut u8, *mut usize) -> i32,
|
||||
@ -152,6 +156,35 @@ impl PluginLoaderV2 {
|
||||
.insert(lib_name.to_string(), Arc::new(loaded));
|
||||
}
|
||||
|
||||
// Try to resolve Nyash ABI v2 per-box TypeBox symbols and record invoke_id
|
||||
// Symbol pattern: nyash_typebox_<BoxType>
|
||||
for box_type in &lib_def.boxes {
|
||||
let sym_name = format!("nyash_typebox_{}\0", box_type);
|
||||
unsafe {
|
||||
if let Ok(tb_sym) = lib_arc.get::<Symbol<&NyashTypeBoxFfi>>(sym_name.as_bytes()) {
|
||||
let st: &NyashTypeBoxFfi = &*tb_sym;
|
||||
// Validate ABI tag 'TYBX' (0x54594258) and basic invariants
|
||||
let abi_ok = st.abi_tag == 0x5459_4258
|
||||
&& st.struct_size as usize >= std::mem::size_of::<NyashTypeBoxFfi>();
|
||||
if !abi_ok {
|
||||
continue;
|
||||
}
|
||||
// Remember invoke_id in box_specs for (lib_name, box_type)
|
||||
if let Some(invoke_id) = st.invoke_id {
|
||||
let key = (lib_name.to_string(), box_type.to_string());
|
||||
let mut map = self.box_specs.write().map_err(|_| BidError::PluginError)?;
|
||||
let entry = map.entry(key).or_insert(LoadedBoxSpec {
|
||||
type_id: None,
|
||||
methods: HashMap::new(),
|
||||
fini_method_id: None,
|
||||
invoke_id: None,
|
||||
});
|
||||
entry.invoke_id = Some(invoke_id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user