Compare commits

..

No commits in common. "feature/shared-event-queue" and "main" have entirely different histories.

6 changed files with 33 additions and 224 deletions

View File

@ -2,7 +2,6 @@
use std::ffi::c_void;
use std::mem::MaybeUninit;
use std::sync::{Arc, Mutex};
use crate::napi::bindings as napi;
use crate::raw::{Env, Local};
@ -35,7 +34,6 @@ unsafe impl Sync for Tsfn {}
/// function for scheduling tasks to execute on a JavaScript thread.
pub struct ThreadsafeFunction<T> {
tsfn: Tsfn,
is_finalized: Arc<Mutex<bool>>,
callback: fn(Option<Env>, T),
}
@ -78,7 +76,6 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
callback: fn(Option<Env>, T),
) -> Self {
let mut result = MaybeUninit::uninit();
let is_finalized = Arc::new(Mutex::new(false));
assert_eq!(
napi::create_threadsafe_function(
@ -90,8 +87,8 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
// Always set the reference count to 1. Prefer using
// Rust `Arc` to maintain the struct.
1,
Arc::into_raw(is_finalized.clone()) as *mut _,
Some(Self::finalize),
std::ptr::null_mut(),
None,
std::ptr::null_mut(),
Some(Self::callback),
result.as_mut_ptr(),
@ -101,7 +98,6 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
Self {
tsfn: Tsfn(result.assume_init()),
is_finalized: is_finalized,
callback,
}
}
@ -153,14 +149,6 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
);
}
// Provides a C ABI wrapper for a napi callback notifying us about tsfn
// being finalized.
unsafe extern "C" fn finalize(_env: Env, data: *mut c_void, _hint: *mut c_void) {
let is_finalized = Arc::from_raw(data as *mut Mutex<bool>);
*is_finalized.lock().unwrap() = true;
}
// Provides a C ABI wrapper for invoking the user supplied function pointer
unsafe extern "C" fn callback(
env: Env,
@ -179,12 +167,6 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
impl<T> Drop for ThreadsafeFunction<T> {
fn drop(&mut self) {
// tsfn was already finalized by `Environment::CleanupHandles()` in
// Node.js
if *self.is_finalized.lock().unwrap() {
return;
}
unsafe {
napi::release_threadsafe_function(
self.tsfn.0,

View File

@ -154,18 +154,10 @@ use crate::context::internal::Env;
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
use crate::event::EventQueue;
use crate::handle::{Handle, Managed};
#[cfg(all(feature = "napi-6", feature = "event-queue-api"))]
use crate::lifecycle::InstanceData;
#[cfg(feature = "legacy-runtime")]
use crate::object::class::Class;
use crate::object::{Object, This};
use crate::result::{JsResult, NeonResult, Throw};
#[cfg(all(
feature = "napi-4",
not(feature = "napi-6"),
feature = "event-queue-api"
))]
use crate::trampoline::ThreadsafeTrampoline;
use crate::types::binary::{JsArrayBuffer, JsBuffer};
#[cfg(feature = "napi-1")]
use crate::types::boxed::{Finalize, JsBox};
@ -186,12 +178,6 @@ use std::convert::Into;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::panic::UnwindSafe;
#[cfg(all(
feature = "napi-4",
not(feature = "napi-6"),
feature = "event-queue-api"
))]
use std::sync::{Arc, RwLock};
use self::internal::{ContextInternal, Scope, ScopeMetadata};
@ -566,17 +552,7 @@ pub trait Context<'a>: ContextInternal<'a> {
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
/// Creates an unbounded queue of events to be executed on a JavaScript thread
fn queue(&mut self) -> EventQueue {
#[cfg(feature = "napi-6")]
let shared_trampoline = InstanceData::threadsafe_trampoline(self);
#[cfg(not(feature = "napi-6"))]
let shared_trampoline = {
let mut trampoline = ThreadsafeTrampoline::new(self.env().to_raw());
trampoline.decrement_references(self.env().to_raw());
Arc::new(RwLock::new(trampoline))
};
EventQueue::with_shared_trampoline(self, shared_trampoline)
EventQueue::new(self)
}
}

View File

@ -1,9 +1,10 @@
use std::sync::{Arc, RwLock};
use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;
use crate::context::internal::ContextInternal;
use crate::context::{Context, TaskContext};
use crate::result::NeonResult;
use crate::trampoline::ThreadsafeTrampoline;
type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
/// Queue for scheduling Rust closures to execute on the JavaScript main thread.
///
@ -49,9 +50,8 @@ use crate::trampoline::ThreadsafeTrampoline;
/// ```
pub struct EventQueue {
trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
tsfn: ThreadsafeFunction<Callback>,
has_ref: bool,
has_shared_trampoline: bool,
}
impl std::fmt::Debug for EventQueue {
@ -64,43 +64,20 @@ impl EventQueue {
/// Creates an unbounded queue for scheduling closures on the JavaScript
/// main thread
pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
let trampoline = ThreadsafeTrampoline::new(cx.env().to_raw());
let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
Self {
trampoline: Arc::new(RwLock::new(trampoline)),
tsfn,
has_ref: true,
has_shared_trampoline: false,
}
}
pub(crate) fn with_shared_trampoline<'a, C: Context<'a>>(
cx: &mut C,
trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
) -> Self {
trampoline
.write()
.unwrap()
.increment_references(cx.env().to_raw());
Self {
trampoline: trampoline,
has_ref: true,
has_shared_trampoline: true,
}
}
/// Allow the Node event loop to exit while this `EventQueue` exists.
/// _Idempotent_
pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
if !self.has_ref {
return self;
}
self.has_ref = false;
self.trampoline
.write()
.unwrap()
.decrement_references(cx.env().to_raw());
unsafe { self.tsfn.unref(cx.env().to_raw()) }
self
}
@ -108,15 +85,9 @@ impl EventQueue {
/// Prevent the Node event loop from exiting while this `EventQueue` exists. (Default)
/// _Idempotent_
pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
if self.has_ref {
return self;
}
self.has_ref = true;
self.trampoline
.write()
.unwrap()
.increment_references(cx.env().to_raw());
unsafe { self.tsfn.reference(cx.env().to_raw()) }
self
}
@ -136,8 +107,17 @@ impl EventQueue {
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
{
let trampoline = self.trampoline.read().unwrap();
trampoline.try_send(f).map_err(|_| EventQueueError)
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };
// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
});
});
self.tsfn.call(callback, None).map_err(|_| EventQueueError)
}
/// Returns a boolean indicating if this `EventQueue` will prevent the Node event
@ -145,29 +125,16 @@ impl EventQueue {
pub fn has_ref(&self) -> bool {
self.has_ref
}
}
impl Drop for EventQueue {
fn drop(&mut self) {
if !self.has_ref {
return;
// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
}
// If we own the trampoline - it is going to be dropped as well.
// There is no need to decrement its references.
if !self.has_shared_trampoline {
return;
}
let trampoline = self.trampoline.clone();
self.send(move |cx| {
trampoline
.write()
.unwrap()
.decrement_references(cx.env().to_raw());
Ok(())
});
}
}

View File

@ -106,9 +106,6 @@ pub use neon_macros::*;
#[cfg(feature = "napi-6")]
mod lifecycle;
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
mod trampoline;
#[cfg(all(feature = "legacy-runtime", feature = "napi-1"))]
compile_error!("Cannot enable both `legacy-runtime` and `napi-*` features.\n\nTo use `napi-*`, disable `legacy-runtime` by setting `default-features` to `false` in Cargo.toml\nor with cargo's --no-default-features flag.");

View File

@ -10,8 +10,6 @@
use std::mem;
use std::sync::Arc;
#[cfg(feature = "event-queue-api")]
use std::sync::RwLock;
use neon_runtime::raw::Env;
use neon_runtime::reference;
@ -19,8 +17,6 @@ use neon_runtime::tsfn::ThreadsafeFunction;
use crate::context::Context;
use crate::handle::root::NapiRef;
#[cfg(feature = "event-queue-api")]
use crate::trampoline::ThreadsafeTrampoline;
/// `InstanceData` holds Neon data associated with a particular instance of a
/// native module. If a module is loaded multiple times (e.g., worker threads), this
@ -34,10 +30,6 @@ pub(crate) struct InstanceData {
/// given the cost of FFI, this optimization is omitted until the cost of an
/// `Arc` is demonstrated as significant.
drop_queue: Arc<ThreadsafeFunction<NapiRef>>,
/// Used in EventQueue to invoke Rust callbacks with Napi environment.
#[cfg(feature = "event-queue-api")]
threadsafe_trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
}
fn drop_napi_ref(env: Option<Env>, data: NapiRef) {
@ -70,18 +62,8 @@ impl InstanceData {
queue
};
#[cfg(feature = "event-queue-api")]
let threadsafe_trampoline = {
let mut trampoline = ThreadsafeTrampoline::new(env);
trampoline.decrement_references(env);
trampoline
};
let data = InstanceData {
drop_queue: Arc::new(drop_queue),
#[cfg(feature = "event-queue-api")]
threadsafe_trampoline: Arc::new(RwLock::new(threadsafe_trampoline)),
};
unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) }
@ -91,12 +73,4 @@ impl InstanceData {
pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc<ThreadsafeFunction<NapiRef>> {
Arc::clone(&InstanceData::get(cx).drop_queue)
}
/// Helper to return a reference to the `invoke_callback` field of `InstanceData`
#[cfg(feature = "event-queue-api")]
pub(crate) fn threadsafe_trampoline<'a, C: Context<'a>>(
cx: &mut C,
) -> Arc<RwLock<ThreadsafeTrampoline>> {
Arc::clone(&InstanceData::get(cx).threadsafe_trampoline)
}
}

View File

@ -1,87 +0,0 @@
use neon_runtime::raw::Env;
use neon_runtime::tsfn::{CallError, ThreadsafeFunction};
use crate::context::TaskContext;
use crate::result::NeonResult;
pub(crate) type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
pub(crate) struct ThreadsafeTrampoline {
tsfn: ThreadsafeFunction<Callback>,
ref_count: u32,
}
impl ThreadsafeTrampoline {
/// Creates an unbounded queue for scheduling closures on the JavaScript
/// main thread
pub(crate) fn new(env: Env) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(env, Self::callback) };
Self {
tsfn: tsfn,
ref_count: 1,
}
}
/// Schedules a closure to execute on the JavaScript thread that created
/// this ThreadsafeTrampoline.
/// Returns an `Error` if the task could not be scheduled.
pub(crate) fn try_send<F>(&self, f: F) -> Result<(), CallError<Callback>>
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
{
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };
// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
});
});
self.tsfn.call(callback, None)
}
/// References a trampoline to prevent exiting the event loop until it has been dropped. (Default)
/// Safety: `Env` must be valid for the current thread
/// _Not idempotent_
pub(crate) fn increment_references(&mut self, env: Env) {
self.ref_count += 1;
if self.ref_count != 1 {
return;
}
unsafe {
self.tsfn.reference(env);
}
}
/// Unreferences a trampoline to allow exiting the event loop before it has been dropped.
/// Safety: `Env` must be valid for the current thread
/// _Not idempotent_
pub(crate) fn decrement_references(&mut self, env: Env) {
assert!(
self.ref_count > 0,
"ThreadsafeTrampoline reference underflow"
);
self.ref_count -= 1;
if self.ref_count != 0 {
return;
}
unsafe {
self.tsfn.unref(env);
}
}
// Monomorphized trampoline function for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
}
}
}