Patch: implementation of a stream cache



	Hi,

	Here's a patch implementing a simple stream cache in tinymail. The goal
is being able to cache some streams (specifically the remote uri images
referenced in email messages) into a local folder.

	We provide a simple interface (TnyStreamCache) that's the simple access
point for the cache services, and an implementation caching files to
disk (TnyFsStreamCache).

	The filesystem cache works very simple. When we request a cached file,
it offers a stream accessing the cache file, and if it's not available,
it begins fetching the file and the stream provides the data on time
it's being received.

	It's not very tested, but, as the implementation doesn't affect any
other tinymail part, I think it should be safe to commit (once typical
style/obvious issues are detected/fixed). I suppose the main issues that
I'll need to face are related to threads work ;) as usual.

The changelog would be:

* libtinymail/tny-shared.h:
        * Added stream cache classes and handlers
* libtinymail/tny-stream-cache.[ch]:
        * Interface for streams caches. It's mostly intended for image  
          remote streams.
* libtinymail/tny-fs-stream-cache.[ch]:
        * Implementation of TnyStreamCache interface using a local 
          folder for storage. This implementation tries to keep the
          size of the cached streams lower than 1 MB.
* libtinymail/tny-cached-file.[ch]:
        * Data object to store status information of currently cached 
          files. It also controls the process of fetching the remote
          streams, and creation of streams reading from cache.
* libtinymail/tny-cached-file-stream.[ch]:
        * Streams wrapping TnyFsStream, that access the cache files
          waiting when they're not completely fetched yet.
* libtinymail/Makefile.am:
        * Added stream cache implementation files.


-- 
José Dapena Paz <jdapena igalia com>
Igalia
Index: libtinymail/tny-cached-file.h
===================================================================
--- libtinymail/tny-cached-file.h	(revision 0)
+++ libtinymail/tny-cached-file.h	(revision 0)
@@ -0,0 +1,84 @@
+#ifndef TNY_FS_CACHED_FILE_H
+#define TNY_FS_CACHED_FILE_H
+
+/* libtinymail- The Tiny Mail base library
+ * Copyright (C) 2008 Jose Dapena Paz <jdapena igalia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <glib.h>
+#include <glib-object.h>
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <tny-fs-stream-cache.h>
+#include <tny-cached-file-stream.h>
+
+G_BEGIN_DECLS
+
+#define TNY_TYPE_CACHED_FILE                 (tny_cached_file_get_type ())
+#define TNY_CACHED_FILE(obj)             (G_TYPE_CHECK_INSTANCE_CAST ((obj), TNY_TYPE_CACHED_FILE, TnyCachedFile))
+#define TNY_CACHED_FILE_CLASS(vtable)    (G_TYPE_CHECK_CLASS_CAST ((vtable), TNY_TYPE_CACHED_FILE, TnyCachedFileClass))
+#define TNY_IS_CACHED_FILE(obj)          (G_TYPE_CHECK_INSTANCE_TYPE ((obj), TNY_TYPE_CACHED_FILE))
+#define TNY_IS_CACHED_FILE_CLASS(vtable) (G_TYPE_CHECK_CLASS_TYPE ((vtable), TNY_TYPE_CACHED_FILE))
+#define TNY_CACHED_FILE_GET_CLASS(inst)  (G_TYPE_INSTANCE_GET_CLASS ((inst), TNY_TYPE_CACHED_FILE, TnyCachedFileClass))
+
+#ifndef TNY_SHARED_H
+typedef struct _TnyCachedFile TnyCachedFile;
+typedef struct _TnyCachedFileClass TnyCachedFileClass;
+#endif
+
+struct _TnyCachedFile
+{
+	GObject parent;
+};
+
+struct _TnyCachedFileClass 
+{
+	GObjectClass parent;
+
+	/* virtuals */
+	gint64 (*get_expected_size) (TnyCachedFile *self);
+	const gchar * (*get_id) (TnyCachedFile *self);
+	void (*remove) (TnyCachedFile *self);
+	gboolean (*is_active) (TnyCachedFile *self);
+	gboolean (*is_finished) (TnyCachedFile *self);
+	time_t (*get_timestamp) (TnyCachedFile *self);
+	TnyStream * (*get_stream) (TnyCachedFile *self);
+	gint64 (*wait_fetchable) (TnyCachedFile *self, gint64 offset);
+	void (*unregister_stream) (TnyCachedFile *self, TnyCachedFileStream *stream);
+};
+
+GType  tny_cached_file_get_type (void);
+TnyCachedFile* tny_cached_file_new (TnyFsStreamCache *stream_cache, const gchar *id, gint64 expected_size, TnyStream *stream);
+
+gint64 tny_cached_file_get_expected_size (TnyCachedFile *self);
+const gchar *tny_cached_file_get_id (TnyCachedFile *self);
+void tny_cached_file_remove (TnyCachedFile *self);
+gboolean tny_cached_file_is_active (TnyCachedFile *self);
+gboolean tny_cached_file_is_finished (TnyCachedFile *self);
+TnyStream *tny_cached_file_get_stream (TnyCachedFile *self);
+time_t tny_cached_file_get_timestamp (TnyCachedFile *self);
+gint64 tny_cached_file_wait_fetchable (TnyCachedFile *self, gint64 offset);
+void tny_cached_file_unregister_stream (TnyCachedFile *self, TnyCachedFileStream *stream);
+
+G_END_DECLS
+
+#endif
+
Index: libtinymail/tny-fs-stream-cache.c
===================================================================
--- libtinymail/tny-fs-stream-cache.c	(revision 0)
+++ libtinymail/tny-fs-stream-cache.c	(revision 0)
@@ -0,0 +1,417 @@
+/* libtinymail - The Tiny Mail base library
+ * Copyright (C) 2008 Jose Dapena Paz <jdapena igalia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * TnyFsStreamCache:
+ *
+ * Stream cache storing cached streams in a folder as files.
+ *
+ * free-function: g_object_unref
+ **/
+
+#include <config.h>
+
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <glib/gstdio.h>
+
+#include <tny-fs-stream.h>
+#include <tny-fs-stream-cache.h>
+#include <tny-cached-file.h>
+
+static GObjectClass *parent_class = NULL;
+
+typedef struct _TnyFsStreamCachePriv TnyFsStreamCachePriv;
+
+struct _TnyFsStreamCachePriv
+{
+	gchar *path;
+	gint64 max_size;
+	gint64 current_size;
+	GHashTable *cached_files;
+};
+
+#define TNY_FS_STREAM_CACHE_GET_PRIVATE(o)	\
+	(G_TYPE_INSTANCE_GET_PRIVATE ((o), TNY_TYPE_FS_STREAM_CACHE, TnyFsStreamCachePriv))
+
+static void
+count_active_files_size (gchar *id, TnyCachedFile *cached_file, gint64* count)
+{
+	if (tny_cached_file_is_active (cached_file))
+		*count += tny_cached_file_get_expected_size (cached_file);
+}
+
+static void
+count_files_size (gchar *id, TnyCachedFile *cached_file, gint64* count)
+{
+	*count += tny_cached_file_get_expected_size (cached_file);
+}
+
+static gint64
+get_available_size (TnyFsStreamCache *self)
+{
+	TnyFsStreamCachePriv *priv;
+	guint64 active_files_size = 0;
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	g_hash_table_foreach (priv->cached_files, (GHFunc) count_active_files_size, &active_files_size);
+
+	/* if available size < 0 it means we shrinked cache and we need to drop files */
+	return priv->max_size - active_files_size;
+}
+
+static gint64
+get_free_space (TnyFsStreamCache *self)
+{
+	TnyFsStreamCachePriv *priv;
+	guint64 files_size = 0;
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	g_hash_table_foreach (priv->cached_files, (GHFunc) count_files_size, &files_size);
+
+	/* if available size < 0 it means we shrinked cache and we need to drop files */
+	return priv->max_size - files_size;
+}
+
+static void
+get_inactive_files_list (gchar *id, TnyCachedFile *cached_file, GList **list)
+{
+	if (!tny_cached_file_is_active (cached_file)) {
+		*list = g_list_prepend (*list, g_object_ref (cached_file));
+	}
+}
+
+static gint 
+remove_priority (TnyCachedFile *cf1, TnyCachedFile *cf2)
+{
+	gint result = 0;
+
+	/* IMPORTANT: priority is lower = before */
+
+	/* non finished has priority to be removed */
+	result = ((gint) (!tny_cached_file_is_finished (cf2))) - ((gint) (!tny_cached_file_is_finished (cf1)));
+	
+	if (result == 0) {
+		/* older has priority to be removed */
+		result = tny_cached_file_get_timestamp (cf1) - tny_cached_file_get_timestamp (cf2);
+	}
+
+	return result;
+}
+
+static void
+remove_old_files (TnyFsStreamCache *self, gint64 required_size)
+{
+
+	/* 1. we obtain a list of non active files
+	 * 2. we sort the list (first uncomplete, then old)
+	 * 3. we get items in list and remove them until we have the required size
+	 */
+	GList *cached_files_list;
+	TnyFsStreamCachePriv *priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+	gint64 available_size;
+
+	/* 1. we obtain a list of non active files */
+	cached_files_list = NULL;
+	g_hash_table_foreach (priv->cached_files, (GHFunc) get_inactive_files_list, &cached_files_list);
+
+	/* 2. we sort the list (first uncomplete, then old) */
+	cached_files_list = g_list_sort (cached_files_list, (GCompareFunc) remove_priority);
+
+	/* 3. we get items in list and remove them until we have the required size */
+	available_size = get_free_space (self);
+	while (available_size < required_size) {
+		TnyCachedFile *cached_file = (TnyCachedFile *) cached_files_list->data;
+		available_size += tny_cached_file_get_expected_size (cached_file);
+		g_hash_table_remove (priv->cached_files, tny_cached_file_get_id (cached_file));
+		tny_cached_file_remove (cached_file);
+		cached_files_list = g_list_delete_link (cached_files_list, cached_files_list);
+		g_object_unref (cached_file);
+	}
+
+	g_list_foreach (cached_files_list, (GFunc) g_object_unref, NULL);
+	g_list_free (cached_files_list);
+	
+}
+
+static TnyStream *
+tny_fs_stream_cache_get_stream (TnyStreamCache *self, const gchar *id,
+				TnyStreamCacheOpenStreamFetcher fetcher, gpointer userdata)
+{
+	TnyStream *result = NULL;
+	TnyFsStreamCachePriv *priv;
+	TnyCachedFile *cached_file;
+
+	g_return_val_if_fail (TNY_IS_FS_STREAM_CACHE (self), NULL);
+	g_return_val_if_fail (id, NULL);
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	cached_file = g_hash_table_lookup (priv->cached_files, id);
+	if (cached_file) {
+		result = tny_cached_file_get_stream (cached_file);
+	} else {
+		TnyStream *input_stream;
+		gint64 expected_size = 0;
+
+		input_stream = fetcher (self, &expected_size, userdata);
+
+		if (input_stream == NULL)
+			return NULL;
+
+		if (expected_size <= get_available_size (TNY_FS_STREAM_CACHE (self))) {
+			remove_old_files (TNY_FS_STREAM_CACHE (self), expected_size);
+			cached_file = tny_cached_file_new (TNY_FS_STREAM_CACHE (self), id, expected_size, input_stream);
+			g_hash_table_insert (priv->cached_files, g_strdup (id), cached_file);
+			result = tny_cached_file_get_stream (cached_file);
+		}
+	}
+
+	return result;
+}
+
+static gint64
+tny_fs_stream_cache_get_max_size (TnyStreamCache *self)
+{
+	TnyFsStreamCachePriv *priv;
+	
+	g_return_val_if_fail (TNY_IS_FS_STREAM_CACHE (self), 0);
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	return priv->max_size;
+}
+
+static void
+tny_fs_stream_cache_set_max_size (TnyStreamCache *self, gint64 max_size)
+{
+	TnyFsStreamCachePriv *priv;
+
+	g_return_if_fail (TNY_IS_FS_STREAM_CACHE (self));
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	priv->max_size = max_size;
+	
+	/* if se shrink the cache, then we have to drop old files */
+	remove_old_files (TNY_FS_STREAM_CACHE (self), 0);
+}
+
+typedef struct {
+	TnyFsStreamCache *self;
+	TnyStreamCacheRemoveFilter filter;
+	gpointer userdata;
+} RemoveFilterData;
+
+static gboolean
+remove_filter (gchar *id, TnyCachedFile *cached_file, RemoveFilterData *remove_filter_data)
+{
+	gboolean result;
+
+	result = remove_filter_data->filter (TNY_STREAM_CACHE (remove_filter_data->self), id, remove_filter_data->userdata);
+	if (result) {
+		tny_cached_file_remove (cached_file);
+	}
+	return result;
+}
+
+static void
+tny_fs_stream_cache_remove (TnyStreamCache *self, TnyStreamCacheRemoveFilter filter, gpointer userdata)
+{
+	TnyFsStreamCachePriv *priv;
+	RemoveFilterData *remove_filter_data;
+
+	g_return_if_fail (TNY_IS_FS_STREAM_CACHE (self));
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	remove_filter_data = g_slice_new (RemoveFilterData);
+	remove_filter_data->self = TNY_FS_STREAM_CACHE (self);
+	remove_filter_data->filter = filter;
+	remove_filter_data->userdata = userdata;
+
+	g_hash_table_foreach_remove (priv->cached_files, (GHRFunc) remove_filter, (gpointer) remove_filter_data);
+
+	g_slice_free (RemoveFilterData, remove_filter_data);
+}
+
+static void
+fill_from_cache (TnyFsStreamCache *self)
+{
+	TnyFsStreamCachePriv *priv;
+	GDir *dir;
+	const gchar *filename;
+
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+	dir = g_dir_open (priv->path, 0, NULL);
+
+	while ((filename = g_dir_read_name (dir)) != NULL) {
+		gchar *fullname;
+
+		fullname = g_build_filename (priv->path, filename, NULL);
+		if (g_str_has_suffix (filename, ".tmp")) {
+			g_unlink (fullname);
+		} else {
+			TnyCachedFile *cached_file;
+
+			cached_file = tny_cached_file_new (self, filename, 0, NULL);
+			priv->current_size += tny_cached_file_get_expected_size (cached_file);
+			g_hash_table_insert (priv->cached_files, g_strdup (filename), cached_file);
+		}
+	}
+
+	g_dir_close (dir);
+}				
+
+/**
+ * tny_fs_stream_cache_new:
+ * @path: a folder path
+ *
+ * Creates a stream cache that stores cached streams as files in a filesystem
+ * folder specified by @path.
+ *
+ * returns: (caller-owns): a new #TnyStream instance
+ * since: 1.0
+ * audience: application-developer
+ **/
+TnyStreamCache*
+tny_fs_stream_cache_new (const char *path, guint64 max_size)
+{
+	TnyFsStreamCachePriv *priv;
+	TnyFsStreamCache *self;
+
+	self = g_object_new (TNY_TYPE_FS_STREAM_CACHE, NULL);
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	priv->path = g_strdup (path);
+	priv->current_size = 0;
+	priv->max_size = max_size;
+	if (!g_file_test (path, G_FILE_TEST_IS_DIR)) {
+		g_mkdir_with_parents (path, 0777);
+	}
+	fill_from_cache (TNY_FS_STREAM_CACHE (self));
+	return TNY_STREAM_CACHE (self);
+}
+
+const gchar *
+tny_fs_stream_cache_get_path (TnyFsStreamCache *self)
+{
+	TnyFsStreamCachePriv *priv;
+
+	g_return_val_if_fail (TNY_IS_FS_STREAM_CACHE (self), NULL);
+	priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	return priv->path;
+	
+}
+
+static void
+tny_fs_stream_cache_instance_init (GTypeInstance *instance, gpointer g_class)
+{
+	TnyFsStreamCache *self = (TnyFsStreamCache *)instance;
+	TnyFsStreamCachePriv *priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	priv->path = NULL;
+	priv->max_size = 0;
+	priv->cached_files = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, g_object_unref);
+
+	return;
+}
+
+static void
+tny_fs_stream_cache_finalize (GObject *object)
+{
+	TnyFsStreamCache *self = (TnyFsStreamCache *)object;
+	TnyFsStreamCachePriv *priv = TNY_FS_STREAM_CACHE_GET_PRIVATE (self);
+
+	if (priv->path) {
+		g_free (priv->path);
+		priv->path = NULL;
+	}
+	(*parent_class->finalize) (object);
+	return;
+}
+
+
+static void
+tny_stream_cache_init (gpointer g, gpointer iface_data)
+{
+	TnyStreamCacheIface *klass = (TnyStreamCacheIface *)g;
+
+	klass->get_stream = tny_fs_stream_cache_get_stream;
+	klass->remove = tny_fs_stream_cache_remove;
+	klass->set_max_size = tny_fs_stream_cache_set_max_size;
+	klass->get_max_size = tny_fs_stream_cache_get_max_size;
+
+	return;
+}
+
+static void 
+tny_fs_stream_cache_class_init (TnyFsStreamCacheClass *class)
+{
+	GObjectClass *object_class;
+
+	parent_class = g_type_class_peek_parent (class);
+	object_class = (GObjectClass*) class;
+	object_class->finalize = tny_fs_stream_cache_finalize;
+	g_type_class_add_private (object_class, sizeof (TnyFsStreamCachePriv));
+
+	return;
+}
+
+static gpointer
+tny_fs_stream_cache_register_type (gpointer notused)
+{
+	GType type = 0;
+
+	static const GTypeInfo info = 
+		{
+			sizeof (TnyFsStreamCacheClass),
+			NULL,   /* base_init */
+			NULL,   /* base_finalize */
+			(GClassInitFunc) tny_fs_stream_cache_class_init,   /* class_init */
+			NULL,   /* class_finalize */
+			NULL,   /* class_data */
+			sizeof (TnyFsStreamCache),
+			0,      /* n_preallocs */
+			tny_fs_stream_cache_instance_init,   /* instance_init */
+			NULL
+		};
+	
+	static const GInterfaceInfo tny_stream_cache_info = 
+		{
+			(GInterfaceInitFunc) tny_stream_cache_init, /* interface_init */
+			NULL,         /* interface_finalize */
+			NULL          /* interface_data */
+		};
+	
+	type = g_type_register_static (G_TYPE_OBJECT,
+				       "TnyFsStreamCache",
+				       &info, 0);
+
+	g_type_add_interface_static (type, TNY_TYPE_STREAM_CACHE, 
+				     &tny_stream_cache_info);
+	
+	return GUINT_TO_POINTER (type);
+}
+
+GType 
+tny_fs_stream_cache_get_type (void)
+{
+	static GOnce once = G_ONCE_INIT;
+	g_once (&once, tny_fs_stream_cache_register_type, NULL);
+	return GPOINTER_TO_UINT (once.retval);
+}
Index: libtinymail/tny-fs-stream-cache.h
===================================================================
--- libtinymail/tny-fs-stream-cache.h	(revision 0)
+++ libtinymail/tny-fs-stream-cache.h	(revision 0)
@@ -0,0 +1,63 @@
+#ifndef TNY_FS_STREAM_CACHE_H
+#define TNY_FS_STREAM_CACHE_H
+
+/* libtinymail- The Tiny Mail base library
+ * Copyright (C) 2008 Jose Dapena Paz <jdapena igalia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <glib.h>
+#include <glib-object.h>
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <tny-stream-cache.h>
+
+G_BEGIN_DECLS
+
+#define TNY_TYPE_FS_STREAM_CACHE             (tny_fs_stream_cache_get_type ())
+#define TNY_FS_STREAM_CACHE(obj)             (G_TYPE_CHECK_INSTANCE_CAST ((obj), TNY_TYPE_FS_STREAM_CACHE, TnyFsStreamCache))
+#define TNY_FS_STREAM_CACHE_CLASS(vtable)    (G_TYPE_CHECK_CLASS_CAST ((vtable), TNY_TYPE_FS_STREAM_CACHE, TnyFsStreamCacheClass))
+#define TNY_IS_FS_STREAM_CACHE(obj)          (G_TYPE_CHECK_INSTANCE_TYPE ((obj), TNY_TYPE_FS_STREAM_CACHE))
+#define TNY_IS_FS_STREAM_CACHE_CLASS(vtable) (G_TYPE_CHECK_CLASS_TYPE ((vtable), TNY_TYPE_FS_STREAM_CACHE))
+#define TNY_FS_STREAM_CACHE_GET_CLASS(inst)  (G_TYPE_INSTANCE_GET_CLASS ((inst), TNY_TYPE_FS_STREAM_CACHE, TnyFsStreamCacheClass))
+
+#ifndef TNY_SHARED_H
+typedef struct _TnyFsStreamCache TnyFsStreamCache;
+typedef struct _TnyFsStreamCacheClass TnyFsStreamCacheClass;
+#endif
+
+struct _TnyFsStreamCache
+{
+	GObject parent;
+};
+
+struct _TnyFsStreamCacheClass 
+{
+	GObjectClass parent;
+};
+
+GType  tny_fs_stream_cache_get_type (void);
+TnyStreamCache* tny_fs_stream_cache_new (const gchar *path, guint64 max_size);
+const gchar *tny_fs_stream_cache_get_path (TnyFsStreamCache *path);
+
+G_END_DECLS
+
+#endif
+
Index: libtinymail/tny-shared.h
===================================================================
--- libtinymail/tny-shared.h	(revision 3699)
+++ libtinymail/tny-shared.h	(working copy)
@@ -104,6 +104,14 @@
 typedef struct _TnyConnectionPolicyIface TnyConnectionPolicyIface;
 typedef struct _TnySeekable TnySeekable;
 typedef struct _TnySeekableIface TnySeekableIface;
+typedef struct _TnyStreamCache TnyStreamCache;
+typedef struct _TnyStreamCacheIface TnyStreamCacheIface;
+typedef struct _TnyFsStreamCache TnyFsStreamCache;
+typedef struct _TnyFsStreamCacheClass TnyFsStreamCacheClass;
+typedef struct _TnyCachedFile TnyCachedFile;
+typedef struct _TnyCachedFileClass TnyCachedFileClass;
+typedef struct _TnyCachedFileStream TnyCachedFileStream;
+typedef struct _TnyCachedFileStreamClass TnyCachedFileStreamClass;
 
 
 /** 
@@ -127,6 +135,9 @@
 typedef void (*TnyFolderCallback) (TnyFolder *self, gboolean cancelled, GError *err, gpointer user_data);
 typedef void (*TnyGetHeadersCallback) (TnyFolder *self, gboolean cancelled, TnyList *headers, GError *err, gpointer user_data);
 
+typedef TnyStream* (*TnyStreamCacheOpenStreamFetcher) (TnyStreamCache *self, gint64 *expected_size, gpointer userdata);
+typedef gboolean (*TnyStreamCacheRemoveFilter) (TnyStreamCache *self, const gchar *id, gpointer userdata);
+
 /** 
  * TnyGetMsgCallback:
  * @folder: a #TnyFolder that caused the callback
Index: libtinymail/tny-cached-file-stream.c
===================================================================
--- libtinymail/tny-cached-file-stream.c	(revision 0)
+++ libtinymail/tny-cached-file-stream.c	(revision 0)
@@ -0,0 +1,295 @@
+/* libtinymail - The Tiny Mail base library
+ * Copyright (C) 2006-2007 Philip Van Hoof <pvanhoof gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * TnyCachedFileStream:
+ *
+ * A stream that accesses a cached file stream taking into account when
+ * specific parts are available.
+ *
+ * free-function: g_object_unref
+ **/
+
+#include <config.h>
+
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+
+#include <tny-cached-file-stream.h>
+
+static GObjectClass *parent_class = NULL;
+static GTypeInterface *parent_stream_iface = NULL;
+static GTypeInterface *parent_seekable_iface = NULL;
+
+typedef struct _TnyCachedFileStreamPriv TnyCachedFileStreamPriv;
+
+struct _TnyCachedFileStreamPriv
+{
+	TnyCachedFile *cached_file;
+	gssize position;
+	GMutex *position_mutex;
+};
+
+#define TNY_CACHED_FILE_STREAM_GET_PRIVATE(o)	\
+	(G_TYPE_INSTANCE_GET_PRIVATE ((o), TNY_TYPE_CACHED_FILE_STREAM, TnyCachedFileStreamPriv))
+
+
+static gssize
+tny_cached_file_stream_read (TnyStream *self, char *buffer, gsize n)
+{
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+	gssize nread;
+	gint64 fetchable;
+
+	fetchable = tny_cached_file_wait_fetchable (priv->cached_file, priv->position);
+
+	if (fetchable < priv->position) {
+		return -1;
+	} else {
+		n = MIN (fetchable - priv->position, n);
+		nread = (*((TnyStreamIface *) parent_stream_iface)->read) (self, buffer, n);
+		g_mutex_lock (priv->position_mutex);
+		if (priv->position == -1) {
+			nread = -1;
+		} else {
+			priv->position += nread;
+		}
+		g_mutex_unlock (priv->position_mutex);
+	}
+	return nread;
+}
+
+static gssize
+tny_cached_file_stream_write (TnyStream *self, const char *buffer, gsize n)
+{
+	g_warning (  "You can't use the tny_stream_write API on cached file "
+		     "streams. This problem indicates a bug in the software");
+
+	return -1;
+}
+
+
+static gint
+tny_cached_file_stream_close (TnyStream *self)
+{
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+	gboolean unregister = FALSE;
+
+	g_mutex_lock (priv->position_mutex);
+	if (priv->position > -1) {
+		priv->position = -1;
+		unregister = TRUE;
+	}
+	g_mutex_unlock (priv->position_mutex);
+
+	if (unregister) {
+		tny_cached_file_unregister_stream (priv->cached_file, TNY_CACHED_FILE_STREAM (self));
+	}
+
+	return (*((TnyStreamIface *) parent_stream_iface)->close) (self);
+}
+
+/**
+ * tny_cached_file_stream_new:
+ * @cached_file: a #TnyCachedFile
+ * @fd: The file descriptor to write to or read from
+ *
+ * returns: (caller-owns): a new #TnyStream instance
+ * since: 1.0
+ * audience: tinymail-developer
+ **/
+TnyStream*
+tny_cached_file_stream_new (TnyCachedFile *cached_file, gint fd)
+{
+	TnyCachedFileStream *self = g_object_new (TNY_TYPE_CACHED_FILE_STREAM, NULL);
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+	tny_fs_stream_set_fd (TNY_FS_STREAM (self), fd);
+	priv->cached_file = g_object_ref (cached_file);
+	return TNY_STREAM (self);
+}
+
+static void
+tny_cached_file_stream_instance_init (GTypeInstance *instance, gpointer g_class)
+{
+	TnyCachedFileStream *self = (TnyCachedFileStream *)instance;
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+
+	priv->position = 0;
+	priv->position_mutex = g_mutex_new ();
+
+	return;
+}
+
+static void
+tny_cached_file_stream_finalize (GObject *object)
+{
+	TnyCachedFileStream *self = (TnyCachedFileStream *)object;
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+
+	g_mutex_free (priv->position_mutex);
+	priv->position = -1;
+
+	(*parent_class->finalize) (object);
+	return;
+}
+
+static gboolean 
+tny_cached_file_stream_is_eos (TnyStream *self)
+{
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+
+	return (priv->position >= tny_cached_file_get_expected_size (TNY_CACHED_FILE (priv->cached_file)));
+}
+
+static gint 
+tny_cached_file_stream_reset (TnyStream *self)
+{
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+	priv->position = 0;
+
+	return (*((TnyStreamIface *) parent_stream_iface)->reset) (self);
+}
+
+static off_t 
+tny_cached_file_seek (TnySeekable *self, off_t offset, int policy)
+{
+	TnyCachedFileStreamPriv *priv = TNY_CACHED_FILE_STREAM_GET_PRIVATE (self);
+	off_t real = 0;
+	gint64 final_pos;
+
+	g_mutex_lock (priv->position_mutex);
+	switch (policy) {
+	case SEEK_SET:
+		real = offset;
+		break;
+	case SEEK_CUR:
+		g_mutex_lock (priv->position_mutex);
+		real = priv->position + offset;
+		g_mutex_unlock (priv->position_mutex);
+		break;
+	case SEEK_END:
+		real = tny_cached_file_get_expected_size (priv->cached_file);
+		break;
+	}
+
+	final_pos = tny_cached_file_wait_fetchable (priv->cached_file, real);
+	if (final_pos == -1)
+		return -1;
+
+	final_pos = (*((TnySeekableIface *) parent_seekable_iface)->seek) (self, offset, policy);
+	g_mutex_lock (priv->position_mutex);
+	priv->position = MIN (priv->position, final_pos);
+	final_pos = priv->position;
+	g_mutex_unlock (priv->position_mutex);
+
+	return final_pos;
+}
+
+static void
+tny_stream_init (gpointer g, gpointer iface_data)
+{
+	TnyStreamIface *klass = (TnyStreamIface *)g;
+
+	klass->is_eos= tny_cached_file_stream_is_eos;
+	klass->read= tny_cached_file_stream_read;
+	klass->write= tny_cached_file_stream_write;
+	klass->close= tny_cached_file_stream_close;
+
+	return;
+}
+
+static void
+tny_seekable_init (gpointer g, gpointer iface_data)
+{
+	TnySeekableIface *klass = (TnySeekableIface *)g;
+
+	klass->seek= tny_cached_file_seek;
+
+	return;
+}
+
+static void 
+tny_cached_file_stream_class_init (TnyCachedFileStreamClass *class)
+{
+	GObjectClass *object_class;
+
+	parent_class = g_type_class_peek_parent (class);
+	parent_stream_iface = g_type_interface_peek (parent_class, TNY_TYPE_STREAM);
+	parent_seekable_iface = g_type_interface_peek (parent_class, TNY_TYPE_SEEKABLE);
+	object_class = (GObjectClass*) class;
+	object_class->finalize = tny_cached_file_stream_finalize;
+	g_type_class_add_private (object_class, sizeof (TnyCachedFileStreamPriv));
+
+	return;
+}
+
+static gpointer
+tny_cached_file_stream_register_type (gpointer notused)
+{
+	GType type = 0;
+
+	static const GTypeInfo info = 
+		{
+			sizeof (TnyCachedFileStreamClass),
+			NULL,   /* base_init */
+			NULL,   /* base_finalize */
+			(GClassInitFunc) tny_cached_file_stream_class_init,   /* class_init */
+			NULL,   /* class_finalize */
+			NULL,   /* class_data */
+			sizeof (TnyCachedFileStream),
+			0,      /* n_preallocs */
+			tny_cached_file_stream_instance_init,   /* instance_init */
+			NULL
+		};
+	
+	static const GInterfaceInfo tny_stream_info = 
+		{
+			(GInterfaceInitFunc) tny_stream_init, /* interface_init */
+			NULL,         /* interface_finalize */
+			NULL          /* interface_data */
+		};
+	
+	static const GInterfaceInfo tny_seekable_info = 
+		{
+			(GInterfaceInitFunc) tny_seekable_init, /* interface_init */
+			NULL,         /* interface_finalize */
+			NULL          /* interface_data */
+		};
+	
+	type = g_type_register_static (TNY_TYPE_FS_STREAM,
+				       "TnyCachedFileStream",
+				       &info, 0);
+
+	g_type_add_interface_static (type, TNY_TYPE_STREAM, 
+				     &tny_stream_info);
+	
+	g_type_add_interface_static (type, TNY_TYPE_SEEKABLE, 
+				     &tny_seekable_info);
+	
+	return GUINT_TO_POINTER (type);
+}
+
+GType 
+tny_cached_file_stream_get_type (void)
+{
+	static GOnce once = G_ONCE_INIT;
+	g_once (&once, tny_cached_file_stream_register_type, NULL);
+	return GPOINTER_TO_UINT (once.retval);
+}
Index: libtinymail/tny-cached-file-stream.h
===================================================================
--- libtinymail/tny-cached-file-stream.h	(revision 0)
+++ libtinymail/tny-cached-file-stream.h	(revision 0)
@@ -0,0 +1,63 @@
+#ifndef TNY_CACHED_FILE_STREAM_H
+#define TNY_CACHED_FILE_STREAM_H
+
+/* libtinymail- The Tiny Mail base library
+ * Copyright (C) 2006-2007 Philip Van Hoof <pvanhoof gnome org>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <glib.h>
+#include <glib-object.h>
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#include <tny-fs-stream.h>
+#include <tny-cached-file.h>
+
+G_BEGIN_DECLS
+
+#define TNY_TYPE_CACHED_FILE_STREAM             (tny_cached_file_stream_get_type ())
+#define TNY_CACHED_FILE_STREAM(obj)             (G_TYPE_CHECK_INSTANCE_CAST ((obj), TNY_TYPE_CACHED_FILE_STREAM, TnyCachedFileStream))
+#define TNY_CACHED_FILE_STREAM_CLASS(vtable)    (G_TYPE_CHECK_CLASS_CAST ((vtable), TNY_TYPE_CACHED_FILE_STREAM, TnyCachedFileStreamClass))
+#define TNY_IS_CACHED_FILE_STREAM(obj)          (G_TYPE_CHECK_INSTANCE_TYPE ((obj), TNY_TYPE_CACHED_FILE_STREAM))
+#define TNY_IS_CACHED_FILE_STREAM_CLASS(vtable) (G_TYPE_CHECK_CLASS_TYPE ((vtable), TNY_TYPE_CACHED_FILE_STREAM))
+#define TNY_CACHED_FILE_STREAM_GET_CLASS(inst)  (G_TYPE_INSTANCE_GET_CLASS ((inst), TNY_TYPE_CACHED_FILE_STREAM, TnyCachedFileStreamClass))
+
+#ifndef TNY_SHARED_H
+typedef struct _TnyCachedFileStream TnyCachedFileStream;
+typedef struct _TnyCachedFileStreamClass TnyCachedFileStreamClass;
+#endif
+
+struct _TnyCachedFileStream
+{
+	TnyFsStream parent;
+};
+
+struct _TnyCachedFileStreamClass 
+{
+	TnyFsStreamClass parent;
+};
+
+GType  tny_cached_file_stream_get_type (void);
+TnyStream* tny_cached_file_stream_new (TnyCachedFile *cached_file, int fd);
+
+G_END_DECLS
+
+#endif
+
Index: libtinymail/tny-stream-cache.c
===================================================================
--- libtinymail/tny-stream-cache.c	(revision 0)
+++ libtinymail/tny-stream-cache.c	(revision 0)
@@ -0,0 +1,173 @@
+/* libtinymail - The Tiny Mail base library
+ * Copyright (C) 2008 Jose Dapena Paz <jdapena igalia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * TnyStreamCache:
+ *
+ * A file-oriented cache for streams.
+ *
+ * free-function: g_object_unref
+ **/
+
+#include <config.h>
+#include <tny-stream-cache.h>
+
+/**
+ * tny_stream_cache_get_stream:
+ * @self: a #TnyStreamCache
+ * @id: a stringthe unique id used to identify the stream
+ * @fetcher: a #TnyStreamCacheOpenStreamFetcher, function to obtain a
+ *  TnyStream for fetching the file in case it's not in cache.
+ * @destroy_userdata: a #GDestroyNotify, that frees @userdata once we used it
+ * @userdata a #gpointer
+ * 
+ * Obtain a #TnyStream with the file referred with @id from cache. If it's
+ * not in cache, it sets up a stream resource to fetch the file from internet
+ * and store in cache while sending information to the user.
+ *
+ * Returns: a #TnyStream
+ * since: 1.0
+ * audience: application-developer
+ */
+TnyStream *
+tny_stream_cache_get_stream (TnyStreamCache *self, const gchar *id,
+			     TnyStreamCacheOpenStreamFetcher fetcher, gpointer userdata)
+{
+#ifdef DBC /* require */
+	g_assert (TNY_IS_STREAM_CACHE (self));
+	g_assert (id);
+	g_assert (expected_size > 0);
+	g_assert (TNY_STREAM_CACHE_GET_IFACE (self)->get_stream!= NULL);
+#endif
+
+	return TNY_STREAM_CACHE_GET_IFACE (self)->get_stream(self, id, fetcher, userdata);
+}
+
+/**
+ * tny_stream_cache_set_max_size:
+ * @self: a #TnyStreamCache
+ * @max_size: a #gint64
+ *
+ * sets the new maximum size the cache can host.
+ *
+ * since: 1.0
+ * audience: application-developer
+ */
+void
+tny_stream_cache_set_max_size (TnyStreamCache *self, gint64 max_size)
+{
+#ifdef DBC /* require */
+	g_assert (TNY_IS_STREAM_CACHE (self));
+	g_assert (TNY_STREAM_CACHE_GET_IFACE (self)->set_max_size != NULL);
+#endif
+
+	TNY_STREAM_CACHE_GET_IFACE (self)->set_max_size(self, max_size);
+
+	return;
+}
+
+/**
+ * tny_stream_cache_get_max_size:
+ * @self: a #TnyStreamCache
+ *
+ * obtain the maximum size the cache can host.
+ *
+ * Returns: a #gint64
+ *
+ * since: 1.0
+ * audience: application-developer
+ */
+gint64
+tny_stream_cache_get_max_size (TnyStreamCache *self)
+{
+#ifdef DBC /* require */
+	g_assert (TNY_IS_STREAM_CACHE (self));
+	g_assert (TNY_STREAM_CACHE_GET_IFACE (self)->get_max_size != NULL);
+#endif
+
+	return TNY_STREAM_CACHE_GET_IFACE (self)->get_max_size(self);
+}
+
+/**
+ * tny_stream_cache_remove:
+ * @self: a #TnyStreamCache
+ * @filter: a #TnyStreamCacheRemoveFilter
+ * @userdata: a #gpointer
+ * 
+ * Removes all the cached files matching filter.
+ *
+ * since: 1.0
+ * audience: application-developer
+ */
+void
+tny_stream_cache_remove (TnyStreamCache *self, TnyStreamCacheRemoveFilter filter, gpointer userdata)
+{
+#ifdef DBC /* require */
+	g_assert (TNY_IS_STREAM_CACHE (self));
+	g_assert (filter);
+	g_assert (TNY_STREAM_CACHE_GET_IFACE (self)->remove!= NULL);
+#endif
+
+	TNY_STREAM_CACHE_GET_IFACE (self)->remove(self, filter, userdata);
+	return;
+}
+
+
+static void
+tny_stream_cache_base_init (gpointer g_class)
+{
+	static gboolean initialized = FALSE;
+
+	if (!initialized) {
+		/* create interface signals here. */
+		initialized = TRUE;
+	}
+}
+
+static gpointer
+tny_stream_cache_register_type (gpointer notused)
+{
+	GType type = 0;
+
+	static const GTypeInfo info = 
+		{
+			sizeof (TnyStreamCacheIface),
+			tny_stream_cache_base_init,   /* base_init */
+			NULL,   /* base_finalize */
+			NULL,   /* class_init */
+			NULL,   /* class_finalize */
+			NULL,   /* class_data */
+			0,
+			0,      /* n_preallocs */
+			NULL,   /* instance_init */
+			NULL
+		};
+	type = g_type_register_static (G_TYPE_INTERFACE, 
+				       "TnyCacheStream", &info, 0);
+	
+	return GUINT_TO_POINTER (type);
+}
+
+GType
+tny_stream_cache_get_type (void)
+{
+	static GOnce once = G_ONCE_INIT;
+	g_once (&once, tny_stream_cache_register_type, NULL);
+	return GPOINTER_TO_UINT (once.retval);
+}
Index: libtinymail/tny-stream-cache.h
===================================================================
--- libtinymail/tny-stream-cache.h	(revision 0)
+++ libtinymail/tny-stream-cache.h	(revision 0)
@@ -0,0 +1,66 @@
+#ifndef TNY_STREAM_CACHE_H
+#define TNY_STREAM_CACHE_H
+
+/* libtinymail- The Tiny Mail base library
+ * Copyright (C) 2008 Jose Dapena Paz <jdapena igalia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#include <glib.h>
+#include <glib-object.h>
+
+#include <fcntl.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <tny-stream.h>
+#include <tny-shared.h>
+
+G_BEGIN_DECLS
+
+#define TNY_TYPE_STREAM_CACHE             (tny_stream_cache_get_type ())
+#define TNY_STREAM_CACHE(obj)             (G_TYPE_CHECK_INSTANCE_CAST ((obj), TNY_TYPE_STREAM_CACHE, TnyStreamCache))
+#define TNY_IS_STREAM_CACHE(obj)          (G_TYPE_CHECK_INSTANCE_TYPE ((obj), TNY_TYPE_STREAM_CACHE))
+#define TNY_STREAM_CACHE_GET_IFACE(inst)  (G_TYPE_INSTANCE_GET_INTERFACE ((inst), TNY_TYPE_STREAM_CACHE, TnyStreamCacheIface))
+
+#ifndef TNY_SHARED_H
+typedef struct _TnyStreamCache TnyStreamCache;
+typedef struct _TnyStreamCacheIface TnyStreamCacheIface;
+#endif
+
+struct _TnyStreamCacheIface 
+{
+	GTypeInterface parent;
+	
+	TnyStream*       (*get_stream)   (TnyStreamCache *self, const char *id,
+					  TnyStreamCacheOpenStreamFetcher fetcher, gpointer userdata);
+	gint64           (*get_max_size) (TnyStreamCache *self);
+	void             (*set_max_size) (TnyStreamCache *self, gint64 max_size);
+	void             (*remove)       (TnyStreamCache *self, TnyStreamCacheRemoveFilter filter, gpointer data);
+};
+
+GType      tny_stream_cache_get_type (void);
+
+TnyStream* tny_stream_cache_get_stream (TnyStreamCache *cache, const char *id,
+					TnyStreamCacheOpenStreamFetcher fetcher, gpointer userdata);
+void       tny_stream_cache_remove (TnyStreamCache *cache, TnyStreamCacheRemoveFilter filter, gpointer userdata);
+void       tny_stream_cache_set_max_size (TnyStreamCache *cache, gint64 max_size);
+gint64    tny_stream_cache_get_max_size (TnyStreamCache *cache);
+
+G_END_DECLS
+
+#endif
+
Index: libtinymail/Makefile.am
===================================================================
--- libtinymail/Makefile.am	(revision 3699)
+++ libtinymail/Makefile.am	(working copy)
@@ -49,6 +49,10 @@
 	tny-send-queue.h \
 	tny-password-getter.h \
 	tny-merge-folder.h \
+	tny-stream-cache.h \
+	tny-fs-stream-cache.h \
+	tny-cached-file.h \
+	tny-cached-file-stream.h \
 	tny-combined-account.h \
 	tny-connection-policy.h 
 
@@ -91,6 +95,10 @@
 	tny-msg-receive-strategy.c \
 	tny-send-queue.c \
 	tny-password-getter.c \
+	tny-stream-cache.c \
+	tny-fs-stream-cache.c \
+	tny-cached-file.c \
+	tny-cached-file-stream.c \
 	tny-merge-folder.c \
 	tny-combined-account.c \
 	tny-idle-stopper.c \
Index: libtinymail/tny-cached-file.c
===================================================================
--- libtinymail/tny-cached-file.c	(revision 0)
+++ libtinymail/tny-cached-file.c	(revision 0)
@@ -0,0 +1,566 @@
+/* libtinymail - The Tiny Mail base library
+ * Copyright (C) 2008 Jose Dapena Paz <jdapena igalia com>
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.	 See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with self library; if not, write to the
+ * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
+ */
+
+/**
+ * TnyCachedFile:
+ *
+ * Stream cache storing cached streams in a folder as files.
+ *
+ * free-function: g_object_unref
+ **/
+
+#include <config.h>
+
+#include <string.h>
+#include <stdio.h>
+#include <errno.h>
+#include <glib/gstdio.h>
+
+#include <tny-fs-stream.h>
+#include <tny-fs-stream-cache.h>
+#include <tny-cached-file.h>
+
+static GObjectClass *parent_class = NULL;
+
+typedef struct _TnyCachedFilePriv TnyCachedFilePriv;
+
+struct _TnyCachedFilePriv
+{
+	/* stream cache the file belongs to */
+	TnyStreamCache *stream_cache;
+	/* id in cache of the file */
+	gchar *id;
+
+	/* expected size the file will get */
+	gint64 expected_size;
+	/* timestamp of last access to stream */
+	time_t timestamp;
+	
+	/* Input stream we use to feed the cache */
+	TnyStream *fetch_stream;
+	/* Currently fetched size in cache */
+	gint64 fetched_size;
+	/* is the stream invalid? Due to errors reading, or closing the stream */
+	gboolean invalid;
+	/* Close is pending when we run remove and we're still fetching the
+	 * stream */
+	gboolean pending_close;
+	/* mutex and gcond for fetched size current size */
+	GMutex *fetched_mutex;
+	GCond *fetched_cond;
+
+	/* active read operations */
+	gint active_ops;
+	GMutex *active_ops_mutex;
+
+};
+
+#define TNY_CACHED_FILE_GET_PRIVATE(o)	\
+	(G_TYPE_INSTANCE_GET_PRIVATE ((o), TNY_TYPE_CACHED_FILE, TnyCachedFilePriv))
+
+gint64
+tny_cached_file_get_expected_size (TnyCachedFile *self)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->get_expected_size (self);
+}
+
+static gint64
+tny_cached_file_get_expected_size_default (TnyCachedFile *self)
+{
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	return priv->expected_size;
+}
+
+const gchar *
+tny_cached_file_get_id (TnyCachedFile *self)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->get_id (self);
+}
+
+static const gchar *
+tny_cached_file_get_id_default (TnyCachedFile *self)
+{
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	return priv->id;
+}
+
+gboolean
+tny_cached_file_is_active (TnyCachedFile *self)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->is_active (self);
+}
+
+static gboolean
+tny_cached_file_is_active_default (TnyCachedFile *self)
+{
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+	gboolean retval;
+
+	g_mutex_lock (priv->active_ops_mutex);
+	retval = (priv->active_ops > 0);
+	g_mutex_unlock (priv->active_ops_mutex);
+	
+	return retval;
+}
+
+gboolean
+tny_cached_file_is_finished (TnyCachedFile *self)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->is_finished (self);
+}
+
+static gboolean
+tny_cached_file_is_finished_default (TnyCachedFile *self)
+{
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	return (priv->fetch_stream == NULL);
+}
+
+time_t
+tny_cached_file_get_timestamp (TnyCachedFile *self)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->get_timestamp (self);
+}
+
+static time_t
+tny_cached_file_get_timestamp_default (TnyCachedFile *self)
+{
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	if (priv->active_ops > 0)
+		priv->timestamp = time (NULL);
+
+	return priv->timestamp;
+}
+
+void
+tny_cached_file_remove (TnyCachedFile *self)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->remove (self);
+}
+
+static void
+tny_cached_file_remove_default (TnyCachedFile *self)
+{
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+	
+	g_return_if_fail (priv->active_ops == 0);
+
+	priv->invalid = TRUE;
+	if (priv->fetch_stream) {
+		priv->pending_close = TRUE;
+	} else {
+		gchar *fullname;
+
+		fullname = g_build_filename (tny_fs_stream_cache_get_path (TNY_FS_STREAM_CACHE (priv->stream_cache)), priv->id, NULL);
+		g_unlink (fullname);
+		g_free (fullname);
+		priv->invalid = TRUE;
+	}
+}
+
+static gint
+get_new_fd_of_stream (TnyCachedFile *self, gboolean creation, gboolean temp)
+{
+	TnyCachedFilePriv *priv;
+	gint new_fd;
+	gint flags;
+	gchar *filename;
+	gchar *fullpath;
+
+	priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+	flags = O_RDWR;
+	if (creation) {
+		flags = O_CREAT | O_RDWR;
+	} else {
+		flags |= O_RDONLY;
+	}
+
+	if (temp) {
+		filename = g_strconcat (priv->id, ".tmp", NULL);
+	} else {
+		filename = g_strdup (priv->id);
+	}
+	fullpath = g_build_filename (tny_fs_stream_cache_get_path (TNY_FS_STREAM_CACHE (priv->stream_cache)),
+				     filename, NULL);
+	g_free (filename);
+
+	new_fd = g_open (fullpath, flags, 0660);
+	g_free (fullpath);
+
+	return new_fd;
+}
+
+TnyStream *
+tny_cached_file_get_stream (TnyCachedFile *self)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->get_stream (self);
+}
+
+static TnyStream *
+tny_cached_file_get_stream_default (TnyCachedFile *self)
+{
+	TnyStream *stream;
+	TnyCachedFilePriv *priv;
+	gint new_fd;
+
+	priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	if (priv->invalid)
+		return NULL;
+
+	new_fd = get_new_fd_of_stream (self, FALSE, priv->fetch_stream != FALSE);
+	lseek (new_fd, 0, SEEK_SET);
+	g_mutex_lock (priv->active_ops_mutex);
+	priv->active_ops ++;
+	g_mutex_unlock (priv->active_ops_mutex);
+
+	stream = tny_cached_file_stream_new (self, new_fd);
+
+	return stream;
+}
+
+gint64
+tny_cached_file_wait_fetchable (TnyCachedFile *self, gint64 offset)
+{
+	return TNY_CACHED_FILE_GET_CLASS (self)->wait_fetchable (self, offset);
+}
+
+static gint64
+tny_cached_file_wait_fetchable_default (TnyCachedFile *self, gint64 offset)
+{
+	TnyCachedFilePriv *priv;
+	gint64 retval;
+
+	priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	g_mutex_lock (priv->fetched_mutex);
+
+	while ((!priv->invalid) && (offset > priv->fetched_size)) {
+		g_cond_wait(priv->fetched_cond, priv->fetched_mutex);
+	}
+
+	if (priv->invalid) {
+		retval = -1;
+	} else {
+		retval = priv->fetched_size;
+	}
+
+	g_mutex_unlock (priv->fetched_mutex);
+
+	return retval;
+}
+
+void
+tny_cached_file_unregister_stream (TnyCachedFile *self, TnyCachedFileStream *stream)
+{
+	TNY_CACHED_FILE_GET_CLASS (self)->unregister_stream (self, stream);	
+}
+
+static void
+tny_cached_file_unregister_stream_default (TnyCachedFile *self, TnyCachedFileStream *stream)
+{
+	TnyCachedFilePriv *priv;
+
+	priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	g_mutex_lock (priv->active_ops_mutex);
+	priv->active_ops --;
+	g_mutex_unlock (priv->active_ops_mutex);
+}
+
+
+typedef struct {
+	TnyCachedFile *self;
+	TnyStream *write_stream;
+} AsyncFetchStreamData;
+
+static gpointer
+async_fetch_stream (gpointer userdata)
+{
+	AsyncFetchStreamData *afs_data = (AsyncFetchStreamData *) userdata;
+	TnyCachedFile *self;
+	TnyCachedFilePriv *priv;
+	TnyStream *write_stream;
+	int write_fd;
+
+	self = afs_data->self;
+	priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	/* First we dump the read stream contents to file cache */
+	write_fd = get_new_fd_of_stream (self, TRUE, TRUE);
+	lseek (write_fd, 0, SEEK_SET);
+	write_stream = tny_fs_stream_new (write_fd);
+	while (!tny_stream_is_eos (priv->fetch_stream)) {
+		gssize readed, written;
+		char buffer[1024];
+
+		readed = tny_stream_read (priv->fetch_stream, buffer, 1024);
+		if (readed == -1 || priv->pending_close) {
+			g_mutex_lock (priv->fetched_mutex);
+			priv->invalid = TRUE;
+			g_cond_broadcast (priv->fetched_cond);
+			g_mutex_unlock (priv->fetched_mutex);
+			break;
+		}
+		written = 0;
+		while (written < readed) {
+			gssize written_now;
+
+			written_now = tny_stream_write (write_stream, buffer + written, readed - written);
+			if (written_now == -1 || priv->pending_close) {
+				g_mutex_lock (priv->fetched_mutex);
+				priv->invalid = TRUE;
+				g_cond_broadcast (priv->fetched_cond);
+				g_mutex_unlock (priv->fetched_mutex);
+				break;
+			}
+			written += written_now;
+		}
+		tny_stream_flush (write_stream);
+		g_mutex_lock (priv->fetched_mutex);
+		priv->fetched_size += written;
+		g_cond_broadcast (priv->fetched_cond);
+		g_mutex_unlock (priv->fetched_mutex);
+	}
+
+	tny_stream_close (priv->fetch_stream);
+	g_object_unref (priv->fetch_stream);
+	priv->fetch_stream = NULL;
+
+	if (!priv->invalid) {
+		gchar *tmp_filename;
+		gchar *old_name;
+		gchar *new_name;
+
+		tmp_filename = g_strconcat (priv->id, ".tmp", NULL);
+		old_name = g_build_filename (tny_fs_stream_cache_get_path (TNY_FS_STREAM_CACHE (priv->stream_cache)), tmp_filename, NULL);
+		new_name = g_build_filename (tny_fs_stream_cache_get_path (TNY_FS_STREAM_CACHE (priv->stream_cache)), priv->id, NULL);
+		g_free (tmp_filename);
+
+		if (g_rename (old_name, new_name) == -1) {
+			g_mutex_lock (priv->fetched_mutex);
+			priv->invalid = TRUE;
+			g_cond_broadcast (priv->fetched_cond);
+			g_mutex_unlock (priv->fetched_mutex);
+		}
+	}
+
+	tny_stream_close (write_stream);
+	g_object_unref (write_stream);
+
+	if (priv->invalid) {
+		gchar *tmp_filename;
+		gchar *fullname;
+
+		tmp_filename = g_strconcat (priv->id, ".tmp", NULL);
+		fullname = g_build_filename (tny_fs_stream_cache_get_path (TNY_FS_STREAM_CACHE (priv->stream_cache)), tmp_filename, NULL);
+		g_unlink (fullname);
+	} else {
+		priv->expected_size = priv->fetched_size;
+	}
+
+	g_slice_free (AsyncFetchStreamData, afs_data);
+
+	return NULL;
+}
+
+/**
+ * tny_cached_file_new:
+ * @stream_cache: the cache the file will be added to
+ * @id: the stream unique-id
+ * @fetcher: a #TnyStreamCacheOpenStreamFetcher
+ * @userdata: a #gpointer
+ *
+ * Creates a cached file information object. This object maintains the stream
+ * that fetches the file @id into @stream_cache, and gives valid TnySeekable
+ * streams for users.
+ *
+ * If @fetcher and @userdata are %NULL, then it assumes the file is already in 
+ * @stream_cache
+ *
+ * returns: (caller-owns): a new #TnyCachedFile instance
+ * since: 1.0
+ * audience: application-developer
+ **/
+TnyCachedFile*
+tny_cached_file_new (TnyFsStreamCache *stream_cache, const char *id, gint64 expected_size, TnyStream *input_stream)
+{
+	TnyCachedFile *result;
+	TnyCachedFilePriv *priv;
+
+	g_return_val_if_fail (id, NULL);
+	g_return_val_if_fail (expected_size >= 0, NULL);
+	g_return_val_if_fail (TNY_IS_FS_STREAM_CACHE (stream_cache), NULL);
+
+	result = g_object_new (TNY_TYPE_CACHED_FILE, NULL);
+	priv = TNY_CACHED_FILE_GET_PRIVATE (result);
+
+	priv->expected_size = expected_size;
+	priv->id = g_strdup (id);
+	priv->stream_cache = g_object_ref (stream_cache);
+
+	if (input_stream) {
+		AsyncFetchStreamData *afs_data;
+
+		priv->fetch_stream = g_object_ref (input_stream);
+
+		/* we begin retrieval of the stream to disk. This should be cancellable */
+		afs_data = g_slice_new0 (AsyncFetchStreamData);
+		afs_data->self = g_object_ref (result);
+		g_thread_create (async_fetch_stream, afs_data, FALSE, NULL);
+	} else {
+		gchar *fullpath;
+		struct stat filestat;
+		fullpath = g_build_filename (tny_fs_stream_cache_get_path (TNY_FS_STREAM_CACHE (priv->stream_cache)),
+					     priv->id, NULL);
+		if (g_stat (fullpath, &filestat)== -1) {
+			g_free (fullpath);
+			g_object_unref (result);
+			return NULL;
+		}
+		priv->timestamp = filestat.st_atime;
+		priv->expected_size = priv->fetched_size = filestat.st_size;
+		g_free (fullpath);
+	}
+
+	return result;
+}
+
+static void
+tny_cached_file_instance_init (GTypeInstance *instance, gpointer g_class)
+{
+	TnyCachedFile *self = (TnyCachedFile *)instance;
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	priv->expected_size = 0;
+	priv->id = NULL;
+	priv->active_ops = 0;
+	priv->active_ops_mutex = g_mutex_new ();
+	priv->fetch_stream = NULL;
+	priv->stream_cache = NULL;
+	priv->timestamp = 0;
+	priv->pending_close = FALSE;
+	priv->invalid = FALSE;
+	priv->fetched_size = 0;
+	priv->fetched_mutex = g_mutex_new ();
+	priv->fetched_cond = g_cond_new ();
+
+	return;
+}
+
+static void
+tny_cached_file_finalize (GObject *object)
+{
+	TnyCachedFile *self = (TnyCachedFile *)object;
+	TnyCachedFilePriv *priv = TNY_CACHED_FILE_GET_PRIVATE (self);
+
+	if (priv->fetch_stream) {
+		g_object_unref (priv->fetch_stream);
+		priv->fetch_stream = NULL;
+	}
+
+	if (priv->id) {
+		g_free (priv->id);
+		priv->id = NULL;
+	}
+
+	if (priv->stream_cache) {
+		g_object_unref (priv->stream_cache);
+		priv->stream_cache = NULL;
+	}
+
+	if (priv->active_ops_mutex) {
+		g_mutex_free (priv->active_ops_mutex);
+		priv->active_ops_mutex = NULL;
+	}
+
+	if (priv->fetched_mutex) {
+		g_mutex_free (priv->fetched_mutex);
+		priv->fetched_mutex = NULL;
+	}
+
+	if (priv->fetched_cond) {
+		g_cond_free (priv->fetched_cond);
+		priv->fetched_cond = NULL;
+	}
+
+	(*parent_class->finalize) (object);
+	return;
+}
+
+
+static void 
+tny_cached_file_class_init (TnyCachedFileClass *class)
+{
+	GObjectClass *object_class;
+
+	parent_class = g_type_class_peek_parent (class);
+	object_class = (GObjectClass*) class;
+	object_class->finalize = tny_cached_file_finalize;
+	class->get_expected_size = tny_cached_file_get_expected_size_default;
+	class->get_id = tny_cached_file_get_id_default;
+	class->remove = tny_cached_file_remove_default;
+	class->get_stream = tny_cached_file_get_stream_default;
+	class->is_finished = tny_cached_file_is_finished_default;
+	class->is_active = tny_cached_file_is_active_default;
+	class->wait_fetchable = tny_cached_file_wait_fetchable_default;
+	class->get_timestamp = tny_cached_file_get_timestamp_default;
+	class->unregister_stream = tny_cached_file_unregister_stream_default;
+	g_type_class_add_private (object_class, sizeof (TnyCachedFilePriv));
+
+	return;
+}
+
+static gpointer
+tny_cached_file_register_type (gpointer notused)
+{
+	GType type = 0;
+
+	static const GTypeInfo info = 
+		{
+			sizeof (TnyCachedFileClass),
+			NULL,   /* base_init */
+			NULL,   /* base_finalize */
+			(GClassInitFunc) tny_cached_file_class_init,   /* class_init */
+			NULL,   /* class_finalize */
+			NULL,   /* class_data */
+			sizeof (TnyCachedFile),
+			0,      /* n_preallocs */
+			tny_cached_file_instance_init,   /* instance_init */
+			NULL
+		};
+	
+	type = g_type_register_static (G_TYPE_OBJECT,
+				       "TnyCachedFile",
+				       &info, 0);
+
+	return GUINT_TO_POINTER (type);
+}
+
+GType 
+tny_cached_file_get_type (void)
+{
+	static GOnce once = G_ONCE_INIT;
+	g_once (&once, tny_cached_file_register_type, NULL);
+	return GPOINTER_TO_UINT (once.retval);
+}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]