[gegl/soc-2012-ville] buffer: threaded memory map file backend
- From: Ville Sokk <villesokk src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [gegl/soc-2012-ville] buffer: threaded memory map file backend
- Date: Thu, 2 Aug 2012 09:41:57 +0000 (UTC)
commit 1cbf15ded8497f5cc2de0b24e0e420d820c4a86d
Author: Ville Sokk <ville sokk gmail com>
Date: Thu Aug 2 12:40:21 2012 +0300
buffer: threaded memory map file backend
gegl/buffer/gegl-tile-backend-file-async.c | 92 ++++------
gegl/buffer/gegl-tile-backend-file-mapped.c | 253 ++++++++++++++++++++------
gegl/buffer/gegl-tile-backend-file.c | 1 +
gegl/buffer/gegl-tile-backend-file.h | 16 ++
4 files changed, 245 insertions(+), 117 deletions(-)
---
diff --git a/gegl/buffer/gegl-tile-backend-file-async.c b/gegl/buffer/gegl-tile-backend-file-async.c
index adeb442..2ca5b15 100644
--- a/gegl/buffer/gegl-tile-backend-file-async.c
+++ b/gegl/buffer/gegl-tile-backend-file-async.c
@@ -118,7 +118,7 @@ struct _GeglTileBackendFile
GFileMonitor *monitor;
/* number of write operations in the queue for this file */
- gint pending_writes;
+ gint pending_ops;
/* used for waiting on writes to the file to be finished */
GCond *cond;
@@ -130,24 +130,6 @@ struct _GeglTileBackendFile
int i;
};
-typedef enum
-{
- OP_WRITE,
- OP_TRUNCATE,
- OP_SYNC,
- OP_CLOSE
-} ThreadOp;
-
-typedef struct
-{
- gint length; /* length of data if writing tile or
- length of file if truncating */
- guchar *source;
- goffset offset;
- GeglTileBackendFile *file; /* the file we are operating on */
- ThreadOp operation; /* type of file operation, see above */
-} ThreadParams;
-
static void gegl_tile_backend_file_ensure_exist (GeglTileBackendFile *self);
static gboolean gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
@@ -166,21 +148,21 @@ static gint file_size = 0;
static gint peak_allocs = 0;
static gint peak_file_size = 0;
-static GQueue queue = G_QUEUE_INIT;
-static GMutex *mutex = NULL;
-static GCond *queue_cond = NULL;
+static GQueue queue = G_QUEUE_INIT;
+static GMutex *mutex = NULL;
+static GCond *queue_cond = NULL;
-static inline void
+static void
gegl_tile_backend_file_finish_writing (GeglTileBackendFile *self)
{
g_mutex_lock (mutex);
- while (self->pending_writes != 0)
+ while (self->pending_ops != 0)
g_cond_wait (self->cond, mutex);
g_mutex_unlock (mutex);
}
-static inline void
+static void
gegl_tile_backend_file_push_queue (ThreadParams *params)
{
gboolean was_empty;
@@ -188,7 +170,7 @@ gegl_tile_backend_file_push_queue (ThreadParams *params)
g_mutex_lock (mutex);
was_empty = g_queue_is_empty (&queue);
- params->file->pending_writes += 1;
+ params->file->pending_ops += 1;
g_queue_push_tail (&queue, params);
if (was_empty)
g_cond_signal (queue_cond);
@@ -260,22 +242,13 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
case OP_SYNC:
fsync (params->file->o);
break;
- case OP_CLOSE:
- if (params->file->o != -1 && params->file->i != -1)
- {
- close (params->file->o);
- close (params->file->i);
- params->file->i = -1;
- params->file->o = -1;
- }
- break;
}
g_mutex_lock (mutex);
g_queue_pop_head (&queue);
- params->file->pending_writes -= 1;
- if (params->file->pending_writes == 0)
+ params->file->pending_ops -= 1;
+ if (params->file->pending_ops == 0)
g_cond_signal (params->file->cond);
if (params->operation == OP_WRITE)
@@ -290,7 +263,7 @@ gegl_tile_backend_file_writer_thread (gpointer ignored)
}
static void
-gegl_tile_backend_file_read_entry (GeglTileBackendFile *self,
+gegl_tile_backend_file_entry_read (GeglTileBackendFile *self,
GeglBufferTile *entry,
guchar *dest)
{
@@ -352,7 +325,7 @@ gegl_tile_backend_file_read_entry (GeglTileBackendFile *self,
}
static inline void
-gegl_tile_backend_file_write_entry (GeglTileBackendFile *self,
+gegl_tile_backend_file_entry_write (GeglTileBackendFile *self,
GeglBufferTile *entry,
guchar *source)
{
@@ -614,7 +587,7 @@ gegl_tile_backend_file_get_tile (GeglTileSource *self,
gegl_tile_set_rev (tile, entry->rev);
gegl_tile_mark_as_stored (tile);
- gegl_tile_backend_file_read_entry (tile_backend_file, entry, gegl_tile_get_data (tile));
+ gegl_tile_backend_file_entry_read (tile_backend_file, entry, gegl_tile_get_data (tile));
return tile;
}
@@ -643,7 +616,7 @@ gegl_tile_backend_file_set_tile (GeglTileSource *self,
}
entry->rev = gegl_tile_get_rev (tile);
- gegl_tile_backend_file_write_entry (tile_backend_file, entry, gegl_tile_get_data (tile));
+ gegl_tile_backend_file_entry_write (tile_backend_file, entry, gegl_tile_get_data (tile));
gegl_tile_mark_as_stored (tile);
return NULL;
}
@@ -829,18 +802,21 @@ gegl_tile_backend_file_finalize (GObject *object)
if (self->exist)
{
- ThreadParams *params = g_new (ThreadParams, 1);
-
- params->operation = OP_CLOSE;
- params->file = (GeglTileBackendFile *) self;
-
- gegl_tile_backend_file_push_queue (params);
+ gegl_tile_backend_file_finish_writing (self);
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "finalizing buffer %s", self->path);
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "pushed close of %s", self->path);
+ if (self->i != -1)
+ {
+ close (self->i);
+ self->i = -1;
+ }
+ if (self->o != -1)
+ {
+ close (self->o);
+ self->o = -1;
+ }
}
- gegl_tile_backend_file_finish_writing (self);
-
if (self->path)
g_free (self->path);
@@ -1015,12 +991,12 @@ gegl_tile_backend_file_constructor (GType type,
GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "constructing file backend: %s", self->path);
- self->file = g_file_new_for_commandline_arg (self->path);
- self->i = self->o = -1;
- self->index = g_hash_table_new (gegl_tile_backend_file_hashfunc,
- gegl_tile_backend_file_equalfunc);
- self->pending_writes = 0;
- self->cond = g_cond_new ();
+ self->file = g_file_new_for_commandline_arg (self->path);
+ self->i = self->o = -1;
+ self->index = g_hash_table_new (gegl_tile_backend_file_hashfunc,
+ gegl_tile_backend_file_equalfunc);
+ self->pending_ops = 0;
+ self->cond = g_cond_new ();
/* If the file already exists open it, assuming it is a GeglBuffer. */
if (g_access (self->path, F_OK) != -1)
@@ -1107,7 +1083,7 @@ gegl_tile_backend_file_ensure_exist (GeglTileBackendFile *self)
self->next_pre_alloc = 256; /* reserved space for header */
self->total = 256; /* reserved space for header */
self->in_offset = self->out_offset = 0;
- self->pending_writes = 0;
+ self->pending_ops = 0;
gegl_buffer_header_init (&self->header,
backend->priv->tile_width,
backend->priv->tile_height,
@@ -1161,7 +1137,7 @@ gegl_tile_backend_file_init (GeglTileBackendFile *self)
self->free_list = NULL;
self->next_pre_alloc = 256; /* reserved space for header */
self->total = 256; /* reserved space for header */
- self->pending_writes = 0;
+ self->pending_ops = 0;
}
gboolean
diff --git a/gegl/buffer/gegl-tile-backend-file-mapped.c b/gegl/buffer/gegl-tile-backend-file-mapped.c
index 5d15e9d..1793c68 100644
--- a/gegl/buffer/gegl-tile-backend-file-mapped.c
+++ b/gegl/buffer/gegl-tile-backend-file-mapped.c
@@ -114,6 +114,12 @@ struct _GeglTileBackendFile
/* size of the memory mapped area */
guint total_mapped;
+
+ /* number of write operations in the queue for this file */
+ gint pending_ops;
+
+ /* used for waiting on writes to the file to be finished */
+ GCond *cond;
};
@@ -124,6 +130,93 @@ static void gegl_tile_backend_file_dbg_alloc (int size);
static void gegl_tile_backend_file_dbg_dealloc (int size);
+G_DEFINE_TYPE (GeglTileBackendFile, gegl_tile_backend_file, GEGL_TYPE_TILE_BACKEND)
+static GObjectClass * parent_class = NULL;
+
+/* this debugging is across all buffers */
+static gint allocs = 0;
+static gint file_size = 0;
+static gint peak_allocs = 0;
+static gint peak_file_size = 0;
+
+static GQueue queue = G_QUEUE_INIT;
+static GMutex *mutex = NULL;
+static GCond *queue_cond = NULL;
+
+
+static void
+gegl_tile_backend_file_finish_writing (GeglTileBackendFile *self)
+{
+ g_mutex_lock (mutex);
+ while (self->pending_ops != 0)
+ g_cond_wait (self->cond, mutex);
+ g_mutex_unlock (mutex);
+}
+
+static void
+gegl_tile_backend_file_push_queue (ThreadParams *params)
+{
+ gboolean was_empty;
+
+ g_mutex_lock (mutex);
+
+ was_empty = g_queue_is_empty (&queue);
+ params->file->pending_ops += 1;
+ g_queue_push_tail (&queue, params);
+ if (was_empty)
+ g_cond_signal (queue_cond);
+
+ g_mutex_unlock (mutex);
+}
+
+static void
+gegl_tile_backend_file_truncate (GeglTileBackendFile *self,
+ guint length)
+{
+ ThreadParams *params = g_new (ThreadParams, 1);
+
+ params->operation = OP_TRUNCATE;
+ params->file = self;
+ params->length = length;
+
+ gegl_tile_backend_file_push_queue (params);
+
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "truncate to %u bytes", length);
+}
+
+static void
+gegl_tile_backend_file_write (GeglTileBackendFile *self,
+ goffset offset,
+ gchar *source,
+ gint length)
+{
+ gboolean is_empty;
+
+ /*g_mutex_lock (mutex);*/
+ is_empty = g_queue_is_empty (&queue);
+ /*g_mutex_unlock (mutex);*/
+
+ if (is_empty)
+ {
+ memcpy (self->map + offset, source, length);
+ }
+ else
+ {
+ ThreadParams *params = g_new (ThreadParams, 1);
+ guchar *new_source = g_malloc (length);
+
+ memcpy (new_source, source, length);
+
+ params->operation = OP_WRITE;
+ params->offset = offset;
+ params->file = self;
+ params->length = length;
+ params->source = new_source;
+
+ gegl_tile_backend_file_push_queue (params);
+ }
+}
+
static inline void
gegl_tile_backend_file_map (GeglTileBackendFile *self)
{
@@ -133,7 +226,7 @@ gegl_tile_backend_file_map (GeglTileBackendFile *self)
g_error ("Unable to memory map file %s: %s", self->path, g_strerror (errno));
return;
}
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "mapped %i bytes of file %s", self->total_mapped, self->path);
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "mapped %u bytes of file %s", self->total_mapped, self->path);
}
static inline void
@@ -152,23 +245,60 @@ gegl_tile_backend_file_remap (GeglTileBackendFile *self)
{
if (self->total > self->total_mapped)
{
- g_printf ("remap %i\n", self->total * 10);
+ g_mutex_lock (mutex);
gegl_tile_backend_file_unmap (self);
- self->total_mapped = self->total * 10;
+ self->total_mapped = self->total * 5;
gegl_tile_backend_file_map (self);
+
+ g_mutex_unlock (mutex);
}
}
-static inline void
-gegl_tile_backend_file_sync (GeglTileBackendFile *self)
+static gpointer
+gegl_tile_backend_file_writer_thread (gpointer ignored)
{
- if ((msync (self->map, self->total, MS_SYNC|MS_INVALIDATE)) < 0)
+ while (TRUE)
{
- g_warning ("Unable to sync file %s: %s", self->path, g_strerror (errno));
- return;
+ ThreadParams *params;
+
+ g_mutex_lock (mutex);
+
+ while (g_queue_is_empty (&queue))
+ g_cond_wait (queue_cond, mutex);
+
+ params = (ThreadParams *)g_queue_peek_head (&queue);
+ g_mutex_unlock (mutex);
+
+ switch (params->operation)
+ {
+ case OP_WRITE:
+ memcpy (params->file->map + params->offset,
+ params->source, params->length);
+ break;
+ case OP_TRUNCATE:
+ ftruncate (params->file->o, params->length);
+ gegl_tile_backend_file_remap (params->file);
+ break;
+ default: /* OP_SYNC is not necessary for memory mapped files */
+ break;
+ }
+
+ g_mutex_lock (mutex);
+ g_queue_pop_head (&queue);
+
+ params->file->pending_ops -= 1;
+ if (params->file->pending_ops == 0)
+ g_cond_signal (params->file->cond);
+
+ if (params->operation == OP_WRITE)
+ g_free (params->source);
+
+ g_free (params);
+ g_mutex_unlock (mutex);
}
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "synced file %s", self->path);
+
+ return NULL;
}
static inline void
@@ -181,24 +311,30 @@ gegl_tile_backend_file_file_entry_read (GeglTileBackendFile *self,
gegl_tile_backend_file_ensure_exist (self);
- memcpy (dest, self->map + offset, tile_size);
-
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i at %i", entry->x, entry->y, entry->z, (gint)offset);
-}
+ g_mutex_lock (mutex);
+ if (!g_queue_is_empty (&queue))
+ {
+ GList *link = g_queue_peek_tail_link (&queue);
-static inline void
-gegl_tile_backend_file_file_entry_write (GeglTileBackendFile *self,
- GeglBufferTile *entry,
- guchar *source)
-{
- goffset offset = entry->offset;
- gint tile_size = gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self));
+ while (link)
+ {
+ ThreadParams *queued_op = (ThreadParams *)link->data;
- gegl_tile_backend_file_ensure_exist (self);
+ if (queued_op->file == self && queued_op->offset == offset)
+ {
+ memcpy (dest, queued_op->source, tile_size);
+ g_mutex_unlock (mutex);
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i from write queue", entry->x, entry->y, entry->z);
+ return;
+ }
+ link = link->prev;
+ }
+ }
+ g_mutex_unlock (mutex);
- memcpy (self->map + offset, source, tile_size);
+ memcpy (dest, self->map + offset, tile_size);
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "wrote entry %i,%i,%i at %i", entry->x, entry->y, entry->z, (gint)offset);
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "read entry %i,%i,%i at %i", entry->x, entry->y, entry->z, (gint)offset);
}
static inline GeglBufferTile *
@@ -235,11 +371,7 @@ gegl_tile_backend_file_file_entry_new (GeglTileBackendFile *self)
*/
{
self->total = self->total + 32 * tile_size;
-
- GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "growing file to %i bytes", (gint)self->total);
-
- ftruncate (self->o, self->total);
- gegl_tile_backend_file_remap (self);
+ gegl_tile_backend_file_truncate (self, self->total);
}
}
gegl_tile_backend_file_dbg_alloc (gegl_tile_backend_get_tile_size (GEGL_TILE_BACKEND (self)));
@@ -265,7 +397,7 @@ gegl_tile_backend_file_write_header (GeglTileBackendFile *self)
{
gegl_tile_backend_file_ensure_exist (self);
- memcpy (self->map, &(self->header), 256);
+ gegl_tile_backend_file_write (self, 0, (gchar*)&(self->header), 256);
GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Wrote header, next=%i", (gint)self->header.next);
return TRUE;
@@ -292,13 +424,8 @@ gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
are not matching..
*/
- if (self->total < next_allocation)
- {
- self->total = next_allocation;
- ftruncate (self->o, next_allocation);
- gegl_tile_backend_file_remap (self);
- }
- memcpy (self->map + offset, self->in_holding, self->in_holding->length);
+ gegl_tile_backend_file_write (self, offset, (gchar*)self->in_holding, self->in_holding->length);
+
self->offset += self->in_holding->length;
GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "Wrote block: length:%i flags:%i next:%i at offset %i",
self->in_holding->length,
@@ -324,16 +451,6 @@ gegl_tile_backend_file_write_block (GeglTileBackendFile *self,
return TRUE;
}
-G_DEFINE_TYPE (GeglTileBackendFile, gegl_tile_backend_file, GEGL_TYPE_TILE_BACKEND)
-static GObjectClass * parent_class = NULL;
-
-/* this debugging is across all buffers */
-
-static gint allocs = 0;
-static gint file_size = 0;
-static gint peak_allocs = 0;
-static gint peak_file_size = 0;
-
void
gegl_tile_backend_file_stats (void)
{
@@ -431,7 +548,11 @@ gegl_tile_backend_file_set_tile (GeglTileSource *self,
}
entry->rev = gegl_tile_get_rev (tile);
- gegl_tile_backend_file_file_entry_write (tile_backend_file, entry, gegl_tile_get_data (tile));
+ gegl_tile_backend_file_write (tile_backend_file, entry->offset,
+ (gchar*)gegl_tile_get_data (tile),
+ gegl_tile_backend_get_tile_size (backend));
+ GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "wrote entry %i, %i, %i at %i", entry->x, entry->y, entry->z, (gint)entry->offset);
+
gegl_tile_mark_as_stored (tile);
return NULL;
}
@@ -618,6 +739,7 @@ gegl_tile_backend_file_finalize (GObject *object)
if (self->exist)
{
+ gegl_tile_backend_file_finish_writing (self);
GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "finalizing buffer %s", self->path);
if (self->o != -1)
@@ -797,9 +919,12 @@ gegl_tile_backend_file_constructor (GType type,
GEGL_NOTE (GEGL_DEBUG_TILE_BACKEND, "constructing file backend: %s", self->path);
- self->file = g_file_new_for_commandline_arg (self->path);
- self->o = -1;
- self->index = g_hash_table_new (gegl_tile_backend_file_hashfunc, gegl_tile_backend_file_equalfunc);
+ self->file = g_file_new_for_commandline_arg (self->path);
+ self->o = -1;
+ self->index = g_hash_table_new (gegl_tile_backend_file_hashfunc,
+ gegl_tile_backend_file_equalfunc);
+ self->pending_ops = 0;
+ self->cond = g_cond_new ();
/* If the file already exists open it, assuming it is a GeglBuffer. */
if (g_access (self->path, F_OK) != -1)
@@ -908,7 +1033,7 @@ gegl_tile_backend_file_ensure_exist (GeglTileBackendFile *self)
static void
gegl_tile_backend_file_class_init (GeglTileBackendFileClass *klass)
{
- GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
parent_class = g_type_class_peek_parent (klass);
@@ -917,6 +1042,10 @@ gegl_tile_backend_file_class_init (GeglTileBackendFileClass *klass)
gobject_class->constructor = gegl_tile_backend_file_constructor;
gobject_class->finalize = gegl_tile_backend_file_finalize;
+ queue_cond = g_cond_new ();
+ mutex = g_mutex_new ();
+ g_thread_create_full (gegl_tile_backend_file_writer_thread,
+ NULL, 0, TRUE, TRUE, G_THREAD_PRIORITY_NORMAL, NULL);
GEGL_BUFFER_STRUCT_CHECK_PADDING;
@@ -933,14 +1062,15 @@ static void
gegl_tile_backend_file_init (GeglTileBackendFile *self)
{
((GeglTileSource*)self)->command = gegl_tile_backend_file_command;
- self->path = NULL;
- self->file = NULL;
- self->o = -1;
- self->index = NULL;
- self->free_list = NULL;
- self->next_pre_alloc = 256; /* reserved space for header */
- self->total = 256; /* reserved space for header */
- self->map = NULL;
+ self->path = NULL;
+ self->file = NULL;
+ self->o = -1;
+ self->index = NULL;
+ self->free_list = NULL;
+ self->next_pre_alloc = 256; /* reserved space for header */
+ self->total = 256; /* reserved space for header */
+ self->map = NULL;
+ self->pending_ops = 0;
}
gboolean
@@ -968,5 +1098,10 @@ gegl_tile_backend_file_unlock (GeglTileBackendFile *self)
}
self->header.flags -= GEGL_FLAG_LOCKED;
gegl_tile_backend_file_write_header (self);
+
+ /* wait until all writes to this file are finished before handing it over
+ to another process */
+ gegl_tile_backend_file_finish_writing (self);
+
return TRUE;
}
diff --git a/gegl/buffer/gegl-tile-backend-file.c b/gegl/buffer/gegl-tile-backend-file.c
index 90e8109..40bf5b0 100644
--- a/gegl/buffer/gegl-tile-backend-file.c
+++ b/gegl/buffer/gegl-tile-backend-file.c
@@ -47,6 +47,7 @@
#endif
+
struct _GeglTileBackendFile
{
GeglTileBackend parent_instance;
diff --git a/gegl/buffer/gegl-tile-backend-file.h b/gegl/buffer/gegl-tile-backend-file.h
index 018503c..067a243 100644
--- a/gegl/buffer/gegl-tile-backend-file.h
+++ b/gegl/buffer/gegl-tile-backend-file.h
@@ -38,6 +38,22 @@ G_BEGIN_DECLS
typedef struct _GeglTileBackendFile GeglTileBackendFile;
typedef struct _GeglTileBackendFileClass GeglTileBackendFileClass;
+typedef enum
+{
+ OP_WRITE,
+ OP_TRUNCATE,
+ OP_SYNC
+} ThreadOp;
+
+typedef struct
+{
+ gint length; /* length of data if writing tile or
+ length of file if truncating */
+ guchar *source;
+ goffset offset;
+ GeglTileBackendFile *file; /* the file we are operating on */
+ ThreadOp operation; /* type of file operation, see above */
+} ThreadParams;
struct _GeglTileBackendFileClass
{
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]