[libsoup/wip/http2: 90/90] wip



commit 11e3d519524bb7ff22020e34c70984288b79d1af
Author: Dan Winship <danw gnome org>
Date:   Tue Dec 10 20:30:05 2013 +0100

    wip

 libsoup/soup-message-queue.c |   42 +++++++++-----
 libsoup/soup-message-queue.h |   15 +++--
 libsoup/soup-session-host.c  |   81 ++++++++++++++++++++++++--
 libsoup/soup-session-host.h  |   14 +++-
 libsoup/soup-session.c       |  130 ++++++++++++++++++++++++++----------------
 5 files changed, 204 insertions(+), 78 deletions(-)
---
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c
index 575f51f..961cd72 100644
--- a/libsoup/soup-message-queue.c
+++ b/libsoup/soup-message-queue.c
@@ -34,12 +34,11 @@ struct _SoupMessageQueue {
 };
 
 SoupMessageQueue *
-soup_message_queue_new (SoupSession *session)
+soup_message_queue_new (void)
 {
        SoupMessageQueue *queue;
 
        queue = g_slice_new0 (SoupMessageQueue);
-       queue->session = session;
        g_mutex_init (&queue->mutex);
        return queue;
 }
@@ -62,29 +61,30 @@ queue_message_restarted (SoupMessage *msg, gpointer user_data)
 }
 
 /**
- * soup_message_queue_append:
- * @queue: a #SoupMessageQueue
+ * soup_message_queue_item_new:
+ * @session: the #SoupSession that will own the item
  * @msg: a #SoupMessage
  * @callback: the callback for @msg
  * @user_data: the data to pass to @callback
  *
- * Creates a new #SoupMessageQueueItem and appends it to @queue.
+ * Creates a new #SoupMessageQueueItem for @msg.
  *
  * Return value: the new item, which you must unref with
  * soup_message_queue_unref_item() when you are done with.
  **/
 SoupMessageQueueItem *
-soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
-                          SoupSessionCallback callback, gpointer user_data)
+soup_message_queue_item_new (SoupSession         *session,
+                            SoupMessage         *msg,
+                            SoupSessionCallback  callback,
+                            gpointer             user_data)
 {
        SoupMessageQueueItem *item;
 
        item = g_slice_new0 (SoupMessageQueueItem);
-       item->session = g_object_ref (queue->session);
+       item->session = g_object_ref (session);
        item->async_context = soup_session_get_async_context (item->session);
        if (item->async_context)
                g_main_context_ref (item->async_context);
-       item->queue = queue;
        item->msg = g_object_ref (msg);
        item->callback = callback;
        item->callback_data = user_data;
@@ -94,11 +94,26 @@ soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
        g_signal_connect (msg, "restarted",
                          G_CALLBACK (queue_message_restarted), item);
 
-       /* Note: the initial ref_count of 1 represents the caller's
-        * ref; the queue's own ref is indicated by the absence of the
-        * "removed" flag.
-        */
        item->ref_count = 1;
+       item->removed = TRUE;
+
+       return item;
+}
+
+/**
+ * soup_message_queue_append:
+ * @queue: a #SoupMessageQueue
+ * @item: a #SoupMessageQueueItem
+ *
+ * Appends @item to @queue.
+ **/
+void
+soup_message_queue_append (SoupMessageQueue     *queue,
+                          SoupMessageQueueItem *item)
+{
+       g_return_if_fail (item->removed == TRUE);
+
+       item->removed = FALSE;
 
        g_mutex_lock (&queue->mutex);
        if (queue->head) {
@@ -127,7 +142,6 @@ soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
                queue->head = queue->tail = item;
 
        g_mutex_unlock (&queue->mutex);
-       return item;
 }
 
 /**
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index d2dfda4..be5689a 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -10,6 +10,7 @@
 #include "soup-connection.h"
 #include "soup-message.h"
 #include "soup-session.h"
+#include "soup-session-host.h"
 
 G_BEGIN_DECLS
 
@@ -40,6 +41,7 @@ struct _SoupMessageQueueItem {
        GCancellable *cancellable;
        GError *error;
 
+       SoupSessionHost *host;
        SoupConnection *conn;
        GTask *task;
        GSource *io_source;
@@ -60,12 +62,18 @@ struct _SoupMessageQueueItem {
        SoupMessageQueueItem *related;
 };
 
-SoupMessageQueue     *soup_message_queue_new        (SoupSession          *session);
-SoupMessageQueueItem *soup_message_queue_append     (SoupMessageQueue     *queue,
+SoupMessageQueue     *soup_message_queue_new        (void);
+
+SoupMessageQueueItem *soup_message_queue_item_new   (SoupSession          *session,
                                                     SoupMessage          *msg,
                                                     SoupSessionCallback   callback,
                                                     gpointer              user_data);
 
+void                  soup_message_queue_append     (SoupMessageQueue     *queue,
+                                                    SoupMessageQueueItem *item);
+void                  soup_message_queue_remove     (SoupMessageQueue     *queue,
+                                                    SoupMessageQueueItem *item);
+
 SoupMessageQueueItem *soup_message_queue_lookup     (SoupMessageQueue     *queue,
                                                     SoupMessage          *msg);
 
@@ -73,9 +81,6 @@ SoupMessageQueueItem *soup_message_queue_first      (SoupMessageQueue     *queue
 SoupMessageQueueItem *soup_message_queue_next       (SoupMessageQueue     *queue,
                                                     SoupMessageQueueItem *item);
 
-void                  soup_message_queue_remove     (SoupMessageQueue     *queue,
-                                                    SoupMessageQueueItem *item);
-
 void                  soup_message_queue_destroy    (SoupMessageQueue     *queue);
 
 void soup_message_queue_item_ref            (SoupMessageQueueItem *item);
diff --git a/libsoup/soup-session-host.c b/libsoup/soup-session-host.c
index af64fcf..ce1bc8c 100644
--- a/libsoup/soup-session-host.c
+++ b/libsoup/soup-session-host.c
@@ -12,6 +12,7 @@
 #include "soup-session-host.h"
 #include "soup.h"
 #include "soup-connection.h"
+#include "soup-message-queue.h"
 #include "soup-misc-private.h"
 #include "soup-session-private.h"
 #include "soup-socket-private.h"
@@ -24,6 +25,8 @@ typedef struct {
        SoupURI     *uri;
        SoupAddress *addr;
 
+       SoupMessageQueue *queue;
+
        GSList      *connections;      /* CONTAINS: SoupConnection */
        guint        num_conns;
        guint        max_conns;
@@ -52,6 +55,8 @@ soup_session_host_init (SoupSessionHost *host)
        SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
 
        g_mutex_init (&priv->mutex);
+
+       priv->queue = soup_message_queue_new ();
 }
 
 static void
@@ -69,6 +74,8 @@ soup_session_host_finalize (GObject *object)
        soup_uri_free (priv->uri);
        g_object_unref (priv->addr);
 
+       soup_message_queue_destroy (priv->queue);
+
        g_mutex_clear (&priv->mutex);
 
        G_OBJECT_CLASS (soup_session_host_parent_class)->finalize (object);
@@ -133,17 +140,79 @@ soup_session_host_get_address (SoupSessionHost *host)
 }
 
 void
-soup_session_host_add_message (SoupSessionHost *host,
-                              SoupMessage     *msg)
+soup_session_host_run_queue (SoupSessionHost *host,
+                            gboolean        *should_cleanup)
 {
-       SOUP_SESSION_HOST_GET_PRIVATE (host)->num_messages++;
+       SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+       SoupMessageQueueItem *item;
+       SoupMessage *msg;
+
+       g_object_ref (host);
+       soup_session_host_cleanup_connections (host, FALSE);
+
+       for (item = soup_message_queue_first (priv->queue);
+            item;
+            item = soup_message_queue_next (priv->queue, item)) {
+               msg = item->msg;
+
+               /* CONNECT messages are handled specially */
+               if (msg->method == SOUP_METHOD_CONNECT)
+                       continue;
+
+               if (!item->async ||
+                   item->async_context != soup_session_get_async_context (priv->session))
+                       continue;
+
+               soup_session_process_queue_item (priv->session, item, should_cleanup, TRUE);
+       }
+
+       g_object_unref (host);
+}
+
+GSList *
+soup_session_host_get_queue_items (SoupSessionHost *host)
+{
+       SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+       SoupMessageQueueItem *item;
+       GSList *items = NULL;
+
+       for (item = soup_message_queue_first (priv->queue);
+            item;
+            item = soup_message_queue_next (priv->queue, item)) {
+               soup_message_queue_item_ref (item);
+               items = g_slist_prepend (items, item);
+       }
+
+       return items;
 }
 
 void
-soup_session_host_remove_message (SoupSessionHost *host,
-                                 SoupMessage     *msg)
+soup_session_host_add_queue_item (SoupSessionHost      *host,
+                                 SoupMessageQueueItem *item)
+{
+       SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+
+       priv->num_messages++;
+       soup_message_queue_append (priv->queue, item);
+}
+
+SoupMessageQueueItem *
+soup_session_host_lookup_queue_item (SoupSessionHost *host,
+                                    SoupMessage     *msg)
 {
-       SOUP_SESSION_HOST_GET_PRIVATE (host)->num_messages--;
+       SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+
+       return soup_message_queue_lookup (priv->queue, msg);
+}
+
+void
+soup_session_host_remove_item (SoupSessionHost      *host,
+                              SoupMessageQueueItem *item)
+{
+       SoupSessionHostPrivate *priv = SOUP_SESSION_HOST_GET_PRIVATE (host);
+
+       soup_message_queue_remove (priv->queue, item);
+       priv->num_messages--;
 }
 
 static gboolean
diff --git a/libsoup/soup-session-host.h b/libsoup/soup-session-host.h
index 4e5694d..31a6fa6 100644
--- a/libsoup/soup-session-host.h
+++ b/libsoup/soup-session-host.h
@@ -35,10 +35,16 @@ SoupSessionHost *soup_session_host_new                 (SoupSession          *se
 SoupURI         *soup_session_host_get_uri             (SoupSessionHost      *host);
 SoupAddress     *soup_session_host_get_address         (SoupSessionHost      *host);
 
-void             soup_session_host_add_message         (SoupSessionHost      *host,
-                                                       SoupMessage          *msg);
-void             soup_session_host_remove_message      (SoupSessionHost      *host,
-                                                       SoupMessage          *msg);
+void                  soup_session_host_add_queue_item      (SoupSessionHost      *host,
+                                                            SoupMessageQueueItem *item);
+SoupMessageQueueItem *soup_session_host_lookup_queue_item   (SoupSessionHost      *host,
+                                                            SoupMessage          *msg);
+void                  soup_session_host_run_queue           (SoupSessionHost      *host,
+                                                            gboolean             *should_cleanup);
+GSList               *soup_session_host_get_queue_items     (SoupSessionHost      *host);
+
+void                  soup_session_host_remove_item         (SoupSessionHost      *host,
+                                                            SoupMessageQueueItem *item);
 
 SoupConnection  *soup_session_host_get_connection      (SoupSessionHost      *host,
                                                        gboolean              need_new_connection,
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 58e6c4e..a70593b 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -107,8 +107,6 @@ typedef struct {
 
        SoupSocketProperties *socket_props;
 
-       SoupMessageQueue *queue;
-
        char *user_agent;
        char *accept_language;
        gboolean accept_language_auto;
@@ -217,8 +215,6 @@ soup_session_init (SoupSession *session)
 
        priv->session = session;
 
-       priv->queue = soup_message_queue_new (session);
-
        g_mutex_init (&priv->conn_lock);
        g_cond_init (&priv->conn_cond);
        priv->http_hosts = g_hash_table_new_full (soup_host_uri_hash,
@@ -328,8 +324,6 @@ soup_session_finalize (GObject *object)
        SoupSession *session = SOUP_SESSION (object);
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 
-       soup_message_queue_destroy (priv->queue);
-
        g_mutex_clear (&priv->conn_lock);
        g_cond_clear (&priv->conn_cond);
        g_hash_table_destroy (priv->http_hosts);
@@ -1186,12 +1180,17 @@ re_emit_connection_event (SoupConnection      *conn,
 static void
 soup_session_set_item_connection (SoupSession          *session,
                                  SoupMessageQueueItem *item,
+                                 SoupSessionHost      *host,
                                  SoupConnection       *conn)
 {
        if (item->conn) {
                g_signal_handlers_disconnect_by_func (item->conn, re_emit_connection_event, item);
                g_object_unref (item->conn);
        }
+       if (item->host && item->host != host) {
+               soup_session_host_remove_item (item->host, item);
+               item->host = NULL;
+       }
 
        item->conn = conn;
        soup_message_set_connection (item->msg, conn);
@@ -1201,6 +1200,11 @@ soup_session_set_item_connection (SoupSession          *session,
                g_signal_connect (item->conn, "event",
                                  G_CALLBACK (re_emit_connection_event), item);
        }
+
+       if (!item->host) {
+               item->host = host;
+               soup_session_host_add_queue_item (host, item);
+       }
 }
 
 static void
@@ -1213,7 +1217,7 @@ message_restarted (SoupMessage *msg, gpointer user_data)
             SOUP_STATUS_IS_REDIRECTION (msg->status_code))) {
                if (soup_connection_get_state (item->conn) == SOUP_CONNECTION_IN_USE)
                        soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
-               soup_session_set_item_connection (item->session, item, NULL);
+               soup_session_set_item_connection (item->session, item, NULL, NULL);
        }
 
        soup_message_cleanup_response (msg);
@@ -1230,13 +1234,13 @@ soup_session_append_queue_item (SoupSession *session, SoupMessage *msg,
 
        soup_message_cleanup_response (msg);
 
-       item = soup_message_queue_append (priv->queue, msg, callback, user_data);
+       item = soup_message_queue_item_new (session, msg, callback, user_data);
        item->async = async;
        item->new_api = new_api;
 
        g_mutex_lock (&priv->conn_lock);
        host = get_host_for_message (session, item->msg);
-       soup_session_host_add_message (host, msg);
+       soup_session_host_add_queue_item (host, item);
        g_mutex_unlock (&priv->conn_lock);
 
        if (!(soup_message_get_flags (msg) & SOUP_MESSAGE_NO_REDIRECT)) {
@@ -1406,8 +1410,18 @@ soup_session_lookup_queue_item (SoupSession *session,
                                SoupMessage *msg)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       GSList *hosts, *h;
+       SoupMessageQueueItem *item = NULL;
+
+       g_mutex_lock (&priv->conn_lock);
+       hosts = get_all_hosts (session);
+       g_mutex_unlock (&priv->conn_lock);
+
+       for (h = hosts; h && !item; h = h->next)
+               item = soup_session_host_lookup_queue_item (h->data, msg);
 
-       return soup_message_queue_lookup (priv->queue, msg);
+       g_slist_free_full (hosts, g_object_unref);
+       return item;
 }
 
 static void
@@ -1421,7 +1435,7 @@ soup_session_unqueue_item (SoupSession          *session,
                if (item->msg->method != SOUP_METHOD_CONNECT ||
                    !SOUP_STATUS_IS_SUCCESSFUL (item->msg->status_code))
                        soup_connection_set_state (item->conn, SOUP_CONNECTION_IDLE);
-               soup_session_set_item_connection (session, item, NULL);
+               soup_session_set_item_connection (session, item, NULL, NULL);
        }
 
        if (item->state != SOUP_MESSAGE_FINISHED) {
@@ -1429,11 +1443,9 @@ soup_session_unqueue_item (SoupSession          *session,
                return;
        }
 
-       soup_message_queue_remove (priv->queue, item);
-
        g_mutex_lock (&priv->conn_lock);
        host = get_host_for_message (session, item->msg);
-       soup_session_host_remove_message (host, item->msg);
+       soup_session_host_remove_item (host, item);
        g_cond_broadcast (&priv->conn_cond);
        g_mutex_unlock (&priv->conn_lock);
 
@@ -1574,7 +1586,7 @@ tunnel_complete (SoupMessageQueueItem *tunnel_item,
                status = status_from_connect_error (item, error);
        if (!SOUP_STATUS_IS_SUCCESSFUL (status)) {
                soup_connection_disconnect (item->conn);
-               soup_session_set_item_connection (session, item, NULL);
+               soup_session_set_item_connection (session, item, NULL, NULL);
                if (!item->new_api || item->msg->status_code == 0)
                        soup_session_set_item_status (session, item, status, error);
        }
@@ -1659,7 +1671,7 @@ tunnel_connect (SoupMessageQueueItem *item)
        g_object_unref (msg);
        tunnel_item->related = item;
        soup_message_queue_item_ref (item);
-       soup_session_set_item_connection (session, tunnel_item, item->conn);
+       soup_session_set_item_connection (session, tunnel_item, item->host, item->conn);
        tunnel_item->state = SOUP_MESSAGE_RUNNING;
 
        g_signal_emit (session, signals[TUNNELING], 0, tunnel_item->conn);
@@ -1687,7 +1699,7 @@ connect_complete (SoupMessageQueueItem *item, SoupConnection *conn, GError *erro
        if (item->state == SOUP_MESSAGE_CONNECTING) {
                if (!item->new_api || item->msg->status_code == 0)
                        soup_session_set_item_status (session, item, status, error);
-               soup_session_set_item_connection (session, item, NULL);
+               soup_session_set_item_connection (session, item, NULL, NULL);
                item->state = SOUP_MESSAGE_READY;
        }
 }
@@ -1800,7 +1812,7 @@ get_connection (SoupMessageQueueItem *item, gboolean *should_cleanup)
                return FALSE;
        }
 
-       soup_session_set_item_connection (session, item, conn);
+       soup_session_set_item_connection (session, item, host, conn);
 
        if (soup_connection_get_state (item->conn) != SOUP_CONNECTION_NEW) {
                item->state = SOUP_MESSAGE_READY;
@@ -1917,29 +1929,19 @@ static void
 async_run_queue (SoupSession *session)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
-       SoupMessageQueueItem *item;
-       SoupMessage *msg;
        gboolean try_cleanup = TRUE, should_cleanup = FALSE;
+       GSList *hosts, *h;
 
        g_object_ref (session);
        soup_session_cleanup_connections (session, FALSE);
 
- try_again:
-       for (item = soup_message_queue_first (priv->queue);
-            item;
-            item = soup_message_queue_next (priv->queue, item)) {
-               msg = item->msg;
-
-               /* CONNECT messages are handled specially */
-               if (msg->method == SOUP_METHOD_CONNECT)
-                       continue;
-
-               if (!item->async ||
-                   item->async_context != soup_session_get_async_context (session))
-                       continue;
+       g_mutex_lock (&priv->conn_lock);
+       hosts = get_all_hosts (session);
+       g_mutex_unlock (&priv->conn_lock);
 
-               soup_session_process_queue_item (session, item, &should_cleanup, TRUE);
-       }
+ try_again:
+       for (h = hosts; h; h = hosts->next)
+               soup_session_host_run_queue (h->data, &should_cleanup);
 
        if (try_cleanup && should_cleanup) {
                /* There is at least one message in the queue that
@@ -1952,6 +1954,7 @@ async_run_queue (SoupSession *session)
                }
        }
 
+       g_slist_free_full (hosts, g_object_unref);
        g_object_unref (session);
 }
 
@@ -2158,19 +2161,42 @@ soup_session_pause_message (SoupSession *session,
        soup_message_queue_item_unref (item);
 }
 
+static GSList *
+get_all_queue_items (SoupSession *session)
+{
+       GSList *hosts, *h, *items, *host_items;
+
+       hosts = get_all_hosts (session);
+
+       items = NULL;
+       for (h = hosts; h; h = h->next) {
+               host_items = soup_session_host_get_queue_items (h->data);
+               items = g_slist_concat (host_items, items);
+       }
+
+       g_slist_free_full (hosts, g_object_unref);
+
+       return items;
+}
+
 static void
 soup_session_real_kick_queue (SoupSession *session)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       GSList *items, *i;
        SoupMessageQueueItem *item;
        gboolean have_sync_items = FALSE;
 
        if (priv->disposed)
                return;
 
-       for (item = soup_message_queue_first (priv->queue);
-            item;
-            item = soup_message_queue_next (priv->queue, item)) {
+       g_mutex_lock (&priv->conn_lock);
+       items = get_all_queue_items (session);
+       g_mutex_unlock (&priv->conn_lock);
+
+       for (i = items; i; i = i->next) {
+               item = i->data;
+
                if (item->async) {
                        GSource *source;
 
@@ -2189,6 +2215,7 @@ soup_session_real_kick_queue (SoupSession *session)
                } else
                        have_sync_items = TRUE;
        }
+       g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref);
 
        if (have_sync_items) {
                g_mutex_lock (&priv->conn_lock);
@@ -2312,26 +2339,29 @@ static void
 soup_session_real_flush_queue (SoupSession *session)
 {
        SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       GSList *items, *i;
        SoupMessageQueueItem *item;
        GHashTable *current = NULL;
        gboolean done = FALSE;
 
+       g_mutex_lock (&priv->conn_lock);
+       items = get_all_queue_items (session);
+       g_mutex_unlock (&priv->conn_lock);
+
        if (SOUP_IS_SESSION_SYNC (session)) {
                /* Record the current contents of the queue */
                current = g_hash_table_new (NULL, NULL);
-               for (item = soup_message_queue_first (priv->queue);
-                    item;
-                    item = soup_message_queue_next (priv->queue, item))
-                       g_hash_table_insert (current, item, item);
+               for (i = items; i; i = i->next)
+                       g_hash_table_insert (current, i->data, i->data);
        }
 
        /* Cancel everything */
-       for (item = soup_message_queue_first (priv->queue);
-            item;
-            item = soup_message_queue_next (priv->queue, item)) {
+       for (i = items; i; i = i->next) {
+               item = i->data;
                soup_session_cancel_message (session, item->msg,
                                             SOUP_STATUS_CANCELLED);
        }
+       g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref);
 
        if (SOUP_IS_SESSION_SYNC (session)) {
                /* Wait until all of the items in @current have been
@@ -2345,12 +2375,14 @@ soup_session_real_flush_queue (SoupSession *session)
                g_mutex_lock (&priv->conn_lock);
                do {
                        done = TRUE;
-                       for (item = soup_message_queue_first (priv->queue);
-                            item;
-                            item = soup_message_queue_next (priv->queue, item)) {
-                               if (g_hash_table_lookup (current, item))
+                       items = get_all_queue_items (session);
+                       for (i = items; i; i = i->next) {
+                               if (g_hash_table_lookup (current, i->data)) {
                                        done = FALSE;
+                                       break;
+                               }
                        }
+                       g_slist_free_full (items, (GDestroyNotify) soup_message_queue_item_unref);
 
                        if (!done)
                                g_cond_wait (&priv->conn_cond, &priv->conn_lock);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]