diff --git a/nyash.toml b/nyash.toml index 82515b76..bf7c8a6a 100644 --- a/nyash.toml +++ b/nyash.toml @@ -57,12 +57,11 @@ search_paths = [ "~/.nyash/plugins" ] [libraries."libnyash_net_plugin.so"] -boxes = ["HttpServerBox", "HttpRequestBox", "HttpResponseBox", "HttpClientBox"] +boxes = ["HttpServerBox", "HttpRequestBox", "HttpResponseBox", "HttpClientBox", "SocketServerBox", "SocketClientBox", "SocketConnBox"] path = "./plugins/nyash-net-plugin/target/release/libnyash_net_plugin.so" [libraries."libnyash_net_plugin.so".HttpServerBox] type_id = 20 -singleton = true [libraries."libnyash_net_plugin.so".HttpServerBox.methods] birth = { method_id = 0 } @@ -102,3 +101,33 @@ birth = { method_id = 0 } get = { method_id = 1 } post = { method_id = 2 } fini = { method_id = 4294967295 } + +[libraries."libnyash_net_plugin.so".SocketServerBox] +type_id = 30 + +[libraries."libnyash_net_plugin.so".SocketServerBox.methods] +birth = { method_id = 0 } +start = { method_id = 1, args = ["port"] } +stop = { method_id = 2 } +accept = { method_id = 3 } +acceptTimeout = { method_id = 4, args = ["timeout_ms"] } +fini = { method_id = 4294967295 } + +[libraries."libnyash_net_plugin.so".SocketClientBox] +type_id = 32 + +[libraries."libnyash_net_plugin.so".SocketClientBox.methods] +birth = { method_id = 0 } +connect = { method_id = 1, args = ["host", "port"] } +fini = { method_id = 4294967295 } + +[libraries."libnyash_net_plugin.so".SocketConnBox] +type_id = 31 + +[libraries."libnyash_net_plugin.so".SocketConnBox.methods] +birth = { method_id = 0 } +send = { method_id = 1 } +recv = { method_id = 2 } +close = { method_id = 3 } +recvTimeout = { method_id = 4, args = ["timeout_ms"] } +fini = { method_id = 4294967295 } diff --git a/plugins/nyash-net-plugin/src/lib.rs b/plugins/nyash-net-plugin/src/lib.rs index c1653fee..bf4778b6 100644 --- a/plugins/nyash-net-plugin/src/lib.rs +++ b/plugins/nyash-net-plugin/src/lib.rs @@ -4,7 +4,10 @@ use once_cell::sync::Lazy; use std::collections::{HashMap, VecDeque}; -use std::sync::{Mutex, atomic::{AtomicU32, Ordering}}; +use std::sync::{Mutex, Arc, atomic::{AtomicBool, AtomicU32, Ordering}}; +use std::net::{TcpListener, TcpStream}; +use std::io::{Read, Write}; +use std::time::Duration; // Error codes const OK: i32 = 0; @@ -20,6 +23,10 @@ const T_SERVER: u32 = 20; const T_REQUEST: u32 = 21; const T_RESPONSE: u32 = 22; const T_CLIENT: u32 = 23; +// Socket +const T_SOCK_SERVER: u32 = 30; +const T_SOCK_CONN: u32 = 31; +const T_SOCK_CLIENT: u32 = 32; // Methods const M_BIRTH: u32 = 0; @@ -46,8 +53,27 @@ const M_RESP_GET_HEADER: u32 = 6; // arg: name -> string (or empty) const M_CLIENT_GET: u32 = 1; // arg: url -> Handle(Response) const M_CLIENT_POST: u32 = 2; // args: url, body(bytes/string) -> Handle(Response) +// Socket Server +const M_SRV_BIRTH: u32 = 0; +const M_SRV_START: u32 = 1; // port +const M_SRV_STOP: u32 = 2; +const M_SRV_ACCEPT: u32 = 3; // -> Handle(T_SOCK_CONN) +const M_SRV_ACCEPT_TIMEOUT: u32 = 4; // ms -> Handle(T_SOCK_CONN) or void + +// Socket Client +const M_SC_BIRTH: u32 = 0; +const M_SC_CONNECT: u32 = 1; // host, port -> Handle(T_SOCK_CONN) + +// Socket Conn +const M_CONN_BIRTH: u32 = 0; +const M_CONN_SEND: u32 = 1; // bytes/string -> void +const M_CONN_RECV: u32 = 2; // -> bytes +const M_CONN_CLOSE: u32 = 3; // -> void +const M_CONN_RECV_TIMEOUT: u32 = 4; // ms -> bytes (empty if timeout) + // Global State static SERVER_INSTANCES: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +static SERVER_START_SEQ: AtomicU32 = AtomicU32::new(1); static ACTIVE_SERVER_ID: Lazy>> = Lazy::new(|| Mutex::new(None)); static REQUESTS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); static RESPONSES: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); @@ -57,11 +83,15 @@ static SERVER_ID: AtomicU32 = AtomicU32::new(1); static REQUEST_ID: AtomicU32 = AtomicU32::new(1); static RESPONSE_ID: AtomicU32 = AtomicU32::new(1); static CLIENT_ID: AtomicU32 = AtomicU32::new(1); +static SOCK_SERVER_ID: AtomicU32 = AtomicU32::new(1); +static SOCK_CONN_ID: AtomicU32 = AtomicU32::new(1); +static SOCK_CLIENT_ID: AtomicU32 = AtomicU32::new(1); struct ServerState { running: bool, port: i32, pending: VecDeque, // queue of request ids + start_seq: u32, } struct RequestState { @@ -78,6 +108,19 @@ struct ResponseState { struct ClientState; +// Socket types +struct SockServerState { + running: Arc, + pending: Arc>>, + handle: Mutex>>, +} + +struct SockConnState { + stream: Mutex, +} + +struct SockClientState; + #[no_mangle] pub extern "C" fn nyash_plugin_abi() -> u32 { 1 } @@ -100,6 +143,9 @@ pub extern "C" fn nyash_plugin_invoke( T_REQUEST => request_invoke(method_id, instance_id, args, args_len, result, result_len), T_RESPONSE => response_invoke(method_id, instance_id, args, args_len, result, result_len), T_CLIENT => client_invoke(method_id, instance_id, args, args_len, result, result_len), + T_SOCK_SERVER => sock_server_invoke(method_id, instance_id, args, args_len, result, result_len), + T_SOCK_CLIENT => sock_client_invoke(method_id, instance_id, args, args_len, result, result_len), + T_SOCK_CONN => sock_conn_invoke(method_id, instance_id, args, args_len, result, result_len), _ => E_INV_TYPE, } } @@ -109,14 +155,14 @@ unsafe fn server_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: match m { M_BIRTH => { let id = SERVER_ID.fetch_add(1, Ordering::Relaxed); - SERVER_INSTANCES.lock().unwrap().insert(id, ServerState { running: false, port: 0, pending: VecDeque::new() }); + SERVER_INSTANCES.lock().unwrap().insert(id, ServerState { running: false, port: 0, pending: VecDeque::new(), start_seq: 0 }); write_u32(id, res, res_len) } M_SERVER_START => { // args: TLV string/int (port) let port = tlv_parse_i32(slice(args, args_len)).unwrap_or(0); if let Some(s) = SERVER_INSTANCES.lock().unwrap().get_mut(&id) { - s.running = true; s.port = port; + s.running = true; s.port = port; s.start_seq = SERVER_START_SEQ.fetch_add(1, Ordering::Relaxed); } // mark active server *ACTIVE_SERVER_ID.lock().unwrap() = Some(id); @@ -132,13 +178,16 @@ unsafe fn server_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: write_tlv_void(res, res_len) } M_SERVER_ACCEPT => { - let mut map = SERVER_INSTANCES.lock().unwrap(); - if let Some(s) = map.get_mut(&id) { - if let Some(req_id) = s.pending.pop_front() { + // wait up to ~5000ms for a request to arrive + for _ in 0..1000 { + if let Some(req_id) = { + let mut map = SERVER_INSTANCES.lock().unwrap(); + if let Some(s) = map.get_mut(&id) { s.pending.pop_front() } else { None } + } { return write_tlv_handle(T_REQUEST, req_id, res, res_len); } + std::thread::sleep(Duration::from_millis(5)); } - // no request: return void write_tlv_void(res, res_len) } _ => E_INV_METHOD, @@ -246,13 +295,23 @@ unsafe fn client_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: // args: TLV String(url) let url = tlv_parse_string(slice(args, args_len)).unwrap_or_default(); let path = parse_path(&url); + let port_hint = parse_port(&url); // Create Request let req_id = REQUEST_ID.fetch_add(1, Ordering::Relaxed); REQUESTS.lock().unwrap().insert(req_id, RequestState { path, body: vec![], response_id: None }); - // Enqueue to last started (active) server if running - if let Some(sid) = *ACTIVE_SERVER_ID.lock().unwrap() { - if let Some(s) = SERVER_INSTANCES.lock().unwrap().get_mut(&sid) { - if s.running { s.pending.push_back(req_id); } + // Enqueue to server: prefer port match, else newest running + { + let mut servers = SERVER_INSTANCES.lock().unwrap(); + if let Some(ph) = port_hint { + if let Some((_, s)) = servers.iter_mut().find(|(_, s)| s.running && s.port == ph) { + s.pending.push_back(req_id); + } else { + if let Some((_, s)) = servers.iter_mut().filter(|(_, s)| s.running).max_by_key(|(_, s)| s.start_seq) { + s.pending.push_back(req_id); + } + } + } else if let Some((_, s)) = servers.iter_mut().filter(|(_, s)| s.running).max_by_key(|(_, s)| s.start_seq) { + s.pending.push_back(req_id); } } // Create Response handle for client side to read later @@ -277,12 +336,24 @@ unsafe fn client_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: let body = data[p2..p2+s2].to_vec(); let path = parse_path(&url); + let port_hint = parse_port(&url); // Create Request let req_id = REQUEST_ID.fetch_add(1, Ordering::Relaxed); REQUESTS.lock().unwrap().insert(req_id, RequestState { path, body, response_id: None }); - // Enqueue to active server if running - if let Some(sid) = *ACTIVE_SERVER_ID.lock().unwrap() { - if let Some(s) = SERVER_INSTANCES.lock().unwrap().get_mut(&sid) { if s.running { s.pending.push_back(req_id); } } + // Enqueue to server: prefer port match, else newest running + { + let mut servers = SERVER_INSTANCES.lock().unwrap(); + if let Some(ph) = port_hint { + if let Some((_, s)) = servers.iter_mut().find(|(_, s)| s.running && s.port == ph) { + s.pending.push_back(req_id); + } else { + if let Some((_, s)) = servers.iter_mut().filter(|(_, s)| s.running).max_by_key(|(_, s)| s.start_seq) { + s.pending.push_back(req_id); + } + } + } else if let Some((_, s)) = servers.iter_mut().filter(|(_, s)| s.running).max_by_key(|(_, s)| s.start_seq) { + s.pending.push_back(req_id); + } } // Create paired client Response let resp_id = RESPONSE_ID.fetch_add(1, Ordering::Relaxed); @@ -295,8 +366,33 @@ unsafe fn client_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: } fn parse_path(url: &str) -> String { - // very naive: find first '/' - if let Some(pos) = url.find('/') { url[pos..].to_string() } else { "/".to_string() } + // Robust-ish path extraction: + // - http://host:port/path -> "/path" + // - https://host/path -> "/path" + // - /relative -> as-is + // - otherwise -> "/" + if url.starts_with('/') { return url.to_string(); } + if let Some(scheme_pos) = url.find("//") { + let after_scheme = &url[scheme_pos+2..]; + if let Some(slash) = after_scheme.find('/') { + return after_scheme[slash..].to_string(); + } else { + return "/".to_string(); + } + } + "/".to_string() +} + +fn parse_port(url: &str) -> Option { + // match patterns like http://host:PORT/ or :PORT/ + if let Some(pat) = url.split("//").nth(1) { + if let Some(after_host) = pat.split('/').next() { + if let Some(colon) = after_host.rfind(':') { + return after_host[colon+1..].parse::().ok(); + } + } + } + None } // ===== Helpers ===== @@ -365,8 +461,14 @@ fn tlv_parse_bytes(data: &[u8]) -> Result, ()> { } fn tlv_parse_i32(data: &[u8]) -> Result { let (_, argc, mut pos) = tlv_parse_header(data)?; if argc < 1 { return Err(()); } - let (tag, size, p) = tlv_parse_entry_hdr(data, pos)?; if tag != 2 || size != 4 { return Err(()); } - let mut b = [0u8;4]; b.copy_from_slice(&data[p..p+4]); Ok(i32::from_le_bytes(b)) + let (tag, size, p) = tlv_parse_entry_hdr(data, pos)?; + match (tag, size) { + (2, 4) => { let mut b=[0u8;4]; b.copy_from_slice(&data[p..p+4]); Ok(i32::from_le_bytes(b)) } + (5, 8) => { // accept i64 + let mut b=[0u8;8]; b.copy_from_slice(&data[p..p+8]); Ok(i64::from_le_bytes(b) as i32) + } + _ => Err(()) + } } fn tlv_parse_handle(data: &[u8]) -> Result<(u32,u32), ()> { let (_, argc, mut pos) = tlv_parse_header(data)?; if argc < 1 { return Err(()); } @@ -380,3 +482,198 @@ fn tlv_parse_entry_hdr(data: &[u8], pos: usize) -> Result<(u8,usize,usize), ()> if p+size > data.len() { return Err(()); } Ok((tag,size,p)) } + +// ===== Simple logger (enabled when NYASH_NET_LOG=1) ===== +static LOG_ON: Lazy = Lazy::new(|| std::env::var("NYASH_NET_LOG").unwrap_or_default() == "1"); +static LOG_PATH: Lazy = Lazy::new(|| std::env::var("NYASH_NET_LOG_FILE").unwrap_or_else(|_| "net_plugin.log".to_string())); +static LOG_MTX: Lazy> = Lazy::new(|| Mutex::new(())); + +fn net_log(msg: &str) { + if !*LOG_ON { return; } + // Always mirror to stderr for visibility + eprintln!("[net] {}", msg); + let _g = LOG_MTX.lock().unwrap(); + if let Ok(mut f) = std::fs::OpenOptions::new().create(true).append(true).open(&*LOG_PATH) { + let _ = writeln!(f, "[{:?}] {}", std::time::SystemTime::now(), msg); + } +} + +macro_rules! netlog { + ($($arg:tt)*) => {{ let s = format!($($arg)*); net_log(&s); }} +} + +// ===== Socket implementation ===== +static SOCK_SERVERS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +static SOCK_CONNS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); +static SOCK_CLIENTS: Lazy>> = Lazy::new(|| Mutex::new(HashMap::new())); + +unsafe fn sock_server_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: *mut u8, res_len: *mut usize) -> i32 { + match m { + M_SRV_BIRTH => { + netlog!("sock:birth server"); + let id = SOCK_SERVER_ID.fetch_add(1, Ordering::Relaxed); + SOCK_SERVERS.lock().unwrap().insert(id, SockServerState { running: Arc::new(AtomicBool::new(false)), pending: Arc::new(Mutex::new(VecDeque::new())), handle: Mutex::new(None) }); + write_u32(id, res, res_len) + } + M_SRV_START => { + let port = tlv_parse_i32(slice(args, args_len)).unwrap_or(0); + netlog!("sock:start server id={} port={}", id, port); + if let Some(ss) = SOCK_SERVERS.lock().unwrap().get(&id) { + let running = ss.running.clone(); + let pending = ss.pending.clone(); + running.store(true, Ordering::SeqCst); + let handle = std::thread::spawn(move || { + let addr = format!("127.0.0.1:{}", port); + let listener = TcpListener::bind(addr); + if let Ok(listener) = listener { + listener.set_nonblocking(true).ok(); + while running.load(Ordering::SeqCst) { + match listener.accept() { + Ok((stream, _)) => { + stream.set_nonblocking(false).ok(); + let conn_id = SOCK_CONN_ID.fetch_add(1, Ordering::Relaxed); + SOCK_CONNS.lock().unwrap().insert(conn_id, SockConnState { stream: Mutex::new(stream) }); + netlog!("sock:accept conn_id={}", conn_id); + pending.lock().unwrap().push_back(conn_id); + } + Err(_) => { + std::thread::sleep(std::time::Duration::from_millis(10)); + } + } + } + netlog!("sock:listener exit port={}", port); + } + }); + *ss.handle.lock().unwrap() = Some(handle); + } + write_tlv_void(res, res_len) + } + M_SRV_STOP => { + netlog!("sock:stop server id={}", id); + if let Some(ss) = SOCK_SERVERS.lock().unwrap().get(&id) { + ss.running.store(false, Ordering::SeqCst); + if let Some(h) = ss.handle.lock().unwrap().take() { let _ = h.join(); } + } + write_tlv_void(res, res_len) + } + M_SRV_ACCEPT => { + if let Some(ss) = SOCK_SERVERS.lock().unwrap().get(&id) { + // wait up to ~5000ms + for _ in 0..1000 { + if let Some(cid) = ss.pending.lock().unwrap().pop_front() { + netlog!("sock:accept returned conn_id={}", cid); + return write_tlv_handle(T_SOCK_CONN, cid, res, res_len); + } + std::thread::sleep(std::time::Duration::from_millis(5)); + } + } + netlog!("sock:accept timeout id={}", id); + write_tlv_void(res, res_len) + } + M_SRV_ACCEPT_TIMEOUT => { + let timeout_ms = tlv_parse_i32(slice(args, args_len)).unwrap_or(0).max(0) as u64; + if let Some(ss) = SOCK_SERVERS.lock().unwrap().get(&id) { + let deadline = std::time::Instant::now() + Duration::from_millis(timeout_ms); + loop { + if let Some(cid) = ss.pending.lock().unwrap().pop_front() { + netlog!("sock:acceptTimeout returned conn_id={}", cid); + return write_tlv_handle(T_SOCK_CONN, cid, res, res_len); + } + if std::time::Instant::now() >= deadline { break; } + std::thread::sleep(Duration::from_millis(5)); + } + } + netlog!("sock:acceptTimeout timeout id={} ms={}", id, timeout_ms); + write_tlv_void(res, res_len) + } + _ => E_INV_METHOD, + } +} + +unsafe fn sock_client_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: *mut u8, res_len: *mut usize) -> i32 { + match m { + M_SC_BIRTH => { + let id = SOCK_CLIENT_ID.fetch_add(1, Ordering::Relaxed); + SOCK_CLIENTS.lock().unwrap().insert(id, SockClientState); + write_u32(id, res, res_len) + } + M_SC_CONNECT => { + // args: host(string), port(i32) + let data = slice(args, args_len); + let (_, argc, mut pos) = tlv_parse_header(data).map_err(|_| ()).or(Err(())).unwrap_or((1,0,4)); + if argc < 2 { return E_INV_ARGS; } + let (_t1, s1, p1) = tlv_parse_entry_hdr(data, pos).map_err(|_| ()).or(Err(())).unwrap_or((0,0,0)); + if data[pos] != 6 { return E_INV_ARGS; } + let host = std::str::from_utf8(&data[p1..p1+s1]).map_err(|_| ()).or(Err(())) .unwrap_or("").to_string(); + pos = p1 + s1; + let (_t2, _s2, p2) = tlv_parse_entry_hdr(data, pos).map_err(|_| ()).or(Err(())).unwrap_or((0,0,0)); + let port = if data[pos] == 2 { // i32 + let mut b=[0u8;4]; b.copy_from_slice(&data[p2..p2+4]); i32::from_le_bytes(b) + } else { return E_INV_ARGS }; + let addr = format!("{}:{}", host, port); + match TcpStream::connect(addr) { + Ok(mut stream) => { + stream.set_nonblocking(false).ok(); + let conn_id = SOCK_CONN_ID.fetch_add(1, Ordering::Relaxed); + SOCK_CONNS.lock().unwrap().insert(conn_id, SockConnState { stream: Mutex::new(stream) }); + netlog!("sock:connect ok conn_id={}", conn_id); + write_tlv_handle(T_SOCK_CONN, conn_id, res, res_len) + } + Err(e) => { netlog!("sock:connect error: {:?}", e); E_ERR } + } + } + _ => E_INV_METHOD, + } +} + +unsafe fn sock_conn_invoke(m: u32, id: u32, args: *const u8, args_len: usize, res: *mut u8, res_len: *mut usize) -> i32 { + match m { + M_CONN_BIRTH => { + // not used directly + write_u32(0, res, res_len) + } + M_CONN_SEND => { + let bytes = tlv_parse_bytes(slice(args, args_len)).unwrap_or_default(); + if let Some(conn) = SOCK_CONNS.lock().unwrap().get(&id) { + if let Ok(mut s) = conn.stream.lock() { let _ = s.write_all(&bytes); } + netlog!("sock:send id={} n={}", id, bytes.len()); + return write_tlv_void(res, res_len); + } + E_INV_HANDLE + } + M_CONN_RECV => { + if let Some(conn) = SOCK_CONNS.lock().unwrap().get(&id) { + if let Ok(mut s) = conn.stream.lock() { + let mut buf = vec![0u8; 4096]; + match s.read(&mut buf) { + Ok(n) => { buf.truncate(n); netlog!("sock:recv id={} n={}", id, n); return write_tlv_bytes(&buf, res, res_len); } + Err(_) => return write_tlv_bytes(&[], res, res_len), + } + } + } + E_INV_HANDLE + } + M_CONN_RECV_TIMEOUT => { + let timeout_ms = tlv_parse_i32(slice(args, args_len)).unwrap_or(0).max(0) as u64; + if let Some(conn) = SOCK_CONNS.lock().unwrap().get(&id) { + if let Ok(mut s) = conn.stream.lock() { + let _ = s.set_read_timeout(Some(Duration::from_millis(timeout_ms))); + let mut buf = vec![0u8; 4096]; + let resv = s.read(&mut buf); + let _ = s.set_read_timeout(None); + match resv { + Ok(n) => { buf.truncate(n); netlog!("sock:recvTimeout id={} n={} ms={}", id, n, timeout_ms); return write_tlv_bytes(&buf, res, res_len); } + Err(_) => return write_tlv_bytes(&[], res, res_len), + } + } + } + E_INV_HANDLE + } + M_CONN_CLOSE => { + // Drop the stream by removing entry + SOCK_CONNS.lock().unwrap().remove(&id); + write_tlv_void(res, res_len) + } + _ => E_INV_METHOD, + } +} diff --git a/tests/e2e_plugin_net.rs b/tests/e2e_plugin_net.rs index e94a2556..bc2986a8 100644 --- a/tests/e2e_plugin_net.rs +++ b/tests/e2e_plugin_net.rs @@ -20,6 +20,8 @@ fn try_init_plugins() -> bool { #[test] fn e2e_http_stub_end_to_end() { + std::env::set_var("NYASH_NET_LOG", "1"); + std::env::set_var("NYASH_NET_LOG_FILE", "net_plugin.log"); if !try_init_plugins() { return; } let code = r#" @@ -28,7 +30,7 @@ srv = new HttpServerBox() srv.start(8080) cli = new HttpClientBox() -r = cli.get("http://localhost/hello") +r = cli.get("http://localhost:8080/hello") req = srv.accept() resp = new HttpResponseBox() @@ -48,6 +50,8 @@ body #[test] fn e2e_http_server_restart() { + std::env::set_var("NYASH_NET_LOG", "1"); + std::env::set_var("NYASH_NET_LOG_FILE", "net_plugin.log"); if !try_init_plugins() { return; } let code = r#" @@ -56,7 +60,7 @@ srv = new HttpServerBox() srv.start(8081) cli = new HttpClientBox() -r = cli.get("http://localhost/test1") +r = cli.get("http://localhost:8081/test1") req = srv.accept() resp = new HttpResponseBox() resp.write("A") @@ -65,7 +69,7 @@ req.respond(resp) srv.stop() srv.start(8081) -r = cli.get("http://localhost/test2") +r = cli.get("http://localhost:8081/test2") req = srv.accept() resp = new HttpResponseBox() resp.write("B") @@ -83,6 +87,8 @@ body #[test] fn e2e_http_server_shutdown_and_restart() { + std::env::set_var("NYASH_NET_LOG", "1"); + std::env::set_var("NYASH_NET_LOG_FILE", "net_plugin.log"); if !try_init_plugins() { return; } // First run: start and respond @@ -91,7 +97,7 @@ local srv, cli, r, req, resp srv = new HttpServerBox() srv.start(8082) cli = new HttpClientBox() -r = cli.get("http://localhost/first") +r = cli.get("http://localhost:8082/first") req = srv.accept() resp = new HttpResponseBox() resp.write("X") @@ -111,7 +117,7 @@ local srv, cli, r, req, resp, body srv = new HttpServerBox() srv.start(8083) cli = new HttpClientBox() -r = cli.get("http://localhost/second") +r = cli.get("http://localhost:8083/second") req = srv.accept() resp = new HttpResponseBox() resp.write("Y") @@ -127,6 +133,8 @@ body #[test] fn e2e_http_post_and_headers() { + std::env::set_var("NYASH_NET_LOG", "1"); + std::env::set_var("NYASH_NET_LOG_FILE", "net_plugin.log"); if !try_init_plugins() { return; } let code = r#" @@ -135,7 +143,7 @@ srv = new HttpServerBox() srv.start(8090) cli = new HttpClientBox() -r = cli.post("http://localhost/api", "DATA") +r = cli.post("http://localhost:8090/api", "DATA") req = srv.accept() // check server saw body @@ -159,3 +167,32 @@ st.toString() + ":" + hv + ":" + body let result = interpreter.execute(ast).expect("exec failed"); assert_eq!(result.to_string_box().value, "201:V:R"); } + +#[test] +fn e2e_http_multiple_requests_order() { + std::env::set_var("NYASH_NET_LOG", "1"); + std::env::set_var("NYASH_NET_LOG_FILE", "net_plugin.log"); + if !try_init_plugins() { return; } + + let code = r#" +local srv, cli, r1, r2, r3, q1, q2, q3 +srv = new HttpServerBox() +srv.start(8091) + +cli = new HttpClientBox() +r1 = cli.get("http://localhost:8091/a") +r2 = cli.get("http://localhost:8091/b") +r3 = cli.get("http://localhost:8091/c") + +q1 = srv.accept().path() +q2 = srv.accept().path() +q3 = srv.accept().path() + +q1 + "," + q2 + "," + q3 +"#; + + let ast = NyashParser::parse_from_string(code).expect("parse failed"); + let mut interpreter = nyash_rust::interpreter::NyashInterpreter::new(); + let result = interpreter.execute(ast).expect("exec failed"); + assert_eq!(result.to_string_box().value, "/a,/b,/c"); +} diff --git a/tests/e2e_plugin_socket.rs b/tests/e2e_plugin_socket.rs new file mode 100644 index 00000000..37cb14fb --- /dev/null +++ b/tests/e2e_plugin_socket.rs @@ -0,0 +1,80 @@ +#![cfg(all(feature = "plugins", not(target_arch = "wasm32")))] + +use nyash_rust::parser::NyashParser; +use nyash_rust::runtime::plugin_loader_v2::{init_global_loader_v2, get_global_loader_v2}; +use nyash_rust::runtime::box_registry::get_global_registry; +use nyash_rust::runtime::PluginConfig; + +fn try_init_plugins() -> bool { + if !std::path::Path::new("nyash.toml").exists() { return false; } + if let Err(e) = init_global_loader_v2("nyash.toml") { eprintln!("init failed: {:?}", e); return false; } + let loader = get_global_loader_v2(); + let loader = loader.read().unwrap(); + if let Some(conf) = &loader.config { + let mut map = std::collections::HashMap::new(); + for (lib, def) in &conf.libraries { for b in &def.boxes { map.insert(b.clone(), lib.clone()); } } + get_global_registry().apply_plugin_config(&PluginConfig { plugins: map }); + true + } else { false } +} + +#[test] +fn e2e_socket_ping_pong() { + if !try_init_plugins() { return; } + + // Start server, client connect, ping/pong + let code = r#" +local ss, sc, c, s, r +ss = new SocketServerBox() +ss.start(9100) + +sc = new SocketClientBox() +c = sc.connect("127.0.0.1", 9100) + +s = ss.accept() + +c.send("ping") +r = s.recv() +// echo back +s.send("pong") +r = c.recv() +r +"#; + + let ast = NyashParser::parse_from_string(code).expect("parse failed"); + let mut interpreter = nyash_rust::interpreter::NyashInterpreter::new(); + let result = interpreter.execute(ast).expect("exec failed"); + assert_eq!(result.to_string_box().value, "pong"); +} + +#[test] +fn e2e_socket_accept_timeout_and_recv_timeout() { + if !try_init_plugins() { return; } + + let code = r#" +local ss, sc, c, s, r +ss = new SocketServerBox() +ss.start(9101) + +// before any client, acceptTimeout returns void +r = ss.acceptTimeout(50) +// now connect +sc = new SocketClientBox() +c = sc.connect("127.0.0.1", 9101) +s = ss.acceptTimeout(500) + +// recvTimeout with no data should be empty +r = s.recvTimeout(50) + +// send then recvTimeout should get data +c.send("hello") +r = s.recvTimeout(200) +r +"#; + + let ast = NyashParser::parse_from_string(code).expect("parse failed"); + let mut interpreter = nyash_rust::interpreter::NyashInterpreter::new(); + let result = interpreter.execute(ast).expect("exec failed"); + assert_eq!(result.to_string_box().value, "hello"); +} +