aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/nvim/msgpack_rpc/channel.c13
-rw-r--r--src/nvim/os/event.c98
-rw-r--r--src/nvim/os/event_defs.h13
-rw-r--r--src/nvim/os/input.c18
-rw-r--r--src/nvim/os/job.c55
-rw-r--r--src/nvim/os/rstream.c58
-rw-r--r--src/nvim/os/signal.c81
7 files changed, 65 insertions, 271 deletions
diff --git a/src/nvim/msgpack_rpc/channel.c b/src/nvim/msgpack_rpc/channel.c
index 83e7900a54..d31e404c23 100644
--- a/src/nvim/msgpack_rpc/channel.c
+++ b/src/nvim/msgpack_rpc/channel.c
@@ -126,8 +126,7 @@ void channel_from_stream(uv_stream_t *stream)
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel,
- NULL);
+ channel);
rstream_set_stream(channel->data.streams.read, stream);
rstream_start(channel->data.streams.read);
// write stream
@@ -201,17 +200,12 @@ Object channel_send_call(uint64_t id,
// Send the msgpack-rpc request
send_request(channel, request_id, method_name, args);
- EventSource channel_source = channel->is_job
- ? job_event_source(channel->data.job)
- : rstream_event_source(channel->data.streams.read);
- EventSource sources[] = {channel_source, NULL};
-
// Push the frame
ChannelCallFrame frame = {request_id, false, false, NIL};
kv_push(ChannelCallFrame *, channel->call_stack, &frame);
do {
- event_poll(-1, sources);
+ event_poll(-1);
} while (!frame.returned);
(void)kv_pop(channel->call_stack);
@@ -286,8 +280,7 @@ static void channel_from_stdio(void)
// read stream
channel->data.streams.read = rstream_new(parse_msgpack,
rbuffer_new(CHANNEL_BUFFER_SIZE),
- channel,
- NULL);
+ channel);
rstream_set_file(channel->data.streams.read, 0);
rstream_start(channel->data.streams.read);
// write stream
diff --git a/src/nvim/os/event.c b/src/nvim/os/event.c
index 43c02b13b2..00920fc5cf 100644
--- a/src/nvim/os/event.c
+++ b/src/nvim/os/event.c
@@ -34,12 +34,7 @@ typedef struct {
#ifdef INCLUDE_GENERATED_DECLARATIONS
# include "os/event.c.generated.h"
#endif
-static klist_t(Event) *deferred_events, *immediate_events;
-// NULL-terminated array of event sources that we should process immediately.
-//
-// Events from sources that are not contained in this array are processed
-// later when `event_process` is called
-static EventSource *immediate_sources = NULL;
+static klist_t(Event) *pending_events;
void event_init(void)
{
@@ -47,8 +42,7 @@ void event_init(void)
msgpack_rpc_init_method_table();
msgpack_rpc_helpers_init();
// Initialize the event queues
- deferred_events = kl_init(Event);
- immediate_events = kl_init(Event);
+ pending_events = kl_init(Event);
// Initialize input events
input_init();
// Timer to wake the event loop if a timeout argument is passed to
@@ -72,8 +66,7 @@ void event_teardown(void)
}
// Wait for some event
-bool event_poll(int32_t ms, EventSource sources[])
- FUNC_ATTR_NONNULL_ARG(2)
+bool event_poll(int32_t ms)
{
uv_run_mode run_mode = UV_RUN_ONCE;
@@ -104,18 +97,7 @@ bool event_poll(int32_t ms, EventSource sources[])
run_mode = UV_RUN_NOWAIT;
}
- size_t processed_events;
-
- do {
- // Run one event loop iteration, blocking for events if run_mode is
- // UV_RUN_ONCE
- processed_events = loop(run_mode, sources);
- } while (
- // Continue running if ...
- !processed_events && // we didn't process any immediate events
- !event_has_deferred() && // no events are waiting to be processed
- run_mode != UV_RUN_NOWAIT && // ms != 0
- !timer_data.timed_out); // we didn't get a timeout
+ loop(run_mode);
if (!(--recursive)) {
// Again, only stop when we leave the top-level invocation
@@ -127,56 +109,31 @@ bool event_poll(int32_t ms, EventSource sources[])
// once more to let libuv perform it's cleanup
uv_close((uv_handle_t *)&timer, NULL);
uv_close((uv_handle_t *)&timer_prepare, NULL);
- processed_events += loop(UV_RUN_NOWAIT, sources);
+ loop(UV_RUN_NOWAIT);
}
- return !timer_data.timed_out && (processed_events || event_has_deferred());
+ return !timer_data.timed_out && event_has_deferred();
}
bool event_has_deferred(void)
{
- return !kl_empty(deferred_events);
+ return !kl_empty(pending_events);
}
// Queue an event
void event_push(Event event)
{
- bool defer = true;
-
- if (immediate_sources) {
- size_t i;
- EventSource src;
-
- for (src = immediate_sources[i = 0]; src; src = immediate_sources[++i]) {
- if (src == event.source) {
- defer = false;
- break;
- }
- }
- }
-
- *kl_pushp(Event, defer ? deferred_events : immediate_events) = event;
+ *kl_pushp(Event, pending_events) = event;
}
-void event_process(void)
-{
- process_from(deferred_events);
-}
-// Runs the appropriate action for each queued event
-static size_t process_from(klist_t(Event) *queue)
+void event_process(void)
{
- size_t count = 0;
Event event;
- while (kl_shift(Event, queue, &event) == 0) {
+ while (kl_shift(Event, pending_events, &event) == 0) {
event.handler(event);
- count++;
}
-
- DLOG("Processed %u events", count);
-
- return count;
}
// Set a flag in the `event_poll` loop for signaling of a timeout
@@ -194,42 +151,9 @@ static void timer_prepare_cb(uv_prepare_t *handle)
uv_prepare_stop(handle);
}
-static void requeue_deferred_events(void)
+static void loop(uv_run_mode run_mode)
{
- size_t remaining = deferred_events->size;
-
- DLOG("Number of deferred events: %u", remaining);
-
- while (remaining--) {
- // Re-push each deferred event to ensure it will be in the right queue
- Event event;
- kl_shift(Event, deferred_events, &event);
- event_push(event);
- DLOG("Re-queueing event");
- }
-
- DLOG("Number of deferred events: %u", deferred_events->size);
-}
-
-static size_t loop(uv_run_mode run_mode, EventSource *sources)
-{
- size_t count;
- immediate_sources = sources;
- // It's possible that some events from the immediate sources are waiting
- // in the deferred queue. If so, move them to the immediate queue so they
- // will be processed in order of arrival by the next `process_from` call.
- requeue_deferred_events();
- count = process_from(immediate_events);
-
- if (count) {
- // No need to enter libuv, events were already processed
- return count;
- }
-
DLOG("Enter event loop");
uv_run(uv_default_loop(), run_mode);
DLOG("Exit event loop");
- immediate_sources = NULL;
- count = process_from(immediate_events);
- return count;
}
diff --git a/src/nvim/os/event_defs.h b/src/nvim/os/event_defs.h
index 553d4e3125..2dd9403d9f 100644
--- a/src/nvim/os/event_defs.h
+++ b/src/nvim/os/event_defs.h
@@ -6,21 +6,12 @@
#include "nvim/os/job_defs.h"
#include "nvim/os/rstream_defs.h"
-typedef void * EventSource;
typedef struct event Event;
typedef void (*event_handler)(Event event);
struct event {
- EventSource source;
+ void *data;
event_handler handler;
- union {
- int signum;
- struct {
- RStream *ptr;
- bool eof;
- } rstream;
- Job *job;
- } data;
-};
+};
#endif // NVIM_OS_EVENT_DEFS_H
diff --git a/src/nvim/os/input.c b/src/nvim/os/input.c
index a18d735ce6..d718bf95da 100644
--- a/src/nvim/os/input.c
+++ b/src/nvim/os/input.c
@@ -7,7 +7,6 @@
#include "nvim/api/private/defs.h"
#include "nvim/os/input.h"
#include "nvim/os/event.h"
-#include "nvim/os/signal.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
#include "nvim/ascii.h"
@@ -48,10 +47,7 @@ void input_init(void)
}
read_buffer = rbuffer_new(READ_BUFFER_SIZE);
- read_stream = rstream_new(read_cb,
- read_buffer,
- NULL,
- NULL);
+ read_stream = rstream_new(read_cb, read_buffer, NULL);
rstream_set_file(read_stream, read_cmd_fd);
}
@@ -170,16 +166,10 @@ void input_buffer_restore(String str)
static bool input_poll(int32_t ms)
{
if (embedded_mode) {
- EventSource input_sources[] = { signal_event_source(), NULL };
- return event_poll(ms, input_sources);
+ return event_poll(ms);
}
- EventSource input_sources[] = {
- rstream_event_source(read_stream),
- NULL
- };
-
- return input_ready() || event_poll(ms, input_sources) || input_ready();
+ return input_ready() || event_poll(ms) || input_ready();
}
// This is a replacement for the old `WaitForChar` function in os_unix.c
@@ -235,7 +225,7 @@ static void read_cb(RStream *rstream, void *data, bool at_eof)
static void convert_input(void)
{
- if (!rbuffer_available(input_buffer)) {
+ if (embedded_mode || !rbuffer_available(input_buffer)) {
// No input buffer space
return;
}
diff --git a/src/nvim/os/job.c b/src/nvim/os/job.c
index d0ac82c047..091da5d213 100644
--- a/src/nvim/os/job.c
+++ b/src/nvim/os/job.c
@@ -14,7 +14,6 @@
#include "nvim/os/event_defs.h"
#include "nvim/os/time.h"
#include "nvim/os/shell.h"
-#include "nvim/os/signal.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
#include "nvim/term.h"
@@ -103,21 +102,24 @@ void job_teardown(void)
// Prepare to start shooting
for (i = 0; i < MAX_RUNNING_JOBS; i++) {
- if ((job = table[i]) == NULL) {
- continue;
- }
+ job = table[i];
// Still alive
- while (is_alive(job) && remaining_tries--) {
+ while (job && is_alive(job) && remaining_tries--) {
os_delay(50, 0);
// Acknowledge child exits
uv_run(uv_default_loop(), UV_RUN_NOWAIT);
+ // It's possible that the uv_run call removed the job from the table,
+ // reset 'job' so the next iteration won't run in that case.
+ job = table[i];
}
- if (is_alive(job)) {
+ if (job && is_alive(job)) {
uv_process_kill(&job->proc, SIGKILL);
}
}
+ // Last run to ensure all children were removed
+ uv_run(uv_default_loop(), UV_RUN_NOWAIT);
}
/// Tries to start a new job.
@@ -213,14 +215,8 @@ Job *job_start(char **argv,
job->in = wstream_new(maxmem);
wstream_set_stream(job->in, (uv_stream_t *)&job->proc_stdin);
// Start the readable streams
- job->out = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
- job->err = rstream_new(read_cb,
- rbuffer_new(JOB_BUFFER_SIZE),
- job,
- job_event_source(job));
+ job->out = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
+ job->err = rstream_new(read_cb, rbuffer_new(JOB_BUFFER_SIZE), job);
rstream_set_stream(job->out, (uv_stream_t *)&job->proc_stdout);
rstream_set_stream(job->err, (uv_stream_t *)&job->proc_stderr);
rstream_start(job->out);
@@ -277,8 +273,6 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
int old_mode = cur_tmode;
settmode(TMODE_COOK);
- EventSource sources[] = {job_event_source(job), signal_event_source(), NULL};
-
// keep track of the elapsed time if ms > 0
uint64_t before = (ms > 0) ? os_hrtime() : 0;
@@ -288,7 +282,7 @@ int job_wait(Job *job, int ms) FUNC_ATTR_NONNULL_ALL
break;
}
- event_poll(ms, sources);
+ event_poll(ms);
// we'll assume that a user frantically hitting interrupt doesn't like
// the current job. Signal that it has to be killed.
@@ -369,14 +363,6 @@ bool job_write(Job *job, WBuffer *buffer)
return wstream_write(job->in, buffer);
}
-/// Runs the read callback associated with the job exit event
-///
-/// @param event Object containing data necessary to invoke the callback
-void job_exit_event(Event event)
-{
- job_exit_callback(event.data.job);
-}
-
/// Get the job id
///
/// @param job A pointer to the job
@@ -395,11 +381,6 @@ void *job_data(Job *job)
return job->data;
}
-EventSource job_event_source(Job *job)
-{
- return job;
-}
-
static void job_exit_callback(Job *job)
{
// Free the slot now, 'exit_cb' may want to start another job to replace
@@ -470,7 +451,7 @@ static void read_cb(RStream *rstream, void *data, bool eof)
}
if (eof && --job->pending_refs == 0) {
- emit_exit_event(job);
+ job_exit_callback(job);
}
}
@@ -481,20 +462,10 @@ static void exit_cb(uv_process_t *proc, int64_t status, int term_signal)
job->status = status;
if (--job->pending_refs == 0) {
- emit_exit_event(job);
+ job_exit_callback(job);
}
}
-static void emit_exit_event(Job *job)
-{
- Event event = {
- .source = job_event_source(job),
- .handler = job_exit_event,
- .data.job = job
- };
- event_push(event);
-}
-
static void close_cb(uv_handle_t *handle)
{
Job *job = handle_get_job(handle);
diff --git a/src/nvim/os/rstream.c b/src/nvim/os/rstream.c
index b95098cc52..8cfd9d1b75 100644
--- a/src/nvim/os/rstream.c
+++ b/src/nvim/os/rstream.c
@@ -8,8 +8,6 @@
#include "nvim/os/uv_helpers.h"
#include "nvim/os/rstream_defs.h"
#include "nvim/os/rstream.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/event.h"
#include "nvim/ascii.h"
#include "nvim/vim.h"
#include "nvim/memory.h"
@@ -33,7 +31,6 @@ struct rstream {
uv_file fd;
rstream_cb cb;
bool free_handle;
- EventSource source_override;
};
#ifdef INCLUDE_GENERATED_DECLARATIONS
@@ -76,18 +73,13 @@ void rbuffer_consumed(RBuffer *rbuffer, size_t count)
void rbuffer_produced(RBuffer *rbuffer, size_t count)
{
rbuffer->wpos += count;
- DLOG("Received %u bytes from RStream(address: %p, source: %p)",
- (size_t)cnt,
- rbuffer->rstream,
- rstream_event_source(rbuffer->rstream));
+ DLOG("Received %u bytes from RStream(%p)", (size_t)cnt, rbuffer->rstream);
rbuffer_relocate(rbuffer);
if (rbuffer->rstream && rbuffer->wpos == rbuffer->capacity) {
// The last read filled the buffer, stop reading for now
rstream_stop(rbuffer->rstream);
- DLOG("Buffer for RStream(address: %p, source: %p) is full, stopping it",
- rstream,
- rstream_event_source(rstream));
+ DLOG("Buffer for RStream(%p) is full, stopping it", rstream);
}
}
@@ -180,13 +172,8 @@ void rbuffer_free(RBuffer *rbuffer)
/// for reading with `rstream_read`
/// @param buffer RBuffer instance to associate with the RStream
/// @param data Some state to associate with the `RStream` instance
-/// @param source_override Replacement for the default source used in events
-/// emitted by this RStream. If NULL, the default is used.
/// @return The newly-allocated `RStream` instance
-RStream * rstream_new(rstream_cb cb,
- RBuffer *buffer,
- void *data,
- EventSource source_override)
+RStream * rstream_new(rstream_cb cb, RBuffer *buffer, void *data)
{
RStream *rv = xmalloc(sizeof(RStream));
rv->buffer = buffer;
@@ -198,7 +185,6 @@ RStream * rstream_new(rstream_cb cb,
rv->fread_idle = NULL;
rv->free_handle = false;
rv->file_type = UV_UNKNOWN_HANDLE;
- rv->source_override = source_override ? source_override : rv;
return rv;
}
@@ -322,21 +308,6 @@ size_t rstream_read(RStream *rstream, char *buffer, size_t count)
return rbuffer_read(rstream->buffer, buffer, count);
}
-/// Runs the read callback associated with the rstream
-///
-/// @param event Object containing data necessary to invoke the callback
-void rstream_read_event(Event event)
-{
- RStream *rstream = event.data.rstream.ptr;
-
- rstream->cb(rstream, rstream->data, event.data.rstream.eof);
-}
-
-EventSource rstream_event_source(RStream *rstream)
-{
- return rstream->source_override;
-}
-
// Callbacks used by libuv
// Called by libuv to allocate memory for reading.
@@ -357,13 +328,11 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
if (cnt <= 0) {
if (cnt != UV_ENOBUFS) {
- DLOG("Closing RStream(address: %p, source: %p)",
- rstream,
- rstream_event_source(rstream));
+ DLOG("Closing RStream(%p)", rstream);
// Read error or EOF, either way stop the stream and invoke the callback
// with eof == true
uv_read_stop(stream);
- emit_read_event(rstream, true);
+ rstream->cb(rstream, rstream->data, true);
}
return;
}
@@ -374,7 +343,7 @@ static void read_cb(uv_stream_t *stream, ssize_t cnt, const uv_buf_t *buf)
// Data was already written, so all we need is to update 'wpos' to reflect
// the space actually used in the buffer.
rbuffer_produced(rstream->buffer, nread);
- emit_read_event(rstream, false);
+ rstream->cb(rstream, rstream->data, false);
}
// Called by the by the 'idle' handle to emulate a reading event
@@ -409,7 +378,6 @@ static void fread_idle_cb(uv_idle_t *handle)
if (req.result <= 0) {
uv_idle_stop(rstream->fread_idle);
- emit_read_event(rstream, true);
return;
}
@@ -417,7 +385,6 @@ static void fread_idle_cb(uv_idle_t *handle)
size_t nread = (size_t) req.result;
rbuffer_produced(rstream->buffer, nread);
rstream->fpos += nread;
- emit_read_event(rstream, false);
}
static void close_cb(uv_handle_t *handle)
@@ -426,19 +393,6 @@ static void close_cb(uv_handle_t *handle)
free(handle);
}
-static void emit_read_event(RStream *rstream, bool eof)
-{
- Event event = {
- .source = rstream_event_source(rstream),
- .handler = rstream_read_event,
- .data.rstream = {
- .ptr = rstream,
- .eof = eof
- }
- };
- event_push(event);
-}
-
static void rbuffer_relocate(RBuffer *rbuffer)
{
// Move data ...
diff --git a/src/nvim/os/signal.c b/src/nvim/os/signal.c
index b330c7f788..36f7b37c48 100644
--- a/src/nvim/os/signal.c
+++ b/src/nvim/os/signal.c
@@ -12,8 +12,6 @@
#include "nvim/memory.h"
#include "nvim/misc1.h"
#include "nvim/misc2.h"
-#include "nvim/os/event_defs.h"
-#include "nvim/os/event.h"
#include "nvim/os/signal.h"
static uv_signal_t sint, spipe, shup, squit, sterm, swinch;
@@ -72,45 +70,6 @@ void signal_accept_deadly(void)
rejecting_deadly = false;
}
-void signal_handle(Event event)
-{
- int signum = event.data.signum;
-
- switch (signum) {
- case SIGINT:
- got_int = true;
- break;
-#ifdef SIGPWR
- case SIGPWR:
- // Signal of a power failure(eg batteries low), flush the swap files to
- // be safe
- ml_sync_all(false, false);
- break;
-#endif
- case SIGPIPE:
- // Ignore
- break;
- case SIGWINCH:
- shell_resized();
- break;
- case SIGTERM:
- case SIGQUIT:
- case SIGHUP:
- if (!rejecting_deadly) {
- deadly_signal(signum);
- }
- break;
- default:
- fprintf(stderr, "Invalid signal %d", signum);
- break;
- }
-}
-
-EventSource signal_event_source(void)
-{
- return &sint;
-}
-
static char * signal_name(int signum)
{
switch (signum) {
@@ -154,20 +113,32 @@ static void deadly_signal(int signum)
static void signal_cb(uv_signal_t *handle, int signum)
{
- if (rejecting_deadly) {
- if (signum == SIGINT) {
+ switch (signum) {
+ case SIGINT:
got_int = true;
- }
-
- return;
+ break;
+#ifdef SIGPWR
+ case SIGPWR:
+ // Signal of a power failure(eg batteries low), flush the swap files to
+ // be safe
+ ml_sync_all(false, false);
+ break;
+#endif
+ case SIGPIPE:
+ // Ignore
+ break;
+ case SIGWINCH:
+ shell_resized();
+ break;
+ case SIGTERM:
+ case SIGQUIT:
+ case SIGHUP:
+ if (!rejecting_deadly) {
+ deadly_signal(signum);
+ }
+ break;
+ default:
+ fprintf(stderr, "Invalid signal %d", signum);
+ break;
}
-
- Event event = {
- .source = signal_event_source(),
- .handler = signal_handle,
- .data = {
- .signum = signum
- }
- };
- event_push(event);
}