[jsonrpc-glib] output-stream: use internal bit for busy status



commit 23b548ed6a3b5cd2b8bcc5407a6c5445b9203fca
Author: Christian Hergert <chergert redhat com>
Date:   Mon Apr 23 23:21:37 2018 -0700

    output-stream: use internal bit for busy status
    
    The pending bit has not been serving us well, so this ensures
    that we maintain state that we control about if we can write
    more data to the stream.
    
    This fixes an issue with high-velocity message sending in
    Builder where the socket has reached max buffering.

 src/jsonrpc-output-stream.c | 31 +++++++++++++++++++++++--------
 1 file changed, 23 insertions(+), 8 deletions(-)
---
diff --git a/src/jsonrpc-output-stream.c b/src/jsonrpc-output-stream.c
index e43c1a9..db68bb8 100644
--- a/src/jsonrpc-output-stream.c
+++ b/src/jsonrpc-output-stream.c
@@ -52,6 +52,7 @@ typedef struct
 {
   GQueue queue;
   guint  use_gvariant : 1;
+  guint  processing : 1;
 } JsonrpcOutputStreamPrivate;
 
 G_DEFINE_TYPE_WITH_PRIVATE (JsonrpcOutputStream, jsonrpc_output_stream, G_TYPE_DATA_OUTPUT_STREAM)
@@ -256,8 +257,10 @@ jsonrpc_output_stream_pump (JsonrpcOutputStream *self)
 
   g_assert (JSONRPC_IS_OUTPUT_STREAM (self));
 
-  if (priv->queue.length == 0 ||
-      g_output_stream_has_pending (G_OUTPUT_STREAM (self)))
+  if (priv->queue.length == 0)
+    return;
+
+  if (priv->processing)
     return;
 
   task = g_queue_pop_head (&priv->queue);
@@ -265,6 +268,17 @@ jsonrpc_output_stream_pump (JsonrpcOutputStream *self)
   data = g_bytes_get_data (bytes, &len);
   cancellable = g_task_get_cancellable (task);
 
+  if (g_output_stream_is_closed (G_OUTPUT_STREAM (self)))
+    {
+      g_task_return_new_error (task,
+                               G_IO_ERROR,
+                               G_IO_ERROR_CLOSED,
+                               "Stream has been closed");
+      return;
+    }
+
+  priv->processing = TRUE;
+
   g_output_stream_write_all_async (G_OUTPUT_STREAM (self),
                                    data,
                                    len,
@@ -279,22 +293,23 @@ jsonrpc_output_stream_write_message_async_cb (GObject      *object,
                                               GAsyncResult *result,
                                               gpointer      user_data)
 {
-  GOutputStream *stream = (GOutputStream *)object;
-  JsonrpcOutputStream *self;
+  JsonrpcOutputStream *self = (JsonrpcOutputStream *)object;
+  JsonrpcOutputStreamPrivate *priv = jsonrpc_output_stream_get_instance_private (self);
   g_autoptr(GError) error = NULL;
   g_autoptr(GTask) task = user_data;
   GBytes *bytes;
   gsize n_written;
 
-  g_assert (G_IS_OUTPUT_STREAM (stream));
+  g_assert (JSONRPC_IS_OUTPUT_STREAM (self));
   g_assert (G_IS_ASYNC_RESULT (result));
   g_assert (G_IS_TASK (task));
-  self = g_task_get_source_object (task);
-  g_assert (JSONRPC_IS_OUTPUT_STREAM (self));
 
-  if (!g_output_stream_write_all_finish (stream, result, &n_written, &error))
+  priv->processing = FALSE;
+
+  if (!g_output_stream_write_all_finish (G_OUTPUT_STREAM (self), result, &n_written, &error))
     {
       g_task_return_error (task, g_steal_pointer (&error));
+      jsonrpc_output_stream_fail_pending (self);
       return;
     }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]