[libgit2-glib] Rework synchronization for blob's state machine, effectively unbreaking on OpenBSD
- From: Jesse van den Kieboom <jessevdk src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libgit2-glib] Rework synchronization for blob's state machine, effectively unbreaking on OpenBSD
- Date: Wed, 17 Dec 2014 18:50:25 +0000 (UTC)
commit 55c649fa4d95e72e530bba1d070242b10d860ad8
Author: Jasper Lievisse Adriaanse <jasper humppa nl>
Date: Wed Jul 30 23:51:05 2014 +0200
Rework synchronization for blob's state machine, effectively unbreaking on OpenBSD
Rework synchronization around a state machine instead of trying to treat
mutexes and condition variables like semaphores. Should reduce the
low-level locking operations and eliminates the attempt to lock a mutex in
one thread but unlock it another that violates POSIX requirements.
https://bugzilla.gnome.org/show_bug.cgi?id=734010
libgit2-glib/ggit-blob-output-stream.c | 119 ++++++++++++++++++++------------
1 files changed, 76 insertions(+), 43 deletions(-)
---
diff --git a/libgit2-glib/ggit-blob-output-stream.c b/libgit2-glib/ggit-blob-output-stream.c
index 6373653..52179cd 100644
--- a/libgit2-glib/ggit-blob-output-stream.c
+++ b/libgit2-glib/ggit-blob-output-stream.c
@@ -28,6 +28,20 @@
#define GGIT_BLOB_OUTPUT_STREAM_GET_PRIVATE(object)(G_TYPE_INSTANCE_GET_PRIVATE((object),
GGIT_TYPE_BLOB_OUTPUT_STREAM, GgitBlobOutputStreamPrivate))
+/*
+ * BLOB_CHUNKING_STATE_IDLE: stream's thread is waiting for data
+ * BLOB_CHUNKING_STATE_SENDING: have data, outside thread is waiting for result
+ * BLOB_CHUNKING_STATE_CANCELLED: cancelled, stream's thread to exit and set error result
+ * BLOB_CHUNKING_STATE_CLOSED: closed by outside thread or error, exit and set result
+ */
+typedef enum
+{
+ BLOB_CHUNKING_STATE_IDLE,
+ BLOB_CHUNKING_STATE_SENDING,
+ BLOB_CHUNKING_STATE_CANCELLED,
+ BLOB_CHUNKING_STATE_CLOSED
+} BlobChunkingState;
+
struct _GgitBlobOutputStreamPrivate
{
GgitRepository *repository;
@@ -36,9 +50,6 @@ struct _GgitBlobOutputStreamPrivate
GMutex mutex;
GCond cond;
- GMutex reply_mutex;
- GCond reply_cond;
-
gssize written;
const gchar *writebuf;
@@ -47,8 +58,7 @@ struct _GgitBlobOutputStreamPrivate
gint ret;
GgitOId *oid;
- guint closing : 1;
- guint cancelled : 1;
+ BlobChunkingState state;
};
G_DEFINE_TYPE (GgitBlobOutputStream, ggit_blob_output_stream, G_TYPE_OUTPUT_STREAM)
@@ -68,17 +78,22 @@ ggit_blob_output_stream_close (GOutputStream *object,
if (g_cancellable_set_error_if_cancelled (cancellable, error))
{
+ g_mutex_lock (&stream->priv->mutex);
+
+ if (stream->priv->state != BLOB_CHUNKING_STATE_CANCELLED)
+ {
+ stream->priv->state = BLOB_CHUNKING_STATE_CANCELLED;
+ g_cond_signal (&stream->priv->cond);
+ }
+
+ g_mutex_unlock (&stream->priv->mutex);
return FALSE;
}
g_mutex_lock (&stream->priv->mutex);
- stream->priv->closing = TRUE;
-
+ stream->priv->state = BLOB_CHUNKING_STATE_CLOSED;
g_cond_signal (&stream->priv->cond);
- g_mutex_lock (&stream->priv->reply_mutex);
g_mutex_unlock (&stream->priv->mutex);
- g_cond_wait (&stream->priv->reply_cond, &stream->priv->reply_mutex);
- g_mutex_unlock (&stream->priv->reply_mutex);
g_thread_join (stream->priv->thread);
return TRUE;
@@ -105,38 +120,54 @@ ggit_blob_output_stream_write (GOutputStream *object,
GError **error)
{
GgitBlobOutputStream *stream = GGIT_BLOB_OUTPUT_STREAM (object);
+ gssize written;
+
+ g_mutex_lock (&stream->priv->mutex);
- if (stream->priv->closing)
+ while (stream->priv->state == BLOB_CHUNKING_STATE_SENDING)
{
- return 0;
+ g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
}
- g_mutex_lock (&stream->priv->mutex);
+ if (stream->priv->state == BLOB_CHUNKING_STATE_CLOSED)
+ {
+ g_mutex_unlock (&stream->priv->mutex);
+ return 0;
+ }
if (g_cancellable_is_cancelled (cancellable))
{
- stream->priv->cancelled = TRUE;
+ stream->priv->state = BLOB_CHUNKING_STATE_CANCELLED;
}
- stream->priv->writebuf = buffer;
- stream->priv->bufsize = count;
- stream->priv->written = 0;
+ if (stream->priv->state == BLOB_CHUNKING_STATE_IDLE)
+ {
+ stream->priv->writebuf = buffer;
+ stream->priv->bufsize = count;
+ stream->priv->written = 0;
+ stream->priv->state = BLOB_CHUNKING_STATE_SENDING;
+ }
g_cond_signal (&stream->priv->cond);
- g_mutex_lock (&stream->priv->reply_mutex);
- g_mutex_unlock (&stream->priv->mutex);
-
- g_cond_wait (&stream->priv->reply_cond, &stream->priv->reply_mutex);
- g_mutex_unlock (&stream->priv->reply_mutex);
+ while (stream->priv->state == BLOB_CHUNKING_STATE_SENDING)
+ {
+ g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
+ }
- if (stream->priv->cancelled)
+ if (stream->priv->state == BLOB_CHUNKING_STATE_CANCELLED)
{
g_cancellable_set_error_if_cancelled (cancellable, error);
- return -1;
+ written = -1;
+ }
+ else
+ {
+ written = stream->priv->written;
}
- return stream->priv->written;
+ g_mutex_unlock (&stream->priv->mutex);
+
+ return written;
}
static void
@@ -151,9 +182,6 @@ ggit_blob_output_stream_finalize (GObject *object)
g_mutex_clear (&stream->priv->mutex);
g_cond_clear (&stream->priv->cond);
- g_mutex_clear (&stream->priv->reply_mutex);
- g_cond_clear (&stream->priv->reply_cond);
-
if (stream->priv->oid)
{
ggit_oid_free (stream->priv->oid);
@@ -233,18 +261,27 @@ blob_chunk_cb (char *content,
GgitBlobOutputStream *stream = payload;
int written = 0;
- g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
+ g_mutex_lock (&stream->priv->mutex);
- if (stream->priv->closing)
+ while (stream->priv->state == BLOB_CHUNKING_STATE_IDLE)
{
+ g_cond_wait (&stream->priv->cond, &stream->priv->mutex);
+ }
+
+ if (stream->priv->state == BLOB_CHUNKING_STATE_CLOSED)
+ {
+ g_mutex_unlock (&stream->priv->mutex);
return 0;
}
- if (stream->priv->cancelled)
+ if (stream->priv->state == BLOB_CHUNKING_STATE_CANCELLED)
{
+ g_mutex_unlock (&stream->priv->mutex);
return -1;
}
+ /* state must be BLOB_CHUNKING_STATE_SENDING */
+
if (stream->priv->bufsize > maxlen)
{
stream->priv->written = maxlen;
@@ -260,9 +297,9 @@ blob_chunk_cb (char *content,
written = stream->priv->written;
}
- g_mutex_lock (&stream->priv->reply_mutex);
- g_cond_signal (&stream->priv->reply_cond);
- g_mutex_unlock (&stream->priv->reply_mutex);
+ stream->priv->state = BLOB_CHUNKING_STATE_IDLE;
+ g_cond_signal (&stream->priv->cond);
+ g_mutex_unlock (&stream->priv->mutex);
return written;
}
@@ -280,23 +317,22 @@ chunk_blob_in_thread (gpointer data)
blob_chunk_cb,
data);
+ g_mutex_lock (&stream->priv->mutex);
stream->priv->ret = ret;
- stream->priv->closing = TRUE;
if (ret == GIT_OK)
{
+ stream->priv->state = BLOB_CHUNKING_STATE_CLOSED;
stream->priv->oid = _ggit_oid_wrap (&oid);
}
else
{
- stream->priv->cancelled = TRUE;
+ stream->priv->state = BLOB_CHUNKING_STATE_CANCELLED;
}
- g_mutex_lock (&stream->priv->reply_mutex);
- g_cond_signal (&stream->priv->reply_cond);
+ g_cond_signal (&stream->priv->cond);
g_mutex_unlock (&stream->priv->mutex);
- g_mutex_unlock (&stream->priv->reply_mutex);
return NULL;
}
@@ -309,10 +345,7 @@ ggit_blob_output_stream_init (GgitBlobOutputStream *self)
g_mutex_init (&self->priv->mutex);
g_cond_init (&self->priv->cond);
- g_mutex_init (&self->priv->reply_mutex);
- g_cond_init (&self->priv->reply_cond);
-
- g_mutex_lock (&self->priv->mutex);
+ self->priv->state = BLOB_CHUNKING_STATE_IDLE;
self->priv->thread = g_thread_new ("ggit-blob-output-stream",
chunk_blob_in_thread,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]