[libsoup/wip/async-close] soup-message-io: do an async close when doing non-blocking I/O [WIP]



commit e0415303f19610e30f92aabd2bc924c34e1348f8
Author: Dan Winship <danw gnome org>
Date:   Mon Jun 9 08:52:38 2014 -0400

    soup-message-io: do an async close when doing non-blocking I/O [WIP]
    
    When using chunked encoding, SoupBodyOutputStream needs to write the
    final "0" chunk when it's closed, and thus may block. So we have to do
    an async close in the non-blocking case.
    
    (Does not currently pass "make check".)
    
    https://bugzilla.gnome.org/show_bug.cgi?id=727138

 libsoup/soup-message-io.c |   66 ++++++++++++++++++++++++++++++++++++++++----
 1 files changed, 60 insertions(+), 6 deletions(-)
---
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 600cd85..2fa3492 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -80,6 +80,9 @@ typedef struct {
        GSource *unpause_source;
        gboolean paused;
 
+       GCancellable *async_close_wait;
+       GError       *async_close_error;
+
        SoupMessageGetHeadersFn   get_headers_cb;
        SoupMessageParseHeadersFn parse_headers_cb;
        gpointer                  header_data;
@@ -87,6 +90,7 @@ typedef struct {
        gpointer                  completion_data;
 } SoupMessageIOData;
        
+static void io_run (SoupMessage *msg, gboolean blocking);
 
 #define RESPONSE_BLOCK_SIZE 8192
 
@@ -275,6 +279,33 @@ soup_message_setup_body_istream (GInputStream *body_stream,
        return istream;
 }
 
+static void
+closed_async (GObject      *source,
+             GAsyncResult *result,
+             gpointer      user_data)
+{
+       GOutputStream *body_ostream = G_OUTPUT_STREAM (source);
+       SoupMessage *msg = user_data;
+       SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+       SoupMessageIOData *io = priv->io_data;
+       GCancellable *async_close_wait;
+
+       if (!io || !io->async_close_wait || io->body_ostream != body_ostream) {
+               g_object_unref (msg);
+               return;
+       }
+
+       g_output_stream_close_finish (body_ostream, result, &io->async_close_error);
+       g_clear_object (&io->body_ostream);
+
+       async_close_wait = io->async_close_wait;
+       io->async_close_wait = NULL;
+       g_cancellable_cancel (async_close_wait);
+       g_object_unref (async_close_wait);
+
+       g_object_unref (msg);
+}
+
 /*
  * There are two request/response formats: the basic request/response,
  * possibly with one or more unsolicited informational responses (such
@@ -316,6 +347,17 @@ io_write (SoupMessage *msg, gboolean blocking,
        SoupBuffer *chunk;
        gssize nwrote;
 
+       if (io->async_close_error) {
+               g_propagate_error (error, io->async_close_error);
+               io->async_close_error = NULL;
+               return FALSE;
+       } else if (io->async_close_wait) {
+               g_set_error_literal (error, G_IO_ERROR,
+                                    G_IO_ERROR_WOULD_BLOCK,
+                                    _("Operation would block"));
+               return FALSE;
+       }
+
        switch (io->write_state) {
        case SOUP_MESSAGE_IO_STATE_HEADERS:
                if (!io->write_buf->len) {
@@ -460,9 +502,21 @@ io_write (SoupMessage *msg, gboolean blocking,
 
        case SOUP_MESSAGE_IO_STATE_BODY_DONE:
                if (io->body_ostream) {
-                       if (!g_output_stream_close (io->body_ostream, cancellable, error))
-                               return FALSE;
-                       g_clear_object (&io->body_ostream);
+                       if (blocking) {
+                               if (!g_output_stream_close (io->body_ostream, cancellable, error))
+                                       return FALSE;
+                               g_clear_object (&io->body_ostream);
+                       } else {
+                               io->async_close_wait = g_cancellable_new ();
+                               if (io->async_context)
+                                       g_main_context_push_thread_default (io->async_context);
+                               g_output_stream_close_async (io->body_ostream,
+                                                            G_PRIORITY_DEFAULT, cancellable,
+                                                            closed_async, g_object_ref (msg));
+                               if (io->async_context)
+                                       g_main_context_pop_thread_default (io->async_context);
+                               return TRUE;
+                       }
                }
 
                io->write_state = SOUP_MESSAGE_IO_STATE_FINISHING;
@@ -782,6 +836,8 @@ soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
                base_source = g_timeout_source_new (0);
        } else if (io->paused) {
                base_source = NULL;
+       } else if (io->async_close_wait) {
+               base_source = g_cancellable_source_new (io->async_close_wait);
        } else if (SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
                GPollableInputStream *istream;
 
@@ -857,7 +913,7 @@ io_run_until (SoupMessage *msg, gboolean blocking,
 
        g_object_ref (msg);
 
-       while (progress && priv->io_data == io && !io->paused &&
+       while (progress && priv->io_data == io && !io->paused && !io->async_close_wait &&
               (io->read_state < read_state || io->write_state < write_state)) {
 
                if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
@@ -907,8 +963,6 @@ io_run_until (SoupMessage *msg, gboolean blocking,
        return done;
 }
 
-static void io_run (SoupMessage *msg, gboolean blocking);
-
 static gboolean
 io_run_ready (SoupMessage *msg, gpointer user_data)
 {


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]