Compare commits

...

9 Commits

Author SHA1 Message Date
Fedor Indutny
c22f9321e2 Handle early finalization by Node.js 2021-05-19 17:25:33 -07:00
Fedor Indutny
47ef9adaab Fix build on napi-4 2021-05-19 15:49:31 -07:00
Fedor Indutny
b8ea71824b another fix 2021-05-19 15:26:20 -07:00
Fedor Indutny
43fdcd2d2d Fix build when napi-6 feature is absent 2021-05-19 15:21:43 -07:00
Fedor Indutny
d103220b3c use assert! 2021-05-19 14:17:12 -07:00
Fedor Indutny
4d3f846c60 Address review feedback 2021-05-19 13:13:09 -07:00
Fedor Indutny
84a8eb85c3 EventQueue:new() uses new threadsafe trampoline 2021-05-19 11:17:39 -07:00
Fedor Indutny
0ebf866254 Do ref counting in ThreadsafeTrampoline 2021-05-19 10:17:28 -07:00
Fedor Indutny
76853a9b6c Reuse ThreadsafeFunction in EventQueue
Node.js optimizes subsequent ThreadsafeFunction invocations to happen
during the same event loop tick, but only if the same instance of
ThreadsafeFunction is used. The performance improvement is most
noticeable when used in Electron, because scheduling a new UV tick in
Electron is very costly.

With this change EventQueue will use an
existing instance of ThreadsafeTrampoline (wrapper around
ThreadsafeFunction) if compiled with napi-6 feature, or it will fallback
to creating a new ThreadsafeFunction per EventQueue instance.

Fix: #727
2021-05-18 18:11:16 -07:00
6 changed files with 224 additions and 33 deletions

View File

@ -2,6 +2,7 @@
use std::ffi::c_void;
use std::mem::MaybeUninit;
use std::sync::{Arc, Mutex};
use crate::napi::bindings as napi;
use crate::raw::{Env, Local};
@ -34,6 +35,7 @@ unsafe impl Sync for Tsfn {}
/// function for scheduling tasks to execute on a JavaScript thread.
pub struct ThreadsafeFunction<T> {
tsfn: Tsfn,
is_finalized: Arc<Mutex<bool>>,
callback: fn(Option<Env>, T),
}
@ -76,6 +78,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
callback: fn(Option<Env>, T),
) -> Self {
let mut result = MaybeUninit::uninit();
let is_finalized = Arc::new(Mutex::new(false));
assert_eq!(
napi::create_threadsafe_function(
@ -87,8 +90,8 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
// Always set the reference count to 1. Prefer using
// Rust `Arc` to maintain the struct.
1,
std::ptr::null_mut(),
None,
Arc::into_raw(is_finalized.clone()) as *mut _,
Some(Self::finalize),
std::ptr::null_mut(),
Some(Self::callback),
result.as_mut_ptr(),
@ -98,6 +101,7 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
Self {
tsfn: Tsfn(result.assume_init()),
is_finalized: is_finalized,
callback,
}
}
@ -149,6 +153,14 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
);
}
// Provides a C ABI wrapper for a napi callback notifying us about tsfn
// being finalized.
unsafe extern "C" fn finalize(_env: Env, data: *mut c_void, _hint: *mut c_void) {
let is_finalized = Arc::from_raw(data as *mut Mutex<bool>);
*is_finalized.lock().unwrap() = true;
}
// Provides a C ABI wrapper for invoking the user supplied function pointer
unsafe extern "C" fn callback(
env: Env,
@ -167,6 +179,12 @@ impl<T: Send + 'static> ThreadsafeFunction<T> {
impl<T> Drop for ThreadsafeFunction<T> {
fn drop(&mut self) {
// tsfn was already finalized by `Environment::CleanupHandles()` in
// Node.js
if *self.is_finalized.lock().unwrap() {
return;
}
unsafe {
napi::release_threadsafe_function(
self.tsfn.0,

View File

@ -154,10 +154,18 @@ use crate::context::internal::Env;
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
use crate::event::EventQueue;
use crate::handle::{Handle, Managed};
#[cfg(all(feature = "napi-6", feature = "event-queue-api"))]
use crate::lifecycle::InstanceData;
#[cfg(feature = "legacy-runtime")]
use crate::object::class::Class;
use crate::object::{Object, This};
use crate::result::{JsResult, NeonResult, Throw};
#[cfg(all(
feature = "napi-4",
not(feature = "napi-6"),
feature = "event-queue-api"
))]
use crate::trampoline::ThreadsafeTrampoline;
use crate::types::binary::{JsArrayBuffer, JsBuffer};
#[cfg(feature = "napi-1")]
use crate::types::boxed::{Finalize, JsBox};
@ -178,6 +186,12 @@ use std::convert::Into;
use std::marker::PhantomData;
use std::os::raw::c_void;
use std::panic::UnwindSafe;
#[cfg(all(
feature = "napi-4",
not(feature = "napi-6"),
feature = "event-queue-api"
))]
use std::sync::{Arc, RwLock};
use self::internal::{ContextInternal, Scope, ScopeMetadata};
@ -552,7 +566,17 @@ pub trait Context<'a>: ContextInternal<'a> {
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
/// Creates an unbounded queue of events to be executed on a JavaScript thread
fn queue(&mut self) -> EventQueue {
EventQueue::new(self)
#[cfg(feature = "napi-6")]
let shared_trampoline = InstanceData::threadsafe_trampoline(self);
#[cfg(not(feature = "napi-6"))]
let shared_trampoline = {
let mut trampoline = ThreadsafeTrampoline::new(self.env().to_raw());
trampoline.decrement_references(self.env().to_raw());
Arc::new(RwLock::new(trampoline))
};
EventQueue::with_shared_trampoline(self, shared_trampoline)
}
}

View File

@ -1,10 +1,9 @@
use neon_runtime::raw::Env;
use neon_runtime::tsfn::ThreadsafeFunction;
use std::sync::{Arc, RwLock};
use crate::context::internal::ContextInternal;
use crate::context::{Context, TaskContext};
use crate::result::NeonResult;
type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
use crate::trampoline::ThreadsafeTrampoline;
/// Queue for scheduling Rust closures to execute on the JavaScript main thread.
///
@ -50,8 +49,9 @@ type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
/// ```
pub struct EventQueue {
tsfn: ThreadsafeFunction<Callback>,
trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
has_ref: bool,
has_shared_trampoline: bool,
}
impl std::fmt::Debug for EventQueue {
@ -64,20 +64,43 @@ impl EventQueue {
/// Creates an unbounded queue for scheduling closures on the JavaScript
/// main thread
pub fn new<'a, C: Context<'a>>(cx: &mut C) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(cx.env().to_raw(), Self::callback) };
let trampoline = ThreadsafeTrampoline::new(cx.env().to_raw());
Self {
tsfn,
trampoline: Arc::new(RwLock::new(trampoline)),
has_ref: true,
has_shared_trampoline: false,
}
}
pub(crate) fn with_shared_trampoline<'a, C: Context<'a>>(
cx: &mut C,
trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
) -> Self {
trampoline
.write()
.unwrap()
.increment_references(cx.env().to_raw());
Self {
trampoline: trampoline,
has_ref: true,
has_shared_trampoline: true,
}
}
/// Allow the Node event loop to exit while this `EventQueue` exists.
/// _Idempotent_
pub fn unref<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
self.has_ref = false;
if !self.has_ref {
return self;
}
unsafe { self.tsfn.unref(cx.env().to_raw()) }
self.has_ref = false;
self.trampoline
.write()
.unwrap()
.decrement_references(cx.env().to_raw());
self
}
@ -85,9 +108,15 @@ impl EventQueue {
/// Prevent the Node event loop from exiting while this `EventQueue` exists. (Default)
/// _Idempotent_
pub fn reference<'a, C: Context<'a>>(&mut self, cx: &mut C) -> &mut Self {
self.has_ref = true;
if self.has_ref {
return self;
}
unsafe { self.tsfn.reference(cx.env().to_raw()) }
self.has_ref = true;
self.trampoline
.write()
.unwrap()
.increment_references(cx.env().to_raw());
self
}
@ -107,17 +136,8 @@ impl EventQueue {
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
{
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };
// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
});
});
self.tsfn.call(callback, None).map_err(|_| EventQueueError)
let trampoline = self.trampoline.read().unwrap();
trampoline.try_send(f).map_err(|_| EventQueueError)
}
/// Returns a boolean indicating if this `EventQueue` will prevent the Node event
@ -125,16 +145,29 @@ impl EventQueue {
pub fn has_ref(&self) -> bool {
self.has_ref
}
}
// Monomorphized trampoline funciton for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
impl Drop for EventQueue {
fn drop(&mut self) {
if !self.has_ref {
return;
}
// If we own the trampoline - it is going to be dropped as well.
// There is no need to decrement its references.
if !self.has_shared_trampoline {
return;
}
let trampoline = self.trampoline.clone();
self.send(move |cx| {
trampoline
.write()
.unwrap()
.decrement_references(cx.env().to_raw());
Ok(())
});
}
}

View File

@ -106,6 +106,9 @@ pub use neon_macros::*;
#[cfg(feature = "napi-6")]
mod lifecycle;
#[cfg(all(feature = "napi-4", feature = "event-queue-api"))]
mod trampoline;
#[cfg(all(feature = "legacy-runtime", feature = "napi-1"))]
compile_error!("Cannot enable both `legacy-runtime` and `napi-*` features.\n\nTo use `napi-*`, disable `legacy-runtime` by setting `default-features` to `false` in Cargo.toml\nor with cargo's --no-default-features flag.");

View File

@ -10,6 +10,8 @@
use std::mem;
use std::sync::Arc;
#[cfg(feature = "event-queue-api")]
use std::sync::RwLock;
use neon_runtime::raw::Env;
use neon_runtime::reference;
@ -17,6 +19,8 @@ use neon_runtime::tsfn::ThreadsafeFunction;
use crate::context::Context;
use crate::handle::root::NapiRef;
#[cfg(feature = "event-queue-api")]
use crate::trampoline::ThreadsafeTrampoline;
/// `InstanceData` holds Neon data associated with a particular instance of a
/// native module. If a module is loaded multiple times (e.g., worker threads), this
@ -30,6 +34,10 @@ pub(crate) struct InstanceData {
/// given the cost of FFI, this optimization is omitted until the cost of an
/// `Arc` is demonstrated as significant.
drop_queue: Arc<ThreadsafeFunction<NapiRef>>,
/// Used in EventQueue to invoke Rust callbacks with Napi environment.
#[cfg(feature = "event-queue-api")]
threadsafe_trampoline: Arc<RwLock<ThreadsafeTrampoline>>,
}
fn drop_napi_ref(env: Option<Env>, data: NapiRef) {
@ -62,8 +70,18 @@ impl InstanceData {
queue
};
#[cfg(feature = "event-queue-api")]
let threadsafe_trampoline = {
let mut trampoline = ThreadsafeTrampoline::new(env);
trampoline.decrement_references(env);
trampoline
};
let data = InstanceData {
drop_queue: Arc::new(drop_queue),
#[cfg(feature = "event-queue-api")]
threadsafe_trampoline: Arc::new(RwLock::new(threadsafe_trampoline)),
};
unsafe { &mut *neon_runtime::lifecycle::set_instance_data(env, data) }
@ -73,4 +91,12 @@ impl InstanceData {
pub(crate) fn drop_queue<'a, C: Context<'a>>(cx: &mut C) -> Arc<ThreadsafeFunction<NapiRef>> {
Arc::clone(&InstanceData::get(cx).drop_queue)
}
/// Helper to return a reference to the `invoke_callback` field of `InstanceData`
#[cfg(feature = "event-queue-api")]
pub(crate) fn threadsafe_trampoline<'a, C: Context<'a>>(
cx: &mut C,
) -> Arc<RwLock<ThreadsafeTrampoline>> {
Arc::clone(&InstanceData::get(cx).threadsafe_trampoline)
}
}

87
src/trampoline.rs Normal file
View File

@ -0,0 +1,87 @@
use neon_runtime::raw::Env;
use neon_runtime::tsfn::{CallError, ThreadsafeFunction};
use crate::context::TaskContext;
use crate::result::NeonResult;
pub(crate) type Callback = Box<dyn FnOnce(Env) + Send + 'static>;
pub(crate) struct ThreadsafeTrampoline {
tsfn: ThreadsafeFunction<Callback>,
ref_count: u32,
}
impl ThreadsafeTrampoline {
/// Creates an unbounded queue for scheduling closures on the JavaScript
/// main thread
pub(crate) fn new(env: Env) -> Self {
let tsfn = unsafe { ThreadsafeFunction::new(env, Self::callback) };
Self {
tsfn: tsfn,
ref_count: 1,
}
}
/// Schedules a closure to execute on the JavaScript thread that created
/// this ThreadsafeTrampoline.
/// Returns an `Error` if the task could not be scheduled.
pub(crate) fn try_send<F>(&self, f: F) -> Result<(), CallError<Callback>>
where
F: FnOnce(TaskContext) -> NeonResult<()> + Send + 'static,
{
let callback = Box::new(move |env| {
let env = unsafe { std::mem::transmute(env) };
// Note: It is sufficient to use `TaskContext`'s `InheritedHandleScope` because
// N-API creates a `HandleScope` before calling the callback.
TaskContext::with_context(env, move |cx| {
let _ = f(cx);
});
});
self.tsfn.call(callback, None)
}
/// References a trampoline to prevent exiting the event loop until it has been dropped. (Default)
/// Safety: `Env` must be valid for the current thread
/// _Not idempotent_
pub(crate) fn increment_references(&mut self, env: Env) {
self.ref_count += 1;
if self.ref_count != 1 {
return;
}
unsafe {
self.tsfn.reference(env);
}
}
/// Unreferences a trampoline to allow exiting the event loop before it has been dropped.
/// Safety: `Env` must be valid for the current thread
/// _Not idempotent_
pub(crate) fn decrement_references(&mut self, env: Env) {
assert!(
self.ref_count > 0,
"ThreadsafeTrampoline reference underflow"
);
self.ref_count -= 1;
if self.ref_count != 0 {
return;
}
unsafe {
self.tsfn.unref(env);
}
}
// Monomorphized trampoline function for calling the user provided closure
fn callback(env: Option<Env>, callback: Callback) {
if let Some(env) = env {
callback(env);
} else {
crate::context::internal::IS_RUNNING.with(|v| {
*v.borrow_mut() = false;
});
}
}
}