[jsonrpc-glib/jsonrpc-glib-3-28] output-stream: use internal bit for busy status



commit 685f84663d4a9af228b43d312a79bbd784522562
Author: Christian Hergert <chergert redhat com>
Date:   Mon Apr 23 23:21:37 2018 -0700

    output-stream: use internal bit for busy status
    
    The pending bit has not been serving us well, so this ensures
    that we maintain state that we control about if we can write
    more data to the stream.
    
    This fixes an issue with high-velocity message sending in
    Builder where the socket has reached max buffering.

 src/jsonrpc-output-stream.c | 31 +++++++++++++++++++++++--------
 1 file changed, 23 insertions(+), 8 deletions(-)
---
diff --git a/src/jsonrpc-output-stream.c b/src/jsonrpc-output-stream.c
index e43c1a9..db68bb8 100644
--- a/src/jsonrpc-output-stream.c
+++ b/src/jsonrpc-output-stream.c
@@ -52,6 +52,7 @@ typedef struct
 {
   GQueue queue;
   guint  use_gvariant : 1;
+  guint  processing : 1;
 } JsonrpcOutputStreamPrivate;
 
 G_DEFINE_TYPE_WITH_PRIVATE (JsonrpcOutputStream, jsonrpc_output_stream, G_TYPE_DATA_OUTPUT_STREAM)
@@ -256,8 +257,10 @@ jsonrpc_output_stream_pump (JsonrpcOutputStream *self)
 
   g_assert (JSONRPC_IS_OUTPUT_STREAM (self));
 
-  if (priv->queue.length == 0 ||
-      g_output_stream_has_pending (G_OUTPUT_STREAM (self)))
+  if (priv->queue.length == 0)
+    return;
+
+  if (priv->processing)
     return;
 
   task = g_queue_pop_head (&priv->queue);
@@ -265,6 +268,17 @@ jsonrpc_output_stream_pump (JsonrpcOutputStream *self)
   data = g_bytes_get_data (bytes, &len);
   cancellable = g_task_get_cancellable (task);
 
+  if (g_output_stream_is_closed (G_OUTPUT_STREAM (self)))
+    {
+      g_task_return_new_error (task,
+                               G_IO_ERROR,
+                               G_IO_ERROR_CLOSED,
+                               "Stream has been closed");
+      return;
+    }
+
+  priv->processing = TRUE;
+
   g_output_stream_write_all_async (G_OUTPUT_STREAM (self),
                                    data,
                                    len,
@@ -279,22 +293,23 @@ jsonrpc_output_stream_write_message_async_cb (GObject      *object,
                                               GAsyncResult *result,
                                               gpointer      user_data)
 {
-  GOutputStream *stream = (GOutputStream *)object;
-  JsonrpcOutputStream *self;
+  JsonrpcOutputStream *self = (JsonrpcOutputStream *)object;
+  JsonrpcOutputStreamPrivate *priv = jsonrpc_output_stream_get_instance_private (self);
   g_autoptr(GError) error = NULL;
   g_autoptr(GTask) task = user_data;
   GBytes *bytes;
   gsize n_written;
 
-  g_assert (G_IS_OUTPUT_STREAM (stream));
+  g_assert (JSONRPC_IS_OUTPUT_STREAM (self));
   g_assert (G_IS_ASYNC_RESULT (result));
   g_assert (G_IS_TASK (task));
-  self = g_task_get_source_object (task);
-  g_assert (JSONRPC_IS_OUTPUT_STREAM (self));
 
-  if (!g_output_stream_write_all_finish (stream, result, &n_written, &error))
+  priv->processing = FALSE;
+
+  if (!g_output_stream_write_all_finish (G_OUTPUT_STREAM (self), result, &n_written, &error))
     {
       g_task_return_error (task, g_steal_pointer (&error));
+      jsonrpc_output_stream_fail_pending (self);
       return;
     }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]