Barretenberg
The ZK-SNARK library at the core of Aztec
Loading...
Searching...
No Matches
msgpack_client_async.cpp
Go to the documentation of this file.
3#include "napi.h"
4#include <cstdint>
5#include <vector>
6
7using namespace bb::nodejs::msgpack_client;
8
10 : ObjectWrap(info)
11{
12 Napi::Env env = info.Env();
13
14 // Arg 0: shared memory base name (string)
15 if (info.Length() < 1 || !info[0].IsString()) {
16 throw Napi::TypeError::New(env, "First argument must be a string (shared memory name)");
17 }
18 std::string shm_name = info[0].As<Napi::String>();
19
20 // Arg 1 (optional): client ID for MPSC mode (number)
21 if (info.Length() >= 2 && info[1].IsNumber()) {
22 size_t client_id = info[1].As<Napi::Number>().Uint32Value();
23 client_ = bb::ipc::IpcClient::create_mpsc_shm(shm_name, client_id);
24 } else {
26 }
27
28 // Connect to bb server
29 if (!client_->connect()) {
30 throw Napi::Error::New(env, "Failed to connect to shared memory server");
31 }
32}
33
34Napi::Value MsgpackClientAsync::setResponseCallback(const Napi::CallbackInfo& info)
35{
36 Napi::Env env = info.Env();
37
38 // Arg 0: JavaScript callback function
39 if (info.Length() < 1 || !info[0].IsFunction()) {
40 throw Napi::TypeError::New(env, "First argument must be a function");
41 }
42
43 // Store the callback for lazy TSFN creation
44 // Don't create TSFN yet - it will be created on first acquire()
45 js_callback_ = Napi::Persistent(info[0].As<Napi::Function>());
46
47 // Start background polling thread now that callback is registered
49
50 // Detach the thread - it will run until process exits
51 // No need for explicit shutdown or join
52 poll_thread_.detach();
53
54 return env.Undefined();
55}
56
58{
59 constexpr uint64_t TIMEOUT_NS = 1000000000; // 1s
60
61 while (true) { // Run forever until process exits
62 // Poll for response (blocks with timeout using futex)
63 std::span<const uint8_t> response = client_->receive(TIMEOUT_NS);
64
65 if (response.empty()) {
66 // Timeout - just continue polling
67 continue;
68 }
69
70 // Copy response data before releasing (span is invalidated by release())
71 auto* response_data = new std::vector<uint8_t>(response.begin(), response.end());
72
73 // Release the message in ring buffer to free space
74 client_->release(response.size());
75
76 // Lock mutex to safely access TSFN
77 {
79
80 // TSFN is active - invoke JavaScript callback
81 // The callback will handle matching this response to the correct promise
82 auto status = tsfn_.NonBlockingCall(
83 response_data, [](Napi::Env env, Napi::Function js_callback, std::vector<uint8_t>* data) {
84 // This lambda runs on the JavaScript main thread!
85 // Safe to create JS objects and call functions here
86
87 // Create Buffer with response data
88 auto js_buffer = Napi::Buffer<uint8_t>::Copy(env, data->data(), data->size());
89
90 // Call the registered JavaScript callback with the response
91 // TypeScript will pop its queue and resolve the appropriate promise
92 js_callback.Call({ js_buffer });
93
94 // Clean up response data
95 delete data;
96 });
97
98 if (status != napi_ok) {
99 // Failed to queue callback - likely process is exiting
100 // Just clean up and continue (process will exit soon anyway)
101 delete response_data;
102 }
103 }
104 }
105}
106
107Napi::Value MsgpackClientAsync::call(const Napi::CallbackInfo& info)
108{
109 Napi::Env env = info.Env();
110
111 // Arg 0: msgpack buffer to send
112 if (info.Length() < 1 || !info[0].IsBuffer()) {
113 throw Napi::TypeError::New(env, "First argument must be a Buffer");
114 }
115
116 auto input_buffer = info[0].As<Napi::Buffer<uint8_t>>();
117 const uint8_t* input_data = input_buffer.Data();
118 size_t input_len = input_buffer.Length();
119
120 // Send request (non-blocking write to ring buffer with no timeout)
121 // TypeScript will handle promise creation and queueing
122 if (!client_->send(input_data, input_len, 0)) {
123 throw Napi::Error::New(env, "Failed to send request, ring buffer full. Make it bigger?");
124 }
125
126 // Return undefined - TypeScript manages promises
127 return env.Undefined();
128}
129
130Napi::Value MsgpackClientAsync::acquire(const Napi::CallbackInfo& info)
131{
132 Napi::Env env = info.Env();
133
135
136 if (ref_count_ == 0) {
137 // Lazily create TSFN when first needed (0 → 1)
138 tsfn_ = Napi::ThreadSafeFunction::New(env,
139 js_callback_.Value(), // The actual JS function to call
140 "ShmResponseCallback", // Resource name for debugging
141 0, // Unlimited queue size
142 1 // Initial thread count (must be >= 1)
143 );
144 }
145
146 ref_count_++;
147 return env.Undefined();
148}
149
150Napi::Value MsgpackClientAsync::release(const Napi::CallbackInfo& info)
151{
153
154 ref_count_--;
155
156 if (ref_count_ == 0) {
157 // Destroy TSFN when no longer needed (1 → 0)
158 // This releases the initial reference, bringing ref count to 0
159 tsfn_.Release();
160 }
161
162 return info.Env().Undefined();
163}
164
165Napi::Function MsgpackClientAsync::get_class(Napi::Env env)
166{
167 return DefineClass(
168 env,
169 "MsgpackClientAsync",
170 {
171 MsgpackClientAsync::InstanceMethod("setResponseCallback", &MsgpackClientAsync::setResponseCallback),
172 MsgpackClientAsync::InstanceMethod("call", &MsgpackClientAsync::call),
173 MsgpackClientAsync::InstanceMethod("acquire", &MsgpackClientAsync::acquire),
174 MsgpackClientAsync::InstanceMethod("release", &MsgpackClientAsync::release),
175 });
176}
static std::unique_ptr< IpcClient > create_mpsc_shm(const std::string &base_name, size_t client_id)
static std::unique_ptr< IpcClient > create_shm(const std::string &base_name)
static Napi::Function get_class(Napi::Env env)
void poll_responses()
Background thread function that polls for responses.
Napi::Value call(const Napi::CallbackInfo &info)
Send a msgpack buffer asynchronously.
Napi::Value release(const Napi::CallbackInfo &info)
Release a reference to allow the event loop to exit Called by TypeScript when there are no pending ca...
Napi::Value setResponseCallback(const Napi::CallbackInfo &info)
Set the JavaScript callback to be invoked when responses arrive.
std::unique_ptr< bb::ipc::IpcClient > client_
Napi::Value acquire(const Napi::CallbackInfo &info)
Acquire a reference to keep the event loop alive Called by TypeScript when there are pending callback...
#define info(...)
Definition log.hpp:93
const std::vector< MemoryValue > data
constexpr decltype(auto) get(::tuplet::tuple< T... > &&t) noexcept
Definition tuple.hpp:13