[byzanz] Rework encoder handling and introduce ByzanzQueue



commit e17558f7d674654e6dc6784b7192208f664e6944
Author: Benjamin Otte <otte gnome org>
Date:   Wed Sep 2 21:24:30 2009 +0200

    Rework encoder handling and introduce ByzanzQueue
    
    ByzanzQueue is a disk-backed memory queue that can be used as an
    intermediate storage for large amounts of data that cannot be processed
    in time.
    Byzanz uses this with the encoder rewrite as an image data cache in
    .byzanz serialization format. This helps a lot when using a slow encoder
    (theora) and lots of changes happening.

 src/Makefile.am               |    6 +
 src/byzanzencoder.c           |   78 ++++++------
 src/byzanzencoder.h           |   15 +--
 src/byzanzencoderbyzanz.c     |   36 +-----
 src/byzanzencoderbyzanz.h     |    2 -
 src/byzanzencodergif.c        |   25 ++---
 src/byzanzencodergif.h        |    2 +-
 src/byzanzencodergstreamer.c  |   23 ++--
 src/byzanzqueue.c             |  150 ++++++++++++++++++++++++
 src/byzanzqueue.h             |   62 ++++++++++
 src/byzanzqueueinputstream.c  |  257 +++++++++++++++++++++++++++++++++++++++++
 src/byzanzqueueinputstream.h  |   52 ++++++++
 src/byzanzqueueoutputstream.c |  179 ++++++++++++++++++++++++++++
 src/byzanzqueueoutputstream.h |   52 ++++++++
 src/byzanzsession.c           |   57 +++++++--
 src/byzanzsession.h           |    3 +
 src/playback.c                |   40 +------
 17 files changed, 883 insertions(+), 156 deletions(-)
---
diff --git a/src/Makefile.am b/src/Makefile.am
index 5f50ccb..769bca3 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -17,6 +17,9 @@ noinst_HEADERS = \
 	byzanzlayer.h \
 	byzanzlayercursor.h \
 	byzanzlayerwindow.h \
+	byzanzqueue.h \
+	byzanzqueueinputstream.h \
+	byzanzqueueoutputstream.h \
 	byzanzrecorder.h \
 	byzanzsession.h \
 	byzanzselect.h \
@@ -35,6 +38,9 @@ libbyzanz_la_SOURCES = \
 	byzanzlayercursor.c \
 	byzanzlayerwindow.c \
 	byzanzmarshal.c \
+	byzanzqueue.c \
+	byzanzqueueinputstream.c \
+	byzanzqueueoutputstream.c \
 	byzanzrecorder.c \
 	byzanzsession.c \
 	byzanzselect.c \
diff --git a/src/byzanzencoder.c b/src/byzanzencoder.c
index b49d213..e45280d 100644
--- a/src/byzanzencoder.c
+++ b/src/byzanzencoder.c
@@ -23,6 +23,8 @@
 
 #include "byzanzencoder.h"
 
+#include "byzanzserialize.h"
+
 typedef struct _ByzanzEncoderJob ByzanzEncoderJob;
 struct _ByzanzEncoderJob {
   GTimeVal		tv;		/* time this job was enqueued */
@@ -71,30 +73,36 @@ byzanz_encoder_run (gpointer enc)
   ByzanzEncoder *encoder = enc;
   ByzanzEncoderClass *klass = BYZANZ_ENCODER_GET_CLASS (enc);
   GError *error = NULL;
+  guint width, height;
+  cairo_surface_t *surface;
+  GdkRegion *region;
+  guint64 msecs;
 
-  if (!klass->setup (encoder, encoder->output_stream, encoder->width, encoder->height,
+  if (!byzanz_deserialize_header (encoder->input_stream, &width, &height, encoder->cancellable, &error) ||
+      !klass->setup (encoder, encoder->output_stream, width, height,
         encoder->cancellable, &error))
-    return error;
+    goto fail;
 
   do {
-    ByzanzEncoderJob *job = g_async_queue_pop (encoder->jobs);
+    if (!byzanz_deserialize (encoder->input_stream, &msecs, &surface, &region,
+          encoder->cancellable, &error))
+      break;
 
     /* quit */
-    if (job->surface == NULL) {
-      if (klass->close (encoder, encoder->output_stream, &job->tv,
-              encoder->cancellable, &error))
+    if (surface == NULL) {
+      if (klass->close (encoder, encoder->output_stream, msecs, encoder->cancellable, &error))
         g_output_stream_close (encoder->output_stream, encoder->cancellable, &error);
-      byzanz_encoder_job_free (job);
       break;
     }
 
     /* decode */
-    klass->process (encoder, encoder->output_stream,
-          job->surface, job->region, &job->tv,
-          encoder->cancellable, &error);
-    byzanz_encoder_job_free (job);
+    klass->process (encoder, encoder->output_stream, msecs,
+          surface, region, encoder->cancellable, &error);
+    cairo_surface_destroy (surface);
+    gdk_region_destroy (region);
   } while (error == NULL);
 
+fail:
   g_idle_add_full (G_PRIORITY_DEFAULT, byzanz_encoder_finished, enc, NULL);
   return error;
 }
@@ -103,10 +111,9 @@ byzanz_encoder_run (gpointer enc)
 
 enum {
   PROP_0,
+  PROP_INPUT,
   PROP_OUTPUT,
   PROP_CANCELLABLE,
-  PROP_WIDTH,
-  PROP_HEIGHT,
   PROP_ERROR,
   PROP_RUNNING
 };
@@ -173,6 +180,9 @@ byzanz_encoder_get_property (GObject *object, guint param_id, GValue *value,
   ByzanzEncoder *encoder = BYZANZ_ENCODER (object);
 
   switch (param_id) {
+    case PROP_INPUT:
+      g_value_set_object (value, encoder->input_stream);
+      break;
     case PROP_OUTPUT:
       g_value_set_object (value, encoder->output_stream);
       break;
@@ -185,12 +195,6 @@ byzanz_encoder_get_property (GObject *object, guint param_id, GValue *value,
     case PROP_RUNNING:
       g_value_set_boolean (value, encoder->thread != NULL);
       break;
-    case PROP_WIDTH:
-      g_value_set_uint (value, encoder->width);
-      break;
-    case PROP_HEIGHT:
-      g_value_set_uint (value, encoder->height);
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
       break;
@@ -204,6 +208,10 @@ byzanz_encoder_set_property (GObject *object, guint param_id, const GValue *valu
   ByzanzEncoder *encoder = BYZANZ_ENCODER (object);
 
   switch (param_id) {
+    case PROP_INPUT:
+      encoder->input_stream = g_value_dup_object (value);
+      g_assert (encoder->input_stream != NULL);
+      break;
     case PROP_OUTPUT:
       encoder->output_stream = g_value_dup_object (value);
       g_assert (encoder->output_stream != NULL);
@@ -211,12 +219,6 @@ byzanz_encoder_set_property (GObject *object, guint param_id, const GValue *valu
     case PROP_CANCELLABLE:
       encoder->cancellable = g_value_dup_object (value);
       break;
-    case PROP_WIDTH:
-      encoder->width = g_value_get_uint (value);
-      break;
-    case PROP_HEIGHT:
-      encoder->height = g_value_get_uint (value);
-      break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
       break;
@@ -230,6 +232,7 @@ byzanz_encoder_finalize (GObject *object)
 
   g_assert (encoder->thread == NULL);
 
+  g_object_unref (encoder->input_stream);
   g_object_unref (encoder->output_stream);
   if (encoder->cancellable)
     g_object_unref (encoder->cancellable);
@@ -265,6 +268,9 @@ byzanz_encoder_class_init (ByzanzEncoderClass *klass)
   object_class->finalize = byzanz_encoder_finalize;
   object_class->constructed = byzanz_encoder_constructed;
 
+  g_object_class_install_property (object_class, PROP_INPUT,
+      g_param_spec_object ("input", "input", "stream to read data from",
+	  G_TYPE_INPUT_STREAM, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
   g_object_class_install_property (object_class, PROP_OUTPUT,
       g_param_spec_object ("output", "output", "stream to write data to",
 	  G_TYPE_OUTPUT_STREAM, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
@@ -277,12 +283,6 @@ byzanz_encoder_class_init (ByzanzEncoderClass *klass)
   g_object_class_install_property (object_class, PROP_RUNNING,
       g_param_spec_boolean ("running", "running", "TRUE while the encoding thread is running",
 	  TRUE, G_PARAM_READABLE));
-  g_object_class_install_property (object_class, PROP_WIDTH,
-      g_param_spec_uint ("width", "width", "width of the stream that gets encoded",
-	  1, G_MAXUINT, 1, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
-  g_object_class_install_property (object_class, PROP_HEIGHT,
-      g_param_spec_uint ("height", "height", "height of the stream that gets encoded",
-	  1, G_MAXUINT, 1, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
 }
 
 static void
@@ -295,25 +295,24 @@ byzanz_encoder_init (GTypeInstance *instance, gpointer klass)
 
 ByzanzEncoder *
 byzanz_encoder_new (GType           encoder_type,
-                    GOutputStream * stream,
-                    guint           width,
-                    guint           height,
+                    GInputStream *  input,
+                    GOutputStream * output,
                     GCancellable *  cancellable)
 {
   ByzanzEncoder *encoder;
 
   g_return_val_if_fail (g_type_is_a (encoder_type, BYZANZ_TYPE_ENCODER), NULL);
-  g_return_val_if_fail (G_IS_OUTPUT_STREAM (stream), NULL);
+  g_return_val_if_fail (G_IS_INPUT_STREAM (input), NULL);
+  g_return_val_if_fail (G_IS_OUTPUT_STREAM (output), NULL);
   g_return_val_if_fail (cancellable == NULL || G_IS_CANCELLABLE (cancellable), NULL);
-  g_return_val_if_fail (width > 0, NULL);
-  g_return_val_if_fail (height > 0, NULL);
 
-  encoder = g_object_new (encoder_type, "output", stream,
-      "cancellable", cancellable, "width", width, "height", height, NULL);
+  encoder = g_object_new (encoder_type, "input", input, "output", output,
+      "cancellable", cancellable, NULL);
 
   return encoder;
 }
 
+/*
 void
 byzanz_encoder_process (ByzanzEncoder *	 encoder,
 		        cairo_surface_t *surface,
@@ -357,6 +356,7 @@ byzanz_encoder_close (ByzanzEncoder *encoder,
 
   g_async_queue_push (encoder->jobs, job);
 }
+*/
 
 gboolean
 byzanz_encoder_is_running (ByzanzEncoder *encoder)
diff --git a/src/byzanzencoder.h b/src/byzanzencoder.h
index ed0677f..94df407 100644
--- a/src/byzanzencoder.h
+++ b/src/byzanzencoder.h
@@ -41,11 +41,10 @@ struct _ByzanzEncoder {
   GObject		object;
   
   /*<private >*/
+  GInputStream *        input_stream;           /* stream to read from in byzanzserialize.h format */
   GOutputStream *       output_stream;          /* stream we write to (passed to the vfuncs) */
   GCancellable *        cancellable;            /* cancellable to use in thread */
   GError *              error;                  /* NULL or the encoding error */
-  guint                 width;                  /* width of image */
-  guint                 height;                 /* height of image */
 
   GAsyncQueue *         jobs;                   /* the stuff we still need to encode */
   GThread *             thread;                 /* the encoding thread */
@@ -65,14 +64,14 @@ struct _ByzanzEncoderClass {
 						 GError **		error);
   gboolean		(* process)		(ByzanzEncoder *	encoder,
 						 GOutputStream *	stream,
+                                                 guint64                msecs,
 						 cairo_surface_t *	surface,
 						 const GdkRegion *	region,
-						 const GTimeVal *	total_elapsed,
                                                  GCancellable *         cancellable,
 						 GError **		error);
   gboolean		(* close)		(ByzanzEncoder *	encoder,
 						 GOutputStream *	stream,
-						 const GTimeVal *	total_elapsed,
+                                                 guint64                msecs,
                                                  GCancellable *         cancellable,
 						 GError **		error);
 };
@@ -80,17 +79,17 @@ struct _ByzanzEncoderClass {
 GType		byzanz_encoder_get_type		(void) G_GNUC_CONST;
 
 ByzanzEncoder *	byzanz_encoder_new		(GType                  encoder_type,
-                                                 GOutputStream *        stream,
-                                                 guint                  width,
-                                                 guint                  height,
+                                                 GInputStream *         input,
+                                                 GOutputStream *        output,
                                                  GCancellable *         cancellable);
+/*
 void		byzanz_encoder_process		(ByzanzEncoder *	encoder,
 						 cairo_surface_t *	surface,
 						 const GdkRegion *	region,
 						 const GTimeVal *	total_elapsed);
 void		byzanz_encoder_close		(ByzanzEncoder *	encoder,
 						 const GTimeVal *	total_elapsed);
-
+*/
 gboolean        byzanz_encoder_is_running       (ByzanzEncoder *        encoder);
 const GError *  byzanz_encoder_get_error        (ByzanzEncoder *        encoder);
 
diff --git a/src/byzanzencoderbyzanz.c b/src/byzanzencoderbyzanz.c
index ea45a97..33667a0 100644
--- a/src/byzanzencoderbyzanz.c
+++ b/src/byzanzencoderbyzanz.c
@@ -30,19 +30,6 @@
 
 G_DEFINE_TYPE (ByzanzEncoderByzanz, byzanz_encoder_byzanz, BYZANZ_TYPE_ENCODER)
 
-static guint64
-byzanz_encoder_byzanz_time (ByzanzEncoderByzanz * byzanz,
-                            const GTimeVal *      tv)
-{
-  guint64 result;
-
-  result = tv->tv_sec - byzanz->start_time.tv_sec;
-  result *= 1000;
-  result += (tv->tv_usec - byzanz->start_time.tv_usec) / 1000;
-
-  return result;
-}
-
 static gboolean
 byzanz_encoder_byzanz_setup (ByzanzEncoder * encoder,
                              GOutputStream * stream,
@@ -57,32 +44,23 @@ byzanz_encoder_byzanz_setup (ByzanzEncoder * encoder,
 static gboolean
 byzanz_encoder_byzanz_process (ByzanzEncoder *   encoder,
                                GOutputStream *   stream,
+                               guint64           msecs,
                                cairo_surface_t * surface,
                                const GdkRegion * region,
-                               const GTimeVal *  total_elapsed,
                                GCancellable *    cancellable,
                                GError **	 error)
 {
-  ByzanzEncoderByzanz *byzanz = BYZANZ_ENCODER_BYZANZ (encoder);
-
-  if (byzanz->start_time.tv_sec == 0 && byzanz->start_time.tv_usec)
-    byzanz->start_time = *total_elapsed;
-
-  return byzanz_serialize (stream, byzanz_encoder_byzanz_time (byzanz, total_elapsed),
-      surface, region, cancellable, error);
+  return byzanz_serialize (stream, msecs, surface, region, cancellable, error);
 }
 
 static gboolean
 byzanz_encoder_byzanz_close (ByzanzEncoder *  encoder,
-                          GOutputStream *  stream,
-                          const GTimeVal * total_elapsed,
-                          GCancellable *   cancellable,
-                          GError **	   error)
+                             GOutputStream *  stream,
+                             guint64          msecs,
+                             GCancellable *   cancellable,
+                             GError **	      error)
 {
-  ByzanzEncoderByzanz *byzanz = BYZANZ_ENCODER_BYZANZ (encoder);
-
-  return byzanz_serialize (stream, byzanz_encoder_byzanz_time (byzanz, total_elapsed),
-      NULL, NULL, cancellable, error);
+  return byzanz_serialize (stream, msecs, NULL, NULL, cancellable, error);
 }
 
 static void
diff --git a/src/byzanzencoderbyzanz.h b/src/byzanzencoderbyzanz.h
index 64e5f51..a9da99f 100644
--- a/src/byzanzencoderbyzanz.h
+++ b/src/byzanzencoderbyzanz.h
@@ -34,8 +34,6 @@ typedef struct _ByzanzEncoderByzanzClass ByzanzEncoderByzanzClass;
 
 struct _ByzanzEncoderByzanz {
   ByzanzEncoder         encoder;
-
-  GTimeVal              start_time;
 };
 
 struct _ByzanzEncoderByzanzClass {
diff --git a/src/byzanzencodergif.c b/src/byzanzencodergif.c
index 080ce9a..bb80339 100644
--- a/src/byzanzencodergif.c
+++ b/src/byzanzencodergif.c
@@ -85,9 +85,9 @@ byzanz_encoder_gif_quantize (ByzanzEncoderGif * gif,
 }
 
 static gboolean
-byzanz_encoder_write_image (ByzanzEncoderGif *gif, const GTimeVal *tv, GError **error)
+byzanz_encoder_write_image (ByzanzEncoderGif *gif, guint64 msecs, GError **error)
 {
-  glong msecs;
+  guint elapsed;
   guint width;
 
   g_assert (gif->cached_data != NULL);
@@ -95,19 +95,16 @@ byzanz_encoder_write_image (ByzanzEncoderGif *gif, const GTimeVal *tv, GError **
   g_assert (gif->cached_area.height > 0);
 
   width = gifenc_get_width (gif->gifenc);
-  msecs = (tv->tv_sec - gif->cached_time.tv_sec) * 1000 + 
-	  (tv->tv_usec - gif->cached_time.tv_usec) / 1000 + 5;
-  if (msecs < 10)
-    g_printerr ("<10 msecs for a frame, can this be?\n");
-  msecs = MAX (msecs, 10);
+  elapsed = msecs - gif->cached_time;
+  elapsed = MAX (elapsed, 10);
 
   if (!gifenc_add_image (gif->gifenc, gif->cached_area.x, gif->cached_area.y, 
-            gif->cached_area.width, gif->cached_area.height, msecs,
+            gif->cached_area.width, gif->cached_area.height, elapsed,
             gif->cached_data + width * gif->cached_area.y + gif->cached_area.x,
             width, error))
     return FALSE;
 
-  gif->cached_time = *tv;
+  gif->cached_time = msecs;
   return TRUE;
 }
 
@@ -171,9 +168,9 @@ byzanz_encoder_swap_image (ByzanzEncoderGif * gif,
 static gboolean
 byzanz_encoder_gif_process (ByzanzEncoder *   encoder,
                             GOutputStream *   stream,
+                            guint64           msecs,
                             cairo_surface_t * surface,
                             const GdkRegion * region,
-                            const GTimeVal *  total_elapsed,
                             GCancellable *    cancellable,
                             GError **	      error)
 {
@@ -183,14 +180,14 @@ byzanz_encoder_gif_process (ByzanzEncoder *   encoder,
   if (!gif->has_quantized) {
     if (!byzanz_encoder_gif_quantize (gif, surface, error))
       return FALSE;
-    gif->cached_time = *total_elapsed;
+    gif->cached_time = msecs;
     if (!byzanz_encoder_gif_encode_image (gif, surface, region, &area)) {
       g_assert_not_reached ();
     }
     byzanz_encoder_swap_image (gif, &area);
   } else {
     if (byzanz_encoder_gif_encode_image (gif, surface, region, &area)) {
-      if (!byzanz_encoder_write_image (gif, total_elapsed, error))
+      if (!byzanz_encoder_write_image (gif, msecs, error))
         return FALSE;
       byzanz_encoder_swap_image (gif, &area);
     }
@@ -202,7 +199,7 @@ byzanz_encoder_gif_process (ByzanzEncoder *   encoder,
 static gboolean
 byzanz_encoder_gif_close (ByzanzEncoder *  encoder,
                           GOutputStream *  stream,
-                          const GTimeVal * total_elapsed,
+                          guint64          msecs,
                           GCancellable *   cancellable,
                           GError **	   error)
 {
@@ -213,7 +210,7 @@ byzanz_encoder_gif_close (ByzanzEncoder *  encoder,
     return FALSE;
   }
 
-  if (!byzanz_encoder_write_image (gif, total_elapsed, error) ||
+  if (!byzanz_encoder_write_image (gif, msecs, error) ||
       !gifenc_close (gif->gifenc, error))
     return FALSE;
 
diff --git a/src/byzanzencodergif.h b/src/byzanzencodergif.h
index 31b0033..b4c75f0 100644
--- a/src/byzanzencodergif.h
+++ b/src/byzanzencodergif.h
@@ -43,7 +43,7 @@ struct _ByzanzEncoderGif {
 
   GdkRectangle          cached_area;    /* area that is saved in cached_data */
   guint8 *              cached_data;    /* cached_area.{width x height} sized area of image */
-  GTimeVal              cached_time;    /* timestamp the cached image corresponds to */
+  guint64               cached_time;    /* timestamp the cached image corresponds to */
 
   guint8 *		cached_tmp;	/* temporary data to swap cached_data with */
 };
diff --git a/src/byzanzencodergstreamer.c b/src/byzanzencodergstreamer.c
index cc8a31a..2decca0 100644
--- a/src/byzanzencodergstreamer.c
+++ b/src/byzanzencodergstreamer.c
@@ -102,12 +102,12 @@ byzanz_encoder_gstreamer_got_error (ByzanzEncoderGStreamer *gstreamer, GError **
 
 static gboolean
 byzanz_encoder_gstreamer_process (ByzanzEncoder *   encoder,
-                            GOutputStream *   stream,
-                            cairo_surface_t * surface,
-                            const GdkRegion * region,
-                            const GTimeVal *  total_elapsed,
-                            GCancellable *    cancellable,
-                            GError **	      error)
+                                  GOutputStream *   stream,
+                                  guint64           msecs,
+                                  cairo_surface_t * surface,
+                                  const GdkRegion * region,
+                                  GCancellable *    cancellable,
+                                  GError **	    error)
 {
   ByzanzEncoderGStreamer *gstreamer = BYZANZ_ENCODER_GSTREAMER (encoder);
   GstBuffer *buffer;
@@ -119,7 +119,6 @@ byzanz_encoder_gstreamer_process (ByzanzEncoder *   encoder,
   if (gstreamer->surface == NULL) {
     /* just assume that the size is right and pray */
     gstreamer->surface = cairo_surface_reference (surface);
-    gstreamer->start_time = *total_elapsed;
   } else {
     cairo_t *cr;
 
@@ -149,7 +148,7 @@ byzanz_encoder_gstreamer_process (ByzanzEncoder *   encoder,
       cairo_image_surface_get_stride (gstreamer->surface) * cairo_image_surface_get_height (gstreamer->surface),
       (GstAppBufferFinalizeFunc) cairo_surface_destroy, gstreamer->surface);
   GST_BUFFER_FLAG_SET (buffer, GST_BUFFER_FLAG_READONLY);
-  GST_BUFFER_TIMESTAMP (buffer) = GST_TIMEVAL_TO_TIME (*total_elapsed) - GST_TIMEVAL_TO_TIME (gstreamer->start_time);
+  GST_BUFFER_TIMESTAMP (buffer) = msecs * GST_MSECOND;
   gst_buffer_set_caps (buffer, gstreamer->caps);
   gst_app_src_push_buffer (gstreamer->src, buffer);
 
@@ -158,10 +157,10 @@ byzanz_encoder_gstreamer_process (ByzanzEncoder *   encoder,
 
 static gboolean
 byzanz_encoder_gstreamer_close (ByzanzEncoder *  encoder,
-                          GOutputStream *  stream,
-                          const GTimeVal * total_elapsed,
-                          GCancellable *   cancellable,
-                          GError **	   error)
+                                GOutputStream *  stream,
+                                guint64          msecs,
+                                GCancellable *   cancellable,
+                                GError **	 error)
 {
   ByzanzEncoderGStreamer *gstreamer = BYZANZ_ENCODER_GSTREAMER (encoder);
   GstBus *bus;
diff --git a/src/byzanzqueue.c b/src/byzanzqueue.c
new file mode 100644
index 0000000..02492cd
--- /dev/null
+++ b/src/byzanzqueue.c
@@ -0,0 +1,150 @@
+/* desktop session recorder
+ * Copyright (C) 2009 Benjamin Otte <otte gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "byzanzqueue.h"
+
+#include "byzanzqueueinputstream.h"
+#include "byzanzqueueoutputstream.h"
+
+enum {
+  PROP_0,
+  PROP_INPUT,
+  PROP_OUTPUT,
+};
+
+G_DEFINE_TYPE (ByzanzQueue, byzanz_queue, G_TYPE_OBJECT)
+
+static void
+byzanz_queue_get_property (GObject *object, guint param_id, GValue *value, 
+    GParamSpec * pspec)
+{
+  ByzanzQueue *queue = BYZANZ_QUEUE (object);
+
+  switch (param_id) {
+    case PROP_INPUT:
+      g_value_set_object (value, queue->input);
+      break;
+    case PROP_OUTPUT:
+      g_value_set_object (value, queue->output);
+      break;
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
+      break;
+  }
+}
+
+static void
+byzanz_queue_set_property (GObject *object, guint param_id, const GValue *value, 
+    GParamSpec * pspec)
+{
+  //ByzanzQueue *queue = BYZANZ_QUEUE (object);
+
+  switch (param_id) {
+    default:
+      G_OBJECT_WARN_INVALID_PROPERTY_ID (object, param_id, pspec);
+      break;
+  }
+}
+
+static void
+byzanz_queue_dispose (GObject *object)
+{
+  ByzanzQueue *queue = BYZANZ_QUEUE (object);
+
+  if (queue->input)
+    g_object_unref (queue->input);
+  if (queue->output)
+    g_object_unref (queue->output);
+
+  if (!g_atomic_int_dec_and_test (&queue->shared_count)) {
+    g_object_ref (queue);
+    return;
+  }
+
+  G_OBJECT_CLASS (byzanz_queue_parent_class)->dispose (object);
+}
+
+static void
+byzanz_queue_finalize (GObject *object)
+{
+  ByzanzQueue *queue = BYZANZ_QUEUE (object);
+  GFile *file;
+
+  while ((file = g_async_queue_try_pop (queue->files)))
+    g_file_delete (file, NULL, NULL);
+  g_async_queue_unref (queue->files);
+
+  G_OBJECT_CLASS (byzanz_queue_parent_class)->dispose (object);
+}
+
+static void
+byzanz_queue_class_init (ByzanzQueueClass *klass)
+{
+  GObjectClass *object_class = G_OBJECT_CLASS (klass);
+
+  object_class->get_property = byzanz_queue_get_property;
+  object_class->set_property = byzanz_queue_set_property;
+  object_class->dispose = byzanz_queue_dispose;
+  object_class->finalize = byzanz_queue_finalize;
+
+  g_object_class_install_property (object_class, PROP_INPUT,
+      g_param_spec_object ("inputstream", "input stream", "stream to use for reading from the cache",
+	  G_TYPE_INPUT_STREAM, G_PARAM_READABLE));
+  g_object_class_install_property (object_class, PROP_OUTPUT,
+      g_param_spec_object ("outputstream", "output stream", "stream to use for writing to the cache",
+	  G_TYPE_OUTPUT_STREAM, G_PARAM_READABLE));
+}
+
+static void
+byzanz_queue_init (ByzanzQueue *queue)
+{
+  queue->files = g_async_queue_new ();
+
+  queue->input = byzanz_queue_input_stream_new (queue);
+  queue->output = byzanz_queue_output_stream_new (queue);
+
+  queue->shared_count = 3;
+}
+
+ByzanzQueue *
+byzanz_queue_new (void)
+{
+  return g_object_new (BYZANZ_TYPE_QUEUE, NULL);
+}
+
+GOutputStream *
+byzanz_queue_get_output_stream (ByzanzQueue *queue)
+{
+  g_return_val_if_fail (BYZANZ_IS_QUEUE (queue), NULL);
+
+  return queue->output;
+}
+
+GInputStream *
+byzanz_queue_get_input_stream (ByzanzQueue *queue)
+{
+  g_return_val_if_fail (BYZANZ_IS_QUEUE (queue), NULL);
+
+  return queue->input;
+}
+
diff --git a/src/byzanzqueue.h b/src/byzanzqueue.h
new file mode 100644
index 0000000..9a9ff24
--- /dev/null
+++ b/src/byzanzqueue.h
@@ -0,0 +1,62 @@
+/* desktop session recorder
+ * Copyright (C) 2009 Benjamin Otte <otte gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <gio/gio.h>
+
+#ifndef __HAVE_BYZANZ_QUEUE_H__
+#define __HAVE_BYZANZ_QUEUE_H__
+
+typedef struct _ByzanzQueue ByzanzQueue;
+typedef struct _ByzanzQueueClass ByzanzQueueClass;
+
+#define BYZANZ_QUEUE_FILE_SIZE 16 * 1024 * 1024
+
+#define BYZANZ_TYPE_QUEUE                    (byzanz_queue_get_type())
+#define BYZANZ_IS_QUEUE(obj)                 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), BYZANZ_TYPE_QUEUE))
+#define BYZANZ_IS_QUEUE_CLASS(klass)         (G_TYPE_CHECK_CLASS_TYPE ((klass), BYZANZ_TYPE_QUEUE))
+#define BYZANZ_QUEUE(obj)                    (G_TYPE_CHECK_INSTANCE_CAST ((obj), BYZANZ_TYPE_QUEUE, ByzanzQueue))
+#define BYZANZ_QUEUE_CLASS(klass)            (G_TYPE_CHECK_CLASS_CAST ((klass), BYZANZ_TYPE_QUEUE, ByzanzQueueClass))
+#define BYZANZ_QUEUE_GET_CLASS(obj)          (G_TYPE_INSTANCE_GET_CLASS ((obj), BYZANZ_TYPE_QUEUE, ByzanzQueueClass))
+
+struct _ByzanzQueue {
+  GObject		object;
+
+  GOutputStream *	output;		/* stream to use for writing to the queue */
+  GInputStream *	input;		/* stream to use for reading to the queue */
+
+  volatile int		shared_count;	/* shared ref count of queue, output and input stream */
+
+  GAsyncQueue *		files;		/* the files that still need to be processed */
+  guint			output_closed:1;/* the output stream is closed. Must take async queue lock to access */
+  guint			input_closed:1; /* the input stream is closed. Must take async queue lock to access */
+};
+
+struct _ByzanzQueueClass {
+  GObjectClass		object_class;
+};
+
+GType		byzanz_queue_get_type		(void) G_GNUC_CONST;
+
+ByzanzQueue *	byzanz_queue_new		(void);
+
+GOutputStream *	byzanz_queue_get_output_stream	(ByzanzQueue *	queue);
+GInputStream *	byzanz_queue_get_input_stream	(ByzanzQueue *	queue);
+
+
+#endif /* __HAVE_BYZANZ_QUEUE_H__ */
diff --git a/src/byzanzqueueinputstream.c b/src/byzanzqueueinputstream.c
new file mode 100644
index 0000000..0ca6731
--- /dev/null
+++ b/src/byzanzqueueinputstream.c
@@ -0,0 +1,257 @@
+/* desktop session recorder
+ * Copyright (C) 2009 Benjamin Otte <otte gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "byzanzqueueinputstream.h"
+
+G_DEFINE_TYPE (ByzanzQueueInputStream, byzanz_queue_input_stream, G_TYPE_INPUT_STREAM)
+
+static gboolean
+byzanz_queue_input_stream_close_input (ByzanzQueueInputStream *stream,
+				       GCancellable *	       cancellable,
+				       GError **               error)
+{
+  if (stream->input == NULL)
+    return TRUE;
+
+  if (!g_input_stream_close (stream->input, cancellable, error))
+    return FALSE;
+
+  g_object_unref (stream->input);
+  stream->input = NULL;
+  stream->input_bytes = 0;
+  return TRUE;
+}
+
+static void
+byzanz_queue_input_stream_dispose (GObject *object)
+{
+  ByzanzQueueInputStream *stream = BYZANZ_QUEUE_INPUT_STREAM (object);
+
+  if (g_atomic_int_dec_and_test (&stream->queue->shared_count)) {
+    stream->queue->input = NULL;
+    g_object_unref (stream->queue);
+  } else {
+    g_object_ref (stream);
+    return;
+  }
+
+  G_OBJECT_CLASS (byzanz_queue_input_stream_parent_class)->dispose (object);
+}
+
+static void
+byzanz_queue_input_stream_finalize (GObject *object)
+{
+  ByzanzQueueInputStream *stream = BYZANZ_QUEUE_INPUT_STREAM (object);
+
+  if (!byzanz_queue_input_stream_close_input (stream, NULL, NULL))
+    g_object_unref (stream->input);
+
+  G_OBJECT_CLASS (byzanz_queue_input_stream_parent_class)->finalize (object);
+}
+
+static gboolean
+byzanz_queue_input_stream_wait (ByzanzQueueInputStream *stream,
+			        GCancellable *		cancellable,
+				GError **		error)
+{
+  GPollFD fd;
+  guint n_fds;
+  
+  /* FIXME: Use a file monitor here */
+
+  /* Do the same thing that the UNIX tail program does: sleep a second */
+  n_fds = 0;
+  if (cancellable)
+    {
+      g_cancellable_make_pollfd (cancellable, &fd);
+      n_fds++;
+    }
+
+  g_poll (&fd, n_fds, 1000);
+
+  return !g_cancellable_set_error_if_cancelled (cancellable, error);
+}
+
+static gboolean
+byzanz_queue_input_stream_ensure_input (ByzanzQueueInputStream *stream,
+					GCancellable *          cancellable,
+					GError **               error)
+{
+  GFile *file;
+
+  if (stream->input_bytes >= BYZANZ_QUEUE_FILE_SIZE)
+    {
+      if (!byzanz_queue_input_stream_close_input (stream, cancellable, error))
+	return FALSE;
+      stream->input = NULL;
+    }
+
+  if (stream->input != NULL)
+    return TRUE;
+
+  g_async_queue_lock (stream->queue->files);
+  do {
+    file = g_async_queue_try_pop_unlocked (stream->queue->files);
+    if (file != NULL || stream->queue->output_closed)
+      break;
+    
+    g_async_queue_unlock (stream->queue->files);
+    if (!byzanz_queue_input_stream_wait (stream, cancellable, error))
+      return FALSE;
+    g_async_queue_lock (stream->queue->files);
+  
+  } while (TRUE);
+  g_async_queue_unlock (stream->queue->files);
+
+  if (file == NULL)
+    return TRUE;
+
+  stream->input = G_INPUT_STREAM (g_file_read (file, cancellable, error));
+  g_file_delete (file, NULL, NULL);
+  g_object_unref (file);
+
+  return stream->input != NULL;
+}
+
+static gssize
+byzanz_queue_input_stream_read (GInputStream *input_stream,
+				void *	      buffer,
+				gsize         count,
+				GCancellable *cancellable,
+				GError **     error)
+{
+  ByzanzQueueInputStream *stream = BYZANZ_QUEUE_INPUT_STREAM (input_stream);
+  gssize result;
+
+retry:
+  if (!byzanz_queue_input_stream_ensure_input (stream, cancellable, error))
+    return -1;
+
+  /* No more data to read from the queue */
+  if (stream->input == NULL)
+    return 0;
+
+  result = g_input_stream_read (stream->input, buffer, count, cancellable, error);
+  if (result == -1)
+    return -1;
+
+  /* no data in file. Let's wait for more. */
+  if (result == 0) {
+    if (!byzanz_queue_input_stream_wait (stream, cancellable, error))
+      return -1;
+    goto retry;
+  }
+
+  stream->input_bytes += result;
+  return result;
+}
+
+static gssize
+byzanz_queue_input_stream_skip (GInputStream *input_stream,
+				gsize         count,
+				GCancellable *cancellable,
+				GError **     error)
+{
+  ByzanzQueueInputStream *stream = BYZANZ_QUEUE_INPUT_STREAM (input_stream);
+  gssize result;
+
+retry:
+  if (!byzanz_queue_input_stream_ensure_input (stream, cancellable, error))
+    return -1;
+
+  /* No more data to read from the queue */
+  if (stream->input == NULL)
+    return 0;
+
+  result = g_input_stream_skip (stream->input, count, cancellable, error);
+  if (result == -1)
+    return -1;
+
+  /* no data in file. Let's wait for more. */
+  if (result == 0) {
+    if (!byzanz_queue_input_stream_wait (stream, cancellable, error))
+      return -1;
+    goto retry;
+  }
+
+  stream->input_bytes += result;
+  return result;
+}
+
+static gboolean
+byzanz_queue_input_stream_close (GInputStream * input_stream,
+				 GCancellable * cancellable,
+				 GError **      error)
+{
+  ByzanzQueueInputStream *stream = BYZANZ_QUEUE_INPUT_STREAM (input_stream);
+  GFile *file;
+
+  if (!byzanz_queue_input_stream_close_input (stream, cancellable, error))
+    return FALSE;
+
+  g_async_queue_lock (stream->queue->files);
+  stream->queue->input_closed = TRUE;
+  file = g_async_queue_try_pop_unlocked (stream->queue->files);
+  g_async_queue_unlock (stream->queue->files);
+
+  while (file) {
+    g_file_delete (file, NULL, NULL);
+    file = g_async_queue_try_pop (stream->queue->files);
+  }
+
+  return TRUE;
+}
+
+static void
+byzanz_queue_input_stream_class_init (ByzanzQueueInputStreamClass *klass)
+{
+  GObjectClass *object_class = G_OBJECT_CLASS (klass);
+  GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (klass);
+
+  object_class->dispose = byzanz_queue_input_stream_dispose;
+  object_class->finalize = byzanz_queue_input_stream_finalize;
+
+  input_stream_class->read_fn = byzanz_queue_input_stream_read;
+  input_stream_class->skip = byzanz_queue_input_stream_skip;
+  input_stream_class->close_fn = byzanz_queue_input_stream_close;
+  /* FIXME: implement async ops */
+}
+
+static void
+byzanz_queue_input_stream_init (ByzanzQueueInputStream *queue_input_stream)
+{
+}
+
+GInputStream *
+byzanz_queue_input_stream_new (ByzanzQueue *queue)
+{
+  ByzanzQueueInputStream *stream;
+
+  g_return_val_if_fail (BYZANZ_IS_QUEUE (queue), NULL);
+
+  stream = g_object_new (BYZANZ_TYPE_QUEUE_INPUT_STREAM, NULL);
+  stream->queue = queue;
+
+  return G_INPUT_STREAM (stream);
+}
+
diff --git a/src/byzanzqueueinputstream.h b/src/byzanzqueueinputstream.h
new file mode 100644
index 0000000..7dbd640
--- /dev/null
+++ b/src/byzanzqueueinputstream.h
@@ -0,0 +1,52 @@
+/* desktop session recorder
+ * Copyright (C) 2009 Benjamin Otte <otte gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "byzanzqueue.h"
+
+#ifndef __HAVE_BYZANZ_QUEUE_INPUT_STREAM_H__
+#define __HAVE_BYZANZ_QUEUE_INPUT_STREAM_H__
+
+typedef struct _ByzanzQueueInputStream ByzanzQueueInputStream;
+typedef struct _ByzanzQueueInputStreamClass ByzanzQueueInputStreamClass;
+
+#define BYZANZ_TYPE_QUEUE_INPUT_STREAM                    (byzanz_queue_input_stream_get_type())
+#define BYZANZ_IS_QUEUE_INPUT_STREAM(obj)                 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), BYZANZ_TYPE_QUEUE_INPUT_STREAM))
+#define BYZANZ_IS_QUEUE_INPUT_STREAM_CLASS(klass)         (G_TYPE_CHECK_CLASS_TYPE ((klass), BYZANZ_TYPE_QUEUE_INPUT_STREAM))
+#define BYZANZ_QUEUE_INPUT_STREAM(obj)                    (G_TYPE_CHECK_INSTANCE_CAST ((obj), BYZANZ_TYPE_QUEUE_INPUT_STREAM, ByzanzQueueInputStream))
+#define BYZANZ_QUEUE_INPUT_STREAM_CLASS(klass)            (G_TYPE_CHECK_CLASS_CAST ((klass), BYZANZ_TYPE_QUEUE_INPUT_STREAM, ByzanzQueueInputStreamClass))
+#define BYZANZ_QUEUE_INPUT_STREAM_GET_CLASS(obj)          (G_TYPE_INSTANCE_GET_CLASS ((obj), BYZANZ_TYPE_QUEUE_INPUT_STREAM, ByzanzQueueInputStreamClass))
+
+struct _ByzanzQueueInputStream {
+  GInputStream  	input_stream;
+
+  ByzanzQueue *		queue;		/* queue we belong to */
+  GInputStream *	input;		/* stream we're reading from or NULL if we need to open one */
+  goffset		input_bytes;	/* bytes we've already read from input */
+};
+
+struct _ByzanzQueueInputStreamClass {
+  GInputStreamClass	input_stream_class;
+};
+
+GType		byzanz_queue_input_stream_get_type		(void) G_GNUC_CONST;
+
+GInputStream *	byzanz_queue_input_stream_new			(ByzanzQueue *	queue);
+
+
+#endif /* __HAVE_BYZANZ_QUEUE_INPUT_STREAM_H__ */
diff --git a/src/byzanzqueueoutputstream.c b/src/byzanzqueueoutputstream.c
new file mode 100644
index 0000000..4c3d3b7
--- /dev/null
+++ b/src/byzanzqueueoutputstream.c
@@ -0,0 +1,179 @@
+/* desktop session recorder
+ * Copyright (C) 2009 Benjamin Otte <otte gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "byzanzqueueoutputstream.h"
+
+#include <unistd.h>
+
+G_DEFINE_TYPE (ByzanzQueueOutputStream, byzanz_queue_output_stream, G_TYPE_OUTPUT_STREAM)
+
+static void
+byzanz_queue_output_stream_dispose (GObject *object)
+{
+  ByzanzQueueOutputStream *stream = BYZANZ_QUEUE_OUTPUT_STREAM (object);
+
+  if (g_atomic_int_dec_and_test (&stream->queue->shared_count)) {
+    stream->queue->output = NULL;
+    g_object_unref (stream->queue);
+  } else {
+    g_object_ref (stream);
+    return;
+  }
+
+  G_OBJECT_CLASS (byzanz_queue_output_stream_parent_class)->dispose (object);
+}
+
+static void
+byzanz_queue_output_stream_finalize (GObject *object)
+{
+  ByzanzQueueOutputStream *stream = BYZANZ_QUEUE_OUTPUT_STREAM (object);
+
+  if (stream->output)
+    g_object_unref (stream->output);
+
+  G_OBJECT_CLASS (byzanz_queue_output_stream_parent_class)->finalize (object);
+}
+
+static gboolean
+byzanz_queue_output_stream_ensure_output (ByzanzQueueOutputStream *stream,
+					  GCancellable *           cancellable,
+					  GError **                error)
+{
+  GFile *file;
+
+  if (stream->output_bytes == 0 && stream->output)
+    {
+      if (!g_output_stream_close (stream->output, cancellable, error))
+	return FALSE;
+      g_object_unref (stream->output);
+      stream->output = NULL;
+    }
+
+  if (stream->output != NULL)
+    return TRUE;
+
+  g_async_queue_lock (stream->queue->files);
+  if (!stream->queue->input_closed) {
+    int fd;
+    char *filename;
+
+    fd = g_file_open_tmp ("byzanzcacheXXXXXX", &filename, error);
+    if (fd < 0) {
+      file = NULL;
+    } else {
+      close (fd);
+      file = g_file_new_for_path (filename);
+      g_free (filename);
+      g_object_ref (file);
+      g_async_queue_push_unlocked (stream->queue->files, file);
+    }
+  }
+  g_async_queue_unlock (stream->queue->files);
+
+  if (file == NULL)
+    return FALSE;
+
+  stream->output = G_OUTPUT_STREAM (g_file_append_to (file, G_FILE_CREATE_PRIVATE, cancellable, error));
+  g_object_unref (file);
+  if (stream->output == NULL)
+    return FALSE;
+
+  stream->output_bytes = BYZANZ_QUEUE_FILE_SIZE;
+  return TRUE;
+}
+
+static gssize
+byzanz_queue_output_stream_write (GOutputStream *output_stream,
+				  const void *   buffer,
+				  gsize          count,
+				  GCancellable * cancellable,
+				  GError **      error)
+{
+  ByzanzQueueOutputStream *stream = BYZANZ_QUEUE_OUTPUT_STREAM (output_stream);
+  gssize result;
+
+  if (!byzanz_queue_output_stream_ensure_output (stream, cancellable, error))
+    return -1;
+
+  /* will happen if input stream is closed, and there's no need to continue writing */
+  if (stream->output == NULL)
+    return count;
+
+  result = g_output_stream_write (stream->output, buffer, 
+      MIN (count, stream->output_bytes), cancellable, error);
+  if (result == -1)
+    return -1;
+
+  stream->output_bytes -= result;
+  return result;
+}
+
+static gboolean
+byzanz_queue_output_stream_close (GOutputStream *output_stream,
+                                  GCancellable * cancellable,
+                                  GError **	 error)
+{
+  ByzanzQueueOutputStream *stream = BYZANZ_QUEUE_OUTPUT_STREAM (output_stream);
+
+  if (stream->output &&
+      !g_output_stream_close (stream->output, cancellable, error))
+    return FALSE;
+
+  g_async_queue_lock (stream->queue->files);
+  stream->queue->output_closed = TRUE;
+  g_async_queue_unlock (stream->queue->files);
+  return TRUE;
+}
+
+static void
+byzanz_queue_output_stream_class_init (ByzanzQueueOutputStreamClass *klass)
+{
+  GObjectClass *object_class = G_OBJECT_CLASS (klass);
+  GOutputStreamClass *output_stream_class = G_OUTPUT_STREAM_CLASS (klass);
+
+  object_class->dispose = byzanz_queue_output_stream_dispose;
+  object_class->finalize = byzanz_queue_output_stream_finalize;
+
+  output_stream_class->write_fn = byzanz_queue_output_stream_write;
+  output_stream_class->close_fn = byzanz_queue_output_stream_close;
+  /* FIXME: implement async ops */
+}
+
+static void
+byzanz_queue_output_stream_init (ByzanzQueueOutputStream *queue_output_stream)
+{
+}
+
+GOutputStream *
+byzanz_queue_output_stream_new (ByzanzQueue *queue)
+{
+  ByzanzQueueOutputStream *stream;
+
+  g_return_val_if_fail (BYZANZ_IS_QUEUE (queue), NULL);
+
+  stream = g_object_new (BYZANZ_TYPE_QUEUE_OUTPUT_STREAM, NULL);
+  stream->queue = queue;
+
+  return G_OUTPUT_STREAM (stream);
+}
+
diff --git a/src/byzanzqueueoutputstream.h b/src/byzanzqueueoutputstream.h
new file mode 100644
index 0000000..c91a14c
--- /dev/null
+++ b/src/byzanzqueueoutputstream.h
@@ -0,0 +1,52 @@
+/* desktop session recorder
+ * Copyright (C) 2009 Benjamin Otte <otte gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Library General Public
+ * License as published by the Free Software Foundation; either
+ * version 3 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Library General Public License for more details.
+ *
+ * You should have received a copy of the GNU Library General Public
+ * License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include "byzanzqueue.h"
+
+#ifndef __HAVE_BYZANZ_QUEUE_OUTPUT_STREAM_H__
+#define __HAVE_BYZANZ_QUEUE_OUTPUT_STREAM_H__
+
+typedef struct _ByzanzQueueOutputStream ByzanzQueueOutputStream;
+typedef struct _ByzanzQueueOutputStreamClass ByzanzQueueOutputStreamClass;
+
+#define BYZANZ_TYPE_QUEUE_OUTPUT_STREAM                    (byzanz_queue_output_stream_get_type())
+#define BYZANZ_IS_QUEUE_OUTPUT_STREAM(obj)                 (G_TYPE_CHECK_INSTANCE_TYPE ((obj), BYZANZ_TYPE_QUEUE_OUTPUT_STREAM))
+#define BYZANZ_IS_QUEUE_OUTPUT_STREAM_CLASS(klass)         (G_TYPE_CHECK_CLASS_TYPE ((klass), BYZANZ_TYPE_QUEUE_OUTPUT_STREAM))
+#define BYZANZ_QUEUE_OUTPUT_STREAM(obj)                    (G_TYPE_CHECK_INSTANCE_CAST ((obj), BYZANZ_TYPE_QUEUE_OUTPUT_STREAM, ByzanzQueueOutputStream))
+#define BYZANZ_QUEUE_OUTPUT_STREAM_CLASS(klass)            (G_TYPE_CHECK_CLASS_CAST ((klass), BYZANZ_TYPE_QUEUE_OUTPUT_STREAM, ByzanzQueueOutputStreamClass))
+#define BYZANZ_QUEUE_OUTPUT_STREAM_GET_CLASS(obj)          (G_TYPE_INSTANCE_GET_CLASS ((obj), BYZANZ_TYPE_QUEUE_OUTPUT_STREAM, ByzanzQueueOutputStreamClass))
+
+struct _ByzanzQueueOutputStream {
+  GOutputStream		output_stream;
+
+  ByzanzQueue *		queue;		/* queue we belong to */
+  GOutputStream *	output;		/* stream we're writing to or %NULL if we need to open one */
+  goffset		output_bytes;	/* bytes we may still write to output */
+};
+
+struct _ByzanzQueueOutputStreamClass {
+  GOutputStreamClass	output_stream_class;
+};
+
+GType		byzanz_queue_output_stream_get_type		(void) G_GNUC_CONST;
+
+GOutputStream *	byzanz_queue_output_stream_new			(ByzanzQueue *	queue);
+
+
+#endif /* __HAVE_BYZANZ_QUEUE_OUTPUT_STREAM_H__ */
diff --git a/src/byzanzsession.c b/src/byzanzsession.c
index 356b9e5..d3f0c80 100644
--- a/src/byzanzsession.c
+++ b/src/byzanzsession.c
@@ -39,6 +39,7 @@
 
 #include "byzanzencoder.h"
 #include "byzanzrecorder.h"
+#include "byzanzserialize.h"
 
 /*** MAIN FUNCTIONS ***/
 
@@ -142,12 +143,12 @@ byzanz_session_encoder_notify_cb (ByzanzEncoder * encoder,
   } else if (g_str_equal (pspec->name, "error")) {
     const GError *error = byzanz_encoder_get_error (encoder);
 
+    /* Delete the file, it's broken after all. Don't throw errors if it fails though. */
+    g_file_delete (session->file, NULL, NULL);
+
     /* Cancellation is not an error, it's been requested via _abort() */
     if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CANCELLED))
       byzanz_session_set_error (session, error);
-
-    /* Delete the file, it's broken after all. Don't throw errors if it fails though. */
-    g_file_delete (session->file, NULL, NULL);
   }
 }
 
@@ -159,6 +160,23 @@ byzanz_session_recorder_notify_cb (ByzanzRecorder * recorder,
   g_object_notify (G_OBJECT (session), "recording");
 }
 
+static guint64
+byzanz_session_elapsed (ByzanzSession *session, const GTimeVal *tv)
+{
+  guint elapsed;
+
+  if (session->start_time.tv_sec == 0 && session->start_time.tv_usec == 0) {
+    session->start_time = *tv;
+    return 0;
+  }
+
+  elapsed = tv->tv_sec - session->start_time.tv_sec;
+  elapsed *= 1000;
+  elapsed += (tv->tv_usec - session->start_time.tv_usec) / 1000;
+
+  return elapsed;
+}
+
 static void
 byzanz_session_recorder_image_cb (ByzanzRecorder *  recorder,
                                   cairo_surface_t * surface,
@@ -166,10 +184,14 @@ byzanz_session_recorder_image_cb (ByzanzRecorder *  recorder,
                                   const GTimeVal *  tv,
                                   ByzanzSession *   session)
 {
-  if (session->encoder) {
-    byzanz_encoder_process (session->encoder, surface, region, tv);
-  } else {
-    g_warning ("FIXME: figure out what to do now");
+  GOutputStream *stream;
+  GError *error = NULL;
+
+  stream = byzanz_queue_get_output_stream (session->queue);
+  if (!byzanz_serialize (stream, byzanz_session_elapsed (session, tv), 
+          surface, region, session->cancellable, &error)) {
+    byzanz_session_set_error (session, error);
+    g_error_free (error);
   }
 }
 
@@ -197,6 +219,7 @@ byzanz_session_finalize (GObject *object)
   }
   g_object_unref (session->window);
   g_object_unref (session->file);
+  g_object_unref (session->queue);
 
   if (session->error)
     g_error_free (session->error);
@@ -220,14 +243,17 @@ byzanz_session_constructed (GObject *object)
   stream = G_OUTPUT_STREAM (g_file_replace (session->file, NULL, 
         FALSE, G_FILE_CREATE_REPLACE_DESTINATION, session->cancellable, &session->error));
   if (stream != NULL) {
-    session->encoder = byzanz_encoder_new (session->encoder_type, stream,
-        session->area.width, session->area.height, session->cancellable);
+    session->encoder = byzanz_encoder_new (session->encoder_type, 
+        byzanz_queue_get_input_stream (session->queue),
+        stream, session->cancellable);
     g_signal_connect (session->encoder, "notify", 
         G_CALLBACK (byzanz_session_encoder_notify_cb), session);
     g_object_unref (stream);
     if (byzanz_encoder_get_error (session->encoder))
       byzanz_session_set_error (session, byzanz_encoder_get_error (session->encoder));
   }
+  byzanz_serialize_header (byzanz_queue_get_output_stream (session->queue),
+      session->area.width, session->area.height, session->cancellable, &session->error);
 
   if (G_OBJECT_CLASS (byzanz_session_parent_class)->constructed)
     G_OBJECT_CLASS (byzanz_session_parent_class)->constructed (object);
@@ -271,6 +297,7 @@ static void
 byzanz_session_init (ByzanzSession *session)
 {
   session->cancellable = g_cancellable_new ();
+  session->queue = byzanz_queue_new ();
 }
 
 /**
@@ -318,13 +345,19 @@ byzanz_session_start (ByzanzSession *session)
 void
 byzanz_session_stop (ByzanzSession *session)
 {
+  GOutputStream *stream;
+  GError *error = NULL;
   GTimeVal tv;
 
   g_return_if_fail (BYZANZ_IS_SESSION (session));
 
-  if (session->encoder) {
-    g_get_current_time (&tv);
-    byzanz_encoder_close (session->encoder, &tv);
+  stream = byzanz_queue_get_output_stream (session->queue);
+  g_get_current_time (&tv);
+  if (!byzanz_serialize (stream, byzanz_session_elapsed (session, &tv), 
+          NULL, NULL, session->cancellable, &error) || 
+      !g_output_stream_close (stream, session->cancellable, &error)) {
+    byzanz_session_set_error (session, error);
+    g_error_free (error);
   }
 
   byzanz_recorder_set_recording (session->recorder, FALSE);
diff --git a/src/byzanzsession.h b/src/byzanzsession.h
index 2101a24..db95825 100644
--- a/src/byzanzsession.h
+++ b/src/byzanzsession.h
@@ -21,6 +21,7 @@
 #include <gtk/gtk.h>
 
 #include "byzanzencoder.h"
+#include "byzanzqueue.h"
 #include "byzanzrecorder.h"
 
 #ifndef __HAVE_BYZANZ_SESSION_H__
@@ -45,6 +46,8 @@ struct _ByzanzSession {
   GdkRectangle          area;           /* area of window to record */
   GdkWindow *           window;         /* window to record */
   GType                 encoder_type;   /* type of encoder to use */
+  ByzanzQueue *         queue;          /* queue we use as data cache */
+  GTimeVal              start_time;     /* when we started writing to queue */
 
   /* internal objects */
   GCancellable *        cancellable;    /* cancellable to use for aborting the session */
diff --git a/src/playback.c b/src/playback.c
index b2d29d5..b17195f 100644
--- a/src/playback.c
+++ b/src/playback.c
@@ -47,37 +47,6 @@ typedef struct {
   ByzanzEncoder *       encoder;
 } Operation;
 
-static gboolean
-add_more_data (gpointer data)
-{
-  Operation *op = data;
-  cairo_surface_t *surface;
-  GdkRegion *region;
-  GTimeVal tv;
-  guint64 elapsed;
-  GError *error = NULL;
-
-  if (!byzanz_deserialize (op->instream, &elapsed, &surface, &region, NULL, &error)) {
-    g_print ("%s\n", error->message);
-    g_error_free (error);
-    g_main_loop_quit (op->loop);
-    return FALSE;
-  }
-
-  tv.tv_sec = elapsed / 1000;
-  tv.tv_usec = (elapsed % 1000) * 1000;
-  if (surface) {
-    byzanz_encoder_process (op->encoder, surface, region, &tv);
-    cairo_surface_destroy (surface);
-    gdk_region_destroy (region);
-    return TRUE;
-  } else {
-    byzanz_encoder_close (op->encoder, &tv);
-    return FALSE;
-  }
-  return surface ? TRUE : FALSE;
-}
-
 static void
 encoder_notify (ByzanzEncoder *encoder, GParamSpec *pspec, Operation *op)
 {
@@ -98,7 +67,6 @@ main (int argc, char **argv)
   GOptionContext* context;
   GError *error = NULL;
   Operation op;
-  guint width, height;
   
   g_set_prgname (argv[0]);
 #ifdef GETTEXT_PACKAGE
@@ -137,11 +105,6 @@ main (int argc, char **argv)
     g_error_free (error);
     return 1;
   }
-  if (!byzanz_deserialize_header (op.instream, &width, &height, NULL, &error)) {
-    g_print ("%s\n", error->message);
-    g_error_free (error);
-    return 1;
-  }
   op.outstream = G_OUTPUT_STREAM (g_file_replace (op.outfile, NULL, 
         FALSE, G_FILE_CREATE_REPLACE_DESTINATION, NULL, &error));
   if (op.outstream == NULL) {
@@ -150,10 +113,9 @@ main (int argc, char **argv)
     return 1;
   }
   op.encoder = byzanz_encoder_new (byzanz_encoder_get_type_from_file (op.outfile),
-      op.outstream, width, height, NULL);
+      op.instream, op.outstream, NULL);
   
   g_signal_connect (op.encoder, "notify", G_CALLBACK (encoder_notify), &op);
-  g_idle_add (add_more_data, &op);
   
   g_main_loop_run (op.loop);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]