[libsoup] SoupCacheInputStream: new input stream filter that writes to the cache



commit d12d25e3515b329f65da8f276965fe46c7c628ac
Author: Sergio Villar Senin <svillar igalia com>
Date:   Mon Jul 30 12:40:32 2012 +0200

    SoupCacheInputStream: new input stream filter that writes to the cache
    
    The SoupCacheInputStream will be added to the stream stack as any other
    GPollableInputStream. It will transparently read data from the underlying
    stream and will pass it unmodified to the upper level.
    
    Apart from that the stream writes everything it reads to a local file. Once
    the caching finishes a callback will be called. Caching may be cancelled at
    any point by providing a GCancellable.
    
    https://bugzilla.gnome.org/show_bug.cgi?id=682112

 libsoup/Makefile.am               |    2 +
 libsoup/soup-cache-input-stream.c |  333 +++++++++++++++++++++++++++++++++++++
 libsoup/soup-cache-input-stream.h |   52 ++++++
 po/POTFILES.in                    |    1 +
 4 files changed, 388 insertions(+), 0 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index 87629ed..59553f0 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -114,6 +114,8 @@ libsoup_2_4_la_SOURCES =		\
 	soup-body-output-stream.h	\
 	soup-body-output-stream.c	\
 	soup-cache.c			\
+	soup-cache-input-stream.h	\
+	soup-cache-input-stream.c	\
 	soup-cache-private.h		\
 	soup-client-input-stream.h	\
 	soup-client-input-stream.c	\
diff --git a/libsoup/soup-cache-input-stream.c b/libsoup/soup-cache-input-stream.c
new file mode 100644
index 0000000..a44652a
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.c
@@ -0,0 +1,333 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2012 Igalia, S.L.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <glib/gi18n-lib.h>
+#include "soup-cache-input-stream.h"
+#include "soup-message-body.h"
+
+static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+						soup_cache_input_stream_pollable_init))
+
+/* properties */
+enum {
+	PROP_0,
+
+	PROP_OUTPUT_STREAM,
+
+	LAST_PROP
+};
+
+struct _SoupCacheInputStreamPrivate
+{
+	GOutputStream *output_stream;
+	gsize bytes_written;
+
+	gboolean read_finished;
+	SoupBuffer *current_writing_buffer;
+	GQueue *buffer_queue;
+
+	GTask *task;
+};
+
+static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
+
+static inline void
+notify_and_clear (SoupCacheInputStream *istream, GError *error)
+{
+	SoupCacheInputStreamPrivate *priv = istream->priv;
+
+	if (error)
+		g_task_return_error (priv->task, error);
+	else
+		g_task_return_int (priv->task, priv->bytes_written);
+
+	g_clear_object (&priv->output_stream);
+	g_clear_object (&priv->task);
+}
+
+gsize
+soup_cache_input_stream_cache_finish (SoupCacheInputStream  *istream,
+				      GAsyncResult          *result,
+				      GError               **error)
+{
+	return g_task_propagate_int (G_TASK (result), error);
+}
+
+static inline void
+try_write_next_buffer (SoupCacheInputStream *istream)
+{
+	SoupCacheInputStreamPrivate *priv = istream->priv;
+
+	if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
+		soup_cache_input_stream_write_next_buffer (istream);
+	else if (priv->read_finished)
+		notify_and_clear (istream, NULL);
+	else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
+		GError *error = NULL;
+		g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+				     _("Network stream unexpectedly closed"));
+		notify_and_clear (istream, error);
+	}
+}
+
+static void
+file_replaced_cb (GObject      *source,
+		  GAsyncResult *res,
+		  gpointer      user_data)
+{
+	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
+	SoupCacheInputStreamPrivate *priv = istream->priv;
+	GError *error = NULL;
+
+	priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
+
+	if (error)
+		g_task_return_error (priv->task, error);
+	else
+		try_write_next_buffer (istream);
+}
+
+void
+soup_cache_input_stream_cache (SoupCacheInputStream  *istream,
+			       GFile                 *file,
+			       GCancellable          *cancellable,
+			       GAsyncReadyCallback    callback,
+			       gpointer               user_data)
+{
+	SoupCacheInputStreamPrivate *priv = istream->priv;
+
+	priv->task = g_task_new (istream, cancellable, callback, user_data);
+
+	g_file_replace_async (file, NULL, FALSE,
+			      G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
+			      G_PRIORITY_LOW, cancellable, file_replaced_cb, istream);
+}
+
+static void
+soup_cache_input_stream_init (SoupCacheInputStream *self)
+{
+	SoupCacheInputStreamPrivate *priv =
+		G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM,
+					     SoupCacheInputStreamPrivate);
+
+	priv->buffer_queue = g_queue_new ();
+	self->priv = priv;
+}
+
+static void
+soup_cache_input_stream_get_property (GObject *object,
+				      guint property_id, GValue *value, GParamSpec *pspec)
+{
+	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+	SoupCacheInputStreamPrivate *priv = self->priv;
+
+	switch (property_id) {
+	case PROP_OUTPUT_STREAM:
+		g_value_set_object (value, priv->output_stream);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+		break;
+	}
+}
+
+static void
+soup_cache_input_stream_set_property (GObject *object,
+				      guint property_id, const GValue *value, GParamSpec *pspec)
+{
+	SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+	SoupCacheInputStreamPrivate *priv = self->priv;
+
+	switch (property_id) {
+	case PROP_OUTPUT_STREAM:
+		priv->output_stream = g_value_dup_object (value);
+		break;
+	default:
+		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+		break;
+	}
+}
+
+static void
+soup_cache_input_stream_dispose (GObject *object)
+{
+	SoupCacheInputStreamPrivate *priv = SOUP_CACHE_INPUT_STREAM (object)->priv;
+
+	g_clear_object (&priv->output_stream);
+	g_clear_object (&priv->task);
+
+	G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->dispose (object);
+}
+
+static void
+soup_cache_input_stream_finalize (GObject *object)
+{
+	SoupCacheInputStream *self = (SoupCacheInputStream *)object;
+	SoupCacheInputStreamPrivate *priv = self->priv;
+
+	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+	g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
+
+	G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
+}
+
+static void
+write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
+{
+	GOutputStream *ostream = G_OUTPUT_STREAM (source);
+	SoupCacheInputStreamPrivate *priv = istream->priv;
+	gssize write_size;
+	gsize pending;
+	GError *error = NULL;
+
+	write_size = g_output_stream_write_finish (ostream, result, &error);
+	if (error) {
+		notify_and_clear (istream, error);
+		g_object_unref (istream);
+		return;
+	}
+
+	/* Check that we have written everything */
+	pending = priv->current_writing_buffer->length - write_size;
+	if (pending) {
+		SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
+								   write_size, pending);
+		g_queue_push_head (priv->buffer_queue, subbuffer);
+	}
+
+	priv->bytes_written += write_size;
+	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+
+	try_write_next_buffer (istream);
+	g_object_unref (istream);
+}
+
+static void
+soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
+{
+	SoupCacheInputStreamPrivate *priv = istream->priv;
+	SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
+	int priority;
+
+	g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
+	g_assert (priv->task);
+
+	g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+	priv->current_writing_buffer = buffer;
+
+	if (priv->buffer_queue->length > 10)
+		priority = G_PRIORITY_DEFAULT;
+	else
+		priority = G_PRIORITY_LOW;
+
+	g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
+				     priority, g_task_get_cancellable (priv->task),
+				     (GAsyncReadyCallback) write_ready_cb,
+				     g_object_ref (istream));
+}
+
+static gssize
+read_internal (GInputStream  *stream,
+	       void          *buffer,
+	       gsize          count,
+	       gboolean       blocking,
+	       GCancellable  *cancellable,
+	       GError       **error)
+{
+	SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
+	SoupCacheInputStreamPrivate *priv = istream->priv;
+	GInputStream *base_stream;
+	gssize nread;
+
+	base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
+	nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
+					cancellable, error);
+
+	if (G_UNLIKELY (nread == -1 || priv->read_finished || !priv->task))
+		return nread;
+
+	if (nread == 0) {
+		priv->read_finished = TRUE;
+
+		if (priv->current_writing_buffer == NULL && priv->output_stream)
+			notify_and_clear (istream, NULL);
+	} else {
+		SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
+		g_queue_push_tail (priv->buffer_queue, soup_buffer);
+
+		if (priv->current_writing_buffer == NULL && priv->output_stream)
+			soup_cache_input_stream_write_next_buffer (istream);
+	}
+
+	return nread;
+}
+
+static gssize
+soup_cache_input_stream_read_fn (GInputStream  *stream,
+				 void          *buffer,
+				 gsize          count,
+				 GCancellable  *cancellable,
+				 GError       **error)
+{
+	return read_internal (stream, buffer, count, TRUE,
+			      cancellable, error);
+}
+
+static gssize
+soup_cache_input_stream_read_nonblocking (GPollableInputStream  *stream,
+					  void                  *buffer,
+					  gsize                  count,
+					  GError               **error)
+{
+	return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
+			      NULL, error);
+}
+
+static void
+soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+				       gpointer interface_data)
+{
+	pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
+}
+
+static void
+soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
+{
+	GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+	GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+	g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate));
+
+	gobject_class->get_property = soup_cache_input_stream_get_property;
+	gobject_class->set_property = soup_cache_input_stream_set_property;
+	gobject_class->dispose = soup_cache_input_stream_dispose;
+	gobject_class->finalize = soup_cache_input_stream_finalize;
+
+	istream_class->read_fn = soup_cache_input_stream_read_fn;
+
+	g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
+					 g_param_spec_object ("output-stream", "Output stream",
+							      "the output stream where to write.",
+							      G_TYPE_OUTPUT_STREAM,
+							      G_PARAM_READWRITE |
+							      G_PARAM_CONSTRUCT_ONLY |
+							      G_PARAM_STATIC_STRINGS));
+}
+
+GInputStream *
+soup_cache_input_stream_new (GInputStream *base_stream)
+{
+	return g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
+			     "base-stream", base_stream,
+			     "close-base-stream", FALSE,
+			     NULL);
+}
diff --git a/libsoup/soup-cache-input-stream.h b/libsoup/soup-cache-input-stream.h
new file mode 100644
index 0000000..c999d10
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.h
@@ -0,0 +1,52 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-cache-input-stream.h - Header for SoupCacheInputStream
+ */
+
+#ifndef __SOUP_CACHE_INPUT_STREAM_H__
+#define __SOUP_CACHE_INPUT_STREAM_H__
+
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CACHE_INPUT_STREAM		(soup_cache_input_stream_get_type())
+#define SOUP_CACHE_INPUT_STREAM(obj)		(G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStream))
+#define SOUP_CACHE_INPUT_STREAM_CLASS(klass)	(G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+#define SOUP_IS_CACHE_INPUT_STREAM(obj)		(G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_IS_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_CACHE_INPUT_STREAM_GET_CLASS(obj)  (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+
+typedef struct _SoupCacheInputStream      SoupCacheInputStream;
+typedef struct _SoupCacheInputStreamClass SoupCacheInputStreamClass;
+typedef struct _SoupCacheInputStreamPrivate SoupCacheInputStreamPrivate;
+
+struct _SoupCacheInputStreamClass
+{
+	SoupFilterInputStreamClass parent_class;
+};
+
+struct _SoupCacheInputStream
+{
+	SoupFilterInputStream parent;
+
+	SoupCacheInputStreamPrivate *priv;
+};
+
+GType soup_cache_input_stream_get_type (void) G_GNUC_CONST;
+
+GInputStream *soup_cache_input_stream_new               (GInputStream *base_stream);
+
+void          soup_cache_input_stream_cache             (SoupCacheInputStream  *istream,
+							 GFile                 *file,
+							 GCancellable          *cancellable,
+							 GAsyncReadyCallback    callback,
+							 gpointer               user_data);
+
+gsize         soup_cache_input_stream_cache_finish      (SoupCacheInputStream  *istream,
+							 GAsyncResult          *result,
+							 GError               **error);
+
+G_END_DECLS
+
+#endif /* __SOUP_CACHE_INPUT_STREAM_H__ */
diff --git a/po/POTFILES.in b/po/POTFILES.in
index fff1f0e..21c70d4 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,4 +1,5 @@
 libsoup/soup-body-input-stream.c
+libsoup/soup-cache-input-stream.c
 libsoup/soup-converter-wrapper.c
 libsoup/soup-message-client-io.c
 libsoup/soup-message-io.c



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]