diff options
-rw-r--r-- | src/memory.c | 4 | ||||
-rw-r--r-- | src/memory.h | 16 | ||||
-rw-r--r-- | src/os/channel.c | 175 | ||||
-rw-r--r-- | src/os/channel.h | 29 | ||||
-rw-r--r-- | src/os/channel_defs.h | 8 | ||||
-rw-r--r-- | src/os/event.c | 4 | ||||
-rw-r--r-- | src/os/job.c | 6 | ||||
-rw-r--r-- | src/os/job.h | 3 |
8 files changed, 242 insertions, 3 deletions
diff --git a/src/memory.c b/src/memory.c index 5bc23284dc..53214d5265 100644 --- a/src/memory.c +++ b/src/memory.c @@ -214,6 +214,10 @@ char *xstrndup(const char *str, size_t len) return xmemdupz(str, p ? (size_t)(p - str) : len); } +char *xmemdup(const char *data, size_t len) +{ + return memcpy(xmalloc(len), data, len); +} /* * Avoid repeating the error message many times (they take 1 second each). diff --git a/src/memory.h b/src/memory.h index 723e7af814..0249b19464 100644 --- a/src/memory.h +++ b/src/memory.h @@ -127,6 +127,22 @@ char *xstpcpy(char *restrict dst, const char *restrict src); /// @param maxlen char *xstpncpy(char *restrict dst, const char *restrict src, size_t maxlen); +/// Duplicates a chunk of memory using xmalloc +/// +/// @see {xmalloc} +/// @param data pointer to the chunk +/// @param len size of the chunk +/// @return a pointer +char *xmemdup(const char *data, size_t len) + FUNC_ATTR_MALLOC FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_RET; + +/// Old low level memory allocation function. +/// +/// @deprecated use xmalloc() directly instead +/// @param size +/// @return pointer to allocated space. Never NULL +char_u *lalloc(long_u size, int message) FUNC_ATTR_MALLOC FUNC_ATTR_ALLOC_SIZE(1); + void do_outofmem_msg(long_u size); void free_all_mem(void); diff --git a/src/os/channel.c b/src/os/channel.c new file mode 100644 index 0000000000..2427db2a14 --- /dev/null +++ b/src/os/channel.c @@ -0,0 +1,175 @@ +#include <string.h> + +#include <uv.h> +#include <msgpack.h> + +#include "lib/klist.h" +#include "os/channel.h" +#include "os/channel_defs.h" +#include "os/rstream.h" +#include "os/rstream_defs.h" +#include "os/wstream.h" +#include "os/wstream_defs.h" +#include "os/job.h" +#include "os/job_defs.h" +#include "os/msgpack_rpc.h" +#include "vim.h" +#include "memory.h" + +typedef struct { + ChannelProtocol protocol; + bool is_job; + union { + struct { + msgpack_unpacker *unpacker; + msgpack_sbuffer *sbuffer; + } msgpack; + } proto; + union { + int job_id; + struct { + RStream *read; + WStream *write; + } streams; + } data; +} Channel; + +#define _destroy_channel(x) + +KLIST_INIT(Channel, Channel *, _destroy_channel) + +static klist_t(Channel) *channels = NULL; +static void on_job_stdout(RStream *rstream, void *data, bool eof); +static void on_job_stderr(RStream *rstream, void *data, bool eof); +static void parse_msgpack(RStream *rstream, void *data, bool eof); + +void channel_init() +{ + channels = kl_init(Channel); +} + +void channel_teardown() +{ + if (!channels) { + return; + } + + Channel *channel; + + while (kl_shift(Channel, channels, &channel) == 0) { + + switch (channel->protocol) { + case kChannelProtocolMsgpack: + msgpack_sbuffer_free(channel->proto.msgpack.sbuffer); + msgpack_unpacker_free(channel->proto.msgpack.unpacker); + break; + default: + abort(); + } + + if (channel->is_job) { + job_stop(channel->data.job_id); + } else { + rstream_free(channel->data.streams.read); + wstream_free(channel->data.streams.write); + } + } +} + +void channel_from_job(char **argv, ChannelProtocol prot) +{ + Channel *channel = xmalloc(sizeof(Channel)); + rstream_cb rcb = NULL; + + switch (prot) { + case kChannelProtocolMsgpack: + rcb = on_job_stdout; + channel->proto.msgpack.unpacker = + msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + channel->proto.msgpack.sbuffer = msgpack_sbuffer_new(); + break; + default: + abort(); + } + + channel->protocol = prot; + channel->is_job = true; + channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL); + *kl_pushp(Channel, channels) = channel; +} + +void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot) +{ + Channel *channel = xmalloc(sizeof(Channel)); + rstream_cb rcb = NULL; + + switch (prot) { + case kChannelProtocolMsgpack: + rcb = parse_msgpack; + channel->proto.msgpack.unpacker = + msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE); + channel->proto.msgpack.sbuffer = msgpack_sbuffer_new(); + break; + default: + abort(); + } + + stream->data = NULL; + channel->protocol = prot; + channel->is_job = false; + // read stream + channel->data.streams.read = rstream_new(rcb, 1024, channel, true); + rstream_set_stream(channel->data.streams.read, stream); + rstream_start(channel->data.streams.read); + // write stream + channel->data.streams.write = wstream_new(1024 * 1024); + wstream_set_stream(channel->data.streams.write, stream); + // push to channel list + *kl_pushp(Channel, channels) = channel; +} + +static void on_job_stdout(RStream *rstream, void *data, bool eof) +{ + Job *job = data; + parse_msgpack(rstream, job_data(job), eof); +} + +static void on_job_stderr(RStream *rstream, void *data, bool eof) +{ + // TODO(tarruda): plugin error messages should be sent to the error buffer +} + +static void parse_msgpack(RStream *rstream, void *data, bool eof) +{ + msgpack_unpacked unpacked; + Channel *channel = data; + uint32_t count = rstream_available(rstream); + + // Feed the unpacker with data + msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count); + rstream_read(rstream, + msgpack_unpacker_buffer(channel->proto.msgpack.unpacker), + count); + msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count); + + msgpack_unpacked_init(&unpacked); + + // Deserialize everything we can. + while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) { + // Each object is a new msgpack-rpc request and requires an empty response + msgpack_packer response; + msgpack_packer_init(&response, + channel->proto.msgpack.sbuffer, + msgpack_sbuffer_write); + // Perform the call + msgpack_rpc_call(&unpacked.data, &response); + wstream_write(channel->data.streams.write, + xmemdup(channel->proto.msgpack.sbuffer->data, + channel->proto.msgpack.sbuffer->size), + channel->proto.msgpack.sbuffer->size, + true); + + // Clear the buffer for future calls + msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer); + } +} diff --git a/src/os/channel.h b/src/os/channel.h new file mode 100644 index 0000000000..c58e91a0e5 --- /dev/null +++ b/src/os/channel.h @@ -0,0 +1,29 @@ +#ifndef NEOVIM_OS_CHANNEL_H +#define NEOVIM_OS_CHANNEL_H + +#include <uv.h> + +#include "os/channel_defs.h" + +/// Initializes the module +void channel_init(void); + +/// Teardown the module +void channel_teardown(void); + +/// Creates an API channel from a libuv stream representing a tcp or +/// pipe/socket client connection +/// +/// @param stream The established connection +/// @param prot The rpc protocol used +void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot); + +/// Creates an API channel by starting a job and connecting to its +/// stdin/stdout. stderr is forwarded to the editor error stream. +/// +/// @param argv The argument vector for the process +/// @param prot The rpc protocol used +void channel_from_job(char **argv, ChannelProtocol prot); + +#endif // NEOVIM_OS_CHANNEL_H + diff --git a/src/os/channel_defs.h b/src/os/channel_defs.h new file mode 100644 index 0000000000..27ba103fbe --- /dev/null +++ b/src/os/channel_defs.h @@ -0,0 +1,8 @@ +#ifndef NEOVIM_OS_CHANNEL_DEFS_H +#define NEOVIM_OS_CHANNEL_DEFS_H + +typedef enum { + kChannelProtocolMsgpack +} ChannelProtocol; + +#endif // NEOVIM_OS_CHANNEL_DEFS_H diff --git a/src/os/event.c b/src/os/event.c index 9c9c9c8967..a4b9d1ac5a 100644 --- a/src/os/event.c +++ b/src/os/event.c @@ -7,6 +7,7 @@ #include "lib/klist.h" #include "os/event.h" #include "os/input.h" +#include "os/channel.h" #include "os/signal.h" #include "os/rstream.h" #include "os/job.h" @@ -36,6 +37,8 @@ void event_init() signal_init(); // Jobs job_init(); + // Channels + channel_init(); uv_timer_init(uv_default_loop(), &timer); // This prepare handle that actually starts the timer uv_prepare_init(uv_default_loop(), &timer_prepare); @@ -43,6 +46,7 @@ void event_init() void event_teardown() { + channel_teardown(); job_teardown(); } diff --git a/src/os/job.c b/src/os/job.c index f0c64a27a8..7940338b70 100644 --- a/src/os/job.c +++ b/src/os/job.c @@ -250,8 +250,10 @@ void job_exit_event(Event event) // this one table[job->id - 1] = NULL; - // Invoke the exit callback - job->exit_cb(job, job->data); + if (job->exit_cb) { + // Invoke the exit callback + job->exit_cb(job, job->data); + } // Free the job resources free_job(job); diff --git a/src/os/job.h b/src/os/job.h index 37e34700dc..db147f2c24 100644 --- a/src/os/job.h +++ b/src/os/job.h @@ -28,7 +28,8 @@ void job_teardown(void); /// on stdout /// @param stderr_cb Callback that will be invoked when data is available /// on stderr -/// @param exit_cb Callback that will be invoked when the job exits +/// @param exit_cb Callback that will be invoked when the job exits. This is +/// optional. /// @return The job id if the job started successfully. If the the first item / /// of `argv`(the program) could not be executed, -1 will be returned. // 0 will be returned if the job table is full. |