diff options
author | Thiago de Arruda <tpadilha84@gmail.com> | 2014-10-23 22:19:18 -0300 |
---|---|---|
committer | Thiago de Arruda <tpadilha84@gmail.com> | 2014-10-23 22:19:18 -0300 |
commit | 65942d3a8d84510fcee2dd1c6306a5af13296c84 (patch) | |
tree | cbb3ce006567835f4ed64582cfa4c0181bd2d7bb /src | |
parent | d1e063a8af57661c37da31954346b2b9f73dd9b5 (diff) | |
parent | d696fa51f8a295942da8952d7f85c44b9afac67e (diff) | |
download | rneovim-65942d3a8d84510fcee2dd1c6306a5af13296c84.tar.gz rneovim-65942d3a8d84510fcee2dd1c6306a5af13296c84.tar.bz2 rneovim-65942d3a8d84510fcee2dd1c6306a5af13296c84.zip |
Merge PR #1331 'Fixes to job and channel modules'
Diffstat (limited to 'src')
-rw-r--r-- | src/nvim/api/private/helpers.c | 97 | ||||
-rw-r--r-- | src/nvim/log.c | 24 | ||||
-rw-r--r-- | src/nvim/log.h | 29 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/channel.c | 155 | ||||
-rw-r--r-- | src/nvim/msgpack_rpc/helpers.c | 66 | ||||
-rw-r--r-- | src/nvim/os/job.c | 106 |
6 files changed, 238 insertions, 239 deletions
diff --git a/src/nvim/api/private/helpers.c b/src/nvim/api/private/helpers.c index 784915cd16..a7b48f3b7e 100644 --- a/src/nvim/api/private/helpers.c +++ b/src/nvim/api/private/helpers.c @@ -554,103 +554,6 @@ Dictionary api_metadata(void) return copy_object(DICTIONARY_OBJ(metadata)).data.dictionary; } -char *api_stringify(Object obj) -{ - Array array = ARRAY_DICT_INIT; - print_to_array(obj, &array); - size_t size = 0; - for (size_t i = 0; i < array.size; i++) { - size += array.items[i].data.string.size; - } - - char *rv = xmalloc(size + 1); - size_t pos = 0; - for (size_t i = 0; i < array.size; i++) { - String str = array.items[i].data.string; - memcpy(rv + pos, str.data, str.size); - pos += str.size; - free(str.data); - } - rv[pos] = NUL; - free(array.items); - return rv; -} - -static void print_to_array(Object obj, Array *array) -{ - char buf[32]; - - switch (obj.type) { - case kObjectTypeNil: - ADD(*array, STRING_OBJ(cstr_to_string("nil"))); - break; - - case kObjectTypeBoolean: - ADD(*array, - STRING_OBJ(cstr_to_string(obj.data.boolean ? "true" : "false"))); - break; - - case kObjectTypeInteger: - snprintf(buf, sizeof(buf), "%" PRId64, obj.data.integer); - ADD(*array, STRING_OBJ(cstr_to_string(buf))); - break; - - case kObjectTypeFloat: - snprintf(buf, sizeof(buf), "%f", obj.data.floating); - ADD(*array, STRING_OBJ(cstr_to_string(buf))); - break; - - case kObjectTypeBuffer: - snprintf(buf, sizeof(buf), "Buffer(%" PRIu64 ")", obj.data.buffer); - ADD(*array, STRING_OBJ(cstr_to_string(buf))); - break; - - case kObjectTypeWindow: - snprintf(buf, sizeof(buf), "Window(%" PRIu64 ")", obj.data.window); - ADD(*array, STRING_OBJ(cstr_to_string(buf))); - break; - - case kObjectTypeTabpage: - snprintf(buf, sizeof(buf), "Tabpage(%" PRIu64 ")", obj.data.tabpage); - ADD(*array, STRING_OBJ(cstr_to_string(buf))); - break; - - case kObjectTypeString: - ADD(*array, STRING_OBJ(cstr_to_string("\""))); - ADD(*array, STRING_OBJ(cstr_to_string(obj.data.string.data))); - ADD(*array, STRING_OBJ(cstr_to_string("\""))); - break; - - case kObjectTypeArray: - ADD(*array, STRING_OBJ(cstr_to_string("["))); - for (size_t i = 0; i < obj.data.array.size; i++) { - print_to_array(obj.data.array.items[i], array); - if (i < obj.data.array.size - 1) { - ADD(*array, STRING_OBJ(cstr_to_string(", "))); - } - } - ADD(*array, STRING_OBJ(cstr_to_string("]"))); - break; - - case kObjectTypeDictionary: - ADD(*array, STRING_OBJ(cstr_to_string("{"))); - for (size_t i = 0; i < obj.data.dictionary.size; i++) { - ADD(*array, - STRING_OBJ(cstr_to_string(obj.data.dictionary.items[i].key.data))); - ADD(*array, STRING_OBJ(cstr_to_string(": "))); - print_to_array(obj.data.dictionary.items[i].value, array); - if (i < obj.data.array.size - 1) { - ADD(*array, STRING_OBJ(cstr_to_string(", "))); - } - } - ADD(*array, STRING_OBJ(cstr_to_string("}"))); - break; - - default: - ADD(*array, STRING_OBJ(cstr_to_string("INVALID"))); - } -} - static void init_error_type_metadata(Dictionary *metadata) { Dictionary types = ARRAY_DICT_INIT; diff --git a/src/nvim/log.c b/src/nvim/log.c index da28a18509..e8e0c9bbb9 100644 --- a/src/nvim/log.c +++ b/src/nvim/log.c @@ -20,7 +20,7 @@ # include "log.c.generated.h" #endif -bool do_log(int log_level, const char *func_name, int line_num, +bool do_log(int log_level, const char *func_name, int line_num, bool eol, const char* fmt, ...) FUNC_ATTR_UNUSED { FILE *log_file = open_log_file(); @@ -31,8 +31,8 @@ bool do_log(int log_level, const char *func_name, int line_num, va_list args; va_start(args, fmt); - bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, fmt, - args); + bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, eol, + fmt, args); va_end(args); if (log_file != stderr && log_file != stdout) { @@ -45,13 +45,13 @@ bool do_log(int log_level, const char *func_name, int line_num, /// /// @return The FILE* specified by the USR_LOG_FILE path or stderr in case of /// error -static FILE *open_log_file(void) +FILE *open_log_file(void) { static bool opening_log_file = false; // check if it's a recursive call if (opening_log_file) { - do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__, + do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__, true, "Trying to LOG() recursively! Please fix it."); return stderr; } @@ -81,7 +81,7 @@ static FILE *open_log_file(void) open_log_file_error: opening_log_file = false; - do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__, + do_log_to_file(stderr, ERROR_LOG_LEVEL, __func__, __LINE__, true, "Couldn't open USR_LOG_FILE, logging to stderr! This may be " "caused by attempting to LOG() before initialization " "functions are called (e.g. init_homedir())."); @@ -89,20 +89,20 @@ open_log_file_error: } static bool do_log_to_file(FILE *log_file, int log_level, - const char *func_name, int line_num, + const char *func_name, int line_num, bool eol, const char* fmt, ...) { va_list args; va_start(args, fmt); - bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, fmt, - args); + bool ret = v_do_log_to_file(log_file, log_level, func_name, line_num, eol, + fmt, args); va_end(args); return ret; } static bool v_do_log_to_file(FILE *log_file, int log_level, - const char *func_name, int line_num, + const char *func_name, int line_num, bool eol, const char* fmt, va_list args) { static const char *log_levels[] = { @@ -133,7 +133,9 @@ static bool v_do_log_to_file(FILE *log_file, int log_level, if (vfprintf(log_file, fmt, args) < 0) { return false; } - fputc('\n', log_file); + if (eol) { + fputc('\n', log_file); + } if (fflush(log_file) == EOF) { return false; } diff --git a/src/nvim/log.h b/src/nvim/log.h index f1ee63a4e2..152e90760e 100644 --- a/src/nvim/log.h +++ b/src/nvim/log.h @@ -1,6 +1,7 @@ #ifndef NVIM_LOG_H #define NVIM_LOG_H +#include <stdio.h> #include <stdbool.h> #define DEBUG_LOG_LEVEL 0 @@ -9,9 +10,13 @@ #define ERROR_LOG_LEVEL 3 #define DLOG(...) +#define DLOGN(...) #define ILOG(...) +#define ILOGN(...) #define WLOG(...) +#define WLOGN(...) #define ELOG(...) +#define ELOGN(...) // Logging is disabled if NDEBUG or DISABLE_LOG is defined. #ifdef NDEBUG @@ -28,22 +33,38 @@ # if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL # undef DLOG -# define DLOG(...) do_log(DEBUG_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__) +# undef DLOGN +# define DLOG(...) do_log(DEBUG_LOG_LEVEL, __func__, __LINE__, true, \ + __VA_ARGS__) +# define DLOGN(...) do_log(DEBUG_LOG_LEVEL, __func__, __LINE__, false, \ + __VA_ARGS__) # endif # if MIN_LOG_LEVEL <= INFO_LOG_LEVEL # undef ILOG -# define ILOG(...) do_log(INFO_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__) +# undef ILOGN +# define ILOG(...) do_log(INFO_LOG_LEVEL, __func__, __LINE__, true, \ + __VA_ARGS__) +# define ILOGN(...) do_log(INFO_LOG_LEVEL, __func__, __LINE__, false, \ + __VA_ARGS__) # endif # if MIN_LOG_LEVEL <= WARNING_LOG_LEVEL # undef WLOG -# define WLOG(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__) +# undef WLOGN +# define WLOG(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, true, \ + __VA_ARGS__) +# define WLOGN(...) do_log(WARNING_LOG_LEVEL, __func__, __LINE__, false, \ + __VA_ARGS__) # endif # if MIN_LOG_LEVEL <= ERROR_LOG_LEVEL # undef ELOG -# define ELOG(...) do_log(ERROR_LOG_LEVEL, __func__, __LINE__, __VA_ARGS__) +# undef ELOGN +# define ELOG(...) do_log(ERROR_LOG_LEVEL, __func__, __LINE__, true, \ + __VA_ARGS__) +# define ELOGN(...) do_log(ERROR_LOG_LEVEL, __func__, __LINE__, false, \ + __VA_ARGS__) # endif #endif diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c index 94605c37e9..3325b294dd 100644 --- a/src/nvim/msgpack_rpc/channel.c +++ b/src/nvim/msgpack_rpc/channel.c @@ -31,6 +31,11 @@ #define CHANNEL_BUFFER_SIZE 0xffff +#if MIN_LOG_LEVEL > DEBUG_LOG_LEVEL +#define log_client_msg(...) +#define log_server_msg(...) +#endif + typedef struct { uint64_t request_id; bool returned, errored; @@ -225,6 +230,10 @@ Object channel_send_call(uint64_t id, return NIL; } + if (channel->closed && !kv_size(channel->call_stack)) { + free_channel(channel); + } + return frame.result; } @@ -328,7 +337,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) Channel *channel = data; if (eof) { - goto end; + close_channel(channel); } size_t count = rstream_pending(rstream); @@ -348,7 +357,10 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) // Deserialize everything we can. while ((result = msgpack_unpacker_next(channel->unpacker, &unpacked)) == MSGPACK_UNPACK_SUCCESS) { - if (kv_size(channel->call_stack) && is_rpc_response(&unpacked.data)) { + bool is_response = is_rpc_response(&unpacked.data); + log_client_msg(channel->id, !is_response, unpacked.data); + + if (kv_size(channel->call_stack) && is_response) { if (is_valid_rpc_response(&unpacked.data, channel)) { complete_call(&unpacked.data, channel); } else { @@ -363,7 +375,7 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) } msgpack_unpacked_destroy(&unpacked); // Bail out from this event loop iteration - goto end; + return; } handle_request(channel, &unpacked.data); @@ -386,14 +398,6 @@ static void parse_msgpack(RStream *rstream, void *data, bool eof) "This error can also happen when deserializing " "an object with high level of nesting"); } - -end: - if (eof && !channel->is_job && !kv_size(channel->call_stack)) { - // The free_channel call is deferred for jobs because it's possible that - // job_stderr will called after this. For non-job channels, this is the - // last callback so it must be freed now. - free_channel(channel); - } } static void handle_request(Channel *channel, msgpack_object *request) @@ -406,7 +410,11 @@ static void handle_request(Channel *channel, msgpack_object *request) if (error.set) { // Validation failed, send response with error channel_write(channel, - serialize_response(request_id, &error, NIL, &out_buffer)); + serialize_response(channel->id, + request_id, + &error, + NIL, + &out_buffer)); return; } @@ -459,14 +467,13 @@ static void call_request_handler(Channel *channel, // send the response msgpack_packer response; msgpack_packer_init(&response, &out_buffer, msgpack_sbuffer_write); - - if (error.set) { - channel_write(channel, - serialize_response(request_id, &error, NIL, &out_buffer)); - } - - channel_write(channel, - serialize_response(request_id, &error, result, &out_buffer)); + channel_write(channel, serialize_response(channel->id, + request_id, + &error, + result, + &out_buffer)); + // All arguments were freed already, but we still need to free the array + free(args.items); } static bool channel_write(Channel *channel, WBuffer *buffer) @@ -497,7 +504,11 @@ static void send_error(Channel *channel, uint64_t id, char *err) { Error e = ERROR_INIT; api_set_error(&e, Exception, "%s", err); - channel_write(channel, serialize_response(id, &e, NIL, &out_buffer)); + channel_write(channel, serialize_response(channel->id, + id, + &e, + NIL, + &out_buffer)); } static void send_request(Channel *channel, @@ -506,7 +517,12 @@ static void send_request(Channel *channel, Array args) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(id, method, args, &out_buffer, 1)); + channel_write(channel, serialize_request(channel->id, + id, + method, + args, + &out_buffer, + 1)); } static void send_event(Channel *channel, @@ -514,7 +530,12 @@ static void send_event(Channel *channel, Array args) { String method = {.size = strlen(name), .data = name}; - channel_write(channel, serialize_request(0, method, args, &out_buffer, 1)); + channel_write(channel, serialize_request(channel->id, + 0, + method, + args, + &out_buffer, + 1)); } static void broadcast_event(char *name, Array args) @@ -536,6 +557,7 @@ static void broadcast_event(char *name, Array args) String method = {.size = strlen(name), .data = name}; WBuffer *buffer = serialize_request(0, + 0, method, args, &out_buffer, @@ -569,6 +591,10 @@ static void unsubscribe(Channel *channel, char *event) /// free_channel later. static void close_channel(Channel *channel) { + if (channel->closed) { + return; + } + channel->closed = true; if (channel->is_job) { if (channel->data.job) { @@ -577,10 +603,10 @@ static void close_channel(Channel *channel) } else { rstream_free(channel->data.streams.read); wstream_free(channel->data.streams.write); - if (channel->data.streams.uv) { - uv_close((uv_handle_t *)channel->data.streams.uv, close_cb); + uv_handle_t *handle = (uv_handle_t *)channel->data.streams.uv; + if (handle) { + uv_close(handle, close_cb); } else { - // When the stdin channel closes, it's time to go mch_exit(0); } } @@ -663,3 +689,80 @@ static void call_set_error(Channel *channel, char *msg) close_channel(channel); } + +static WBuffer *serialize_request(uint64_t channel_id, + uint64_t request_id, + String method, + Array args, + msgpack_sbuffer *sbuffer, + size_t refcount) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_rpc_serialize_request(request_id, method, args, &pac); + log_server_msg(channel_id, sbuffer); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + refcount, + free); + msgpack_sbuffer_clear(sbuffer); + api_free_array(args); + return rv; +} + +static WBuffer *serialize_response(uint64_t channel_id, + uint64_t response_id, + Error *err, + Object arg, + msgpack_sbuffer *sbuffer) +{ + msgpack_packer pac; + msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); + msgpack_rpc_serialize_response(response_id, err, arg, &pac); + log_server_msg(channel_id, sbuffer); + WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), + sbuffer->size, + 1, // responses only go though 1 channel + free); + msgpack_sbuffer_clear(sbuffer); + api_free_object(arg); + return rv; +} + +#if MIN_LOG_LEVEL <= DEBUG_LOG_LEVEL +#define REQ "[response] " +#define RES "[response] " +#define NOT "[notification] " + +static void log_server_msg(uint64_t channel_id, + msgpack_sbuffer *packed) +{ + msgpack_unpacked unpacked; + msgpack_unpacked_init(&unpacked); + msgpack_unpack_next(&unpacked, packed->data, packed->size, NULL); + uint64_t type = unpacked.data.via.array.ptr[0].via.u64; + DLOGN("[msgpack-rpc] nvim -> client(%" PRIu64 ") ", channel_id); + FILE *f = open_log_file(); + fprintf(f, type ? (type == 1 ? RES : NOT) : REQ); + log_msg_close(f, unpacked.data); + msgpack_unpacked_destroy(&unpacked); +} + +static void log_client_msg(uint64_t channel_id, + bool is_request, + msgpack_object msg) +{ + DLOGN("[msgpack-rpc] client(%" PRIu64 ") -> nvim ", channel_id); + FILE *f = open_log_file(); + fprintf(f, is_request ? REQ : RES); + log_msg_close(f, msg); +} + +static void log_msg_close(FILE *f, msgpack_object msg) +{ + msgpack_object_print(f, msg); + fputc('\n', f); + fflush(f); + fclose(f); +} +#endif diff --git a/src/nvim/msgpack_rpc/helpers.c b/src/nvim/msgpack_rpc/helpers.c index 6be221b912..4414aadb15 100644 --- a/src/nvim/msgpack_rpc/helpers.c +++ b/src/nvim/msgpack_rpc/helpers.c @@ -323,68 +323,48 @@ Object msgpack_rpc_handle_missing_method(uint64_t channel_id, } /// Serializes a msgpack-rpc request or notification(id == 0) -WBuffer *serialize_request(uint64_t request_id, - String method, - Array args, - msgpack_sbuffer *sbuffer, - size_t refcount) +void msgpack_rpc_serialize_request(uint64_t request_id, + String method, + Array args, + msgpack_packer *pac) FUNC_ATTR_NONNULL_ARG(4) { - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, request_id ? 4 : 3); - msgpack_pack_int(&pac, request_id ? 0 : 2); + msgpack_pack_array(pac, request_id ? 4 : 3); + msgpack_pack_int(pac, request_id ? 0 : 2); if (request_id) { - msgpack_pack_uint64(&pac, request_id); + msgpack_pack_uint64(pac, request_id); } - msgpack_pack_bin(&pac, method.size); - msgpack_pack_bin_body(&pac, method.data, method.size); - msgpack_rpc_from_array(args, &pac); - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - refcount, - free); - api_free_array(args); - msgpack_sbuffer_clear(sbuffer); - return rv; + msgpack_pack_bin(pac, method.size); + msgpack_pack_bin_body(pac, method.data, method.size); + msgpack_rpc_from_array(args, pac); } /// Serializes a msgpack-rpc response -WBuffer *serialize_response(uint64_t response_id, - Error *err, - Object arg, - msgpack_sbuffer *sbuffer) +void msgpack_rpc_serialize_response(uint64_t response_id, + Error *err, + Object arg, + msgpack_packer *pac) FUNC_ATTR_NONNULL_ARG(2, 4) { - msgpack_packer pac; - msgpack_packer_init(&pac, sbuffer, msgpack_sbuffer_write); - msgpack_pack_array(&pac, 4); - msgpack_pack_int(&pac, 1); - msgpack_pack_uint64(&pac, response_id); + msgpack_pack_array(pac, 4); + msgpack_pack_int(pac, 1); + msgpack_pack_uint64(pac, response_id); if (err->set) { // error represented by a [type, message] array - msgpack_pack_array(&pac, 2); - msgpack_rpc_from_integer(err->type, &pac); - msgpack_rpc_from_string(cstr_as_string(err->msg), &pac); + msgpack_pack_array(pac, 2); + msgpack_rpc_from_integer(err->type, pac); + msgpack_rpc_from_string(cstr_as_string(err->msg), pac); // Nil result - msgpack_pack_nil(&pac); + msgpack_pack_nil(pac); } else { // Nil error - msgpack_pack_nil(&pac); + msgpack_pack_nil(pac); // Return value - msgpack_rpc_from_object(arg, &pac); + msgpack_rpc_from_object(arg, pac); } - - WBuffer *rv = wstream_new_buffer(xmemdup(sbuffer->data, sbuffer->size), - sbuffer->size, - 1, // responses only go though 1 channel - free); - api_free_object(arg); - msgpack_sbuffer_clear(sbuffer); - return rv; } void msgpack_rpc_validate(uint64_t *response_id, diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c index 4c01829159..caada5616b 100644 --- a/src/nvim/os/job.c +++ b/src/nvim/os/job.c @@ -21,6 +21,21 @@ #define MAX_RUNNING_JOBS 100 #define JOB_BUFFER_SIZE 0xFFFF +#define close_job_stream(job, stream, type) \ + do { \ + if (job->stream) { \ + type##stream_free(job->stream); \ + job->stream = NULL; \ + if (!uv_is_closing((uv_handle_t *)&job->proc_std##stream)) { \ + uv_close((uv_handle_t *)&job->proc_std##stream, close_cb); \ + } \ + } \ + } while (0) + +#define close_job_in(job) close_job_stream(job, in, w) +#define close_job_out(job) close_job_stream(job, out, r) +#define close_job_err(job) close_job_stream(job, err, r) + struct job { // Job id the index in the job table plus one. int id; @@ -28,13 +43,9 @@ struct job { int64_t status; // Number of polls after a SIGTERM that will trigger a SIGKILL int exit_timeout; - // exit_cb may be called while there's still pending data from stdout/stderr. - // We use this reference count to ensure the JobExit event is only emitted - // when stdout/stderr are drained - int pending_refs; - // Same as above, but for freeing the job memory which contains - // libuv handles. Only after all are closed the job can be safely freed. - int pending_closes; + // Number of references to the job. The job resources will only be freed by + // close_cb when this is 0 + int refcount; // If the job was already stopped bool stopped; // Data associated with the job @@ -164,8 +175,7 @@ Job *job_start(char **argv, job->id = i + 1; *status = job->id; job->status = -1; - job->pending_refs = 3; - job->pending_closes = 4; + job->refcount = 4; job->data = data; job->stdout_cb = stdout_cb; job->stderr_cb = stderr_cb; @@ -206,7 +216,9 @@ Job *job_start(char **argv, // Spawn the job if (uv_spawn(uv_default_loop(), &job->proc, &job->proc_opts) != 0) { - free_job(job); + close_job_in(job); + close_job_out(job); + close_job_err(job); *status = -1; return NULL; } @@ -272,15 +284,13 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL int old_mode = cur_tmode; settmode(TMODE_COOK); - // Increase pending_refs to stop the exit_cb from being called, which - // could result in the job being freed before we have a chance - // to get the status. - job->pending_refs++; + // Increase refcount to stop the job from being freed before we have a + // chance to get the status. + job->refcount++; event_poll_until(ms, // Until... got_int || // interrupted by the user - job->pending_refs == 1); // job exited - job->pending_refs--; + job->refcount == 1); // job exited // we'll assume that a user frantically hitting interrupt doesn't like // the current job. Signal that it has to be killed. @@ -291,9 +301,10 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL settmode(old_mode); - if (!job->pending_refs) { + if (!--job->refcount) { int status = (int) job->status; - job_exit_callback(job); + // Manually invoke close_cb to free the job resources + close_cb((uv_handle_t *)&job->proc); return status; } @@ -312,15 +323,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL /// @param job The job instance void job_close_in(Job *job) FUNC_ATTR_NONNULL_ALL { - if (!job->in) { - return; - } - - // let other functions in the job module know that the in pipe is no more - wstream_free(job->in); - job->in = NULL; - - uv_close((uv_handle_t *)&job->proc_stdin, close_cb); + close_job_in(job); } /// All writes that complete after calling this function will be reported @@ -379,9 +382,6 @@ static void job_exit_callback(Job *job) job->exit_cb(job, job->data); } - // Free the job resources - free_job(job); - // Stop polling job status if this was the last job_count--; if (job_count == 0) { @@ -394,16 +394,6 @@ static bool is_alive(Job *job) return uv_process_kill(&job->proc, 0) == 0; } -static void free_job(Job *job) -{ - uv_close((uv_handle_t *)&job->proc_stdout, close_cb); - if (job->in) { - uv_close((uv_handle_t *)&job->proc_stdin, close_cb); - } - uv_close((uv_handle_t *)&job->proc_stderr, close_cb); - uv_close((uv_handle_t *)&job->proc, close_cb); -} - /// Iterates the table, sending SIGTERM to stopped jobs and SIGKILL to those /// that didn't die from SIGTERM after a while(exit_timeout is 0). static void job_prepare_cb(uv_prepare_t *handle) @@ -433,12 +423,14 @@ static void read_cb(RStream *rstream, void *data, bool eof) if (rstream == job->out) { job->stdout_cb(rstream, data, eof); + if (eof) { + close_job_out(job); + } } else { job->stderr_cb(rstream, data, eof); - } - - if (eof && --job->pending_refs == 0) { - job_exit_callback(job); + if (eof) { + close_job_err(job); + } } } @@ -448,31 +440,29 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal) Job *job = handle_get_job((uv_handle_t *)proc); job->status = status; - if (--job->pending_refs == 0) { - job_exit_callback(job); - } + uv_close((uv_handle_t *)&job->proc, close_cb); } static void close_cb(uv_handle_t *handle) { Job *job = handle_get_job(handle); - if (--job->pending_closes == 0) { - // Only free the job memory after all the associated handles are properly - // closed by libuv - rstream_free(job->out); - rstream_free(job->err); - if (job->in) { - wstream_free(job->in); - } + if (handle == (uv_handle_t *)&job->proc) { + // Make sure all streams are properly closed to trigger callback invocation + // when job->proc is closed + close_job_in(job); + close_job_out(job); + close_job_err(job); + } - // Free data memory of process and pipe handles, that was allocated - // by handle_set_job in job_start. + if (--job->refcount == 0) { + // Invoke the exit_cb + job_exit_callback(job); + // Free all memory allocated for the job free(job->proc.data); free(job->proc_stdin.data); free(job->proc_stdout.data); free(job->proc_stderr.data); - shell_free_argv(job->proc_opts.args); free(job); } |