diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-07-17 12:06:31 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-07-17 12:06:31 -0300 |
commit | 953d61cbf82d5f1acd68bd1ae2101d92f5ec5492 (patch) | |
tree | d4aa1fe08ad3f0a7e27b6628aad4925cd1fbfb2a /src/nvim/os | |
parent | b92630c2fff7950141630f0d62b11404d0589ece (diff) | |
parent | 4dc642aa622cfac09f2f4752907137d68d8508fe (diff) | |
download | rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.gz rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.tar.bz2 rneovim-953d61cbf82d5f1acd68bd1ae2101d92f5ec5492.zip |
Merge PR #895 'Core service providers...'
Diffstat (limited to 'src/nvim/os')
-rw-r--r-- | src/nvim/os/channel.c | 131 | ||||
-rw-r--r-- | src/nvim/os/event.c | 101 | ||||
-rw-r--r-- | src/nvim/os/event_defs.h | 3 | ||||
-rw-r--r-- | src/nvim/os/input.c | 9 | ||||
-rw-r--r-- | src/nvim/os/job.c | 32 | ||||
-rw-r--r-- | src/nvim/os/msgpack_rpc.c | 13 | ||||
-rw-r--r-- | src/nvim/os/provider.c | 215 | ||||
-rw-r--r-- | src/nvim/os/provider.h | 11 | ||||
-rw-r--r-- | src/nvim/os/rstream.c | 49 | ||||
-rw-r--r-- | src/nvim/os/signal.c | 8 | ||||
-rw-r--r-- | src/nvim/os/wstream.c | 30 |
11 files changed, 463 insertions, 139 deletions
diff --git a/src/nvim/os/channel.c b/src/nvim/os/channel.c index 504c1ca05b..d5f29aa667 100644 --- a/src/nvim/os/channel.c +++ b/src/nvim/os/channel.c @@ -6,6 +6,7 @@ #include <msgpack.h> #include "nvim/api/private/helpers.h" +#include "nvim/api/vim.h" #include "nvim/os/channel.h" #include "nvim/os/event.h" #include "nvim/os/rstream.h" @@ -17,9 +18,11 @@ #include "nvim/os/msgpack_rpc.h" #include "nvim/os/msgpack_rpc_helpers.h" #include "nvim/vim.h" +#include "nvim/ascii.h" #include "nvim/memory.h" #include "nvim/message.h" #include "nvim/map.h" +#include "nvim/log.h" #include "nvim/lib/kvec.h" typedef struct { @@ -81,7 +84,8 @@ void channel_teardown(void) /// stdin/stdout. stderr is forwarded to the editor error stream. /// /// @param argv The argument vector for the process -bool channel_from_job(char **argv) +/// @return The channel id +uint64_t channel_from_job(char **argv) { Channel *channel = register_channel(); channel->is_job = true; @@ -91,17 +95,17 @@ bool channel_from_job(char **argv) channel, job_out, job_err, - job_exit, + NULL, true, 0, &status); if (status <= 0) { close_channel(channel); - return false; + return 0; } - return true; + return channel->id; } /// Creates an API channel from a libuv stream representing a tcp or @@ -114,7 +118,7 @@ void channel_from_stream(uv_stream_t *stream) stream->data = NULL; channel->is_job = false; // read stream - channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, true); + channel->data.streams.read = rstream_new(parse_msgpack, 1024, channel, NULL); rstream_set_stream(channel->data.streams.read, stream); rstream_start(channel->data.streams.read); // write stream @@ -123,6 +127,13 @@ void channel_from_stream(uv_stream_t *stream) channel->data.streams.uv = stream; } +bool channel_exists(uint64_t id) +{ + Channel *channel; + return (channel = pmap_get(uint64_t)(channels, id)) != NULL + && channel->enabled; +} + /// Sends event/data to channel /// /// @param id The channel id. If 0, the event will be sent to all @@ -135,7 +146,7 @@ bool channel_send_event(uint64_t id, char *name, Object arg) Channel *channel = NULL; if (id > 0) { - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { msgpack_rpc_free_object(arg); return false; } @@ -155,7 +166,7 @@ bool channel_send_call(uint64_t id, { Channel *channel = NULL; - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { msgpack_rpc_free_object(arg); return false; } @@ -170,22 +181,18 @@ bool channel_send_call(uint64_t id, "while processing a RPC call", channel->id); *result = STRING_OBJ(cstr_to_string(buf)); + msgpack_rpc_free_object(arg); + return false; } uint64_t request_id = channel->next_request_id++; // Send the msgpack-rpc request send_request(channel, request_id, name, arg); - if (!kv_size(channel->call_stack)) { - // This is the first frame, we must disable event deferral for this - // channel because we won't be returning until the client sends a - // response - if (channel->is_job) { - job_set_defer(channel->data.job, false); - } else { - rstream_set_defer(channel->data.streams.read, false); - } - } + EventSource channel_source = channel->is_job + ? job_event_source(channel->data.job) + : rstream_event_source(channel->data.streams.read); + EventSource sources[] = {channel_source, NULL}; // Push the frame ChannelCallFrame frame = {request_id, false, NIL}; @@ -193,24 +200,18 @@ bool channel_send_call(uint64_t id, size_t size = kv_size(channel->call_stack); do { - event_poll(-1); + event_poll(-1, sources); } while ( // Continue running if ... channel->enabled && // the channel is still enabled kv_size(channel->call_stack) >= size); // the call didn't return - if (!kv_size(channel->call_stack)) { - // Popped last frame, restore event deferral - if (channel->is_job) { - job_set_defer(channel->data.job, true); - } else { - rstream_set_defer(channel->data.streams.read, true); - } - if (!channel->enabled && !channel->rpc_call_level) { + if (!(kv_size(channel->call_stack) + || channel->enabled + || channel->rpc_call_level)) { // Close the channel if it has been disabled and we have not been called // by `parse_msgpack`(It would be unsafe to close the channel otherwise) close_channel(channel); - } } *errored = frame.errored; @@ -227,7 +228,7 @@ void channel_subscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { abort(); } @@ -249,7 +250,7 @@ void channel_unsubscribe(uint64_t id, char *event) { Channel *channel; - if (!(channel = pmap_get(uint64_t)(channels, id))) { + if (!(channel = pmap_get(uint64_t)(channels, id)) || !channel->enabled) { abort(); } @@ -264,12 +265,15 @@ static void job_out(RStream *rstream, void *data, bool eof) static void job_err(RStream *rstream, void *data, bool eof) { - // TODO(tarruda): plugin error messages should be sent to the error buffer -} - -static void job_exit(Job *job, void *data) -{ - // TODO(tarruda): what should be done here? + size_t count; + char buf[256]; + Channel *channel = job_data(data); + + while ((count = rstream_available(rstream))) { + size_t read = rstream_read(rstream, buf, sizeof(buf) - 1); + buf[read] = NUL; + ELOG("Channel %" PRIu64 " stderr: %s", channel->id, buf); + } } static void parse_msgpack(RStream *rstream, void *data, bool eof) @@ -283,12 +287,15 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "Before returning from a RPC call, channel %" PRIu64 " was " "closed by the client", channel->id); - disable_channel(channel, buf); + call_set_error(channel, buf); return; } channel->rpc_call_level++; uint32_t count = rstream_available(rstream); + DLOG("Feeding the msgpack parser with %u bytes of data from RStream(%p)", + count, + rstream); // Feed the unpacker with data msgpack_unpacker_reserve_buffer(channel->unpacker, count); @@ -313,7 +320,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) " a matching id for the current RPC call. Ensure the client " " is properly synchronized", channel->id); - call_stack_unwind(channel, buf, 1); + call_set_error(channel, buf); } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration @@ -366,7 +373,7 @@ static bool channel_write(Channel *channel, WBuffer *buffer) "Before returning from a RPC call, channel %" PRIu64 " was " "closed due to a failed write", channel->id); - disable_channel(channel, buf); + call_set_error(channel, buf); } return success; @@ -383,7 +390,7 @@ static void send_request(Channel *channel, Object arg) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, arg, &out_buffer)); + channel_write(channel, serialize_request(id, method, arg, &out_buffer, 1)); } static void send_event(Channel *channel, @@ -391,7 +398,7 @@ static void send_event(Channel *channel, Object arg) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, arg, &out_buffer)); + channel_write(channel, serialize_request(0, method, arg, &out_buffer, 1)); } static void broadcast_event(char *name, Object arg) @@ -412,7 +419,11 @@ static void broadcast_event(char *name, Object arg) } String method = {.size = strlen(name), .data = name}; - WBuffer *buffer = serialize_request(0, method, arg, &out_buffer); + WBuffer *buffer = serialize_request(0, + method, + arg, + &out_buffer, + kv_size(subscribed)); for (size_t i = 0; i < kv_size(subscribed); i++) { channel_write(kv_A(subscribed, i), buffer); @@ -443,6 +454,15 @@ static void close_channel(Channel *channel) pmap_del(uint64_t)(channels, channel->id); msgpack_unpacker_free(channel->unpacker); + // Unsubscribe from all events + char *event_string; + map_foreach_value(channel->subscribed_events, event_string, { + unsubscribe(channel, event_string); + }); + + pmap_free(cstr_t)(channel->subscribed_events); + kv_destroy(channel->call_stack); + if (channel->is_job) { if (channel->data.job) { job_stop(channel->data.job); @@ -453,14 +473,6 @@ static void close_channel(Channel *channel) uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); } - // Unsubscribe from all events - char *event_string; - map_foreach_value(channel->subscribed_events, event_string, { - unsubscribe(channel, event_string); - }); - - pmap_free(cstr_t)(channel->subscribed_events); - kv_destroy(channel->call_stack); free(channel); } @@ -503,10 +515,8 @@ static bool is_valid_rpc_response(msgpack_object *obj, Channel *channel) static void call_stack_pop(msgpack_object *obj, Channel *channel) { - ChannelCallFrame *frame = kv_A(channel->call_stack, - kv_size(channel->call_stack) - 1); + ChannelCallFrame *frame = kv_pop(channel->call_stack); frame->errored = obj->via.array.ptr[2].type != MSGPACK_OBJECT_NIL; - (void)kv_pop(channel->call_stack); if (frame->errored) { msgpack_rpc_to_object(&obj->via.array.ptr[2], &frame->result); @@ -515,24 +525,13 @@ static void call_stack_pop(msgpack_object *obj, Channel *channel) } } -static void call_stack_unwind(Channel *channel, char *msg, int count) +static void call_set_error(Channel *channel, char *msg) { - while (kv_size(channel->call_stack) && count--) { + for (size_t i = 0; i < kv_size(channel->call_stack); i++) { ChannelCallFrame *frame = kv_pop(channel->call_stack); frame->errored = true; frame->result = STRING_OBJ(cstr_to_string(msg)); } -} -static void disable_channel(Channel *channel, char *msg) -{ - if (kv_size(channel->call_stack)) { - // Channel is currently in the middle of a call, remove all frames and mark - // it as "dead" - channel->enabled = false; - call_stack_unwind(channel, msg, -1); - } else { - // Safe to close it now - close_channel(channel); - } + channel->enabled = false; } diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c index 4e091716b2..a460b2db96 100644 --- a/src/nvim/os/event.c +++ b/src/nvim/os/event.c @@ -9,6 +9,7 @@ #include "nvim/os/input.h" #include "nvim/os/channel.h" #include "nvim/os/server.h" +#include "nvim/os/provider.h" #include "nvim/os/signal.h" #include "nvim/os/rstream.h" #include "nvim/os/job.h" @@ -32,6 +33,11 @@ typedef struct { # include "os/event.c.generated.h" #endif static klist_t(Event) *deferred_events, *immediate_events; +// NULL-terminated array of event sources that we should process immediately. +// +// Events from sources that are not contained in this array are processed +// later when `event_process` is called +static EventSource *immediate_sources = NULL; void event_init(void) { @@ -50,6 +56,8 @@ void event_init(void) channel_init(); // Servers server_init(); + // Providers + provider_init(); } void event_teardown(void) @@ -60,7 +68,8 @@ void event_teardown(void) } // Wait for some event -bool event_poll(int32_t ms) +bool event_poll(int32_t ms, EventSource sources[]) + FUNC_ATTR_NONNULL_ARG(2) { uv_run_mode run_mode = UV_RUN_ONCE; @@ -91,16 +100,15 @@ bool event_poll(int32_t ms) run_mode = UV_RUN_NOWAIT; } - bool events_processed; + size_t processed_events; do { // Run one event loop iteration, blocking for events if run_mode is // UV_RUN_ONCE - uv_run(uv_default_loop(), run_mode); - events_processed = event_process(false); + processed_events = loop(run_mode, sources); } while ( // Continue running if ... - !events_processed && // we didn't process any immediate events + !processed_events && // we didn't process any immediate events !event_has_deferred() && // no events are waiting to be processed run_mode != UV_RUN_NOWAIT && // ms != 0 !timer_data.timed_out); // we didn't get a timeout @@ -115,32 +123,49 @@ bool event_poll(int32_t ms) // once more to let libuv perform it's cleanup uv_close((uv_handle_t *)&timer, NULL); uv_close((uv_handle_t *)&timer_prepare, NULL); - uv_run(uv_default_loop(), UV_RUN_NOWAIT); - event_process(false); + processed_events += loop(UV_RUN_NOWAIT, sources); } - return !timer_data.timed_out && (events_processed || event_has_deferred()); + return !timer_data.timed_out && (processed_events || event_has_deferred()); } bool event_has_deferred(void) { - return !kl_empty(get_queue(true)); + return !kl_empty(deferred_events); } -// Push an event to the queue -void event_push(Event event, bool deferred) +// Queue an event +void event_push(Event event) { - *kl_pushp(Event, get_queue(deferred)) = event; + bool defer = true; + + if (immediate_sources) { + size_t i; + EventSource src; + + for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) { + if (src == event.source) { + defer = false; + break; + } + } + } + + *kl_pushp(Event, defer ? deferred_events : immediate_events) = event; +} + +void event_process(void) +{ + process_from(deferred_events); } // Runs the appropriate action for each queued event -bool event_process(bool deferred) +static size_t process_from(klist_t(Event) *queue) { - bool processed_events = false; + size_t count = 0; Event event; - while (kl_shift(Event, get_queue(deferred), &event) == 0) { - processed_events = true; + while (kl_shift(Event, queue, &event) == 0) { switch (event.type) { case kEventSignal: signal_handle(event); @@ -154,9 +179,12 @@ bool event_process(bool deferred) default: abort(); } + count++; } - return processed_events; + DLOG("Processed %u events", count); + + return count; } // Set a flag in the `event_poll` loop for signaling of a timeout @@ -174,7 +202,42 @@ static void timer_prepare_cb(uv_prepare_t *handle) uv_prepare_stop(handle); } -static klist_t(Event) *get_queue(bool deferred) +static void requeue_deferred_events(void) { - return deferred ? deferred_events : immediate_events; + size_t remaining = deferred_events->size; + + DLOG("Number of deferred events: %u", remaining); + + while (remaining--) { + // Re-push each deferred event to ensure it will be in the right queue + Event event; + kl_shift(Event, deferred_events, &event); + event_push(event); + DLOG("Re-queueing event"); + } + + DLOG("Number of deferred events: %u", deferred_events->size); +} + +static size_t loop(uv_run_mode run_mode, EventSource *sources) +{ + size_t count; + immediate_sources = sources; + // It's possible that some events from the immediate sources are waiting + // in the deferred queue. If so, move them to the immediate queue so they + // will be processed in order of arrival by the next `process_from` call. + requeue_deferred_events(); + count = process_from(immediate_events); + + if (count) { + // No need to enter libuv, events were already processed + return count; + } + + DLOG("Enter event loop"); + uv_run(uv_default_loop(), run_mode); + DLOG("Exit event loop"); + immediate_sources = NULL; + count = process_from(immediate_events); + return count; } diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h index ca2cabd75a..dbee3e2ba7 100644 --- a/src/nvim/os/event_defs.h +++ b/src/nvim/os/event_defs.h @@ -6,6 +6,8 @@ #include "nvim/os/job_defs.h" #include "nvim/os/rstream_defs.h" +typedef void * EventSource; + typedef enum { kEventSignal, kEventRStreamData, @@ -13,6 +15,7 @@ typedef enum { } EventType; typedef struct { + EventSource source; EventType type; union { int signum; diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c index 58bdf0cf52..15aebdbf3d 100644 --- a/src/nvim/os/input.c +++ b/src/nvim/os/input.c @@ -34,7 +34,7 @@ static bool eof = false, started_reading = false; void input_init(void) { - read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, false); + read_stream = rstream_new(read_cb, READ_BUFFER_SIZE, NULL, NULL); rstream_set_file(read_stream, read_cmd_fd); } @@ -129,7 +129,12 @@ bool os_isatty(int fd) static bool input_poll(int32_t ms) { - return input_ready() || event_poll(ms) || input_ready(); + EventSource input_sources[] = { + rstream_event_source(read_stream), + NULL + }; + + return input_ready() || event_poll(ms, input_sources) || input_ready(); } // This is a replacement for the old `WaitForChar` function in os_unix.c diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index d2f9c10981..203aa2c990 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -214,8 +214,8 @@ Job *job_start(char **argv, job->in = wstream_new(maxmem); wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin); // Start the readable streams - job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); - job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, defer); + job->out = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job)); + job->err = rstream_new(read_cb, JOB_BUFFER_SIZE, job, job_event_source(job)); rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout); rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr); rstream_start(job->out); @@ -269,18 +269,6 @@ bool job_write(Job *job, WBuffer *buffer) return wstream_write(job->in, buffer); } -/// Sets the `defer` flag for a Job instance -/// -/// @param rstream The Job id -/// @param defer The new value for the flag -void job_set_defer(Job *job, bool defer) -{ - job->defer = defer; - rstream_set_defer(job->out, defer); - rstream_set_defer(job->err, defer); -} - - /// Runs the read callback associated with the job exit event /// /// @param event Object containing data necessary to invoke the callback @@ -307,6 +295,11 @@ void *job_data(Job *job) return job->data; } +EventSource job_event_source(Job *job) +{ + return job; +} + static void job_exit_callback(Job *job) { // Free the slot now, 'exit_cb' may want to start another job to replace @@ -391,10 +384,12 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) static void emit_exit_event(Job *job) { - Event event; - event.type = kEventJobExit; - event.data.job = job; - event_push(event, true); + Event event = { + .source = job_event_source(job), + .type = kEventJobExit, + .data.job = job + }; + event_push(event); } static void close_cb(uv_handle_t *handle) @@ -408,7 +403,6 @@ static void close_cb(uv_handle_t *handle) rstream_free(job->err); wstream_free(job->in); shell_free_argv(job->proc_opts.args); - free(job->data); free(job); } } diff --git a/src/nvim/os/msgpack_rpc.c b/src/nvim/os/msgpack_rpc.c index 85569372da..c03d8dccca 100644 --- a/src/nvim/os/msgpack_rpc.c +++ b/src/nvim/os/msgpack_rpc.c @@ -1,9 +1,11 @@ #include <stdint.h> #include <stdbool.h> +#include <inttypes.h> #include <msgpack.h> #include "nvim/vim.h" +#include "nvim/log.h" #include "nvim/memory.h" #include "nvim/os/wstream.h" #include "nvim/os/msgpack_rpc.h" @@ -51,9 +53,14 @@ WBuffer *msgpack_rpc_call(uint64_t channel_id, msgpack_packer_init(&response, sbuffer, msgpack_sbuffer_write); if (error.set) { + ELOG("Error dispatching msgpack-rpc call: %s(request: id %" PRIu64 ")", + error.msg, + response_id); return serialize_response(response_id, error.msg, NIL, sbuffer); } + DLOG("Successfully completed mspgack-rpc call(request id: %" PRIu64 ")", + response_id); return serialize_response(response_id, NULL, rv, sbuffer); } @@ -113,7 +120,8 @@ void msgpack_rpc_error(char *msg, msgpack_packer *res) WBuffer *serialize_request(uint64_t request_id, String method, Object arg, - msgpack_sbuffer *sbuffer) + msgpack_sbuffer *sbuffer, + size_t refcount) FUNC_ATTR_NONNULL_ARG(4) { msgpack_packer pac; @@ -130,6 +138,7 @@ WBuffer *serialize_request(uint64_t request_id, msgpack_rpc_from_object(arg, &pac); WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), sbuffer->size, + refcount, free); msgpack_rpc_free_object(arg); msgpack_sbuffer_clear(sbuffer); @@ -165,6 +174,7 @@ WBuffer *serialize_response(uint64_t response_id, WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), sbuffer->size, + 1, // responses only go though 1 channel free); msgpack_rpc_free_object(arg); msgpack_sbuffer_clear(sbuffer); @@ -190,6 +200,7 @@ WBuffer *serialize_metadata(uint64_t id, msgpack_pack_raw_body(&pac, msgpack_metadata, msgpack_metadata_size); WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), sbuffer->size, + 1, free); msgpack_sbuffer_clear(sbuffer); return rv; diff --git a/src/nvim/os/provider.c b/src/nvim/os/provider.c new file mode 100644 index 0000000000..2e42cbb8f2 --- /dev/null +++ b/src/nvim/os/provider.c @@ -0,0 +1,215 @@ +#include <stdint.h> +#include <inttypes.h> +#include <stdbool.h> +#include <assert.h> + +#include "nvim/os/provider.h" +#include "nvim/memory.h" +#include "nvim/api/vim.h" +#include "nvim/api/private/helpers.h" +#include "nvim/api/private/defs.h" +#include "nvim/os/channel.h" +#include "nvim/os/shell.h" +#include "nvim/os/os.h" +#include "nvim/log.h" +#include "nvim/map.h" +#include "nvim/message.h" +#include "nvim/os/msgpack_rpc_helpers.h" + +#define FEATURE_COUNT (sizeof(features) / sizeof(features[0])) + +#define FEATURE(feature_name, provider_bootstrap_command, ...) { \ + .name = feature_name, \ + .bootstrap_command = provider_bootstrap_command, \ + .argv = NULL, \ + .channel_id = 0, \ + .methods = (char *[]){__VA_ARGS__, NULL} \ +} + +static struct feature { + char *name, **bootstrap_command, **argv, **methods; + size_t name_length; + uint64_t channel_id; +} features[] = { + FEATURE("python", + &p_ipy, + "python_execute", + "python_execute_file", + "python_do_range", + "python_eval"), + + FEATURE("clipboard", + &p_icpb, + "clipboard_get", + "clipboard_set") +}; + +static Map(cstr_t, uint64_t) *registered_providers = NULL; + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/provider.c.generated.h" +#endif + + +void provider_init(void) +{ + registered_providers = map_new(cstr_t, uint64_t)(); +} + +bool provider_has_feature(char *name) +{ + for (size_t i = 0; i < FEATURE_COUNT; i++) { + struct feature *f = &features[i]; + if (!STRICMP(name, f->name)) { + return f->channel_id || can_execute(f); + } + } + + return false; +} + +bool provider_available(char *method) +{ + return map_has(cstr_t, uint64_t)(registered_providers, method); +} + +bool provider_register(char *method, uint64_t channel_id) +{ + if (map_has(cstr_t, uint64_t)(registered_providers, method)) { + return false; + } + + // First check if this method is part of a feature, and if so, update + // the feature structure with the channel id + struct feature *f = get_feature_for(method); + if (f) { + DLOG("Registering provider for \"%s\" " + "which is part of the \"%s\" feature", + method, + f->name); + f->channel_id = channel_id; + } + + map_put(cstr_t, uint64_t)(registered_providers, xstrdup(method), channel_id); + ILOG("Registered channel %" PRIu64 " as the provider for \"%s\"", + channel_id, + method); + + return true; +} + +Object provider_call(char *method, Object arg) +{ + uint64_t channel_id = get_provider_for(method); + + if (!channel_id) { + char buf[256]; + snprintf(buf, + sizeof(buf), + "Provider for \"%s\" is not available", + method); + report_error(buf); + return NIL; + } + + bool error = false; + Object result = NIL; + channel_send_call(channel_id, method, arg, &result, &error); + + if (error) { + report_error(result.data.string.data); + msgpack_rpc_free_object(result); + return NIL; + } + + return result; +} + +static uint64_t get_provider_for(char *method) +{ + uint64_t channel_id = map_get(cstr_t, uint64_t)(registered_providers, method); + + if (channel_id) { + return channel_id; + } + + // Try to bootstrap if the method is part of a feature + struct feature *f = get_feature_for(method); + + if (!f || !can_execute(f)) { + ELOG("Cannot bootstrap provider for \"%s\"", method); + goto err; + } + + if (f->channel_id) { + ELOG("Already bootstrapped provider for \"%s\"", f->name); + goto err; + } + + f->channel_id = channel_from_job(f->argv); + + if (!f->channel_id) { + ELOG("The provider for \"%s\" failed to bootstrap", f->name); + goto err; + } + + return f->channel_id; + +err: + // Ensure we won't try to restart the provider + f->bootstrap_command = NULL; + f->channel_id = 0; + return 0; +} + +static bool can_execute(struct feature *f) +{ + if (!f->bootstrap_command) { + return false; + } + + char *cmd = *f->bootstrap_command; + + if (!cmd || !strlen(cmd)) { + return false; + } + + if (!f->argv) { + f->argv = shell_build_argv((uint8_t *)cmd, NULL); + } + + return os_can_exe((uint8_t *)f->argv[0]); +} + +static void report_error(char *str) +{ + vim_err_write((String) {.data = str, .size = strlen(str)}); + vim_err_write((String) {.data = "\n", .size = 1}); +} + +static bool feature_has_method(struct feature *f, char *method) +{ + size_t i; + char *m; + + for (m = f->methods[i = 0]; m; m = f->methods[++i]) { + if (!STRCMP(method, m)) { + return true; + } + } + + return false; +} + + +static struct feature *get_feature_for(char *method) +{ + for (size_t i = 0; i < FEATURE_COUNT; i++) { + struct feature *f = &features[i]; + if (feature_has_method(f, method)) { + return f; + } + } + + return NULL; +} diff --git a/src/nvim/os/provider.h b/src/nvim/os/provider.h new file mode 100644 index 0000000000..c6f12e02dd --- /dev/null +++ b/src/nvim/os/provider.h @@ -0,0 +1,11 @@ +#ifndef NVIM_OS_PROVIDER_H +#define NVIM_OS_PROVIDER_H + +#include "nvim/api/private/defs.h" + +#ifdef INCLUDE_GENERATED_DECLARATIONS +# include "os/provider.h.generated.h" +#endif + +#endif // NVIM_OS_PROVIDER_H + diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c index 5286599586..d7ab5b8a64 100644 --- a/src/nvim/os/rstream.c +++ b/src/nvim/os/rstream.c @@ -26,7 +26,8 @@ struct rstream { uv_file fd; rstream_cb cb; size_t buffer_size, rpos, wpos, fpos; - bool reading, free_handle, defer; + bool reading, free_handle; + EventSource source_override; }; #ifdef INCLUDE_GENERATED_DECLARATIONS @@ -40,25 +41,25 @@ struct rstream { /// for reading with `rstream_read` /// @param buffer_size Size in bytes of the internal buffer. /// @param data Some state to associate with the `RStream` instance -/// @param defer Flag that specifies if callback invocation should be deferred -/// to vim main loop(as a KE_EVENT special key) +/// @param source_override Replacement for the default source used in events +/// emitted by this RStream. If NULL, the default is used. /// @return The newly-allocated `RStream` instance RStream * rstream_new(rstream_cb cb, size_t buffer_size, void *data, - bool defer) + EventSource source_override) { RStream *rv = xmalloc(sizeof(RStream)); rv->buffer = xmalloc(buffer_size); rv->buffer_size = buffer_size; rv->data = data; - rv->defer = defer; rv->cb = cb; rv->rpos = rv->wpos = rv->fpos = 0; rv->stream = NULL; rv->fread_idle = NULL; rv->free_handle = false; rv->file_type = UV_UNKNOWN_HANDLE; + rv->source_override = source_override ? source_override : rv; return rv; } @@ -213,15 +214,6 @@ size_t rstream_available(RStream *rstream) return rstream->wpos - rstream->rpos; } -/// Sets the `defer` flag for a a RStream instance -/// -/// @param rstream The RStream instance -/// @param defer The new value for the flag -void rstream_set_defer(RStream *rstream, bool defer) -{ - rstream->defer = defer; -} - /// Runs the read callback associated with the rstream /// /// @param event Object containing data necessary to invoke the callback @@ -232,6 +224,11 @@ void rstream_read_event(Event event) rstream->cb(rstream, rstream->data, event.data.rstream.eof); } +EventSource rstream_event_source(RStream *rstream) +{ + return rstream->source_override; +} + // Callbacks used by libuv // Called by libuv to allocate memory for reading. @@ -260,6 +257,9 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) if (cnt <= 0) { if (cnt != UV_ENOBUFS) { + DLOG("Closing RStream(address: %p, source: %p)", + rstream, + rstream_event_source(rstream)); // Read error or EOF, either way stop the stream and invoke the callback // with eof == true uv_read_stop(stream); @@ -274,10 +274,17 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf) // Data was already written, so all we need is to update 'wpos' to reflect // the space actually used in the buffer. rstream->wpos += nread; + DLOG("Received %u bytes from RStream(address: %p, source: %p)", + (size_t)cnt, + rstream, + rstream_event_source(rstream)); if (rstream->wpos == rstream->buffer_size) { // The last read filled the buffer, stop reading for now rstream_stop(rstream); + DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it", + rstream, + rstream_event_source(rstream)); } rstream->reading = false; @@ -342,9 +349,13 @@ static void close_cb(uv_handle_t *handle) static void emit_read_event(RStream *rstream, bool eof) { - Event event; - event.type = kEventRStreamData; - event.data.rstream.ptr = rstream; - event.data.rstream.eof = eof; - event_push(event, rstream->defer); + Event event = { + .source = rstream_event_source(rstream), + .type = kEventRStreamData, + .data.rstream = { + .ptr = rstream, + .eof = eof + } + }; + event_push(event); } diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c index 65657fda9c..17f270a5cc 100644 --- a/src/nvim/os/signal.c +++ b/src/nvim/os/signal.c @@ -103,6 +103,11 @@ void signal_handle(Event event) } } +EventSource signal_event_source(void) +{ + return &sint; +} + static char * signal_name(int signum) { switch (signum) { @@ -155,10 +160,11 @@ static void signal_cb(uv_signal_t *handle, int signum) } Event event = { + .source = signal_event_source(), .type = kEventSignal, .data = { .signum = signum } }; - event_push(event, true); + event_push(event); } diff --git a/src/nvim/os/wstream.c b/src/nvim/os/wstream.c index 3c4b5b6171..0978d33a10 100644 --- a/src/nvim/os/wstream.c +++ b/src/nvim/os/wstream.c @@ -42,7 +42,8 @@ typedef struct { /// Creates a new WStream instance. A WStream encapsulates all the boilerplate /// necessary for writing to a libuv stream. /// -/// @param maxmem Maximum amount memory used by this `WStream` instance. +/// @param maxmem Maximum amount memory used by this `WStream` instance. If 0, +/// a default value of 10mb will be used. /// @return The newly-allocated `WStream` instance WStream * wstream_new(size_t maxmem) { @@ -91,33 +92,33 @@ void wstream_set_stream(WStream *wstream, uv_stream_t *stream) /// @return false if the write failed bool wstream_write(WStream *wstream, WBuffer *buffer) { - WriteData *data; - uv_buf_t uvbuf; - uv_write_t *req; - // This should not be called after a wstream was freed assert(!wstream->freed); - buffer->refcount++; - if (wstream->curmem > wstream->maxmem) { goto err; } wstream->curmem += buffer->size; - data = xmalloc(sizeof(WriteData)); + + WriteData *data = xmalloc(sizeof(WriteData)); data->wstream = wstream; data->buffer = buffer; - req = xmalloc(sizeof(uv_write_t)); + + uv_write_t *req = xmalloc(sizeof(uv_write_t)); req->data = data; + + uv_buf_t uvbuf; uvbuf.base = buffer->data; uvbuf.len = buffer->size; - wstream->pending_reqs++; if (uv_write(req, wstream->stream, &uvbuf, 1, write_cb)) { + free(data); + free(req); goto err; } + wstream->pending_reqs++; return true; err: @@ -132,14 +133,19 @@ err: /// /// @param data Data stored by the WBuffer /// @param size The size of the data array +/// @param refcount The number of references for the WBuffer. This will be used +/// by WStream instances to decide when a WBuffer should be freed. /// @param cb Pointer to function that will be responsible for freeing /// the buffer data(passing 'free' will work as expected). /// @return The allocated WBuffer instance -WBuffer *wstream_new_buffer(char *data, size_t size, wbuffer_data_finalizer cb) +WBuffer *wstream_new_buffer(char *data, + size_t size, + size_t refcount, + wbuffer_data_finalizer cb) { WBuffer *rv = xmalloc(sizeof(WBuffer)); rv->size = size; - rv->refcount = 0; + rv->refcount = refcount; rv->cb = cb; rv->data = data; |