[ostree] fetcher: Rework reference counting



commit 771075d319ccf68659f1e1db8a925c430788c988
Author: Matthew Barnes <mbarnes redhat com>
Date:   Wed Sep 23 17:11:51 2015 -0400

    fetcher: Rework reference counting
    
    Have OstreeFetcherPendingURI be the GTask's task_data and pass the GTask
    around in queues and callback closures.  The reference counting before
    was a little confusing and this helps clarify it, at least to me.
    
    OstreeFetcherPendingURI no longer needs its own reference count.

 src/libostree/ostree-fetcher.c |  154 ++++++++++++++++++++++-----------------
 1 files changed, 87 insertions(+), 67 deletions(-)
---
diff --git a/src/libostree/ostree-fetcher.c b/src/libostree/ostree-fetcher.c
index e88bd1d..fbfc0ae 100644
--- a/src/libostree/ostree-fetcher.c
+++ b/src/libostree/ostree-fetcher.c
@@ -39,10 +39,8 @@ typedef enum {
 } OstreeFetcherState;
 
 typedef struct {
-  guint refcount;
   OstreeFetcher *self;
   SoupURI *uri;
-  int priority;
 
   OstreeFetcherState state;
 
@@ -57,37 +55,30 @@ typedef struct {
   guint64 current_size;
   guint64 content_length;
 
-  GCancellable *cancellable;
   GTask *task;
 } OstreeFetcherPendingURI;
 
 static int
-pending_uri_compare (gconstpointer a,
-                     gconstpointer b,
-                     gpointer unused)
+pending_task_compare (gconstpointer a,
+                      gconstpointer b,
+                      gpointer unused)
 {
-  const OstreeFetcherPendingURI *pending_a = a;
-  const OstreeFetcherPendingURI *pending_b = b;
+  gint priority_a = g_task_get_priority (G_TASK (a));
+  gint priority_b = g_task_get_priority (G_TASK (b));
 
-  return (pending_a->priority == pending_b->priority) ? 0 :
-         (pending_a->priority < pending_b->priority) ? -1 : 1;
+  return (priority_a == priority_b) ? 0 :
+         (priority_a < priority_b) ? -1 : 1;
 }
 
 static void
 pending_uri_free (OstreeFetcherPendingURI *pending)
 {
-  g_assert (pending->refcount > 0);
-  pending->refcount--;
-  if (pending->refcount > 0)
-    return;
-
   soup_uri_free (pending->uri);
   g_clear_object (&pending->self);
   g_clear_object (&pending->request);
   g_clear_object (&pending->request_body);
   g_free (pending->out_tmpfile);
   g_clear_object (&pending->out_stream);
-  g_clear_object (&pending->cancellable);
   g_free (pending);
 }
 
@@ -130,7 +121,8 @@ _ostree_fetcher_finalize (GObject *object)
   g_hash_table_destroy (self->sending_messages);
   g_hash_table_destroy (self->output_stream_set);
 
-  g_queue_clear (&self->pending_queue);
+  while (!g_queue_is_empty (&self->pending_queue))
+    g_object_unref (g_queue_pop_head (&self->pending_queue));
 
   G_OBJECT_CLASS (_ostree_fetcher_parent_class)->finalize (object);
 }
@@ -282,11 +274,22 @@ ostree_fetcher_process_pending_queue (OstreeFetcher *self)
   while (g_queue_peek_head (&self->pending_queue) != NULL &&
          self->outstanding < self->max_outstanding)
     {
-      OstreeFetcherPendingURI *next = g_queue_pop_head (&self->pending_queue);
+      GTask *task;
+      OstreeFetcherPendingURI *pending;
+      GCancellable *cancellable;
+
+      task = g_queue_pop_head (&self->pending_queue);
+
+      pending = g_task_get_task_data (task);
+      cancellable = g_task_get_cancellable (task);
 
       self->outstanding++;
-      soup_request_send_async (next->request, next->cancellable,
-                               on_request_sent, next);
+      soup_request_send_async (pending->request,
+                               cancellable,
+                               on_request_sent,
+                               g_object_ref (task));
+
+      g_object_unref (task);
     }
 }
 
@@ -303,7 +306,7 @@ finish_stream (OstreeFetcherPendingURI *pending,
    */
   if (pending->out_stream)
     {
-      if (!g_output_stream_close (pending->out_stream, pending->cancellable, error))
+      if (!g_output_stream_close (pending->out_stream, cancellable, error))
         goto out;
       g_hash_table_remove (pending->self->output_stream_set, pending->out_stream);
     }
@@ -347,22 +350,32 @@ on_out_splice_complete (GObject        *object,
                         GAsyncResult   *result,
                         gpointer        user_data) 
 {
-  OstreeFetcherPendingURI *pending = user_data;
+  GTask *task = G_TASK (user_data);
+  OstreeFetcherPendingURI *pending;
+  GCancellable *cancellable;
   gssize bytes_written;
   GError *local_error = NULL;
 
+  pending = g_task_get_task_data (task);
+  cancellable = g_task_get_cancellable (task);
+
   bytes_written = g_output_stream_splice_finish ((GOutputStream *)object,
                                                  result,
                                                  &local_error);
   if (bytes_written < 0)
     goto out;
 
-  g_input_stream_read_bytes_async (pending->request_body, 8192, G_PRIORITY_DEFAULT,
-                                   pending->cancellable, on_stream_read, pending);
+  g_input_stream_read_bytes_async (pending->request_body,
+                                   8192, G_PRIORITY_DEFAULT,
+                                   cancellable,
+                                   on_stream_read,
+                                   g_object_ref (task));
 
  out:
   if (local_error)
-    g_task_return_error (pending->task, local_error);
+    g_task_return_error (task, local_error);
+
+  g_object_unref (task);
 }
 
 static void
@@ -370,11 +383,16 @@ on_stream_read (GObject        *object,
                 GAsyncResult   *result,
                 gpointer        user_data) 
 {
-  OstreeFetcherPendingURI *pending = user_data;
+  GTask *task = G_TASK (user_data);
+  OstreeFetcherPendingURI *pending;
+  GCancellable *cancellable;
   g_autoptr(GBytes) bytes = NULL;
   gsize bytes_read;
   GError *local_error = NULL;
 
+  pending = g_task_get_task_data (task);
+  cancellable = g_task_get_cancellable (task);
+
   bytes = g_input_stream_read_bytes_finish ((GInputStream*)object, result, &local_error);
   if (!bytes)
     goto out;
@@ -382,12 +400,11 @@ on_stream_read (GObject        *object,
   bytes_read = g_bytes_get_size (bytes);
   if (bytes_read == 0)
     {
-      if (!finish_stream (pending, pending->cancellable, &local_error))
+      if (!finish_stream (pending, cancellable, &local_error))
         goto out;
-      g_task_return_pointer (pending->task,
+      g_task_return_pointer (task,
                              g_strdup (pending->out_tmpfile),
                              (GDestroyNotify) g_free);
-      g_object_unref (pending->task);
     }
   else
     {
@@ -415,18 +432,17 @@ on_stream_read (GObject        *object,
         g_output_stream_splice_async (pending->out_stream, membuf,
                                       G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE,
                                       G_PRIORITY_DEFAULT,
-                                      pending->cancellable,
+                                      cancellable,
                                       on_out_splice_complete,
-                                      pending);
+                                      g_object_ref (task));
       }
     }
 
  out:
   if (local_error)
-    {
-      g_task_return_error (pending->task, local_error);
-      g_object_unref (pending->task);
-    }
+    g_task_return_error (task, local_error);
+
+  g_object_unref (task);
 }
 
 static void
@@ -434,10 +450,15 @@ on_request_sent (GObject        *object,
                  GAsyncResult   *result,
                  gpointer        user_data) 
 {
-  OstreeFetcherPendingURI *pending = user_data;
+  GTask *task = G_TASK (user_data);
+  OstreeFetcherPendingURI *pending;
+  GCancellable *cancellable;
   GError *local_error = NULL;
   glnx_unref_object SoupMessage *msg = NULL;
 
+  pending = g_task_get_task_data (task);
+  cancellable = g_task_get_cancellable (task);
+
   pending->state = OSTREE_FETCHER_STATE_COMPLETE;
   pending->request_body = soup_request_send_finish ((SoupRequest*) object,
                                                    result, &local_error);
@@ -455,18 +476,17 @@ on_request_sent (GObject        *object,
           (void) g_input_stream_close (pending->request_body, NULL, NULL);
           if (pending->is_stream)
             {
-              g_task_return_pointer (pending->task,
+              g_task_return_pointer (task,
                                      g_object_ref (pending->request_body),
                                      (GDestroyNotify) g_object_unref);
             }
           else
             {
-              g_task_return_pointer (pending->task,
+              g_task_return_pointer (task,
                                      g_strdup (pending->out_tmpfile),
                                      (GDestroyNotify) g_free);
             }
-          g_object_unref (pending->task);
-          return;
+          goto out;
         }
       else if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
         {
@@ -513,16 +533,17 @@ on_request_sent (GObject        *object,
         }
       pending->out_stream = g_unix_output_stream_new (fd, TRUE);
       g_hash_table_add (pending->self->output_stream_set, g_object_ref (pending->out_stream));
-      g_input_stream_read_bytes_async (pending->request_body, 8192, G_PRIORITY_DEFAULT,
-                                       pending->cancellable, on_stream_read, pending);
-      
+      g_input_stream_read_bytes_async (pending->request_body,
+                                       8192, G_PRIORITY_DEFAULT,
+                                       cancellable,
+                                       on_stream_read,
+                                       g_object_ref (task));
     }
   else
     {
-      g_task_return_pointer (pending->task,
+      g_task_return_pointer (task,
                              g_object_ref (pending->request_body),
                              (GDestroyNotify) g_object_unref);
-      g_object_unref (pending->task);
     }
   
  out:
@@ -530,12 +551,13 @@ on_request_sent (GObject        *object,
     {
       if (pending->request_body)
         (void) g_input_stream_close (pending->request_body, NULL, NULL);
-      g_task_return_error (pending->task, local_error);
-      g_object_unref (pending->task);
+      g_task_return_error (task, local_error);
     }
+
+  g_object_unref (task);
 }
 
-static OstreeFetcherPendingURI *
+static void
 ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
                                      SoupURI               *uri,
                                      gboolean               is_stream,
@@ -546,29 +568,30 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
                                      gpointer               user_data,
                                      gpointer               source_tag)
 {
+  GTask *task;
   OstreeFetcherPendingURI *pending = g_new0 (OstreeFetcherPendingURI, 1);
   GError *local_error = NULL;
 
-  pending->refcount = 1;
   pending->request = soup_requester_request_uri (self->requester, uri, &local_error);
 
   pending->self = g_object_ref (self);
   pending->uri = soup_uri_copy (uri);
-  pending->priority = priority;
   pending->max_size = max_size;
   pending->is_stream = is_stream;
-  pending->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
-  pending->task = g_task_new (self,
-                              cancellable,
-                              callback, user_data);
 
-  g_task_set_source_tag (pending->task, source_tag);
-  g_task_set_task_data (pending->task, pending, (GDestroyNotify) pending_uri_free);
+  task = g_task_new (self, cancellable, callback, user_data);
+  g_task_set_source_tag (task, source_tag);
+  g_task_set_task_data (task, pending, (GDestroyNotify) pending_uri_free);
+
+  /* We'll use the GTask priority for our own priority queue. */
+  g_task_set_priority (task, priority);
 
   if (is_stream)
     {
-      soup_request_send_async (pending->request, cancellable,
-                               on_request_sent, pending);
+      soup_request_send_async (pending->request,
+                               cancellable,
+                               on_request_sent,
+                               g_object_ref (task));
     }
   else
     {
@@ -588,7 +611,7 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
           else
             {
               gs_set_error_from_errno (&local_error, errno);
-              goto fail;
+              goto out;
             }
         }
 
@@ -602,7 +625,9 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
       pending->out_tmpfile = tmpfile;
       tmpfile = NULL; /* Transfer ownership */
 
-      g_queue_insert_sorted (&self->pending_queue, pending, pending_uri_compare, NULL);
+      g_queue_insert_sorted (&self->pending_queue,
+                             g_object_ref (task),
+                             pending_task_compare, NULL);
       ostree_fetcher_process_pending_queue (self);
     }
 
@@ -610,13 +635,8 @@ ostree_fetcher_request_uri_internal (OstreeFetcher         *self,
 
   self->total_requests++;
 
-  pending->refcount++;
-
-  return pending;
-
- fail:
-  pending_uri_free (pending);
-  return NULL;
+out:
+  g_object_unref (task);
 }
 
 void


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]