aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/memory.c4
-rw-r--r--src/memory.h16
-rw-r--r--src/os/channel.c175
-rw-r--r--src/os/channel.h29
-rw-r--r--src/os/channel_defs.h8
-rw-r--r--src/os/event.c4
-rw-r--r--src/os/job.c6
-rw-r--r--src/os/job.h3
8 files changed, 242 insertions, 3 deletions
diff --git a/src/memory.c b/src/memory.c
index 5bc23284dc..53214d5265 100644
--- a/src/memory.c
+++ b/src/memory.c
@@ -214,6 +214,10 @@ char *xstrndup(const char *str, size_t len)
return xmemdupz(str, p ? (size_t)(p - str) : len);
}
+char *xmemdup(const char *data, size_t len)
+{
+ return memcpy(xmalloc(len), data, len);
+}
/*
* Avoid repeating the error message many times (they take 1 second each).
diff --git a/src/memory.h b/src/memory.h
index 723e7af814..0249b19464 100644
--- a/src/memory.h
+++ b/src/memory.h
@@ -127,6 +127,22 @@ char *xstpcpy(char *restrict dst, const char *restrict src);
/// @param maxlen
char *xstpncpy(char *restrict dst, const char *restrict src, size_t maxlen);
+/// Duplicates a chunk of memory using xmalloc
+///
+/// @see {xmalloc}
+/// @param data pointer to the chunk
+/// @param len size of the chunk
+/// @return a pointer
+char *xmemdup(const char *data, size_t len)
+ FUNC_ATTR_MALLOC FUNC_ATTR_WARN_UNUSED_RESULT FUNC_ATTR_NONNULL_RET;
+
+/// Old low level memory allocation function.
+///
+/// @deprecated use xmalloc() directly instead
+/// @param size
+/// @return pointer to allocated space. Never NULL
+char_u *lalloc(long_u size, int message) FUNC_ATTR_MALLOC FUNC_ATTR_ALLOC_SIZE(1);
+
void do_outofmem_msg(long_u size);
void free_all_mem(void);
diff --git a/src/os/channel.c b/src/os/channel.c
new file mode 100644
index 0000000000..2427db2a14
--- /dev/null
+++ b/src/os/channel.c
@@ -0,0 +1,175 @@
+#include <string.h>
+
+#include <uv.h>
+#include <msgpack.h>
+
+#include "lib/klist.h"
+#include "os/channel.h"
+#include "os/channel_defs.h"
+#include "os/rstream.h"
+#include "os/rstream_defs.h"
+#include "os/wstream.h"
+#include "os/wstream_defs.h"
+#include "os/job.h"
+#include "os/job_defs.h"
+#include "os/msgpack_rpc.h"
+#include "vim.h"
+#include "memory.h"
+
+typedef struct {
+ ChannelProtocol protocol;
+ bool is_job;
+ union {
+ struct {
+ msgpack_unpacker *unpacker;
+ msgpack_sbuffer *sbuffer;
+ } msgpack;
+ } proto;
+ union {
+ int job_id;
+ struct {
+ RStream *read;
+ WStream *write;
+ } streams;
+ } data;
+} Channel;
+
+#define _destroy_channel(x)
+
+KLIST_INIT(Channel, Channel *, _destroy_channel)
+
+static klist_t(Channel) *channels = NULL;
+static void on_job_stdout(RStream *rstream, void *data, bool eof);
+static void on_job_stderr(RStream *rstream, void *data, bool eof);
+static void parse_msgpack(RStream *rstream, void *data, bool eof);
+
+void channel_init()
+{
+ channels = kl_init(Channel);
+}
+
+void channel_teardown()
+{
+ if (!channels) {
+ return;
+ }
+
+ Channel *channel;
+
+ while (kl_shift(Channel, channels, &channel) == 0) {
+
+ switch (channel->protocol) {
+ case kChannelProtocolMsgpack:
+ msgpack_sbuffer_free(channel->proto.msgpack.sbuffer);
+ msgpack_unpacker_free(channel->proto.msgpack.unpacker);
+ break;
+ default:
+ abort();
+ }
+
+ if (channel->is_job) {
+ job_stop(channel->data.job_id);
+ } else {
+ rstream_free(channel->data.streams.read);
+ wstream_free(channel->data.streams.write);
+ }
+ }
+}
+
+void channel_from_job(char **argv, ChannelProtocol prot)
+{
+ Channel *channel = xmalloc(sizeof(Channel));
+ rstream_cb rcb = NULL;
+
+ switch (prot) {
+ case kChannelProtocolMsgpack:
+ rcb = on_job_stdout;
+ channel->proto.msgpack.unpacker =
+ msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
+ break;
+ default:
+ abort();
+ }
+
+ channel->protocol = prot;
+ channel->is_job = true;
+ channel->data.job_id = job_start(argv, channel, rcb, on_job_stderr, NULL);
+ *kl_pushp(Channel, channels) = channel;
+}
+
+void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot)
+{
+ Channel *channel = xmalloc(sizeof(Channel));
+ rstream_cb rcb = NULL;
+
+ switch (prot) {
+ case kChannelProtocolMsgpack:
+ rcb = parse_msgpack;
+ channel->proto.msgpack.unpacker =
+ msgpack_unpacker_new(MSGPACK_UNPACKER_INIT_BUFFER_SIZE);
+ channel->proto.msgpack.sbuffer = msgpack_sbuffer_new();
+ break;
+ default:
+ abort();
+ }
+
+ stream->data = NULL;
+ channel->protocol = prot;
+ channel->is_job = false;
+ // read stream
+ channel->data.streams.read = rstream_new(rcb, 1024, channel, true);
+ rstream_set_stream(channel->data.streams.read, stream);
+ rstream_start(channel->data.streams.read);
+ // write stream
+ channel->data.streams.write = wstream_new(1024 * 1024);
+ wstream_set_stream(channel->data.streams.write, stream);
+ // push to channel list
+ *kl_pushp(Channel, channels) = channel;
+}
+
+static void on_job_stdout(RStream *rstream, void *data, bool eof)
+{
+ Job *job = data;
+ parse_msgpack(rstream, job_data(job), eof);
+}
+
+static void on_job_stderr(RStream *rstream, void *data, bool eof)
+{
+ // TODO(tarruda): plugin error messages should be sent to the error buffer
+}
+
+static void parse_msgpack(RStream *rstream, void *data, bool eof)
+{
+ msgpack_unpacked unpacked;
+ Channel *channel = data;
+ uint32_t count = rstream_available(rstream);
+
+ // Feed the unpacker with data
+ msgpack_unpacker_reserve_buffer(channel->proto.msgpack.unpacker, count);
+ rstream_read(rstream,
+ msgpack_unpacker_buffer(channel->proto.msgpack.unpacker),
+ count);
+ msgpack_unpacker_buffer_consumed(channel->proto.msgpack.unpacker, count);
+
+ msgpack_unpacked_init(&unpacked);
+
+ // Deserialize everything we can.
+ while (msgpack_unpacker_next(channel->proto.msgpack.unpacker, &unpacked)) {
+ // Each object is a new msgpack-rpc request and requires an empty response
+ msgpack_packer response;
+ msgpack_packer_init(&response,
+ channel->proto.msgpack.sbuffer,
+ msgpack_sbuffer_write);
+ // Perform the call
+ msgpack_rpc_call(&unpacked.data, &response);
+ wstream_write(channel->data.streams.write,
+ xmemdup(channel->proto.msgpack.sbuffer->data,
+ channel->proto.msgpack.sbuffer->size),
+ channel->proto.msgpack.sbuffer->size,
+ true);
+
+ // Clear the buffer for future calls
+ msgpack_sbuffer_clear(channel->proto.msgpack.sbuffer);
+ }
+}
diff --git a/src/os/channel.h b/src/os/channel.h
new file mode 100644
index 0000000000..c58e91a0e5
--- /dev/null
+++ b/src/os/channel.h
@@ -0,0 +1,29 @@
+#ifndef NEOVIM_OS_CHANNEL_H
+#define NEOVIM_OS_CHANNEL_H
+
+#include <uv.h>
+
+#include "os/channel_defs.h"
+
+/// Initializes the module
+void channel_init(void);
+
+/// Teardown the module
+void channel_teardown(void);
+
+/// Creates an API channel from a libuv stream representing a tcp or
+/// pipe/socket client connection
+///
+/// @param stream The established connection
+/// @param prot The rpc protocol used
+void channel_from_stream(uv_stream_t *stream, ChannelProtocol prot);
+
+/// Creates an API channel by starting a job and connecting to its
+/// stdin/stdout. stderr is forwarded to the editor error stream.
+///
+/// @param argv The argument vector for the process
+/// @param prot The rpc protocol used
+void channel_from_job(char **argv, ChannelProtocol prot);
+
+#endif // NEOVIM_OS_CHANNEL_H
+
diff --git a/src/os/channel_defs.h b/src/os/channel_defs.h
new file mode 100644
index 0000000000..27ba103fbe
--- /dev/null
+++ b/src/os/channel_defs.h
@@ -0,0 +1,8 @@
+#ifndef NEOVIM_OS_CHANNEL_DEFS_H
+#define NEOVIM_OS_CHANNEL_DEFS_H
+
+typedef enum {
+ kChannelProtocolMsgpack
+} ChannelProtocol;
+
+#endif // NEOVIM_OS_CHANNEL_DEFS_H
diff --git a/src/os/event.c b/src/os/event.c
index 9c9c9c8967..a4b9d1ac5a 100644
--- a/src/os/event.c
+++ b/src/os/event.c
@@ -7,6 +7,7 @@
#include "lib/klist.h"
#include "os/event.h"
#include "os/input.h"
+#include "os/channel.h"
#include "os/signal.h"
#include "os/rstream.h"
#include "os/job.h"
@@ -36,6 +37,8 @@ void event_init()
signal_init();
// Jobs
job_init();
+ // Channels
+ channel_init();
uv_timer_init(uv_default_loop(), &timer);
// This prepare handle that actually starts the timer
uv_prepare_init(uv_default_loop(), &timer_prepare);
@@ -43,6 +46,7 @@ void event_init()
void event_teardown()
{
+ channel_teardown();
job_teardown();
}
diff --git a/src/os/job.c b/src/os/job.c
index f0c64a27a8..7940338b70 100644
--- a/src/os/job.c
+++ b/src/os/job.c
@@ -250,8 +250,10 @@ void job_exit_event(Event event)
// this one
table[job->id - 1] = NULL;
- // Invoke the exit callback
- job->exit_cb(job, job->data);
+ if (job->exit_cb) {
+ // Invoke the exit callback
+ job->exit_cb(job, job->data);
+ }
// Free the job resources
free_job(job);
diff --git a/src/os/job.h b/src/os/job.h
index 37e34700dc..db147f2c24 100644
--- a/src/os/job.h
+++ b/src/os/job.h
@@ -28,7 +28,8 @@ void job_teardown(void);
/// on stdout
/// @param stderr_cb Callback that will be invoked when data is available
/// on stderr
-/// @param exit_cb Callback that will be invoked when the job exits
+/// @param exit_cb Callback that will be invoked when the job exits. This is
+/// optional.
/// @return The job id if the job started successfully. If the the first item /
/// of `argv`(the program) could not be executed, -1 will be returned.
// 0 will be returned if the job table is full.