[libsoup] SoupCacheInputStream: new input stream filter that writes to the cache
- From: Sergio Villar Senin <svillar src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup] SoupCacheInputStream: new input stream filter that writes to the cache
- Date: Tue, 18 Dec 2012 15:49:40 +0000 (UTC)
commit d12d25e3515b329f65da8f276965fe46c7c628ac
Author: Sergio Villar Senin <svillar igalia com>
Date: Mon Jul 30 12:40:32 2012 +0200
SoupCacheInputStream: new input stream filter that writes to the cache
The SoupCacheInputStream will be added to the stream stack as any other
GPollableInputStream. It will transparently read data from the underlying
stream and will pass it unmodified to the upper level.
Apart from that the stream writes everything it reads to a local file. Once
the caching finishes a callback will be called. Caching may be cancelled at
any point by providing a GCancellable.
https://bugzilla.gnome.org/show_bug.cgi?id=682112
libsoup/Makefile.am | 2 +
libsoup/soup-cache-input-stream.c | 333 +++++++++++++++++++++++++++++++++++++
libsoup/soup-cache-input-stream.h | 52 ++++++
po/POTFILES.in | 1 +
4 files changed, 388 insertions(+), 0 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index 87629ed..59553f0 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -114,6 +114,8 @@ libsoup_2_4_la_SOURCES = \
soup-body-output-stream.h \
soup-body-output-stream.c \
soup-cache.c \
+ soup-cache-input-stream.h \
+ soup-cache-input-stream.c \
soup-cache-private.h \
soup-client-input-stream.h \
soup-client-input-stream.c \
diff --git a/libsoup/soup-cache-input-stream.c b/libsoup/soup-cache-input-stream.c
new file mode 100644
index 0000000..a44652a
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.c
@@ -0,0 +1,333 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright (C) 2012 Igalia, S.L.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <glib/gi18n-lib.h>
+#include "soup-cache-input-stream.h"
+#include "soup-message-body.h"
+
+static void soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupCacheInputStream, soup_cache_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_cache_input_stream_pollable_init))
+
+/* properties */
+enum {
+ PROP_0,
+
+ PROP_OUTPUT_STREAM,
+
+ LAST_PROP
+};
+
+struct _SoupCacheInputStreamPrivate
+{
+ GOutputStream *output_stream;
+ gsize bytes_written;
+
+ gboolean read_finished;
+ SoupBuffer *current_writing_buffer;
+ GQueue *buffer_queue;
+
+ GTask *task;
+};
+
+static void soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream);
+
+static inline void
+notify_and_clear (SoupCacheInputStream *istream, GError *error)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ if (error)
+ g_task_return_error (priv->task, error);
+ else
+ g_task_return_int (priv->task, priv->bytes_written);
+
+ g_clear_object (&priv->output_stream);
+ g_clear_object (&priv->task);
+}
+
+gsize
+soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream,
+ GAsyncResult *result,
+ GError **error)
+{
+ return g_task_propagate_int (G_TASK (result), error);
+}
+
+static inline void
+try_write_next_buffer (SoupCacheInputStream *istream)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ if (priv->current_writing_buffer == NULL && priv->buffer_queue->length)
+ soup_cache_input_stream_write_next_buffer (istream);
+ else if (priv->read_finished)
+ notify_and_clear (istream, NULL);
+ else if (g_input_stream_is_closed (G_INPUT_STREAM (istream))) {
+ GError *error = NULL;
+ g_set_error_literal (&error, G_IO_ERROR, G_IO_ERROR_CLOSED,
+ _("Network stream unexpectedly closed"));
+ notify_and_clear (istream, error);
+ }
+}
+
+static void
+file_replaced_cb (GObject *source,
+ GAsyncResult *res,
+ gpointer user_data)
+{
+ SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (user_data);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ GError *error = NULL;
+
+ priv->output_stream = (GOutputStream *) g_file_replace_finish (G_FILE (source), res, &error);
+
+ if (error)
+ g_task_return_error (priv->task, error);
+ else
+ try_write_next_buffer (istream);
+}
+
+void
+soup_cache_input_stream_cache (SoupCacheInputStream *istream,
+ GFile *file,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+
+ priv->task = g_task_new (istream, cancellable, callback, user_data);
+
+ g_file_replace_async (file, NULL, FALSE,
+ G_FILE_CREATE_PRIVATE | G_FILE_CREATE_REPLACE_DESTINATION,
+ G_PRIORITY_LOW, cancellable, file_replaced_cb, istream);
+}
+
+static void
+soup_cache_input_stream_init (SoupCacheInputStream *self)
+{
+ SoupCacheInputStreamPrivate *priv =
+ G_TYPE_INSTANCE_GET_PRIVATE (self, SOUP_TYPE_CACHE_INPUT_STREAM,
+ SoupCacheInputStreamPrivate);
+
+ priv->buffer_queue = g_queue_new ();
+ self->priv = priv;
+}
+
+static void
+soup_cache_input_stream_get_property (GObject *object,
+ guint property_id, GValue *value, GParamSpec *pspec)
+{
+ SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ switch (property_id) {
+ case PROP_OUTPUT_STREAM:
+ g_value_set_object (value, priv->output_stream);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+soup_cache_input_stream_set_property (GObject *object,
+ guint property_id, const GValue *value, GParamSpec *pspec)
+{
+ SoupCacheInputStream *self = SOUP_CACHE_INPUT_STREAM (object);
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ switch (property_id) {
+ case PROP_OUTPUT_STREAM:
+ priv->output_stream = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec);
+ break;
+ }
+}
+
+static void
+soup_cache_input_stream_dispose (GObject *object)
+{
+ SoupCacheInputStreamPrivate *priv = SOUP_CACHE_INPUT_STREAM (object)->priv;
+
+ g_clear_object (&priv->output_stream);
+ g_clear_object (&priv->task);
+
+ G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->dispose (object);
+}
+
+static void
+soup_cache_input_stream_finalize (GObject *object)
+{
+ SoupCacheInputStream *self = (SoupCacheInputStream *)object;
+ SoupCacheInputStreamPrivate *priv = self->priv;
+
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+ g_queue_free_full (priv->buffer_queue, (GDestroyNotify) soup_buffer_free);
+
+ G_OBJECT_CLASS (soup_cache_input_stream_parent_class)->finalize (object);
+}
+
+static void
+write_ready_cb (GObject *source, GAsyncResult *result, SoupCacheInputStream *istream)
+{
+ GOutputStream *ostream = G_OUTPUT_STREAM (source);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ gssize write_size;
+ gsize pending;
+ GError *error = NULL;
+
+ write_size = g_output_stream_write_finish (ostream, result, &error);
+ if (error) {
+ notify_and_clear (istream, error);
+ g_object_unref (istream);
+ return;
+ }
+
+ /* Check that we have written everything */
+ pending = priv->current_writing_buffer->length - write_size;
+ if (pending) {
+ SoupBuffer *subbuffer = soup_buffer_new_subbuffer (priv->current_writing_buffer,
+ write_size, pending);
+ g_queue_push_head (priv->buffer_queue, subbuffer);
+ }
+
+ priv->bytes_written += write_size;
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+
+ try_write_next_buffer (istream);
+ g_object_unref (istream);
+}
+
+static void
+soup_cache_input_stream_write_next_buffer (SoupCacheInputStream *istream)
+{
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ SoupBuffer *buffer = g_queue_pop_head (priv->buffer_queue);
+ int priority;
+
+ g_assert (priv->output_stream && !g_output_stream_is_closed (priv->output_stream));
+ g_assert (priv->task);
+
+ g_clear_pointer (&priv->current_writing_buffer, soup_buffer_free);
+ priv->current_writing_buffer = buffer;
+
+ if (priv->buffer_queue->length > 10)
+ priority = G_PRIORITY_DEFAULT;
+ else
+ priority = G_PRIORITY_LOW;
+
+ g_output_stream_write_async (priv->output_stream, buffer->data, buffer->length,
+ priority, g_task_get_cancellable (priv->task),
+ (GAsyncReadyCallback) write_ready_cb,
+ g_object_ref (istream));
+}
+
+static gssize
+read_internal (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ gboolean blocking,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupCacheInputStream *istream = SOUP_CACHE_INPUT_STREAM (stream);
+ SoupCacheInputStreamPrivate *priv = istream->priv;
+ GInputStream *base_stream;
+ gssize nread;
+
+ base_stream = g_filter_input_stream_get_base_stream (G_FILTER_INPUT_STREAM (stream));
+ nread = g_pollable_stream_read (base_stream, buffer, count, blocking,
+ cancellable, error);
+
+ if (G_UNLIKELY (nread == -1 || priv->read_finished || !priv->task))
+ return nread;
+
+ if (nread == 0) {
+ priv->read_finished = TRUE;
+
+ if (priv->current_writing_buffer == NULL && priv->output_stream)
+ notify_and_clear (istream, NULL);
+ } else {
+ SoupBuffer *soup_buffer = soup_buffer_new (SOUP_MEMORY_COPY, buffer, nread);
+ g_queue_push_tail (priv->buffer_queue, soup_buffer);
+
+ if (priv->current_writing_buffer == NULL && priv->output_stream)
+ soup_cache_input_stream_write_next_buffer (istream);
+ }
+
+ return nread;
+}
+
+static gssize
+soup_cache_input_stream_read_fn (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ return read_internal (stream, buffer, count, TRUE,
+ cancellable, error);
+}
+
+static gssize
+soup_cache_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ return read_internal (G_INPUT_STREAM (stream), buffer, count, FALSE,
+ NULL, error);
+}
+
+static void
+soup_cache_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ pollable_interface->read_nonblocking = soup_cache_input_stream_read_nonblocking;
+}
+
+static void
+soup_cache_input_stream_class_init (SoupCacheInputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GInputStreamClass *istream_class = G_INPUT_STREAM_CLASS (klass);
+
+ g_type_class_add_private (klass, sizeof (SoupCacheInputStreamPrivate));
+
+ gobject_class->get_property = soup_cache_input_stream_get_property;
+ gobject_class->set_property = soup_cache_input_stream_set_property;
+ gobject_class->dispose = soup_cache_input_stream_dispose;
+ gobject_class->finalize = soup_cache_input_stream_finalize;
+
+ istream_class->read_fn = soup_cache_input_stream_read_fn;
+
+ g_object_class_install_property (gobject_class, PROP_OUTPUT_STREAM,
+ g_param_spec_object ("output-stream", "Output stream",
+ "the output stream where to write.",
+ G_TYPE_OUTPUT_STREAM,
+ G_PARAM_READWRITE |
+ G_PARAM_CONSTRUCT_ONLY |
+ G_PARAM_STATIC_STRINGS));
+}
+
+GInputStream *
+soup_cache_input_stream_new (GInputStream *base_stream)
+{
+ return g_object_new (SOUP_TYPE_CACHE_INPUT_STREAM,
+ "base-stream", base_stream,
+ "close-base-stream", FALSE,
+ NULL);
+}
diff --git a/libsoup/soup-cache-input-stream.h b/libsoup/soup-cache-input-stream.h
new file mode 100644
index 0000000..c999d10
--- /dev/null
+++ b/libsoup/soup-cache-input-stream.h
@@ -0,0 +1,52 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-cache-input-stream.h - Header for SoupCacheInputStream
+ */
+
+#ifndef __SOUP_CACHE_INPUT_STREAM_H__
+#define __SOUP_CACHE_INPUT_STREAM_H__
+
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CACHE_INPUT_STREAM (soup_cache_input_stream_get_type())
+#define SOUP_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStream))
+#define SOUP_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+#define SOUP_IS_CACHE_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_IS_CACHE_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((klass), SOUP_TYPE_CACHE_INPUT_STREAM))
+#define SOUP_CACHE_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CACHE_INPUT_STREAM, SoupCacheInputStreamClass))
+
+typedef struct _SoupCacheInputStream SoupCacheInputStream;
+typedef struct _SoupCacheInputStreamClass SoupCacheInputStreamClass;
+typedef struct _SoupCacheInputStreamPrivate SoupCacheInputStreamPrivate;
+
+struct _SoupCacheInputStreamClass
+{
+ SoupFilterInputStreamClass parent_class;
+};
+
+struct _SoupCacheInputStream
+{
+ SoupFilterInputStream parent;
+
+ SoupCacheInputStreamPrivate *priv;
+};
+
+GType soup_cache_input_stream_get_type (void) G_GNUC_CONST;
+
+GInputStream *soup_cache_input_stream_new (GInputStream *base_stream);
+
+void soup_cache_input_stream_cache (SoupCacheInputStream *istream,
+ GFile *file,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+
+gsize soup_cache_input_stream_cache_finish (SoupCacheInputStream *istream,
+ GAsyncResult *result,
+ GError **error);
+
+G_END_DECLS
+
+#endif /* __SOUP_CACHE_INPUT_STREAM_H__ */
diff --git a/po/POTFILES.in b/po/POTFILES.in
index fff1f0e..21c70d4 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,4 +1,5 @@
libsoup/soup-body-input-stream.c
+libsoup/soup-cache-input-stream.c
libsoup/soup-converter-wrapper.c
libsoup/soup-message-client-io.c
libsoup/soup-message-io.c
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]