aboutsummaryrefslogtreecommitdiff
path: root/src/nvim/event/wstream.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/nvim/event/wstream.c')
-rw-r--r--src/nvim/event/wstream.c162
1 files changed, 162 insertions, 0 deletions
diff --git a/src/nvim/event/wstream.c b/src/nvim/event/wstream.c
new file mode 100644
index 0000000000..5fcb724fe3
--- /dev/null
+++ b/src/nvim/event/wstream.c
@@ -0,0 +1,162 @@
+#include <assert.h>
+#include <stdint.h>
+#include <stdbool.h>
+#include <stdlib.h>
+
+#include <uv.h>
+
+#include "nvim/event/loop.h"
+#include "nvim/event/wstream.h"
+#include "nvim/vim.h"
+#include "nvim/memory.h"
+
+#define DEFAULT_MAXMEM 1024 * 1024 * 10
+
+typedef struct {
+ Stream *stream;
+ WBuffer *buffer;
+ uv_write_t uv_req;
+} WRequest;
+
+#ifdef INCLUDE_GENERATED_DECLARATIONS
+# include "event/wstream.c.generated.h"
+#endif
+
+void wstream_init_fd(Loop *loop, Stream *stream, int fd, size_t maxmem,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(loop, stream, fd, NULL, data);
+ wstream_init(stream, maxmem);
+}
+
+void wstream_init_stream(Stream *stream, uv_stream_t *uvstream, size_t maxmem,
+ void *data)
+ FUNC_ATTR_NONNULL_ARG(1) FUNC_ATTR_NONNULL_ARG(2)
+{
+ stream_init(NULL, stream, -1, uvstream, data);
+ wstream_init(stream, maxmem);
+}
+
+void wstream_init(Stream *stream, size_t maxmem)
+{
+ stream->maxmem = maxmem ? maxmem : DEFAULT_MAXMEM;
+}
+
+/// Sets a callback that will be called on completion of a write request,
+/// indicating failure/success.
+///
+/// This affects all requests currently in-flight as well. Overwrites any
+/// possible earlier callback.
+///
+/// @note This callback will not fire if the write request couldn't even be
+/// queued properly (i.e.: when `wstream_write() returns an error`).
+///
+/// @param stream The `Stream` instance
+/// @param cb The callback
+void wstream_set_write_cb(Stream *stream, stream_write_cb cb)
+ FUNC_ATTR_NONNULL_ALL
+{
+ stream->write_cb = cb;
+}
+
+/// Queues data for writing to the backing file descriptor of a `Stream`
+/// instance. This will fail if the write would cause the Stream use more
+/// memory than specified by `maxmem`.
+///
+/// @param stream The `Stream` instance
+/// @param buffer The buffer which contains data to be written
+/// @return false if the write failed
+bool wstream_write(Stream *stream, WBuffer *buffer)
+ FUNC_ATTR_NONNULL_ALL
+{
+ assert(stream->maxmem);
+ // This should not be called after a stream was freed
+ assert(!stream->closed);
+
+ if (stream->curmem > stream->maxmem) {
+ goto err;
+ }
+
+ stream->curmem += buffer->size;
+
+ WRequest *data = xmalloc(sizeof(WRequest));
+ data->stream = stream;
+ data->buffer = buffer;
+ data->uv_req.data = data;
+
+ uv_buf_t uvbuf;
+ uvbuf.base = buffer->data;
+ uvbuf.len = buffer->size;
+
+ if (uv_write(&data->uv_req, stream->uvstream, &uvbuf, 1, write_cb)) {
+ xfree(data);
+ goto err;
+ }
+
+ stream->pending_reqs++;
+ return true;
+
+err:
+ wstream_release_wbuffer(buffer);
+ return false;
+}
+
+/// Creates a WBuffer object for holding output data. Instances of this
+/// object can be reused across Stream instances, and the memory is freed
+/// automatically when no longer needed(it tracks the number of references
+/// internally)
+///
+/// @param data Data stored by the WBuffer
+/// @param size The size of the data array
+/// @param refcount The number of references for the WBuffer. This will be used
+/// by Stream instances to decide when a WBuffer should be freed.
+/// @param cb Pointer to function that will be responsible for freeing
+/// the buffer data(passing 'free' will work as expected).
+/// @return The allocated WBuffer instance
+WBuffer *wstream_new_buffer(char *data,
+ size_t size,
+ size_t refcount,
+ wbuffer_data_finalizer cb)
+{
+ WBuffer *rv = xmalloc(sizeof(WBuffer));
+ rv->size = size;
+ rv->refcount = refcount;
+ rv->cb = cb;
+ rv->data = data;
+
+ return rv;
+}
+
+static void write_cb(uv_write_t *req, int status)
+{
+ WRequest *data = req->data;
+
+ data->stream->curmem -= data->buffer->size;
+
+ wstream_release_wbuffer(data->buffer);
+
+ if (data->stream->write_cb) {
+ data->stream->write_cb(data->stream, data->stream->data, status);
+ }
+
+ data->stream->pending_reqs--;
+
+ if (data->stream->closed && data->stream->pending_reqs == 0) {
+ // Last pending write, free the stream;
+ stream_close_handle(data->stream);
+ }
+
+ xfree(data);
+}
+
+void wstream_release_wbuffer(WBuffer *buffer)
+{
+ if (!--buffer->refcount) {
+ if (buffer->cb) {
+ buffer->cb(buffer->data);
+ }
+
+ xfree(buffer);
+ }
+}