libsoup r1178 - in trunk: . libsoup



Author: danw
Date: Fri Oct  3 20:17:56 2008
New Revision: 1178
URL: http://svn.gnome.org/viewvc/libsoup?rev=1178&view=rev

Log:
	* libsoup/soup-message-queue.c: Make this more complicated, with a
	SoupMessageQueueItem to keep track of the session's per-message
	state. (Part of the process of moving session-related state out of
	SoupMessagePrivate.)

	* libsoup/soup-session.c: Update to work in terms of
	SoupMessageQueueItem

	* libsoup/soup-session-async.c:
	* libsoup/soup-session-sync.c: use SoupMessageQueueItem (and get
	rid of SoupSessionAsyncQueueData and SoupSessionSyncAsyncData).


Modified:
   trunk/ChangeLog
   trunk/libsoup/soup-message-queue.c
   trunk/libsoup/soup-message-queue.h
   trunk/libsoup/soup-session-async.c
   trunk/libsoup/soup-session-sync.c
   trunk/libsoup/soup-session.c

Modified: trunk/libsoup/soup-message-queue.c
==============================================================================
--- trunk/libsoup/soup-message-queue.c	(original)
+++ trunk/libsoup/soup-message-queue.c	Fri Oct  3 20:17:56 2008
@@ -2,7 +2,8 @@
 /*
  * soup-message-queue.c: Message queue
  *
- * Copyright (C) 2003, Ximian, Inc.
+ * Copyright (C) 2003 Novell, Inc.
+ * Copyright (C) 2008 Red Hat, Inc.
  */
 
 #ifdef HAVE_CONFIG_H
@@ -11,230 +12,243 @@
 
 #include "soup-message-queue.h"
 
+/**
+ * SECTION:soup-message-queue
+ *
+ * This is an internal structure used by #SoupSession and its
+ * subclasses to keep track of the status of messages currently being
+ * processed.
+ *
+ * The #SoupMessageQueue itself is mostly just a linked list of
+ * #SoupMessageQueueItem, with some added cleverness to allow the list
+ * to be walked safely while other threads / re-entrant loops are
+ * adding items to and removing items from it. In particular, this is
+ * handled by refcounting items and then keeping "removed" items in
+ * the list until their ref_count drops to 0, but skipping over the
+ * "removed" ones when walking the queue.
+ **/
+
 struct SoupMessageQueue {
-	GList *head, *tail;
-	GList *iters;
+	SoupSession *session;
 
 	GMutex *mutex;
+	SoupMessageQueueItem *head, *tail;
 };
 
-/**
- * soup_message_queue_new:
- *
- * Creates a new #SoupMessageQueue
- *
- * Return value: a new #SoupMessageQueue object
- **/
 SoupMessageQueue *
-soup_message_queue_new (void)
+soup_message_queue_new (SoupSession *session)
 {
 	SoupMessageQueue *queue;
 
 	queue = g_slice_new0 (SoupMessageQueue);
+	queue->session = session;
 	queue->mutex = g_mutex_new ();
 	return queue;
 }
 
-/**
- * soup_message_queue_destroy:
- * @queue: a message queue
- *
- * Frees memory associated with @queue, which must be empty.
- **/
 void
 soup_message_queue_destroy (SoupMessageQueue *queue)
 {
 	g_return_if_fail (queue->head == NULL);
 
-	g_list_free (queue->head);
-	g_list_free (queue->iters);
 	g_mutex_free (queue->mutex);
 	g_slice_free (SoupMessageQueue, queue);
 }
 
 /**
  * soup_message_queue_append:
- * @queue: a queue
- * @msg: a message
+ * @queue: a #SoupMessageQueue
+ * @msg: a #SoupMessage
+ * @callback: the callback for @msg
+ * @user_data: the data to pass to @callback
  *
- * Appends @msg to the end of @queue
+ * Creates a new #SoupMessageQueueItem and appends it to @queue.
+ *
+ * Return value: the new item, which you must unref with
+ * soup_message_queue_unref_item() when you are done with.
  **/
-void
-soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg)
-{
+SoupMessageQueueItem *
+soup_message_queue_append (SoupMessageQueue *queue, SoupMessage *msg,
+			   SoupSessionCallback callback, gpointer user_data)
+{
+	SoupMessageQueueItem *item;
+
+	item = g_slice_new0 (SoupMessageQueueItem);
+	item->session = queue->session;
+	item->queue = queue;
+	item->msg = g_object_ref (msg);
+	item->callback = callback;
+	item->callback_data = user_data;
+
+	/* Note: the initial ref_count of 1 represents the caller's
+	 * ref; the queue's own ref is indicated by the absence of the
+	 * "removed" flag.
+	 */
+	item->ref_count = 1;
+
 	g_mutex_lock (queue->mutex);
 	if (queue->head) {
-		queue->tail = g_list_append (queue->tail, msg);
-		queue->tail = queue->tail->next;
+		queue->tail->next = item;
+		item->prev = queue->tail;
+		queue->tail = item;
 	} else
-		queue->head = queue->tail = g_list_append (NULL, msg);
+		queue->head = queue->tail = item;
 
-	g_object_add_weak_pointer (G_OBJECT (msg), &queue->tail->data);
 	g_mutex_unlock (queue->mutex);
+	return item;
 }
 
 /**
- * soup_message_queue_first:
- * @queue: a queue
- * @iter: pointer to a #SoupMessageQueueIter
+ * soup_message_queue_item_ref:
+ * @item: a #SoupMessageQueueItem
  *
- * Initializes @iter and returns the first element of @queue. If you
- * do not iterate all the way to the end of the list, you must call
- * soup_message_queue_free_iter() to dispose the iterator when you are
- * done.
- *
- * Return value: the first element of @queue, or %NULL if it is empty.
- **/
-SoupMessage *
-soup_message_queue_first (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+ * Refs @item.
+ **/ 
+void
+soup_message_queue_item_ref (SoupMessageQueueItem *item)
 {
-	g_mutex_lock (queue->mutex);
-
-	if (!queue->head) {
-		g_mutex_unlock (queue->mutex);
-		return NULL;
-	}
-
-	queue->iters = g_list_prepend (queue->iters, iter);
-
-	iter->cur = NULL;
-	iter->next = queue->head;
-	g_mutex_unlock (queue->mutex);
-
-	return soup_message_queue_next (queue, iter);
+	item->ref_count++;
 }
 
-static SoupMessage *
-queue_remove_internal (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+/**
+ * soup_message_queue_item_unref:
+ * @item: a #SoupMessageQueueItem
+ *
+ * Unrefs @item; use this on a #SoupMessageQueueItem that you are done
+ * with (but that you aren't passing to
+ * soup_message_queue_item_next()).
+ **/ 
+void
+soup_message_queue_item_unref (SoupMessageQueueItem *item)
 {
-	GList *i;
-	SoupMessageQueueIter *iter2;
-	SoupMessage *msg;
+	g_mutex_lock (item->queue->mutex);
 
-	if (!iter->cur) {
-		/* We're at end of list or this item was already removed */
-		return NULL;
+	/* Decrement the ref_count; if it's still non-zero OR if the
+	 * item is still in the queue, then return.
+	 */
+	if (--item->ref_count || !item->removed) {
+		g_mutex_unlock (item->queue->mutex);
+		return;
 	}
 
-	/* Fix any other iters pointing to iter->cur */
-	for (i = queue->iters; i; i = i->next) {
-		iter2 = i->data;
-		if (iter2 != iter) {
-			if (iter2->cur == iter->cur)
-				iter2->cur = NULL;
-			else if (iter2->next == iter->cur)
-				iter2->next = iter->cur->next;
-		}
-	}
-
-	msg = iter->cur->data;
-	if (msg)
-		g_object_remove_weak_pointer (G_OBJECT (msg), &iter->cur->data);
-
-	/* If deleting the last item, fix tail */
-	if (queue->tail == iter->cur)
-		queue->tail = queue->tail->prev;
-
-	/* Remove the item */
-	queue->head = g_list_delete_link (queue->head, iter->cur);
-	iter->cur = NULL;
-
-	return msg;
+	/* OK, @item is dead. Rewrite @queue around it */
+	if (item->prev)
+		item->prev->next = item->next;
+	else
+		item->queue->head = item->next;
+	if (item->next)
+		item->next->prev = item->prev;
+	else
+		item->queue->tail = item->prev;
+
+	g_mutex_unlock (item->queue->mutex);
+
+	/* And free it */
+	g_object_unref (item->msg);
+	g_slice_free (SoupMessageQueueItem, item);
 }
 
 /**
- * soup_message_queue_next:
- * @queue: a queue
- * @iter: pointer to an initialized #SoupMessageQueueIter
- *
- * Returns the next element of @queue
+ * soup_message_queue_lookup:
+ * @queue: a #SoupMessageQueue
+ * @msg: a #SoupMessage
  *
- * Return value: the next element, or %NULL if there are no more.
- **/
-SoupMessage *
-soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+ * Finds the #SoupMessageQueueItem for @msg in @queue. You must unref
+ * the item with soup_message_queue_unref_item() when you are done
+ * with it.
+ *
+ * Return value: the queue item for @msg, or %NULL
+ **/ 
+SoupMessageQueueItem *
+soup_message_queue_lookup (SoupMessageQueue *queue, SoupMessage *msg)
 {
-	g_mutex_lock (queue->mutex);
+	SoupMessageQueueItem *item;
 
-	while (iter->next) {
-		iter->cur = iter->next;
-		iter->next = iter->cur->next;
-		if (iter->cur->data) {
-			g_mutex_unlock (queue->mutex);
-			return iter->cur->data;
-		}
+	g_mutex_lock (queue->mutex);
 
-		/* Message was finalized, remove dead queue element */
-		queue_remove_internal (queue, iter);
-	}
+	item = queue->tail;
+	while (item && (item->removed || item->msg != msg))
+		item = item->prev;
 
-	/* Nothing left */
-	iter->cur = NULL;
-	queue->iters = g_list_remove (queue->iters, iter);
+	if (item)
+		item->ref_count++;
 
 	g_mutex_unlock (queue->mutex);
-	return NULL;
+	return item;
 }
 
 /**
- * soup_message_queue_remove:
- * @queue: a queue
- * @iter: pointer to an initialized #SoupMessageQueueIter
- *
- * Removes the queue element pointed to by @iter; that is, the last
- * message returned by soup_message_queue_first() or
- * soup_message_queue_next().
+ * soup_message_queue_first:
+ * @queue: a #SoupMessageQueue
  *
- * Return value: the removed message, or %NULL if the element pointed
- * to by @iter was already removed.
- **/
-SoupMessage *
-soup_message_queue_remove (SoupMessageQueue *queue, SoupMessageQueueIter *iter)
+ * Gets the first item in @queue. You must unref the item by calling
+ * soup_message_queue_unref_item() on it when you are done.
+ * (soup_message_queue_next() does this for you automatically, so you
+ * only need to unref the item yourself if you are not going to
+ * finishing walking the queue.)
+ *
+ * Return value: the first item in @queue.
+ **/ 
+SoupMessageQueueItem *
+soup_message_queue_first (SoupMessageQueue *queue)
 {
-	SoupMessage *msg;
+	SoupMessageQueueItem *item;
 
 	g_mutex_lock (queue->mutex);
-	msg = queue_remove_internal (queue, iter);
-	g_mutex_unlock (queue->mutex);
 
-	return msg;
+	item = queue->head;
+	while (item && item->removed)
+		item = item->next;
+
+	if (item)
+		item->ref_count++;
+
+	g_mutex_unlock (queue->mutex);
+	return item;
 }
 
 /**
- * soup_message_queue_remove_message:
- * @queue: a queue
- * @msg: a #SoupMessage
+ * soup_message_queue_next:
+ * @queue: a #SoupMessageQueue
+ * @item: a #SoupMessageQueueItem
  *
- * Removes the indicated message from @queue.
- **/
-void
-soup_message_queue_remove_message (SoupMessageQueue *queue, SoupMessage *msg)
+ * Unrefs @item and gets the next item after it in @queue. As with
+ * soup_message_queue_first(), you must unref the returned item
+ * yourself with soup_message_queue_unref_item() if you do not finish
+ * walking the queue.
+ *
+ * Return value: the next item in @queue.
+ **/ 
+SoupMessageQueueItem *
+soup_message_queue_next (SoupMessageQueue *queue, SoupMessageQueueItem *item)
 {
-	SoupMessageQueueIter iter;
-	SoupMessage *msg2;
+	SoupMessageQueueItem *next;
 
-	for (msg2 = soup_message_queue_first (queue, &iter); msg2; msg2 = soup_message_queue_next (queue, &iter)) {
-		if (msg2 == msg) {
-			soup_message_queue_remove (queue, &iter);
-			soup_message_queue_free_iter (queue, &iter);
-			return;
-		}
-	}
-}
+	g_mutex_lock (queue->mutex);
 
+	next = item->next;
+	while (next && next->removed)
+		next = next->next;
+	if (next)
+		next->ref_count++;
+
+	g_mutex_unlock (queue->mutex);
+	soup_message_queue_item_unref (item);
+	return next;
+}
 
 /**
- * soup_message_queue_free_iter:
- * @queue: a queue
- * @iter: pointer to an initialized #SoupMessageQueueIter
+ * soup_message_queue_remove:
+ * @queue: a #SoupMessageQueue
+ * @item: a #SoupMessageQueueItem
  *
- * Removes @iter from the list of active iterators in @queue.
- **/
+ * Removes @item from @queue. Note that you probably also need to call
+ * soup_message_queue_unref_item() after this.
+ **/ 
 void
-soup_message_queue_free_iter (SoupMessageQueue *queue,
-			      SoupMessageQueueIter *iter)
+soup_message_queue_remove (SoupMessageQueue *queue, SoupMessageQueueItem *item)
 {
 	g_mutex_lock (queue->mutex);
-	queue->iters = g_list_remove (queue->iters, iter);
+	item->removed = TRUE;
 	g_mutex_unlock (queue->mutex);
 }

Modified: trunk/libsoup/soup-message-queue.h
==============================================================================
--- trunk/libsoup/soup-message-queue.h	(original)
+++ trunk/libsoup/soup-message-queue.h	Fri Oct  3 20:17:56 2008
@@ -1,6 +1,7 @@
 /* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
 /*
- * Copyright (C) 2003, Ximian, Inc.
+ * Copyright (C) 2003 Novell, Inc.
+ * Copyright (C) 2008 Red Hat, Inc.
  */
 
 #ifndef SOUP_MESSAGE_QUEUE_H
@@ -8,33 +9,47 @@
 
 #include <glib.h>
 #include <libsoup/soup-message.h>
+#include <libsoup/soup-session.h>
 
 G_BEGIN_DECLS
 
 typedef struct SoupMessageQueue SoupMessageQueue; 
+typedef struct SoupMessageQueueItem SoupMessageQueueItem;
 
-typedef struct {
-	GList *cur, *next;
-} SoupMessageQueueIter;
-
-SoupMessageQueue *soup_message_queue_new        (void);
-void              soup_message_queue_append     (SoupMessageQueue     *queue,
-						 SoupMessage          *msg);
-
-SoupMessage      *soup_message_queue_first      (SoupMessageQueue     *queue,
-						 SoupMessageQueueIter *iter);
-SoupMessage      *soup_message_queue_next       (SoupMessageQueue     *queue,
-						 SoupMessageQueueIter *iter);
-SoupMessage      *soup_message_queue_remove     (SoupMessageQueue     *queue,
-						 SoupMessageQueueIter *iter);
+struct SoupMessageQueueItem {
+	/*< public >*/
+	SoupSession *session;
+	SoupMessageQueue *queue;
+	SoupMessage *msg;
+	SoupSessionCallback callback;
+	gpointer callback_data;
+
+	/*< private >*/
+	guint removed              : 1;
+	guint ref_count            : 31;
+	SoupMessageQueueItem *prev, *next;
+};
+
+SoupMessageQueue     *soup_message_queue_new        (SoupSession          *session);
+SoupMessageQueueItem *soup_message_queue_append     (SoupMessageQueue     *queue,
+						     SoupMessage          *msg,
+						     SoupSessionCallback   callback,
+						     gpointer              user_data);
+
+SoupMessageQueueItem *soup_message_queue_lookup     (SoupMessageQueue     *queue,
+						     SoupMessage          *msg);
+
+SoupMessageQueueItem *soup_message_queue_first      (SoupMessageQueue     *queue);
+SoupMessageQueueItem *soup_message_queue_next       (SoupMessageQueue     *queue,
+						     SoupMessageQueueItem *item);
 
-void              soup_message_queue_free_iter  (SoupMessageQueue     *queue,
-						 SoupMessageQueueIter *iter);
+void                  soup_message_queue_remove     (SoupMessageQueue     *queue,
+						     SoupMessageQueueItem *item);
 
-void              soup_message_queue_destroy    (SoupMessageQueue     *queue);
+void                  soup_message_queue_item_ref   (SoupMessageQueueItem *item);
+void                  soup_message_queue_item_unref (SoupMessageQueueItem *item);
 
-void              soup_message_queue_remove_message (SoupMessageQueue *queue,
-						     SoupMessage      *msg);
+void                  soup_message_queue_destroy    (SoupMessageQueue     *queue);
 
 G_END_DECLS
 

Modified: trunk/libsoup/soup-session-async.c
==============================================================================
--- trunk/libsoup/soup-session-async.c	(original)
+++ trunk/libsoup/soup-session-async.c	Fri Oct  3 20:17:56 2008
@@ -24,7 +24,7 @@
  **/
 
 static gboolean run_queue (SoupSessionAsync *sa);
-static void do_idle_run_queue (SoupSessionAsync *sa);
+static void do_idle_run_queue (SoupSession *session);
 
 static void  queue_message   (SoupSession *session, SoupMessage *req,
 			      SoupSessionCallback callback, gpointer user_data);
@@ -108,22 +108,22 @@
 
 
 static void
-connection_closed (SoupConnection *conn, gpointer sa)
+connection_closed (SoupConnection *conn, gpointer session)
 {
 	/* Run the queue in case anyone was waiting for a connection
 	 * to be closed.
 	 */
-	do_idle_run_queue (sa);
+	do_idle_run_queue (session);
 }
 
 static void
 got_connection (SoupConnection *conn, guint status, gpointer user_data)
 {
-	SoupSessionAsync *sa = user_data;
+	SoupSession *session = user_data;
 
 	if (status == SOUP_STATUS_OK) {
 		g_signal_connect (conn, "disconnected",
-				  G_CALLBACK (connection_closed), sa);
+				  G_CALLBACK (connection_closed), session);
 
 		/* @conn has been marked reserved by SoupSession, but
 		 * we don't actually have any specific message in mind
@@ -142,8 +142,8 @@
 	 * there may have been messages waiting for the connection
 	 * count to go down.
 	 */
-	do_idle_run_queue (sa);
-	g_object_unref (sa);
+	do_idle_run_queue (session);
+	g_object_unref (session);
 }
 
 static gboolean
@@ -151,7 +151,7 @@
 {
 	SoupSession *session = SOUP_SESSION (sa);
 	SoupMessageQueue *queue = soup_session_get_queue (session);
-	SoupMessageQueueIter iter;
+	SoupMessageQueueItem *item;
 	SoupMessage *msg;
 	SoupConnection *conn;
 	gboolean try_pruning = TRUE, should_prune = FALSE;
@@ -160,9 +160,10 @@
 	/* FIXME: prefer CONNECTING messages */
 
  try_again:
-	for (msg = soup_message_queue_first (queue, &iter);
-	     msg && !should_prune;
-	     msg = soup_message_queue_next (queue, &iter)) {
+	for (item = soup_message_queue_first (queue);
+	     item && !should_prune;
+	     item = soup_message_queue_next (queue, item)) {
+		msg = item->msg;
 
 		if (!SOUP_MESSAGE_IS_STARTING (msg) ||
 		    soup_message_io_in_progress (msg))
@@ -179,6 +180,8 @@
 		} else
 			soup_connection_send_request (conn, msg);
 	}
+	if (item)
+		soup_message_queue_item_unref (item);
 
 	if (try_pruning && should_prune) {
 		/* There is at least one message in the queue that
@@ -186,7 +189,7 @@
 		 * some other server.
 		 */
 		if (soup_session_try_prune_connection (session)) {
-			try_pruning = FALSE;
+			try_pruning = should_prune = FALSE;
 			goto try_again;
 		}
 	}
@@ -200,32 +203,25 @@
 	run_queue (sa);
 }
 
-typedef struct {
-	SoupSessionAsync *sa;
-	SoupSessionCallback callback;
-	gpointer callback_data;
-} SoupSessionAsyncQueueData;
-
 static void
 final_finished (SoupMessage *req, gpointer user_data)
 {
-	SoupSessionAsyncQueueData *saqd = user_data;
-	SoupSessionAsync *sa = saqd->sa;
+	SoupMessageQueueItem *item = user_data;
+	SoupSession *session = item->session;
+
+	g_object_ref (session);
 
-	g_object_ref (sa);
 	if (!SOUP_MESSAGE_IS_STARTING (req)) {
-		g_signal_handlers_disconnect_by_func (req, final_finished, saqd);
-		if (saqd->callback) {
-			saqd->callback ((SoupSession *)sa, req,
-					saqd->callback_data);
-		}
+		g_signal_handlers_disconnect_by_func (req, final_finished, item);
+		if (item->callback)
+			item->callback (session, req, item->callback_data);
 
 		g_object_unref (req);
-		g_slice_free (SoupSessionAsyncQueueData, saqd);
+		soup_message_queue_item_unref (item);
 	}
 
-	do_idle_run_queue (sa);
-	g_object_unref (sa);
+	do_idle_run_queue (session);
+	g_object_unref (session);
 }
 
 static gboolean
@@ -239,14 +235,14 @@
 }
 
 static void
-do_idle_run_queue (SoupSessionAsync *sa)
+do_idle_run_queue (SoupSession *session)
 {
-	SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (sa);
+	SoupSessionAsyncPrivate *priv = SOUP_SESSION_ASYNC_GET_PRIVATE (session);
 
 	if (!priv->idle_run_queue_source) {
 		priv->idle_run_queue_source = soup_add_completion (
-			soup_session_get_async_context ((SoupSession *)sa),
-			idle_run_queue, sa);
+			soup_session_get_async_context (session),
+			idle_run_queue, session);
 	}
 }
 
@@ -254,21 +250,19 @@
 queue_message (SoupSession *session, SoupMessage *req,
 	       SoupSessionCallback callback, gpointer user_data)
 {
-	SoupSessionAsync *sa = SOUP_SESSION_ASYNC (session);
-	SoupSessionAsyncQueueData *saqd;
+	SoupMessageQueueItem *item;
 
-	g_signal_connect (req, "restarted",
-			  G_CALLBACK (request_restarted), sa);
+	SOUP_SESSION_CLASS (soup_session_async_parent_class)->queue_message (session, req, callback, user_data);
+
+	item = soup_message_queue_lookup (soup_session_get_queue (session), req);
+	g_return_if_fail (item != NULL);
 
-	saqd = g_slice_new (SoupSessionAsyncQueueData);
-	saqd->sa = sa;
-	saqd->callback = callback;
-	saqd->callback_data = user_data;
+	g_signal_connect (req, "restarted",
+			  G_CALLBACK (request_restarted), session);
 	g_signal_connect_after (req, "finished",
-				G_CALLBACK (final_finished), saqd);
+				G_CALLBACK (final_finished), item);
 
-	SOUP_SESSION_CLASS (soup_session_async_parent_class)->queue_message (session, req, callback, user_data);
-	do_idle_run_queue (sa);
+	do_idle_run_queue (session);
 }
 
 static guint

Modified: trunk/libsoup/soup-session-sync.c
==============================================================================
--- trunk/libsoup/soup-session-sync.c	(original)
+++ trunk/libsoup/soup-session-sync.c	Fri Oct  3 20:17:56 2008
@@ -125,61 +125,6 @@
 	return session;
 }
 
-typedef struct {
-	SoupSession *session;
-	SoupMessage *msg;
-	SoupSessionCallback callback;
-	gpointer user_data;
-} SoupSessionSyncAsyncData;
-
-static void
-async_data_free (SoupSessionSyncAsyncData *sad)
-{
-	g_object_unref (sad->session);
-	g_object_unref (sad->msg);
-	g_slice_free (SoupSessionSyncAsyncData, sad);
-}
-
-static gboolean
-queue_message_callback (gpointer data)
-{
-	SoupSessionSyncAsyncData *sad = data;
-
-	sad->callback (sad->session, sad->msg, sad->user_data);
-	async_data_free (sad);
-	return FALSE;
-}
-
-static gpointer
-queue_message_thread (gpointer data)
-{
-	SoupSessionSyncAsyncData *sad = data;
-
-	soup_session_send_message (sad->session, sad->msg);
-	if (sad->callback) {
-		soup_add_completion (soup_session_get_async_context (sad->session),
-				     queue_message_callback, sad);
-	} else
-		async_data_free (sad);
-
-	return NULL;
-}
-
-static void
-queue_message (SoupSession *session, SoupMessage *msg,
-	       SoupSessionCallback callback, gpointer user_data)
-{
-	SoupSessionSyncAsyncData *sad;
-
-	sad = g_slice_new (SoupSessionSyncAsyncData);
-	sad->session = g_object_ref (session);
-	sad->msg = g_object_ref (msg);
-	sad->callback = callback;
-	sad->user_data = user_data;
-
-	g_thread_create (queue_message_thread, sad, FALSE, NULL);
-}
-
 static SoupConnection *
 wait_for_connection (SoupSession *session, SoupMessage *msg)
 {
@@ -235,25 +180,84 @@
 	goto try_again;
 }
 
-static guint
-send_message (SoupSession *session, SoupMessage *msg)
+static void
+process_queue_item (SoupMessageQueueItem *item)
 {
-	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (session);
+	SoupSessionSyncPrivate *priv = SOUP_SESSION_SYNC_GET_PRIVATE (item->session);
+	SoupMessage *msg = item->msg;
 	SoupConnection *conn;
 
-	SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
-
 	do {
 		/* Get a connection */
-		conn = wait_for_connection (session, msg);
+		conn = wait_for_connection (item->session, msg);
 		if (!conn)
-			return msg->status_code;
+			break;
 
 		soup_connection_send_request (conn, msg);
 		g_cond_broadcast (priv->cond);
 	} while (soup_message_get_io_status (msg) != SOUP_MESSAGE_IO_STATUS_FINISHED);
 
-	return msg->status_code;
+	soup_message_queue_remove (item->queue, item);
+}
+
+static gboolean
+queue_message_callback (gpointer data)
+{
+	SoupMessageQueueItem *item = data;
+
+	item->callback (item->session, item->msg, item->callback_data);
+	g_object_unref (item->session);
+	soup_message_queue_item_unref (item);
+	return FALSE;
+}
+
+static gpointer
+queue_message_thread (gpointer data)
+{
+	SoupMessageQueueItem *item = data;
+
+	process_queue_item (item);
+	if (item->callback) {
+		soup_add_completion (soup_session_get_async_context (item->session),
+				     queue_message_callback, item);
+	} else {
+		g_object_unref (item->session);
+		soup_message_queue_item_unref (item);
+	}
+
+	return NULL;
+}
+
+static void
+queue_message (SoupSession *session, SoupMessage *msg,
+	       SoupSessionCallback callback, gpointer user_data)
+{
+	SoupMessageQueueItem *item;
+
+	SOUP_SESSION_CLASS (soup_session_sync_parent_class)->
+		queue_message (g_object_ref (session), msg, callback, user_data);
+
+	item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+	g_return_if_fail (item != NULL);
+
+	g_thread_create (queue_message_thread, item, FALSE, NULL);
+}
+
+static guint
+send_message (SoupSession *session, SoupMessage *msg)
+{
+	SoupMessageQueueItem *item;
+	guint status;
+
+	SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
+
+	item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+	g_return_val_if_fail (item != NULL, SOUP_STATUS_MALFORMED);
+
+	process_queue_item (item);
+	status = msg->status_code;
+	soup_message_queue_item_unref (item);
+	return status;
 }
 
 static void

Modified: trunk/libsoup/soup-session.c
==============================================================================
--- trunk/libsoup/soup-session.c	(original)
+++ trunk/libsoup/soup-session.c	Fri Oct  3 20:17:56 2008
@@ -158,7 +158,7 @@
 {
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 
-	priv->queue = soup_message_queue_new ();
+	priv->queue = soup_message_queue_new (session);
 
 	priv->host_lock = g_mutex_new ();
 	priv->hosts = g_hash_table_new (soup_uri_host_hash,
@@ -920,7 +920,7 @@
 	SoupSession *session = user_data;
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 	SoupSessionHost *host;
-	SoupMessageQueueIter iter;
+	SoupMessageQueueItem *item;
 	SoupMessage *msg;
 
 	g_mutex_lock (priv->host_lock);
@@ -961,7 +961,8 @@
 	 * of luck.
 	 */
 	g_object_ref (session);
-	for (msg = soup_message_queue_first (priv->queue, &iter); msg; msg = soup_message_queue_next (priv->queue, &iter)) {
+	for (item = soup_message_queue_first (priv->queue); item; item = soup_message_queue_next (priv->queue, item)) {
+		msg = item->msg;
 		if (get_host_for_message (session, msg) == host) {
 			if (status == SOUP_STATUS_TRY_AGAIN) {
 				if (soup_message_get_io_status (msg) == SOUP_MESSAGE_IO_STATUS_CONNECTING)
@@ -1101,14 +1102,16 @@
 static void
 message_finished (SoupMessage *msg, gpointer user_data)
 {
-	SoupSession *session = user_data;
+	SoupMessageQueueItem *item = user_data;
+	SoupSession *session = item->session;
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
 
 	if (!SOUP_MESSAGE_IS_STARTING (msg)) {
-		soup_message_queue_remove_message (priv->queue, msg);
+		soup_message_queue_remove (priv->queue, item);
 		g_signal_handlers_disconnect_by_func (msg, message_finished, session);
 		g_signal_handlers_disconnect_by_func (msg, redirect_handler, session);
 		g_signal_emit (session, signals[REQUEST_UNQUEUED], 0, msg);
+		soup_message_queue_item_unref (item);
 	}
 }
 
@@ -1117,9 +1120,13 @@
 	       SoupSessionCallback callback, gpointer user_data)
 {
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+	SoupMessageQueueItem *item;
+
+	item = soup_message_queue_append (priv->queue, msg, callback, user_data);
+	soup_message_set_io_status (msg, SOUP_MESSAGE_IO_STATUS_QUEUED);
 
 	g_signal_connect_after (msg, "finished",
-				G_CALLBACK (message_finished), session);
+				G_CALLBACK (message_finished), item);
 
 	if (!(soup_message_get_flags (msg) & SOUP_MESSAGE_NO_REDIRECT)) {
 		soup_message_add_header_handler (
@@ -1127,9 +1134,6 @@
 			G_CALLBACK (redirect_handler), session);
 	}
 
-	soup_message_set_io_status (msg, SOUP_MESSAGE_IO_STATUS_QUEUED);
-	soup_message_queue_append (priv->queue, msg);
-
 	g_signal_emit (session, signals[REQUEST_QUEUED], 0, msg);
 }
 
@@ -1263,8 +1267,14 @@
 cancel_message (SoupSession *session, SoupMessage *msg, guint status_code)
 {
 	SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+	SoupMessageQueueItem *item;
+
+	item = soup_message_queue_lookup (priv->queue, msg);
+	if (item) {
+		soup_message_queue_remove (priv->queue, item);
+		soup_message_queue_item_unref (item);
+	}
 
-	soup_message_queue_remove_message (priv->queue, msg);
 	soup_message_io_stop (msg);
 	soup_message_set_status (msg, status_code);
 	soup_message_finished (msg);
@@ -1297,7 +1307,7 @@
 	SoupConnection *conn = key;
 	GSList **conns = data;
 
-	*conns = g_slist_prepend (*conns, conn);
+	*conns = g_slist_prepend (*conns, g_object_ref (conn));
 }
 
 /**
@@ -1310,17 +1320,16 @@
 soup_session_abort (SoupSession *session)
 {
 	SoupSessionPrivate *priv;
-	SoupMessageQueueIter iter;
-	SoupMessage *msg;
+	SoupMessageQueueItem *item;
 	GSList *conns, *c;
 
 	g_return_if_fail (SOUP_IS_SESSION (session));
 	priv = SOUP_SESSION_GET_PRIVATE (session);
 
-	for (msg = soup_message_queue_first (priv->queue, &iter);
-	     msg;
-	     msg = soup_message_queue_next (priv->queue, &iter)) {
-		soup_session_cancel_message (session, msg,
+	for (item = soup_message_queue_first (priv->queue);
+	     item;
+	     item = soup_message_queue_next (priv->queue, item)) {
+		soup_session_cancel_message (session, item->msg,
 					     SOUP_STATUS_CANCELLED);
 	}
 
@@ -1329,8 +1338,6 @@
 	conns = NULL;
 	g_hash_table_foreach (priv->conns, gather_conns, &conns);
 
-	for (c = conns; c; c = c->next)
-		g_object_ref (c->data);
 	g_mutex_unlock (priv->host_lock);
 	for (c = conns; c; c = c->next) {
 		soup_connection_disconnect (c->data);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]