node/test/node-api/test_threadsafe_function/binding.c

316 lines
11 KiB
C

// For the purpose of this test we use libuv's threading library. When deciding
// on a threading library for a new project it bears remembering that in the
// future libuv may introduce API changes which may render it non-ABI-stable,
// which, in turn, may affect the ABI stability of the project despite its use
// of N-API.
#include <uv.h>
#include <node_api.h>
#include "../../js-native-api/common.h"
#define ARRAY_LENGTH 10
#define MAX_QUEUE_SIZE 2
static uv_thread_t uv_threads[2];
static napi_threadsafe_function ts_fn;
typedef struct {
napi_threadsafe_function_call_mode block_on_full;
napi_threadsafe_function_release_mode abort;
bool start_secondary;
napi_ref js_finalize_cb;
uint32_t max_queue_size;
} ts_fn_hint;
static ts_fn_hint ts_info;
// Thread data to transmit to JS
static int ints[ARRAY_LENGTH];
static void secondary_thread(void* data) {
napi_threadsafe_function ts_fn = data;
if (napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
napi_fatal_error("secondary_thread", NAPI_AUTO_LENGTH,
"napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
}
// Source thread producing the data
static void data_source_thread(void* data) {
napi_threadsafe_function ts_fn = data;
int index;
void* hint;
ts_fn_hint *ts_fn_info;
napi_status status;
bool queue_was_full = false;
bool queue_was_closing = false;
if (napi_get_threadsafe_function_context(ts_fn, &hint) != napi_ok) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_get_threadsafe_function_context failed", NAPI_AUTO_LENGTH);
}
ts_fn_info = (ts_fn_hint *)hint;
if (ts_fn_info != &ts_info) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"thread-safe function hint is not as expected", NAPI_AUTO_LENGTH);
}
if (ts_fn_info->start_secondary) {
if (napi_acquire_threadsafe_function(ts_fn) != napi_ok) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_acquire_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
if (uv_thread_create(&uv_threads[1], secondary_thread, ts_fn) != 0) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"failed to start secondary thread", NAPI_AUTO_LENGTH);
}
}
for (index = ARRAY_LENGTH - 1; index > -1 && !queue_was_closing; index--) {
status = napi_call_threadsafe_function(ts_fn, &ints[index],
ts_fn_info->block_on_full);
if (ts_fn_info->max_queue_size == 0) {
// Let's make this thread really busy for 200 ms to give the main thread a
// chance to abort.
uint64_t start = uv_hrtime();
for (; uv_hrtime() - start < 200000000;);
}
switch (status) {
case napi_queue_full:
queue_was_full = true;
index++;
// fall through
case napi_ok:
continue;
case napi_closing:
queue_was_closing = true;
break;
default:
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_call_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
}
// Assert that the enqueuing of a value was refused at least once, if this is
// a non-blocking test run.
if (!ts_fn_info->block_on_full && !queue_was_full) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"queue was never full", NAPI_AUTO_LENGTH);
}
// Assert that the queue was marked as closing at least once, if this is an
// aborting test run.
if (ts_fn_info->abort == napi_tsfn_abort && !queue_was_closing) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"queue was never closing", NAPI_AUTO_LENGTH);
}
if (!queue_was_closing &&
napi_release_threadsafe_function(ts_fn, napi_tsfn_release) != napi_ok) {
napi_fatal_error("data_source_thread", NAPI_AUTO_LENGTH,
"napi_release_threadsafe_function failed", NAPI_AUTO_LENGTH);
}
}
// Getting the data into JS
static void call_js(napi_env env, napi_value cb, void* hint, void* data) {
if (!(env == NULL || cb == NULL)) {
napi_value argv, undefined;
NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, cb, 1, &argv,
NULL));
}
}
static napi_ref alt_ref;
// Getting the data into JS with the alternative referece
static void call_ref(napi_env env, napi_value _, void* hint, void* data) {
if (!(env == NULL || alt_ref == NULL)) {
napi_value fn, argv, undefined;
NAPI_CALL_RETURN_VOID(env, napi_get_reference_value(env, alt_ref, &fn));
NAPI_CALL_RETURN_VOID(env, napi_create_int32(env, *(int*)data, &argv));
NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
NAPI_CALL_RETURN_VOID(env, napi_call_function(env, undefined, fn, 1, &argv,
NULL));
}
}
// Cleanup
static napi_value StopThread(napi_env env, napi_callback_info info) {
size_t argc = 2;
napi_value argv[2];
NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
napi_valuetype value_type;
NAPI_CALL(env, napi_typeof(env, argv[0], &value_type));
NAPI_ASSERT(env, value_type == napi_function,
"StopThread argument is a function");
NAPI_ASSERT(env, (ts_fn != NULL), "Existing threadsafe function");
NAPI_CALL(env,
napi_create_reference(env, argv[0], 1, &(ts_info.js_finalize_cb)));
bool abort;
NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
NAPI_CALL(env,
napi_release_threadsafe_function(ts_fn,
abort ? napi_tsfn_abort : napi_tsfn_release));
ts_fn = NULL;
return NULL;
}
// Join the thread and inform JS that we're done.
static void join_the_threads(napi_env env, void *data, void *hint) {
uv_thread_t *the_threads = data;
ts_fn_hint *the_hint = hint;
napi_value js_cb, undefined;
uv_thread_join(&the_threads[0]);
if (the_hint->start_secondary) {
uv_thread_join(&the_threads[1]);
}
NAPI_CALL_RETURN_VOID(env,
napi_get_reference_value(env, the_hint->js_finalize_cb, &js_cb));
NAPI_CALL_RETURN_VOID(env, napi_get_undefined(env, &undefined));
NAPI_CALL_RETURN_VOID(env,
napi_call_function(env, undefined, js_cb, 0, NULL, NULL));
NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env,
the_hint->js_finalize_cb));
if (alt_ref != NULL) {
NAPI_CALL_RETURN_VOID(env, napi_delete_reference(env, alt_ref));
alt_ref = NULL;
}
}
static napi_value StartThreadInternal(napi_env env,
napi_callback_info info,
napi_threadsafe_function_call_js cb,
bool block_on_full,
bool alt_ref_js_cb) {
size_t argc = 4;
napi_value argv[4];
NAPI_CALL(env, napi_get_cb_info(env, info, &argc, argv, NULL, NULL));
if (alt_ref_js_cb) {
NAPI_CALL(env, napi_create_reference(env, argv[0], 1, &alt_ref));
argv[0] = NULL;
}
ts_info.block_on_full =
(block_on_full ? napi_tsfn_blocking : napi_tsfn_nonblocking);
NAPI_ASSERT(env, (ts_fn == NULL), "Existing thread-safe function");
napi_value async_name;
NAPI_CALL(env, napi_create_string_utf8(env, "N-API Thread-safe Function Test",
NAPI_AUTO_LENGTH, &async_name));
NAPI_CALL(env, napi_get_value_uint32(env, argv[3], &ts_info.max_queue_size));
NAPI_CALL(env, napi_create_threadsafe_function(env,
argv[0],
NULL,
async_name,
ts_info.max_queue_size,
2,
uv_threads,
join_the_threads,
&ts_info,
cb,
&ts_fn));
bool abort;
NAPI_CALL(env, napi_get_value_bool(env, argv[1], &abort));
ts_info.abort = abort ? napi_tsfn_abort : napi_tsfn_release;
NAPI_CALL(env, napi_get_value_bool(env, argv[2], &(ts_info.start_secondary)));
NAPI_ASSERT(env,
(uv_thread_create(&uv_threads[0], data_source_thread, ts_fn) == 0),
"Thread creation");
return NULL;
}
static napi_value Unref(napi_env env, napi_callback_info info) {
NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
NAPI_CALL(env, napi_unref_threadsafe_function(env, ts_fn));
return NULL;
}
static napi_value Release(napi_env env, napi_callback_info info) {
NAPI_ASSERT(env, ts_fn != NULL, "No existing thread-safe function");
NAPI_CALL(env, napi_release_threadsafe_function(ts_fn, napi_tsfn_release));
return NULL;
}
// Startup
static napi_value StartThread(napi_env env, napi_callback_info info) {
return StartThreadInternal(env, info, call_js,
/** block_on_full */true, /** alt_ref_js_cb */false);
}
static napi_value StartThreadNonblocking(napi_env env,
napi_callback_info info) {
return StartThreadInternal(env, info, call_js,
/** block_on_full */false, /** alt_ref_js_cb */false);
}
static napi_value StartThreadNoNative(napi_env env, napi_callback_info info) {
return StartThreadInternal(env, info, NULL,
/** block_on_full */true, /** alt_ref_js_cb */false);
}
static napi_value StartThreadNoJsFunc(napi_env env, napi_callback_info info) {
return StartThreadInternal(env, info, call_ref,
/** block_on_full */true, /** alt_ref_js_cb */true);
}
// Module init
static napi_value Init(napi_env env, napi_value exports) {
size_t index;
for (index = 0; index < ARRAY_LENGTH; index++) {
ints[index] = index;
}
napi_value js_array_length, js_max_queue_size;
napi_create_uint32(env, ARRAY_LENGTH, &js_array_length);
napi_create_uint32(env, MAX_QUEUE_SIZE, &js_max_queue_size);
napi_property_descriptor properties[] = {
{
"ARRAY_LENGTH",
NULL,
NULL,
NULL,
NULL,
js_array_length,
napi_enumerable,
NULL
},
{
"MAX_QUEUE_SIZE",
NULL,
NULL,
NULL,
NULL,
js_max_queue_size,
napi_enumerable,
NULL
},
DECLARE_NAPI_PROPERTY("StartThread", StartThread),
DECLARE_NAPI_PROPERTY("StartThreadNoNative", StartThreadNoNative),
DECLARE_NAPI_PROPERTY("StartThreadNonblocking", StartThreadNonblocking),
DECLARE_NAPI_PROPERTY("StartThreadNoJsFunc", StartThreadNoJsFunc),
DECLARE_NAPI_PROPERTY("StopThread", StopThread),
DECLARE_NAPI_PROPERTY("Unref", Unref),
DECLARE_NAPI_PROPERTY("Release", Release),
};
NAPI_CALL(env, napi_define_properties(env, exports,
sizeof(properties)/sizeof(properties[0]), properties));
return exports;
}
NAPI_MODULE(NODE_GYP_MODULE_NAME, Init)