[libgit2-glib] Rework synchronization for blob's state machine, effectively unbreaking on OpenBSD



commit 55c649fa4d95e72e530bba1d070242b10d860ad8
Author: Jasper Lievisse Adriaanse <jasper humppa nl>
Date:   Wed Jul 30 23:51:05 2014 +0200

    Rework synchronization for blob's state machine, effectively unbreaking on OpenBSD
    
    Rework synchronization around a state machine instead of trying to treat
    mutexes and condition variables like semaphores.  Should reduce the
    low-level locking operations and eliminates the attempt to lock a mutex in
    one thread but unlock it another that violates POSIX requirements.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=734010

 libgit2-glib/ggit-blob-output-stream.c |  119 ++++++++++++++++++++------------
 1 files changed, 76 insertions(+), 43 deletions(-)
---
diff --git a/libgit2-glib/ggit-blob-output-stream.c b/libgit2-glib/ggit-blob-output-stream.c
index 6373653..52179cd 100644
--- a/libgit2-glib/ggit-blob-output-stream.c
+++ b/libgit2-glib/ggit-blob-output-stream.c
@@ -28,6 +28,20 @@
 
 #define GGIT_BLOB_OUTPUT_STREAM_GET_PRIVATE(object)(G_TYPE_INSTANCE_GET_PRIVATE((object), 
GGIT_TYPE_BLOB_OUTPUT_STREAM, GgitBlobOutputStreamPrivate))
 
+/*
+ * BLOB_CHUNKING_STATE_IDLE: stream's thread is waiting for data
+ * BLOB_CHUNKING_STATE_SENDING: have data, outside thread is waiting for result
+ * BLOB_CHUNKING_STATE_CANCELLED: cancelled, stream's thread to exit and set error result
+ * BLOB_CHUNKING_STATE_CLOSED: closed by outside thread or error, exit and set result
+ */
+typedef enum
+{
+       BLOB_CHUNKING_STATE_IDLE,
+       BLOB_CHUNKING_STATE_SENDING,
+       BLOB_CHUNKING_STATE_CANCELLED,
+       BLOB_CHUNKING_STATE_CLOSED
+} BlobChunkingState;
+
 struct _GgitBlobOutputStreamPrivate
 {
        GgitRepository *repository;
@@ -36,9 +50,6 @@ struct _GgitBlobOutputStreamPrivate
        GMutex mutex;
        GCond cond;
 
-       GMutex reply_mutex;
-       GCond reply_cond;
-
        gssize written;
 
        const gchar *writebuf;
@@ -47,8 +58,7 @@ struct _GgitBlobOutputStreamPrivate
        gint ret;
        GgitOId *oid;
 
-       guint closing : 1;
-       guint cancelled : 1;
+       BlobChunkingState state;
 };
 
 G_DEFINE_TYPE (GgitBlobOutputStream, ggit_blob_output_stream, G_TYPE_OUTPUT_STREAM)
@@ -68,17 +78,22 @@ ggit_blob_output_stream_close (GOutputStream  *object,
 
        if (g_cancellable_set_error_if_cancelled (cancellable, error))
        {
+               g_mutex_lock (&stream->priv->mutex);
+
+               if (stream->priv->state != BLOB_CHUNKING_STATE_CANCELLED)
+               {
+                       stream->priv->state = BLOB_CHUNKING_STATE_CANCELLED;
+                       g_cond_signal (&stream->priv->cond);
+               }
+
+               g_mutex_unlock (&stream->priv->mutex);
                return FALSE;
        }
 
        g_mutex_lock (&stream->priv->mutex);
-       stream->priv->closing = TRUE;
-
+       stream->priv->state = BLOB_CHUNKING_STATE_CLOSED;
        g_cond_signal (&stream->priv->cond);
-       g_mutex_lock (&stream->priv->reply_mutex);
        g_mutex_unlock (&stream->priv->mutex);
-       g_cond_wait (&stream->priv->reply_cond, &stream->priv->reply_mutex);
-       g_mutex_unlock (&stream->priv->reply_mutex);
 
        g_thread_join (stream->priv->thread);
        return TRUE;
@@ -105,38 +120,54 @@ ggit_blob_output_stream_write (GOutputStream  *object,
                                GError        **error)
 {
        GgitBlobOutputStream *stream = GGIT_BLOB_OUTPUT_STREAM (object);
+       gssize written;
+
+       g_mutex_lock (&stream->priv->mutex);
 
-       if (stream->priv->closing)
+       while (stream->priv->state == BLOB_CHUNKING_STATE_SENDING)
        {
-               return 0;
+               g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
        }
 
-       g_mutex_lock (&stream->priv->mutex);
+       if (stream->priv->state == BLOB_CHUNKING_STATE_CLOSED)
+       {
+               g_mutex_unlock (&stream->priv->mutex);
+               return 0;
+       }
 
        if (g_cancellable_is_cancelled (cancellable))
        {
-               stream->priv->cancelled = TRUE;
+               stream->priv->state = BLOB_CHUNKING_STATE_CANCELLED;
        }
 
-       stream->priv->writebuf = buffer;
-       stream->priv->bufsize = count;
-       stream->priv->written = 0;
+       if (stream->priv->state == BLOB_CHUNKING_STATE_IDLE)
+       {
+               stream->priv->writebuf = buffer;
+               stream->priv->bufsize = count;
+               stream->priv->written = 0;
+               stream->priv->state = BLOB_CHUNKING_STATE_SENDING;
+       }
 
        g_cond_signal (&stream->priv->cond);
 
-       g_mutex_lock (&stream->priv->reply_mutex);
-       g_mutex_unlock (&stream->priv->mutex);
-
-       g_cond_wait (&stream->priv->reply_cond, &stream->priv->reply_mutex);
-       g_mutex_unlock (&stream->priv->reply_mutex);
+       while (stream->priv->state == BLOB_CHUNKING_STATE_SENDING)
+       {
+               g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
+       }
 
-       if (stream->priv->cancelled)
+       if (stream->priv->state == BLOB_CHUNKING_STATE_CANCELLED)
        {
                g_cancellable_set_error_if_cancelled (cancellable, error);
-               return -1;
+               written = -1;
+       }
+       else
+       {
+               written = stream->priv->written;
        }
 
-       return stream->priv->written;
+       g_mutex_unlock (&stream->priv->mutex);
+
+       return written;
 }
 
 static void
@@ -151,9 +182,6 @@ ggit_blob_output_stream_finalize (GObject *object)
        g_mutex_clear (&stream->priv->mutex);
        g_cond_clear (&stream->priv->cond);
 
-       g_mutex_clear (&stream->priv->reply_mutex);
-       g_cond_clear (&stream->priv->reply_cond);
-
        if (stream->priv->oid)
        {
                ggit_oid_free (stream->priv->oid);
@@ -233,18 +261,27 @@ blob_chunk_cb (char   *content,
        GgitBlobOutputStream *stream = payload;
        int written = 0;
 
-       g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
+       g_mutex_lock (&stream->priv->mutex);
 
-       if (stream->priv->closing)
+       while (stream->priv->state == BLOB_CHUNKING_STATE_IDLE)
        {
+               g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
+       }
+
+       if (stream->priv->state == BLOB_CHUNKING_STATE_CLOSED)
+       {
+               g_mutex_unlock (&stream->priv->mutex);
                return 0;
        }
 
-       if (stream->priv->cancelled)
+       if (stream->priv->state == BLOB_CHUNKING_STATE_CANCELLED)
        {
+               g_mutex_unlock (&stream->priv->mutex);
                return -1;
        }
 
+       /* state must be BLOB_CHUNKING_STATE_SENDING */
+
        if (stream->priv->bufsize > maxlen)
        {
                stream->priv->written = maxlen;
@@ -260,9 +297,9 @@ blob_chunk_cb (char   *content,
                written = stream->priv->written;
        }
 
-       g_mutex_lock (&stream->priv->reply_mutex);
-       g_cond_signal (&stream->priv->reply_cond);
-       g_mutex_unlock (&stream->priv->reply_mutex);
+       stream->priv->state = BLOB_CHUNKING_STATE_IDLE;
+       g_cond_signal (&stream->priv->cond);
+       g_mutex_unlock (&stream->priv->mutex);
 
        return written;
 }
@@ -280,23 +317,22 @@ chunk_blob_in_thread (gpointer data)
                                          blob_chunk_cb,
                                          data);
 
+       g_mutex_lock (&stream->priv->mutex);
        stream->priv->ret = ret;
-       stream->priv->closing = TRUE;
 
        if (ret == GIT_OK)
        {
+               stream->priv->state = BLOB_CHUNKING_STATE_CLOSED;
                stream->priv->oid = _ggit_oid_wrap (&oid);
        }
        else
        {
-               stream->priv->cancelled = TRUE;
+               stream->priv->state = BLOB_CHUNKING_STATE_CANCELLED;
        }
 
-       g_mutex_lock (&stream->priv->reply_mutex);
-       g_cond_signal (&stream->priv->reply_cond);
+       g_cond_signal (&stream->priv->cond);
 
        g_mutex_unlock (&stream->priv->mutex);
-       g_mutex_unlock (&stream->priv->reply_mutex);
 
        return NULL;
 }
@@ -309,10 +345,7 @@ ggit_blob_output_stream_init (GgitBlobOutputStream *self)
        g_mutex_init (&self->priv->mutex);
        g_cond_init (&self->priv->cond);
 
-       g_mutex_init (&self->priv->reply_mutex);
-       g_cond_init (&self->priv->reply_cond);
-
-       g_mutex_lock (&self->priv->mutex);
+       self->priv->state = BLOB_CHUNKING_STATE_IDLE;
 
        self->priv->thread = g_thread_new ("ggit-blob-output-stream",
                                           chunk_blob_in_thread,


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]