aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorThiago de Arruda <tpadilha84@gmail.com>2014-10-23 22:19:18 -0300
committerThiago de Arruda <tpadilha84@gmail.com>2014-10-23 22:19:18 -0300
commit65942d3a8d84510fcee2dd1c6306a5af13296c84 (patch)
treecbb3ce006567835f4ed64582cfa4c0181bd2d7bb /src
parentd1e063a8af57661c37da31954346b2b9f73dd9b5 (diff)
parentd696fa51f8a295942da8952d7f85c44b9afac67e (diff)
downloadrneovim-65942d3a8d84510fcee2dd1c6306a5af13296c84.tar.gz
rneovim-65942d3a8d84510fcee2dd1c6306a5af13296c84.tar.bz2
rneovim-65942d3a8d84510fcee2dd1c6306a5af13296c84.zip
Merge PR #1331 'Fixes to job and channel modules'
Diffstat (limited to 'src')
-rw-r--r--src/nvim/api/private/helpers.c97
-rw-r--r--src/nvim/log.c24
-rw-r--r--src/nvim/log.h29
-rw-r--r--src/nvim/msgpack_rpc/channel.c155
-rw-r--r--src/nvim/msgpack_rpc/helpers.c66
-rw-r--r--src/nvim/os/job.c106
6 files changed, 238 insertions, 239 deletions
diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c
index 784915cd16..a7b48f3b7e 100644
--- a/src/nvim/api/private/helpers.c
+++ b/src/nvim/api/private/helpers.c
@@ -554,103 +554,6 @@ Dictionary api_metadata(void)
return copy_object(DICTIONARY_OBJ(metadata)).data.dictionary;
}
-char *api_stringify(Object obj)
-{
- Array array = ARRAY_DICT_INIT;
- print_to_array(obj, &array);
- size_t size = 0;
- for (size_t i = 0; i < array.size; i++) {
- size += array.items[i].data.string.size;
- }
-
- char *rv = xmalloc(size + 1);
- size_t pos = 0;
- for (size_t i = 0; i < array.size; i++) {
- String str = array.items[i].data.string;
- memcpy(rv + pos, str.data, str.size);
- pos += str.size;
- free(str.data);
- }
- rv[pos] = NUL;
- free(array.items);
- return rv;
-}
-
-static void print_to_array(Object obj, Array *array)
-{
- char buf[32];
-
- switch (obj.type) {
- case kObjectTypeNil:
- ADD(*array, STRING_OBJ(cstr_to_string("nil")));
- break;
-
- case kObjectTypeBoolean:
- ADD(*array,
- STRING_OBJ(cstr_to_string(obj.data.boolean ? "true" : "false")));
- break;
-
- case kObjectTypeInteger:
- snprintf(buf, sizeof(buf), "%" PRId64, obj.data.integer);
- ADD(*array, STRING_OBJ(cstr_to_string(buf)));
- break;
-
- case kObjectTypeFloat:
- snprintf(buf, sizeof(buf), "%f", obj.data.floating);
- ADD(*array, STRING_OBJ(cstr_to_string(buf)));
- break;
-
- case kObjectTypeBuffer:
- snprintf(buf, sizeof(buf), "Buffer(%" PRIu64 ")", obj.data.buffer);
- ADD(*array, STRING_OBJ(cstr_to_string(buf)));
- break;
-
- case kObjectTypeWindow:
- snprintf(buf, sizeof(buf), "Window(%" PRIu64 ")", obj.data.window);
- ADD(*array, STRING_OBJ(cstr_to_string(buf)));
- break;
-
- case kObjectTypeTabpage:
- snprintf(buf, sizeof(buf), "Tabpage(%" PRIu64 ")", obj.data.tabpage);
- ADD(*array, STRING_OBJ(cstr_to_string(buf)));
- break;
-
- case kObjectTypeString:
- ADD(*array, STRING_OBJ(cstr_to_string("\"")));
- ADD(*array, STRING_OBJ(cstr_to_string(obj.data.string.data)));
- ADD(*array, STRING_OBJ(cstr_to_string("\"")));
- break;
-
- case kObjectTypeArray:
- ADD(*array, STRING_OBJ(cstr_to_string("[")));
- for (size_t i = 0; i < obj.data.array.size; i++) {
- print_to_array(obj.data.array.items[i], array);
- if (i < obj.data.array.size - 1) {
- ADD(*array, STRING_OBJ(cstr_to_string(", ")));
- }
- }
- ADD(*array, STRING_OBJ(cstr_to_string("]")));
- break;
-
- case kObjectTypeDictionary:
- ADD(*array, STRING_OBJ(cstr_to_string("{")));
- for (size_t i = 0; i < obj.data.dictionary.size; i++) {
- ADD(*array,
- STRING_OBJ(cstr_to_string(obj.data.dictionary.items[i].key.data)));
- ADD(*array, STRING_OBJ(cstr_to_string(": ")));
- print_to_array(obj.data.dictionary.items[i].value, array);
- if (i < obj.data.array.size - 1) {
- ADD(*array, STRING_OBJ(cstr_to_string(", ")));
- }
- }
- ADD(*array, STRING_OBJ(cstr_to_string("}")));
- break;
-
- default:
- ADD(*array, STRING_OBJ(cstr_to_string("INVALID")));
- }
-}
-
static void init_error_type_metadata(Dictionary *metadata)
{
Dictionary types = ARRAY_DICT_INIT;
diff --git a/src/nvim/log.c b/src/nvim/log.c
index da28a18509..e8e0c9bbb9 100644
--- a/src/nvim/log.c
+++ b/src/nvim/log.c
@@ -20,7 +20,7 @@
# include "log.c.generated.h"
#endif
-bool do_log(int log_level, const char *func_name, int line_num,
+bool do_log(int log_level, const char *func_name, int line_num, bool eol,
const char* fmt, ...) FUNC_ATTR_UNUSED
{
FILE *log_file = open_log_file();
@@ -31,8 +31,8 @@ bool do_log(int log_level, const char *func_name, int line_num,
va_list args;
va_start(args, fmt);
- bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, fmt,
- args);
+ bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, eol,
+ fmt, args);
va_end(args);
if (log_file != stderr && log_file != stdout) {
@@ -45,13 +45,13 @@ bool do_log(int log_level, const char *func_name, int line_num,
///
/// @return The FILE* specified by the USR_LOG_FILE path or stderr in case of
/// error
-static FILE *open_log_file(void)
+FILE *open_log_file(void)
{
static bool opening_log_file = false;
// check if it's a recursive call
if (opening_log_file) {
- do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__,
+ do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__, true,
"Trying to LOG() recursively! Please fix it.");
return stderr;
}
@@ -81,7 +81,7 @@ static FILE *open_log_file(void)
open_log_file_error:
opening_log_file = false;
- do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__,
+ do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__, true,
"Couldn't open USR_LOG_FILE, logging to stderr! This may be "
"caused by attempting to LOG() before initialization "
"functions are called (e.g. init_homedir()).");
@@ -89,20 +89,20 @@ open_log_file_error:
}
static bool do_log_to_file(FILE *log_file, int log_level,
- const char *func_name, int line_num,
+ const char *func_name, int line_num, bool eol,
const char* fmt, ...)
{
va_list args;
va_start(args, fmt);
- bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, fmt,
- args);
+ bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, eol,
+ fmt, args);
va_end(args);
return ret;
}
static bool v_do_log_to_file(FILE *log_file, int log_level,
- const char *func_name, int line_num,
+ const char *func_name, int line_num, bool eol,
const char* fmt, va_list args)
{
static const char *log_levels[] = {
@@ -133,7 +133,9 @@ static bool v_do_log_to_file(FILE *log_file, int log_level,
if (vfprintf(log_file, fmt, args) < 0) {
return false;
}
- fputc('\n', log_file);
+ if (eol) {
+ fputc('\n', log_file);
+ }
if (fflush(log_file) == EOF) {
return false;
}
diff --git a/src/nvim/log.h b/src/nvim/log.h
index f1ee63a4e2..152e90760e 100644
--- a/src/nvim/log.h
+++ b/src/nvim/log.h
@@ -1,6 +1,7 @@
#ifndef NVIM_LOG_H
#define NVIM_LOG_H
+#include <stdio.h>
#include <stdbool.h>
#define DEBUG_LOG_LEVEL 0
@@ -9,9 +10,13 @@
#define ERROR_LOG_LEVEL 3
#define DLOG(...)
+#define DLOGN(...)
#define ILOG(...)
+#define ILOGN(...)
#define WLOG(...)
+#define WLOGN(...)
#define ELOG(...)
+#define ELOGN(...)
// Logging is disabled if NDEBUG or DISABLE_LOG is defined.
#ifdef NDEBUG
@@ -28,22 +33,38 @@
# if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
# undef DLOG
-# define DLOG(...) do_log(DEBUG_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__)
+# undef DLOGN
+# define DLOG(...) do_log(DEBUG_LOG_LEVEL, __func__, __LINE__, true, \
+ __VA_ARGS__)
+# define DLOGN(...) do_log(DEBUG_LOG_LEVEL, __func__, __LINE__, false, \
+ __VA_ARGS__)
# endif
# if MIN_LOG_LEVEL <= INFO_LOG_LEVEL
# undef ILOG
-# define ILOG(...) do_log(INFO_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__)
+# undef ILOGN
+# define ILOG(...) do_log(INFO_LOG_LEVEL, __func__, __LINE__, true, \
+ __VA_ARGS__)
+# define ILOGN(...) do_log(INFO_LOG_LEVEL, __func__, __LINE__, false, \
+ __VA_ARGS__)
# endif
# if MIN_LOG_LEVEL <= WARNING_LOG_LEVEL
# undef WLOG
-# define WLOG(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__)
+# undef WLOGN
+# define WLOG(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, true, \
+ __VA_ARGS__)
+# define WLOGN(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, false, \
+ __VA_ARGS__)
# endif
# if MIN_LOG_LEVEL <= ERROR_LOG_LEVEL
# undef ELOG
-# define ELOG(...) do_log(ERROR_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__)
+# undef ELOGN
+# define ELOG(...) do_log(ERROR_LOG_LEVEL, __func__, __LINE__, true, \
+ __VA_ARGS__)
+# define ELOGN(...) do_log(ERROR_LOG_LEVEL, __func__, __LINE__, false, \
+ __VA_ARGS__)
# endif
#endif
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 94605c37e9..3325b294dd 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -31,6 +31,11 @@
#define CHANNEL_BUFFER_SIZE 0xffff
+#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL
+#define log_client_msg(...)
+#define log_server_msg(...)
+#endif
+
typedef struct {
uint64_t request_id;
bool returned, errored;
@@ -225,6 +230,10 @@ Object channel_send_call(uint64_t id,
return NIL;
}
+ if (channel->closed && !kv_size(channel->call_stack)) {
+ free_channel(channel);
+ }
+
return frame.result;
}
@@ -328,7 +337,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
Channel *channel = data;
if (eof) {
- goto end;
+ close_channel(channel);
}
size_t count = rstream_pending(rstream);
@@ -348,7 +357,10 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
// Deserialize everything we can.
while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) ==
MSGPACK_UNPACK_SUCCESS) {
- if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) {
+ bool is_response = is_rpc_response(&unpacked.data);
+ log_client_msg(channel->id, !is_response, unpacked.data);
+
+ if (kv_size(channel->call_stack) && is_response) {
if (is_valid_rpc_response(&unpacked.data, channel)) {
complete_call(&unpacked.data, channel);
} else {
@@ -363,7 +375,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
}
msgpack_unpacked_destroy(&unpacked);
// Bail out from this event loop iteration
- goto end;
+ return;
}
handle_request(channel, &unpacked.data);
@@ -386,14 +398,6 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof)
"This error can also happen when deserializing "
"an object with high level of nesting");
}
-
-end:
- if (eof && !channel->is_job && !kv_size(channel->call_stack)) {
- // The free_channel call is deferred for jobs because it's possible that
- // job_stderr will called after this. For non-job channels, this is the
- // last callback so it must be freed now.
- free_channel(channel);
- }
}
static void handle_request(Channel *channel, msgpack_object *request)
@@ -406,7 +410,11 @@ static void handle_request(Channel *channel, msgpack_object *request)
if (error.set) {
// Validation failed, send response with error
channel_write(channel,
- serialize_response(request_id, &error, NIL, &out_buffer));
+ serialize_response(channel->id,
+ request_id,
+ &error,
+ NIL,
+ &out_buffer));
return;
}
@@ -459,14 +467,13 @@ static void call_request_handler(Channel *channel,
// send the response
msgpack_packer response;
msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write);
-
- if (error.set) {
- channel_write(channel,
- serialize_response(request_id, &error, NIL, &out_buffer));
- }
-
- channel_write(channel,
- serialize_response(request_id, &error, result, &out_buffer));
+ channel_write(channel, serialize_response(channel->id,
+ request_id,
+ &error,
+ result,
+ &out_buffer));
+ // All arguments were freed already, but we still need to free the array
+ free(args.items);
}
static bool channel_write(Channel *channel, WBuffer *buffer)
@@ -497,7 +504,11 @@ static void send_error(Channel *channel, uint64_t id, char *err)
{
Error e = ERROR_INIT;
api_set_error(&e, Exception, "%s", err);
- channel_write(channel, serialize_response(id, &e, NIL, &out_buffer));
+ channel_write(channel, serialize_response(channel->id,
+ id,
+ &e,
+ NIL,
+ &out_buffer));
}
static void send_request(Channel *channel,
@@ -506,7 +517,12 @@ static void send_request(Channel *channel,
Array args)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(id, method, args, &out_buffer, 1));
+ channel_write(channel, serialize_request(channel->id,
+ id,
+ method,
+ args,
+ &out_buffer,
+ 1));
}
static void send_event(Channel *channel,
@@ -514,7 +530,12 @@ static void send_event(Channel *channel,
Array args)
{
String method = {.size = strlen(name), .data = name};
- channel_write(channel, serialize_request(0, method, args, &out_buffer, 1));
+ channel_write(channel, serialize_request(channel->id,
+ 0,
+ method,
+ args,
+ &out_buffer,
+ 1));
}
static void broadcast_event(char *name, Array args)
@@ -536,6 +557,7 @@ static void broadcast_event(char *name, Array args)
String method = {.size = strlen(name), .data = name};
WBuffer *buffer = serialize_request(0,
+ 0,
method,
args,
&out_buffer,
@@ -569,6 +591,10 @@ static void unsubscribe(Channel *channel, char *event)
/// free_channel later.
static void close_channel(Channel *channel)
{
+ if (channel->closed) {
+ return;
+ }
+
channel->closed = true;
if (channel->is_job) {
if (channel->data.job) {
@@ -577,10 +603,10 @@ static void close_channel(Channel *channel)
} else {
rstream_free(channel->data.streams.read);
wstream_free(channel->data.streams.write);
- if (channel->data.streams.uv) {
- uv_close((uv_handle_t *)channel->data.streams.uv, close_cb);
+ uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv;
+ if (handle) {
+ uv_close(handle, close_cb);
} else {
- // When the stdin channel closes, it's time to go
mch_exit(0);
}
}
@@ -663,3 +689,80 @@ static void call_set_error(Channel *channel, char *msg)
close_channel(channel);
}
+
+static WBuffer *serialize_request(uint64_t channel_id,
+ uint64_t request_id,
+ String method,
+ Array args,
+ msgpack_sbuffer *sbuffer,
+ size_t refcount)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_rpc_serialize_request(request_id, method, args, &pac);
+ log_server_msg(channel_id, sbuffer);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ refcount,
+ free);
+ msgpack_sbuffer_clear(sbuffer);
+ api_free_array(args);
+ return rv;
+}
+
+static WBuffer *serialize_response(uint64_t channel_id,
+ uint64_t response_id,
+ Error *err,
+ Object arg,
+ msgpack_sbuffer *sbuffer)
+{
+ msgpack_packer pac;
+ msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
+ msgpack_rpc_serialize_response(response_id, err, arg, &pac);
+ log_server_msg(channel_id, sbuffer);
+ WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
+ sbuffer->size,
+ 1, // responses only go though 1 channel
+ free);
+ msgpack_sbuffer_clear(sbuffer);
+ api_free_object(arg);
+ return rv;
+}
+
+#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL
+#define REQ "[response] "
+#define RES "[response] "
+#define NOT "[notification] "
+
+static void log_server_msg(uint64_t channel_id,
+ msgpack_sbuffer *packed)
+{
+ msgpack_unpacked unpacked;
+ msgpack_unpacked_init(&unpacked);
+ msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL);
+ uint64_t type = unpacked.data.via.array.ptr[0].via.u64;
+ DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id);
+ FILE *f = open_log_file();
+ fprintf(f, type ? (type == 1 ? RES : NOT) : REQ);
+ log_msg_close(f, unpacked.data);
+ msgpack_unpacked_destroy(&unpacked);
+}
+
+static void log_client_msg(uint64_t channel_id,
+ bool is_request,
+ msgpack_object msg)
+{
+ DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id);
+ FILE *f = open_log_file();
+ fprintf(f, is_request ? REQ : RES);
+ log_msg_close(f, msg);
+}
+
+static void log_msg_close(FILE *f, msgpack_object msg)
+{
+ msgpack_object_print(f, msg);
+ fputc('\n', f);
+ fflush(f);
+ fclose(f);
+}
+#endif
diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c
index 6be221b912..4414aadb15 100644
--- a/src/nvim/msgpack_rpc/helpers.c
+++ b/src/nvim/msgpack_rpc/helpers.c
@@ -323,68 +323,48 @@ Object msgpack_rpc_handle_missing_method(uint64_t channel_id,
}
/// Serializes a msgpack-rpc request or notification(id == 0)
-WBuffer *serialize_request(uint64_t request_id,
- String method,
- Array args,
- msgpack_sbuffer *sbuffer,
- size_t refcount)
+void msgpack_rpc_serialize_request(uint64_t request_id,
+ String method,
+ Array args,
+ msgpack_packer *pac)
FUNC_ATTR_NONNULL_ARG(4)
{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_pack_array(&pac, request_id ? 4 : 3);
- msgpack_pack_int(&pac, request_id ? 0 : 2);
+ msgpack_pack_array(pac, request_id ? 4 : 3);
+ msgpack_pack_int(pac, request_id ? 0 : 2);
if (request_id) {
- msgpack_pack_uint64(&pac, request_id);
+ msgpack_pack_uint64(pac, request_id);
}
- msgpack_pack_bin(&pac, method.size);
- msgpack_pack_bin_body(&pac, method.data, method.size);
- msgpack_rpc_from_array(args, &pac);
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- refcount,
- free);
- api_free_array(args);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
+ msgpack_pack_bin(pac, method.size);
+ msgpack_pack_bin_body(pac, method.data, method.size);
+ msgpack_rpc_from_array(args, pac);
}
/// Serializes a msgpack-rpc response
-WBuffer *serialize_response(uint64_t response_id,
- Error *err,
- Object arg,
- msgpack_sbuffer *sbuffer)
+void msgpack_rpc_serialize_response(uint64_t response_id,
+ Error *err,
+ Object arg,
+ msgpack_packer *pac)
FUNC_ATTR_NONNULL_ARG(2, 4)
{
- msgpack_packer pac;
- msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write);
- msgpack_pack_array(&pac, 4);
- msgpack_pack_int(&pac, 1);
- msgpack_pack_uint64(&pac, response_id);
+ msgpack_pack_array(pac, 4);
+ msgpack_pack_int(pac, 1);
+ msgpack_pack_uint64(pac, response_id);
if (err->set) {
// error represented by a [type, message] array
- msgpack_pack_array(&pac, 2);
- msgpack_rpc_from_integer(err->type, &pac);
- msgpack_rpc_from_string(cstr_as_string(err->msg), &pac);
+ msgpack_pack_array(pac, 2);
+ msgpack_rpc_from_integer(err->type, pac);
+ msgpack_rpc_from_string(cstr_as_string(err->msg), pac);
// Nil result
- msgpack_pack_nil(&pac);
+ msgpack_pack_nil(pac);
} else {
// Nil error
- msgpack_pack_nil(&pac);
+ msgpack_pack_nil(pac);
// Return value
- msgpack_rpc_from_object(arg, &pac);
+ msgpack_rpc_from_object(arg, pac);
}
-
- WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size),
- sbuffer->size,
- 1, // responses only go though 1 channel
- free);
- api_free_object(arg);
- msgpack_sbuffer_clear(sbuffer);
- return rv;
}
void msgpack_rpc_validate(uint64_t *response_id,
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index 4c01829159..caada5616b 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -21,6 +21,21 @@
#define MAX_RUNNING_JOBS 100
#define JOB_BUFFER_SIZE 0xFFFF
+#define close_job_stream(job, stream, type) \
+ do { \
+ if (job->stream) { \
+ type##stream_free(job->stream); \
+ job->stream = NULL; \
+ if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \
+ uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \
+ } \
+ } \
+ } while (0)
+
+#define close_job_in(job) close_job_stream(job, in, w)
+#define close_job_out(job) close_job_stream(job, out, r)
+#define close_job_err(job) close_job_stream(job, err, r)
+
struct job {
// Job id the index in the job table plus one.
int id;
@@ -28,13 +43,9 @@ struct job {
int64_t status;
// Number of polls after a SIGTERM that will trigger a SIGKILL
int exit_timeout;
- // exit_cb may be called while there's still pending data from stdout/stderr.
- // We use this reference count to ensure the JobExit event is only emitted
- // when stdout/stderr are drained
- int pending_refs;
- // Same as above, but for freeing the job memory which contains
- // libuv handles. Only after all are closed the job can be safely freed.
- int pending_closes;
+ // Number of references to the job. The job resources will only be freed by
+ // close_cb when this is 0
+ int refcount;
// If the job was already stopped
bool stopped;
// Data associated with the job
@@ -164,8 +175,7 @@ Job *job_start(char **argv,
job->id = i + 1;
*status = job->id;
job->status = -1;
- job->pending_refs = 3;
- job->pending_closes = 4;
+ job->refcount = 4;
job->data = data;
job->stdout_cb = stdout_cb;
job->stderr_cb = stderr_cb;
@@ -206,7 +216,9 @@ Job *job_start(char **argv,
// Spawn the job
if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) {
- free_job(job);
+ close_job_in(job);
+ close_job_out(job);
+ close_job_err(job);
*status = -1;
return NULL;
}
@@ -272,15 +284,13 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
int old_mode = cur_tmode;
settmode(TMODE_COOK);
- // Increase pending_refs to stop the exit_cb from being called, which
- // could result in the job being freed before we have a chance
- // to get the status.
- job->pending_refs++;
+ // Increase refcount to stop the job from being freed before we have a
+ // chance to get the status.
+ job->refcount++;
event_poll_until(ms,
// Until...
got_int || // interrupted by the user
- job->pending_refs == 1); // job exited
- job->pending_refs--;
+ job->refcount == 1); // job exited
// we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed.
@@ -291,9 +301,10 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
settmode(old_mode);
- if (!job->pending_refs) {
+ if (!--job->refcount) {
int status = (int) job->status;
- job_exit_callback(job);
+ // Manually invoke close_cb to free the job resources
+ close_cb((uv_handle_t *)&job->proc);
return status;
}
@@ -312,15 +323,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
/// @param job The job instance
void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL
{
- if (!job->in) {
- return;
- }
-
- // let other functions in the job module know that the in pipe is no more
- wstream_free(job->in);
- job->in = NULL;
-
- uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
+ close_job_in(job);
}
/// All writes that complete after calling this function will be reported
@@ -379,9 +382,6 @@ static void job_exit_callback(Job *job)
job->exit_cb(job, job->data);
}
- // Free the job resources
- free_job(job);
-
// Stop polling job status if this was the last
job_count--;
if (job_count == 0) {
@@ -394,16 +394,6 @@ static bool is_alive(Job *job)
return uv_process_kill(&job->proc, 0) == 0;
}
-static void free_job(Job *job)
-{
- uv_close((uv_handle_t *)&job->proc_stdout, close_cb);
- if (job->in) {
- uv_close((uv_handle_t *)&job->proc_stdin, close_cb);
- }
- uv_close((uv_handle_t *)&job->proc_stderr, close_cb);
- uv_close((uv_handle_t *)&job->proc, close_cb);
-}
-
/// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those
/// that didn't die from SIGTERM after a while(exit_timeout is 0).
static void job_prepare_cb(uv_prepare_t *handle)
@@ -433,12 +423,14 @@ static void read_cb(RStream *rstream, void *data, bool eof)
if (rstream == job->out) {
job->stdout_cb(rstream, data, eof);
+ if (eof) {
+ close_job_out(job);
+ }
} else {
job->stderr_cb(rstream, data, eof);
- }
-
- if (eof && --job->pending_refs == 0) {
- job_exit_callback(job);
+ if (eof) {
+ close_job_err(job);
+ }
}
}
@@ -448,31 +440,29 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
Job *job = handle_get_job((uv_handle_t *)proc);
job->status = status;
- if (--job->pending_refs == 0) {
- job_exit_callback(job);
- }
+ uv_close((uv_handle_t *)&job->proc, close_cb);
}
static void close_cb(uv_handle_t *handle)
{
Job *job = handle_get_job(handle);
- if (--job->pending_closes == 0) {
- // Only free the job memory after all the associated handles are properly
- // closed by libuv
- rstream_free(job->out);
- rstream_free(job->err);
- if (job->in) {
- wstream_free(job->in);
- }
+ if (handle == (uv_handle_t *)&job->proc) {
+ // Make sure all streams are properly closed to trigger callback invocation
+ // when job->proc is closed
+ close_job_in(job);
+ close_job_out(job);
+ close_job_err(job);
+ }
- // Free data memory of process and pipe handles, that was allocated
- // by handle_set_job in job_start.
+ if (--job->refcount == 0) {
+ // Invoke the exit_cb
+ job_exit_callback(job);
+ // Free all memory allocated for the job
free(job->proc.data);
free(job->proc_stdin.data);
free(job->proc_stdout.data);
free(job->proc_stderr.data);
-
shell_free_argv(job->proc_opts.args);
free(job);
}