aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/os
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-07-17 12:06:31 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-07-17 12:06:31 -0300
commit953d61cbf82d5f1acd68bd1ae2101d92f5ec5492 (patch)
treed4aa1fe08ad3f0a7e27b6628aad4925cd1fbfb2a /src/nvim/os
parentb92630c2fff7950141630f0d62b11404d0589ece (diff)
parent4dc642aa622cfac09f2f4752907137d68d8508fe (diff)
downloadrneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.gz
rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.bz2
rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.zip
Merge PR #895 'Core service providers...'
Diffstat (limited to 'src/nvim/os')
-rw-r--r--src/nvim/os/channel.c131
-rw-r--r--src/nvim/os/event.c101
-rw-r--r--src/nvim/os/event_defs.h3
-rw-r--r--src/nvim/os/input.c9
-rw-r--r--src/nvim/os/job.c32
-rw-r--r--src/nvim/os/msgpack_rpc.c13
-rw-r--r--src/nvim/os/provider.c215
-rw-r--r--src/nvim/os/provider.h11
-rw-r--r--src/nvim/os/rstream.c49
-rw-r--r--src/nvim/os/signal.c8
-rw-r--r--src/nvim/os/wstream.c30
11 files changed, 463 insertions, 139 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c
index 504c1ca05b..d5f29aa667 100644
--- a/src/nvim/os/channel.c
+++ b/src/nvim/os/channel.c
@@ -6,6 +6,7 @@
#include <msgpack.h>
#include "nvim/api/private/helpers.h"
+#include "nvim/api/vim.h"
#include "nvim/os/channel.h"
#include "nvim/os/event.h"
#include "nvim/os/rstream.h"
@@ -17,9 +18,11 @@
#include "nvim/os/msgpack_rpc.h"
#include "nvim/os/msgpack_rpc_helpers.h"
#include "nvim/vim.h"
+#include "nvim/ascii.h"
#include "nvim/memory.h"
#include "nvim/message.h"
#include "nvim/map.h"
+#include "nvim/log.h"
#include "nvim/lib/kvec.h"
typedef struct {
@@ -81,7 +84,8 @@ void channel_teardown(void)
/// stdin/stdout. stderr is forwarded to the editor error stream.
///
/// @param argv The argument vector for the process
-bool channel_from_job(char **argv)
+/// @return The channel id
+uint64_t channel_from_job(char **argv)
{
Channel *channel = register_channel();
channel->is_job = true;
@@ -91,17 +95,17 @@ bool channel_from_job(char **argv)
channel,
job_out,
job_err,
- job_exit,
+ NULL,
true,
0,
&status);
if (status <= 0) {
close_channel(channel);
- return false;
+ return 0;
}
- return true;
+ return channel->id;
}
/// Creates an API channel from a libuv stream representing a tcp or
@@ -114,7 +118,7 @@ void channel_from_stream(uv_stream_t *stream)
stream->data = NULL;
channel->is_job = false;
// read stream
- channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true);
+ channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, NULL);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
@@ -123,6 +127,13 @@ void channel_from_stream(uv_stream_t *stream)
channel->data.streams.uv = stream;
}
+bool channel_exists(uint64_t id)
+{
+ Channel *channel;
+ return (channel = pmap_get(uint64_t)(channels, id)) != NULL
+ && channel->enabled;
+}
+
/// Sends event/data to channel
///
/// @param id The channel id. If 0, the event will be sent to all
@@ -135,7 +146,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg)
Channel *channel = NULL;
if (id > 0) {
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@@ -155,7 +166,7 @@ bool channel_send_call(uint64_t id,
{
Channel *channel = NULL;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
msgpack_rpc_free_object(arg);
return false;
}
@@ -170,22 +181,18 @@ bool channel_send_call(uint64_t id,
"while processing a RPC call",
channel->id);
*result = STRING_OBJ(cstr_to_string(buf));
+ msgpack_rpc_free_object(arg);
+ return false;
}
uint64_t request_id = channel->next_request_id++;
// Send the msgpack-rpc request
send_request(channel, request_id, name, arg);
- if (!kv_size(channel->call_stack)) {
- // This is the first frame, we must disable event deferral for this
- // channel because we won't be returning until the client sends a
- // response
- if (channel->is_job) {
- job_set_defer(channel->data.job, false);
- } else {
- rstream_set_defer(channel->data.streams.read, false);
- }
- }
+ EventSource channel_source = channel->is_job
+ ? job_event_source(channel->data.job)
+ : rstream_event_source(channel->data.streams.read);
+ EventSource sources[] = {channel_source, NULL};
// Push the frame
ChannelCallFrame frame = {request_id, false, NIL};
@@ -193,24 +200,18 @@ bool channel_send_call(uint64_t id,
size_t size = kv_size(channel->call_stack);
do {
- event_poll(-1);
+ event_poll(-1, sources);
} while (
// Continue running if ...
channel->enabled && // the channel is still enabled
kv_size(channel->call_stack) >= size); // the call didn't return
- if (!kv_size(channel->call_stack)) {
- // Popped last frame, restore event deferral
- if (channel->is_job) {
- job_set_defer(channel->data.job, true);
- } else {
- rstream_set_defer(channel->data.streams.read, true);
- }
- if (!channel->enabled && !channel->rpc_call_level) {
+ if (!(kv_size(channel->call_stack)
+ || channel->enabled
+ || channel->rpc_call_level)) {
// Close the channel if it has been disabled and we have not been called
// by `parse_msgpack`(It would be unsafe to close the channel otherwise)
close_channel(channel);
- }
}
*errored = frame.errored;
@@ -227,7 +228,7 @@ void channel_subscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@@ -249,7 +250,7 @@ void channel_unsubscribe(uint64_t id, char *event)
{
Channel *channel;
- if (!(channel = pmap_get(uint64_t)(channels, id))) {
+ if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) {
abort();
}
@@ -264,12 +265,15 @@ static void job_out(RStream *rstream, void *data, bool eof)
static void job_err(RStream *rstream, void *data, bool eof)
{
- // TODO(tarruda): plugin error messages should be sent to the error buffer
-}
-
-static void job_exit(Job *job, void *data)
-{
- // TODO(tarruda): what should be done here?
+ size_t count;
+ char buf[256];
+ Channel *channel = job_data(data);
+
+ while ((count = rstream_available(rstream))) {
+ size_t read = rstream_read(rstream, buf, sizeof(buf) - 1);
+ buf[read] = NUL;
+ ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf);
+ }
}
static void parse_msgpack(RStream *rstream, void *data, bool eof)
@@ -283,12 +287,15 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed by the client",
channel->id);
- disable_channel(channel, buf);
+ call_set_error(channel, buf);
return;
}
channel->rpc_call_level++;
uint32_t count = rstream_available(rstream);
+ DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)",
+ count,
+ rstream);
// Feed the unpacker with data
msgpack_unpacker_reserve_buffer(channel->unpacker, count);
@@ -313,7 +320,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
" a matching id for the current RPC call. Ensure the client "
" is properly synchronized",
channel->id);
- call_stack_unwind(channel, buf, 1);
+ call_set_error(channel, buf);
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
@@ -366,7 +373,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer)
"Before returning from a RPC call, channel %" PRIu64 " was "
"closed due to a failed write",
channel->id);
- disable_channel(channel, buf);
+ call_set_error(channel, buf);
}
return success;
@@ -383,7 +390,7 @@ static void send_request(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(id, method, arg, &out_buffer));
+ channel_write(channel, serialize_request(id, method, arg, &out_buffer, 1));
}
static void send_event(Channel *channel,
@@ -391,7 +398,7 @@ static void send_event(Channel *channel,
Object arg)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(0, method, arg, &out_buffer));
+ channel_write(channel, serialize_request(0, method, arg, &out_buffer, 1));
}
static void broadcast_event(char *name, Object arg)
@@ -412,7 +419,11 @@ static void broadcast_event(char *name, Object arg)
}
String method = {.size = strlen(name), .data = name};
- WBuffer *buffer = serialize_request(0, method, arg, &out_buffer);
+ WBuffer *buffer = serialize_request(0,
+ method,
+ arg,
+ &out_buffer,
+ kv_size(subscribed));
for (size_t i = 0; i < kv_size(subscribed); i++) {
channel_write(kv_A(subscribed, i), buffer);
@@ -443,6 +454,15 @@ static void close_channel(Channel *channel)
pmap_del(uint64_t)(channels, channel->id);
msgpack_unpacker_free(channel->unpacker);
+ // Unsubscribe from all events
+ char *event_string;
+ map_foreach_value(channel->subscribed_events, event_string, {
+ unsubscribe(channel, event_string);
+ });
+
+ pmap_free(cstr_t)(channel->subscribed_events);
+ kv_destroy(channel->call_stack);
+
if (channel->is_job) {
if (channel->data.job) {
job_stop(channel->data.job);
@@ -453,14 +473,6 @@ static void close_channel(Channel *channel)
uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
}
- // Unsubscribe from all events
- char *event_string;
- map_foreach_value(channel->subscribed_events, event_string, {
- unsubscribe(channel, event_string);
- });
-
- pmap_free(cstr_t)(channel->subscribed_events);
- kv_destroy(channel->call_stack);
free(channel);
}
@@ -503,10 +515,8 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel)
static void call_stack_pop(msgpack_object *obj, Channel *channel)
{
- ChannelCallFrame *frame = kv_A(channel->call_stack,
- kv_size(channel->call_stack) - 1);
+ ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL;
- (void)kv_pop(channel->call_stack);
if (frame->errored) {
msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result);
@@ -515,24 +525,13 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel)
}
}
-static void call_stack_unwind(Channel *channel, char *msg, int count)
+static void call_set_error(Channel *channel, char *msg)
{
- while (kv_size(channel->call_stack) && count--) {
+ for (size_t i = 0; i < kv_size(channel->call_stack); i++) {
ChannelCallFrame *frame = kv_pop(channel->call_stack);
frame->errored = true;
frame->result = STRING_OBJ(cstr_to_string(msg));
}
-}
-static void disable_channel(Channel *channel, char *msg)
-{
- if (kv_size(channel->call_stack)) {
- // Channel is currently in the middle of a call, remove all frames and mark
- // it as "dead"
- channel->enabled = false;
- call_stack_unwind(channel, msg, -1);
- } else {
- // Safe to close it now
- close_channel(channel);
- }
+ channel->enabled = false;
}
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
index 4e091716b2..a460b2db96 100644
--- a/src/nvim/os/event.c
+++ b/src/nvim/os/event.c
@@ -9,6 +9,7 @@
#include "nvim/os/input.h"
#include "nvim/os/channel.h"
#include "nvim/os/server.h"
+#include "nvim/os/provider.h"
#include "nvim/os/signal.h"
#include "nvim/os/rstream.h"
#include "nvim/os/job.h"
@@ -32,6 +33,11 @@ typedef struct {
# include "os/event.c.generated.h"
#endif
static klist_t(Event) *deferred_events, *immediate_events;
+// NULL-terminated array of event sources that we should process immediately.
+//
+// Events from sources that are not contained in this array are processed
+// later when `event_process` is called
+static EventSource *immediate_sources = NULL;
void event_init(void)
{
@@ -50,6 +56,8 @@ void event_init(void)
channel_init();
// Servers
server_init();
+ // Providers
+ provider_init();
}
void event_teardown(void)
@@ -60,7 +68,8 @@ void event_teardown(void)
}
// Wait for some event
-bool event_poll(int32_t ms)
+bool event_poll(int32_t ms, EventSource sources[])
+ FUNC_ATTR_NONNULL_ARG(2)
{
uv_run_mode run_mode = UV_RUN_ONCE;
@@ -91,16 +100,15 @@ bool event_poll(int32_t ms)
run_mode = UV_RUN_NOWAIT;
}
- bool events_processed;
+ size_t processed_events;
do {
// Run one event loop iteration, blocking for events if run_mode is
// UV_RUN_ONCE
- uv_run(uv_default_loop(), run_mode);
- events_processed = event_process(false);
+ processed_events = loop(run_mode, sources);
} while (
// Continue running if ...
- !events_processed && // we didn't process any immediate events
+ !processed_events && // we didn't process any immediate events
!event_has_deferred() && // no events are waiting to be processed
run_mode != UV_RUN_NOWAIT && // ms != 0
!timer_data.timed_out); // we didn't get a timeout
@@ -115,32 +123,49 @@ bool event_poll(int32_t ms)
// once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
- uv_run(uv_default_loop(), UV_RUN_NOWAIT);
- event_process(false);
+ processed_events += loop(UV_RUN_NOWAIT, sources);
}
- return !timer_data.timed_out && (events_processed || event_has_deferred());
+ return !timer_data.timed_out && (processed_events || event_has_deferred());
}
bool event_has_deferred(void)
{
- return !kl_empty(get_queue(true));
+ return !kl_empty(deferred_events);
}
-// Push an event to the queue
-void event_push(Event event, bool deferred)
+// Queue an event
+void event_push(Event event)
{
- *kl_pushp(Event, get_queue(deferred)) = event;
+ bool defer = true;
+
+ if (immediate_sources) {
+ size_t i;
+ EventSource src;
+
+ for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) {
+ if (src == event.source) {
+ defer = false;
+ break;
+ }
+ }
+ }
+
+ *kl_pushp(Event, defer ? deferred_events : immediate_events) = event;
+}
+
+void event_process(void)
+{
+ process_from(deferred_events);
}
// Runs the appropriate action for each queued event
-bool event_process(bool deferred)
+static size_t process_from(klist_t(Event) *queue)
{
- bool processed_events = false;
+ size_t count = 0;
Event event;
- while (kl_shift(Event, get_queue(deferred), &event) == 0) {
- processed_events = true;
+ while (kl_shift(Event, queue, &event) == 0) {
switch (event.type) {
case kEventSignal:
signal_handle(event);
@@ -154,9 +179,12 @@ bool event_process(bool deferred)
default:
abort();
}
+ count++;
}
- return processed_events;
+ DLOG("Processed %u events", count);
+
+ return count;
}
// Set a flag in the `event_poll` loop for signaling of a timeout
@@ -174,7 +202,42 @@ static void timer_prepare_cb(uv_prepare_t *handle)
uv_prepare_stop(handle);
}
-static klist_t(Event) *get_queue(bool deferred)
+static void requeue_deferred_events(void)
{
- return deferred ? deferred_events : immediate_events;
+ size_t remaining = deferred_events->size;
+
+ DLOG("Number of deferred events: %u", remaining);
+
+ while (remaining--) {
+ // Re-push each deferred event to ensure it will be in the right queue
+ Event event;
+ kl_shift(Event, deferred_events, &event);
+ event_push(event);
+ DLOG("Re-queueing event");
+ }
+
+ DLOG("Number of deferred events: %u", deferred_events->size);
+}
+
+static size_t loop(uv_run_mode run_mode, EventSource *sources)
+{
+ size_t count;
+ immediate_sources = sources;
+ // It's possible that some events from the immediate sources are waiting
+ // in the deferred queue. If so, move them to the immediate queue so they
+ // will be processed in order of arrival by the next `process_from` call.
+ requeue_deferred_events();
+ count = process_from(immediate_events);
+
+ if (count) {
+ // No need to enter libuv, events were already processed
+ return count;
+ }
+
+ DLOG("Enter event loop");
+ uv_run(uv_default_loop(), run_mode);
+ DLOG("Exit event loop");
+ immediate_sources = NULL;
+ count = process_from(immediate_events);
+ return count;
}
diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h
index ca2cabd75a..dbee3e2ba7 100644
--- a/src/nvim/os/event_defs.h
+++ b/src/nvim/os/event_defs.h
@@ -6,6 +6,8 @@
#include "nvim/os/job_defs.h"
#include "nvim/os/rstream_defs.h"
+typedef void * EventSource;
+
typedef enum {
kEventSignal,
kEventRStreamData,
@@ -13,6 +15,7 @@ typedef enum {
} EventType;
typedef struct {
+ EventSource source;
EventType type;
union {
int signum;
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index 58bdf0cf52..15aebdbf3d 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -34,7 +34,7 @@ static bool eof = false, started_reading = false;
void input_init(void)
{
- read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, false);
+ read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, NULL);
rstream_set_file(read_stream, read_cmd_fd);
}
@@ -129,7 +129,12 @@ bool os_isatty(int fd)
static bool input_poll(int32_t ms)
{
- return input_ready() || event_poll(ms) || input_ready();
+ EventSource input_sources[] = {
+ rstream_event_source(read_stream),
+ NULL
+ };
+
+ return input_ready() || event_poll(ms, input_sources) || input_ready();
}
// This is a replacement for the old `WaitForChar` function in os_unix.c
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index d2f9c10981..203aa2c990 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -214,8 +214,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
- job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
- job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer);
+ job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
+ job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job));
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -269,18 +269,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer);
}
-/// Sets the `defer` flag for a Job instance
-///
-/// @param rstream The Job id
-/// @param defer The new value for the flag
-void job_set_defer(Job *job, bool defer)
-{
- job->defer = defer;
- rstream_set_defer(job->out, defer);
- rstream_set_defer(job->err, defer);
-}
-
-
/// Runs the read callback associated with the job exit event
///
/// @param event Object containing data necessary to invoke the callback
@@ -307,6 +295,11 @@ void *job_data(Job *job)
return job->data;
}
+EventSource job_event_source(Job *job)
+{
+ return job;
+}
+
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
@@ -391,10 +384,12 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
static void emit_exit_event(Job *job)
{
- Event event;
- event.type = kEventJobExit;
- event.data.job = job;
- event_push(event, true);
+ Event event = {
+ .source = job_event_source(job),
+ .type = kEventJobExit,
+ .data.job = job
+ };
+ event_push(event);
}
static void close_cb(uv_handle_t *handle)
@@ -408,7 +403,6 @@ static void close_cb(uv_handle_t *handle)
rstream_free(job->err);
wstream_free(job->in);
shell_free_argv(job->proc_opts.args);
- free(job->data);
free(job);
}
}
diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c
index 85569372da..c03d8dccca 100644
--- a/src/nvim/os/msgpack_rpc.c
+++ b/src/nvim/os/msgpack_rpc.c
@@ -1,9 +1,11 @@
#include <stdint.h>
#include <stdbool.h>
+#include <inttypes.h>
#include <msgpack.h>
#include "nvim/vim.h"
+#include "nvim/log.h"
#include "nvim/memory.h"
#include "nvim/os/wstream.h"
#include "nvim/os/msgpack_rpc.h"
@@ -51,9 +53,14 @@ WBuffer *msgpack_rpc_call(uint64_t channel_id,
msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write);
if (error.set) {
+ ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")",
+ error.msg,
+ response_id);
return serialize_response(response_id, error.msg, NIL, sbuffer);
}
+ DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")",
+ response_id);
return serialize_response(response_id, NULL, rv, sbuffer);
}
@@ -113,7 +120,8 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res)
WBuffer *serialize_request(uint64_t request_id,
String method,
Object arg,
- msgpack_sbuffer *sbuffer)
+ msgpack_sbuffer *sbuffer,
+ size_t refcount)
FUNC_ATTR_NONNULL_ARG(4)
{
msgpack_packer pac;
@@ -130,6 +138,7 @@ WBuffer *serialize_request(uint64_t request_id,
msgpack_rpc_from_object(arg, &pac);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
+ refcount,
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
@@ -165,6 +174,7 @@ WBuffer *serialize_response(uint64_t response_id,
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
+ 1, // responses only go though 1 channel
free);
msgpack_rpc_free_object(arg);
msgpack_sbuffer_clear(sbuffer);
@@ -190,6 +200,7 @@ WBuffer *serialize_metadata(uint64_t id,
msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size);
WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
sbuffer->size,
+ 1,
free);
msgpack_sbuffer_clear(sbuffer);
return rv;
diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c
new file mode 100644
index 0000000000..2e42cbb8f2
--- /dev/null
+++ b/src/nvim/os/provider.c
@@ -0,0 +1,215 @@
+#include <stdint.h>
+#include <inttypes.h>
+#include <stdbool.h>
+#include <assert.h>
+
+#include "nvim/os/provider.h"
+#include "nvim/memory.h"
+#include "nvim/api/vim.h"
+#include "nvim/api/private/helpers.h"
+#include "nvim/api/private/defs.h"
+#include "nvim/os/channel.h"
+#include "nvim/os/shell.h"
+#include "nvim/os/os.h"
+#include "nvim/log.h"
+#include "nvim/map.h"
+#include "nvim/message.h"
+#include "nvim/os/msgpack_rpc_helpers.h"
+
+#define FEATURE_COUNT (sizeof(features) / sizeof(features[0]))
+
+#define FEATURE(feature_name, provider_bootstrap_command, ...) { \
+ .name = feature_name, \
+ .bootstrap_command = provider_bootstrap_command, \
+ .argv = NULL, \
+ .channel_id = 0, \
+ .methods = (char *[]){__VA_ARGS__, NULL} \
+}
+
+static struct feature {
+ char *name, **bootstrap_command, **argv, **methods;
+ size_t name_length;
+ uint64_t channel_id;
+} features[] = {
+ FEATURE("python",
+ &p_ipy,
+ "python_execute",
+ "python_execute_file",
+ "python_do_range",
+ "python_eval"),
+
+ FEATURE("clipboard",
+ &p_icpb,
+ "clipboard_get",
+ "clipboard_set")
+};
+
+static Map(cstr_t, uint64_t) *registered_providers = NULL;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "os/provider.c.generated.h"
+#endif
+
+
+void provider_init(void)
+{
+ registered_providers = map_new(cstr_t, uint64_t)();
+}
+
+bool provider_has_feature(char *name)
+{
+ for (size_t i = 0; i < FEATURE_COUNT; i++) {
+ struct feature *f = &features[i];
+ if (!STRICMP(name, f->name)) {
+ return f->channel_id || can_execute(f);
+ }
+ }
+
+ return false;
+}
+
+bool provider_available(char *method)
+{
+ return map_has(cstr_t, uint64_t)(registered_providers, method);
+}
+
+bool provider_register(char *method, uint64_t channel_id)
+{
+ if (map_has(cstr_t, uint64_t)(registered_providers, method)) {
+ return false;
+ }
+
+ // First check if this method is part of a feature, and if so, update
+ // the feature structure with the channel id
+ struct feature *f = get_feature_for(method);
+ if (f) {
+ DLOG("Registering provider for \"%s\" "
+ "which is part of the \"%s\" feature",
+ method,
+ f->name);
+ f->channel_id = channel_id;
+ }
+
+ map_put(cstr_t, uint64_t)(registered_providers, xstrdup(method), channel_id);
+ ILOG("Registered channel %" PRIu64 " as the provider for \"%s\"",
+ channel_id,
+ method);
+
+ return true;
+}
+
+Object provider_call(char *method, Object arg)
+{
+ uint64_t channel_id = get_provider_for(method);
+
+ if (!channel_id) {
+ char buf[256];
+ snprintf(buf,
+ sizeof(buf),
+ "Provider for \"%s\" is not available",
+ method);
+ report_error(buf);
+ return NIL;
+ }
+
+ bool error = false;
+ Object result = NIL;
+ channel_send_call(channel_id, method, arg, &result, &error);
+
+ if (error) {
+ report_error(result.data.string.data);
+ msgpack_rpc_free_object(result);
+ return NIL;
+ }
+
+ return result;
+}
+
+static uint64_t get_provider_for(char *method)
+{
+ uint64_t channel_id = map_get(cstr_t, uint64_t)(registered_providers, method);
+
+ if (channel_id) {
+ return channel_id;
+ }
+
+ // Try to bootstrap if the method is part of a feature
+ struct feature *f = get_feature_for(method);
+
+ if (!f || !can_execute(f)) {
+ ELOG("Cannot bootstrap provider for \"%s\"", method);
+ goto err;
+ }
+
+ if (f->channel_id) {
+ ELOG("Already bootstrapped provider for \"%s\"", f->name);
+ goto err;
+ }
+
+ f->channel_id = channel_from_job(f->argv);
+
+ if (!f->channel_id) {
+ ELOG("The provider for \"%s\" failed to bootstrap", f->name);
+ goto err;
+ }
+
+ return f->channel_id;
+
+err:
+ // Ensure we won't try to restart the provider
+ f->bootstrap_command = NULL;
+ f->channel_id = 0;
+ return 0;
+}
+
+static bool can_execute(struct feature *f)
+{
+ if (!f->bootstrap_command) {
+ return false;
+ }
+
+ char *cmd = *f->bootstrap_command;
+
+ if (!cmd || !strlen(cmd)) {
+ return false;
+ }
+
+ if (!f->argv) {
+ f->argv = shell_build_argv((uint8_t *)cmd, NULL);
+ }
+
+ return os_can_exe((uint8_t *)f->argv[0]);
+}
+
+static void report_error(char *str)
+{
+ vim_err_write((String) {.data = str, .size = strlen(str)});
+ vim_err_write((String) {.data = "\n", .size = 1});
+}
+
+static bool feature_has_method(struct feature *f, char *method)
+{
+ size_t i;
+ char *m;
+
+ for (m = f->methods[i = 0]; m; m = f->methods[++i]) {
+ if (!STRCMP(method, m)) {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+
+static struct feature *get_feature_for(char *method)
+{
+ for (size_t i = 0; i < FEATURE_COUNT; i++) {
+ struct feature *f = &features[i];
+ if (feature_has_method(f, method)) {
+ return f;
+ }
+ }
+
+ return NULL;
+}
diff --git a/src/nvim/os/provider.h b/src/nvim/os/provider.h
new file mode 100644
index 0000000000..c6f12e02dd
--- /dev/null
+++ b/src/nvim/os/provider.h
@@ -0,0 +1,11 @@
+#ifndef NVIM_OS_PROVIDER_H
+#define NVIM_OS_PROVIDER_H
+
+#include "nvim/api/private/defs.h"
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "os/provider.h.generated.h"
+#endif
+
+#endif // NVIM_OS_PROVIDER_H
+
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index 5286599586..d7ab5b8a64 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -26,7 +26,8 @@ struct rstream {
uv_file fd;
rstream_cb cb;
size_t buffer_size, rpos, wpos, fpos;
- bool reading, free_handle, defer;
+ bool reading, free_handle;
+ EventSource source_override;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -40,25 +41,25 @@ struct rstream {
/// for reading with `rstream_read`
/// @param buffer_size Size in bytes of the internal buffer.
/// @param data Some state to associate with the `RStream` instance
-/// @param defer Flag that specifies if callback invocation should be deferred
-/// to vim main loop(as a KE_EVENT special key)
+/// @param source_override Replacement for the default source used in events
+/// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance
RStream * rstream_new(rstream_cb cb,
size_t buffer_size,
void *data,
- bool defer)
+ EventSource source_override)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = xmalloc(buffer_size);
rv->buffer_size = buffer_size;
rv->data = data;
- rv->defer = defer;
rv->cb = cb;
rv->rpos = rv->wpos = rv->fpos = 0;
rv->stream = NULL;
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
+ rv->source_override = source_override ? source_override : rv;
return rv;
}
@@ -213,15 +214,6 @@ size_t rstream_available(RStream *rstream)
return rstream->wpos - rstream->rpos;
}
-/// Sets the `defer` flag for a a RStream instance
-///
-/// @param rstream The RStream instance
-/// @param defer The new value for the flag
-void rstream_set_defer(RStream *rstream, bool defer)
-{
- rstream->defer = defer;
-}
-
/// Runs the read callback associated with the rstream
///
/// @param event Object containing data necessary to invoke the callback
@@ -232,6 +224,11 @@ void rstream_read_event(Event event)
rstream->cb(rstream, rstream->data, event.data.rstream.eof);
}
+EventSource rstream_event_source(RStream *rstream)
+{
+ return rstream->source_override;
+}
+
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
@@ -260,6 +257,9 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
+ DLOG("Closing RStream(address: %p, source: %p)",
+ rstream,
+ rstream_event_source(rstream));
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
@@ -274,10 +274,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rstream->wpos += nread;
+ DLOG("Received %u bytes from RStream(address: %p, source: %p)",
+ (size_t)cnt,
+ rstream,
+ rstream_event_source(rstream));
if (rstream->wpos == rstream->buffer_size) {
// The last read filled the buffer, stop reading for now
rstream_stop(rstream);
+ DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
+ rstream,
+ rstream_event_source(rstream));
}
rstream->reading = false;
@@ -342,9 +349,13 @@ static void close_cb(uv_handle_t *handle)
static void emit_read_event(RStream *rstream, bool eof)
{
- Event event;
- event.type = kEventRStreamData;
- event.data.rstream.ptr = rstream;
- event.data.rstream.eof = eof;
- event_push(event, rstream->defer);
+ Event event = {
+ .source = rstream_event_source(rstream),
+ .type = kEventRStreamData,
+ .data.rstream = {
+ .ptr = rstream,
+ .eof = eof
+ }
+ };
+ event_push(event);
}
diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c
index 65657fda9c..17f270a5cc 100644
--- a/src/nvim/os/signal.c
+++ b/src/nvim/os/signal.c
@@ -103,6 +103,11 @@ void signal_handle(Event event)
}
}
+EventSource signal_event_source(void)
+{
+ return &sint;
+}
+
static char * signal_name(int signum)
{
switch (signum) {
@@ -155,10 +160,11 @@ static void signal_cb(uv_signal_t *handle, int signum)
}
Event event = {
+ .source = signal_event_source(),
.type = kEventSignal,
.data = {
.signum = signum
}
};
- event_push(event, true);
+ event_push(event);
}
diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c
index 3c4b5b6171..0978d33a10 100644
--- a/src/nvim/os/wstream.c
+++ b/src/nvim/os/wstream.c
@@ -42,7 +42,8 @@ typedef struct {
/// Creates a new WStream instance. A WStream encapsulates all the boilerplate
/// necessary for writing to a libuv stream.
///
-/// @param maxmem Maximum amount memory used by this `WStream` instance.
+/// @param maxmem Maximum amount memory used by this `WStream` instance. If 0,
+/// a default value of 10mb will be used.
/// @return The newly-allocated `WStream` instance
WStream * wstream_new(size_t maxmem)
{
@@ -91,33 +92,33 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream)
/// @return false if the write failed
bool wstream_write(WStream *wstream, WBuffer *buffer)
{
- WriteData *data;
- uv_buf_t uvbuf;
- uv_write_t *req;
-
// This should not be called after a wstream was freed
assert(!wstream->freed);
- buffer->refcount++;
-
if (wstream->curmem > wstream->maxmem) {
goto err;
}
wstream->curmem += buffer->size;
- data = xmalloc(sizeof(WriteData));
+
+ WriteData *data = xmalloc(sizeof(WriteData));
data->wstream = wstream;
data->buffer = buffer;
- req = xmalloc(sizeof(uv_write_t));
+
+ uv_write_t *req = xmalloc(sizeof(uv_write_t));
req->data = data;
+
+ uv_buf_t uvbuf;
uvbuf.base = buffer->data;
uvbuf.len = buffer->size;
- wstream->pending_reqs++;
if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) {
+ free(data);
+ free(req);
goto err;
}
+ wstream->pending_reqs++;
return true;
err:
@@ -132,14 +133,19 @@ err:
///
/// @param data Data stored by the WBuffer
/// @param size The size of the data array
+/// @param refcount The number of references for the WBuffer. This will be used
+/// by WStream instances to decide when a WBuffer should be freed.
/// @param cb Pointer to function that will be responsible for freeing
/// the buffer data(passing 'free' will work as expected).
/// @return The allocated WBuffer instance
-WBuffer *wstream_new_buffer(char *data, size_t size, wbuffer_data_finalizer cb)
+WBuffer *wstream_new_buffer(char *data,
+ size_t size,
+ size_t refcount,
+ wbuffer_data_finalizer cb)
{
WBuffer *rv = xmalloc(sizeof(WBuffer));
rv->size = size;
- rv->refcount = 0;
+ rv->refcount = refcount;
rv->cb = cb;
rv->data = data;