[libsoup/new-io] Add SoupRequest implementation to SoupMessage



commit 7b584c8e02fcd5f013631aebcf6540ae17dc0338
Author: Dan Winship <danw gnome org>
Date:   Sun Dec 20 15:11:55 2009 +0100

    Add SoupRequest implementation to SoupMessage
    
    using gvfs's SoupInputStream (now "SoupHTTPInputStream"). as in gvfs,
    it only works for async

 libsoup/Makefile.am              |    2 +
 libsoup/soup-http-input-stream.c |  901 ++++++++++++++++++++++++++++++++++++++
 libsoup/soup-http-input-stream.h |   77 ++++
 libsoup/soup-message-private.h   |    2 +
 libsoup/soup-message.c           |  117 +++++-
 libsoup/soup-session.c           |    6 +-
 6 files changed, 1099 insertions(+), 6 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index c8d51ef..567fb84 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -144,6 +144,8 @@ libsoup_2_4_la_SOURCES =		\
 	soup-ftp-input-stream.c		\
 	soup-gnutls.c			\
 	soup-headers.c			\
+	soup-http-input-stream.h	\
+	soup-http-input-stream.c	\
 	soup-logger.c			\
 	soup-message.c			\
 	soup-message-body.c		\
diff --git a/libsoup/soup-http-input-stream.c b/libsoup/soup-http-input-stream.c
new file mode 100644
index 0000000..565d598
--- /dev/null
+++ b/libsoup/soup-http-input-stream.c
@@ -0,0 +1,901 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/* soup-input-stream.c, based on gsocketinputstream.c
+ *
+ * Copyright (C) 2006-2007 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <config.h>
+
+#include <string.h>
+
+#include <glib.h>
+#include <gio/gio.h>
+
+#include <libsoup/soup.h>
+
+#include "soup-http-input-stream.h"
+
+static void soup_http_input_stream_seekable_iface_init (GSeekableIface *seekable_iface);
+
+G_DEFINE_TYPE_WITH_CODE (SoupHTTPInputStream, soup_http_input_stream, G_TYPE_INPUT_STREAM,
+			 G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
+						soup_http_input_stream_seekable_iface_init))
+
+typedef void (*SoupHTTPInputStreamCallback) (GInputStream *);
+
+typedef struct {
+	SoupSession *session;
+	GMainContext *async_context;
+	SoupMessage *msg;
+	gboolean got_headers, finished;
+	goffset offset;
+
+	GCancellable *cancellable;
+	GSource *cancel_watch;
+	SoupHTTPInputStreamCallback got_headers_cb;
+	SoupHTTPInputStreamCallback got_chunk_cb;
+	SoupHTTPInputStreamCallback finished_cb;
+	SoupHTTPInputStreamCallback cancelled_cb;
+
+	guchar *leftover_buffer;
+	gsize leftover_bufsize, leftover_offset;
+
+	guchar *caller_buffer;
+	gsize caller_bufsize, caller_nread;
+	GAsyncReadyCallback outstanding_callback;
+	GSimpleAsyncResult *result;
+
+} SoupHTTPInputStreamPrivate;
+#define SOUP_HTTP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamPrivate))
+
+
+static gssize   soup_http_input_stream_read         (GInputStream         *stream,
+						     void                 *buffer,
+						     gsize                 count,
+						     GCancellable         *cancellable,
+						     GError              **error);
+static gboolean soup_http_input_stream_close        (GInputStream         *stream,
+						     GCancellable         *cancellable,
+						     GError              **error);
+static void     soup_http_input_stream_read_async   (GInputStream         *stream,
+						     void                 *buffer,
+						     gsize                 count,
+						     int                   io_priority,
+						     GCancellable         *cancellable,
+						     GAsyncReadyCallback   callback,
+						     gpointer              data);
+static gssize   soup_http_input_stream_read_finish  (GInputStream         *stream,
+						     GAsyncResult         *result,
+						     GError              **error);
+static void     soup_http_input_stream_close_async  (GInputStream         *stream,
+						     int                   io_priority,
+						     GCancellable         *cancellable,
+						     GAsyncReadyCallback   callback,
+						     gpointer              data);
+static gboolean soup_http_input_stream_close_finish (GInputStream         *stream,
+						     GAsyncResult         *result,
+						     GError              **error);
+
+static goffset  soup_http_input_stream_tell         (GSeekable            *seekable);
+
+static gboolean soup_http_input_stream_can_seek     (GSeekable            *seekable);
+static gboolean soup_http_input_stream_seek         (GSeekable            *seekable,
+						     goffset               offset,
+						     GSeekType             type,
+						     GCancellable         *cancellable,
+						     GError              **error);
+
+static gboolean soup_http_input_stream_can_truncate (GSeekable            *seekable);
+static gboolean soup_http_input_stream_truncate     (GSeekable            *seekable,
+						     goffset               offset,
+						     GCancellable         *cancellable,
+						     GError              **error);
+
+static void soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream);
+static void soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk, gpointer stream);
+static void soup_http_input_stream_finished (SoupMessage *msg, gpointer stream);
+
+static void
+soup_http_input_stream_finalize (GObject *object)
+{
+	SoupHTTPInputStream *stream = SOUP_HTTP_INPUT_STREAM (object);
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	g_object_unref (priv->session);
+
+	g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_headers), stream);
+	g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_chunk), stream);
+	g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_finished), stream);
+	g_object_unref (priv->msg);
+	g_free (priv->leftover_buffer);
+
+	if (G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize)
+		(*G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize) (object);
+}
+
+static void
+soup_http_input_stream_class_init (SoupHTTPInputStreamClass *klass)
+{
+	GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+	GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
+
+	g_type_class_add_private (klass, sizeof (SoupHTTPInputStreamPrivate));
+
+	gobject_class->finalize = soup_http_input_stream_finalize;
+
+	stream_class->read_fn = soup_http_input_stream_read;
+	stream_class->close_fn = soup_http_input_stream_close;
+	stream_class->read_async = soup_http_input_stream_read_async;
+	stream_class->read_finish = soup_http_input_stream_read_finish;
+	stream_class->close_async = soup_http_input_stream_close_async;
+	stream_class->close_finish = soup_http_input_stream_close_finish;
+}
+
+static void
+soup_http_input_stream_seekable_iface_init (GSeekableIface *seekable_iface)
+{
+	seekable_iface->tell = soup_http_input_stream_tell;
+	seekable_iface->can_seek = soup_http_input_stream_can_seek;
+	seekable_iface->seek = soup_http_input_stream_seek;
+	seekable_iface->can_truncate = soup_http_input_stream_can_truncate;
+	seekable_iface->truncate_fn = soup_http_input_stream_truncate;
+}
+
+static void
+soup_http_input_stream_init (SoupHTTPInputStream *stream)
+{
+	;
+}
+
+static void
+soup_http_input_stream_queue_message (SoupHTTPInputStream *stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	priv->got_headers = priv->finished = FALSE;
+
+	/* Add an extra ref since soup_session_queue_message steals one */
+	g_object_ref (priv->msg);
+	soup_session_queue_message (priv->session, priv->msg, NULL, NULL);
+}
+
+/**
+ * soup_http_input_stream_new:
+ * @session: the #SoupSession to use
+ * @msg: the #SoupMessage whose response will be streamed
+ * 
+ * Prepares to send @msg over @session, and returns a #GInputStream
+ * that can be used to read the response.
+ *
+ * @msg may not be sent until the first read call; if you need to look
+ * at the status code or response headers before reading the body, you
+ * can use soup_http_input_stream_send() or soup_http_input_stream_send_async()
+ * to force the message to be sent and the response headers read.
+ *
+ * If @msg gets a non-2xx result, the first read (or send) will return
+ * an error with type %SOUP_HTTP_INPUT_STREAM_HTTP_ERROR.
+ *
+ * Internally, #SoupHTTPInputStream is implemented using asynchronous I/O,
+ * so if you are using the synchronous API (eg,
+ * g_input_stream_read()), you should create a new #GMainContext and
+ * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If
+ * you don't, then synchronous #GInputStream calls will cause the main
+ * loop to be run recursively.) The async #GInputStream API works fine
+ * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset.
+ *
+ * Returns: a new #GInputStream.
+ **/
+SoupHTTPInputStream *
+soup_http_input_stream_new (SoupSession *session, SoupMessage *msg)
+{
+	SoupHTTPInputStream *stream;
+	SoupHTTPInputStreamPrivate *priv;
+
+	g_return_val_if_fail (SOUP_IS_MESSAGE (msg), NULL);
+
+	stream = g_object_new (SOUP_TYPE_HTTP_INPUT_STREAM, NULL);
+	priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	priv->session = g_object_ref (session);
+	priv->async_context = soup_session_get_async_context (session);
+	priv->msg = g_object_ref (msg);
+
+	g_signal_connect (msg, "got_headers",
+			  G_CALLBACK (soup_http_input_stream_got_headers), stream);
+	g_signal_connect (msg, "got_chunk",
+			  G_CALLBACK (soup_http_input_stream_got_chunk), stream);
+	g_signal_connect (msg, "finished",
+			  G_CALLBACK (soup_http_input_stream_finished), stream);
+
+	soup_http_input_stream_queue_message (stream);
+	return stream;
+}
+
+static void
+soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	/* If the status is unsuccessful, we just ignore the signal and let
+	 * libsoup keep going (eventually either it will requeue the request
+	 * (after handling authentication/redirection), or else the
+	 * "finished" handler will run).
+	 */
+	if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
+		return;
+
+	priv->got_headers = TRUE;
+	if (!priv->caller_buffer) {
+		/* Not ready to read the body yet */
+		soup_session_pause_message (priv->session, msg);
+	}
+
+	if (priv->got_headers_cb)
+		priv->got_headers_cb (stream);
+}
+
+static void
+soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer,
+				  gpointer stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+	const gchar *chunk = chunk_buffer->data;
+	gsize chunk_size = chunk_buffer->length;
+
+	/* We only pay attention to the chunk if it's part of a successful
+	 * response.
+	 */
+	if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
+		return;
+
+	/* Sanity check */
+	if (priv->caller_bufsize == 0 || priv->leftover_bufsize != 0)
+		g_warning ("soup_http_input_stream_got_chunk called again before previous chunk was processed");
+
+	/* Copy what we can into priv->caller_buffer */
+	if (priv->caller_bufsize - priv->caller_nread > 0) {
+		gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread);
+
+		memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread);
+		priv->caller_nread += nread;
+		priv->offset += nread;
+		chunk += nread;
+		chunk_size -= nread;
+	}
+
+	if (chunk_size > 0) {
+		/* Copy the rest into priv->leftover_buffer. If
+		 * there's already some data there, realloc and
+		 * append. Otherwise just copy.
+		 */
+		if (priv->leftover_bufsize) {
+			priv->leftover_buffer = g_realloc (priv->leftover_buffer,
+							   priv->leftover_bufsize + chunk_size);
+			memcpy (priv->leftover_buffer + priv->leftover_bufsize,
+				chunk, chunk_size);
+			priv->leftover_bufsize += chunk_size;
+		} else {
+			priv->leftover_bufsize = chunk_size;
+			priv->leftover_buffer = g_memdup (chunk, chunk_size);
+			priv->leftover_offset = 0;
+		}
+	}
+
+	soup_session_pause_message (priv->session, msg);
+	if (priv->got_chunk_cb)
+		priv->got_chunk_cb (stream);
+}
+
+static void
+soup_http_input_stream_finished (SoupMessage *msg, gpointer stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	priv->finished = TRUE;
+
+	if (priv->finished_cb)
+		priv->finished_cb (stream);
+}
+
+static gboolean
+soup_http_input_stream_cancelled (GIOChannel *chan, GIOCondition condition,
+				  gpointer stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	priv->cancel_watch = NULL;
+
+	soup_session_pause_message (priv->session, priv->msg);
+	if (priv->cancelled_cb)
+		priv->cancelled_cb (stream);
+
+	return FALSE;
+}
+
+static void
+soup_http_input_stream_prepare_for_io (GInputStream *stream,
+				       GCancellable *cancellable,
+				       guchar       *buffer,
+				       gsize         count)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+	int cancel_fd;
+
+	priv->cancellable = cancellable;
+	cancel_fd = g_cancellable_get_fd (cancellable);
+	if (cancel_fd != -1) {
+		GIOChannel *chan = g_io_channel_unix_new (cancel_fd);
+		priv->cancel_watch = soup_add_io_watch (priv->async_context, chan,
+							G_IO_IN | G_IO_ERR | G_IO_HUP,
+							soup_http_input_stream_cancelled,
+							stream);
+		g_io_channel_unref (chan);
+	}
+
+	priv->caller_buffer = buffer;
+	priv->caller_bufsize = count;
+	priv->caller_nread = 0;
+
+	if (priv->got_headers)
+		soup_session_unpause_message (priv->session, priv->msg);
+}
+
+static void
+soup_http_input_stream_done_io (GInputStream *stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	if (priv->cancel_watch) {
+		g_source_destroy (priv->cancel_watch);
+		priv->cancel_watch = NULL;
+		g_cancellable_release_fd (priv->cancellable);
+	}
+	priv->cancellable = NULL;
+
+	priv->caller_buffer = NULL;
+	priv->caller_bufsize = 0;
+}
+
+static gboolean
+set_error_if_http_failed (SoupMessage *msg, GError **error)
+{
+	if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
+		g_set_error_literal (error, SOUP_HTTP_ERROR,
+				     msg->status_code, msg->reason_phrase);
+		return TRUE;
+	}
+	return FALSE;
+}
+
+static gsize
+read_from_leftover (SoupHTTPInputStreamPrivate *priv,
+		    gpointer buffer, gsize bufsize)
+{
+	gsize nread;
+
+	if (priv->leftover_bufsize - priv->leftover_offset <= bufsize) {
+		nread = priv->leftover_bufsize - priv->leftover_offset;
+		memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);
+
+		g_free (priv->leftover_buffer);
+		priv->leftover_buffer = NULL;
+		priv->leftover_bufsize = priv->leftover_offset = 0;
+	} else {
+		nread = bufsize;
+		memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);
+		priv->leftover_offset += nread;
+	}
+
+	priv->offset += nread;
+	return nread;
+}
+
+/* This does the work of soup_http_input_stream_send(), assuming that the
+ * GInputStream pending flag has already been set. It is also used by
+ * soup_http_input_stream_send_async() in some circumstances.
+ */
+static gboolean
+soup_http_input_stream_send_internal (GInputStream  *stream,
+				      GCancellable  *cancellable,
+				      GError       **error)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
+	while (!priv->finished && !priv->got_headers &&
+	       !g_cancellable_is_cancelled (cancellable))
+		g_main_context_iteration (priv->async_context, TRUE);
+	soup_http_input_stream_done_io (stream);
+
+	if (g_cancellable_set_error_if_cancelled (cancellable, error))
+		return FALSE;
+	else if (set_error_if_http_failed (priv->msg, error))
+		return FALSE;
+	return TRUE;
+}
+
+/**
+ * soup_http_input_stream_send:
+ * @httpstream: a #SoupHTTPInputStream
+ * @cancellable: optional #GCancellable object, %NULL to ignore.
+ * @error: location to store the error occuring, or %NULL to ignore
+ *
+ * Synchronously sends the HTTP request associated with @stream, and
+ * reads the response headers. Call this after soup_http_input_stream_new()
+ * and before the first g_input_stream_read() if you want to check the
+ * HTTP status code before you start reading.
+ *
+ * Return value: %TRUE if msg has a successful (2xx) status, %FALSE if
+ * not.
+ **/
+gboolean
+soup_http_input_stream_send (SoupHTTPInputStream  *httpstream,
+			     GCancellable         *cancellable,
+			     GError              **error)
+{
+	GInputStream *istream = (GInputStream *)httpstream;
+	gboolean result;
+
+	g_return_val_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream), FALSE);
+
+	if (!g_input_stream_set_pending (istream, error))
+		return FALSE;
+	result = soup_http_input_stream_send_internal (istream, cancellable, error);
+	g_input_stream_clear_pending (istream);
+
+	return result;
+}
+
+static gssize
+soup_http_input_stream_read (GInputStream *stream,
+			     void         *buffer,
+			     gsize         count,
+			     GCancellable *cancellable,
+			     GError      **error)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	if (priv->finished)
+		return 0;
+
+	/* If there is data leftover from a previous read, return it. */
+	if (priv->leftover_bufsize)
+		return read_from_leftover (priv, buffer, count);
+
+	/* No leftover data, accept one chunk from the network */
+	soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
+	while (!priv->finished && priv->caller_nread == 0 &&
+	       !g_cancellable_is_cancelled (cancellable))
+		g_main_context_iteration (priv->async_context, TRUE);
+	soup_http_input_stream_done_io (stream);
+
+	if (priv->caller_nread > 0)
+		return priv->caller_nread;
+
+	if (g_cancellable_set_error_if_cancelled (cancellable, error))
+		return -1;
+	else if (set_error_if_http_failed (priv->msg, error))
+		return -1;
+	else
+		return 0;
+}
+
+static gboolean
+soup_http_input_stream_close (GInputStream *stream,
+			      GCancellable *cancellable,
+			      GError      **error)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	if (!priv->finished)
+		soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
+
+	return TRUE;
+}
+
+static void
+wrapper_callback (GObject *source_object, GAsyncResult *res,
+		  gpointer user_data)
+{
+	GInputStream *stream = G_INPUT_STREAM (source_object);
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	g_input_stream_clear_pending (stream);
+	if (priv->outstanding_callback)
+		(*priv->outstanding_callback) (source_object, res, user_data);
+	priv->outstanding_callback = NULL;
+	g_object_unref (stream);
+}
+
+static void
+send_async_thread (GSimpleAsyncResult *res,
+		   GObject *object,
+		   GCancellable *cancellable)
+{
+	GError *error = NULL;
+	gboolean success;
+
+	success = soup_http_input_stream_send_internal (G_INPUT_STREAM (object),
+							cancellable, &error);
+	g_simple_async_result_set_op_res_gboolean (res, success);
+	if (error) {
+		g_simple_async_result_set_from_error (res, error);
+		g_error_free (error);
+	}
+}
+
+static void
+soup_http_input_stream_send_async_in_thread (GInputStream        *stream,
+					     int                  io_priority,
+					     GCancellable        *cancellable,
+					     GAsyncReadyCallback  callback,
+					     gpointer             user_data)
+{
+	GSimpleAsyncResult *res;
+
+	res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
+					 soup_http_input_stream_send_async_in_thread);
+	g_simple_async_result_run_in_thread (res, send_async_thread,
+					     io_priority, cancellable);
+	g_object_unref (res);
+}
+
+static void
+send_async_finished (GInputStream *stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+	GSimpleAsyncResult *result;
+	GError *error = NULL;
+
+	if (!g_cancellable_set_error_if_cancelled (priv->cancellable, &error))
+		set_error_if_http_failed (priv->msg, &error);
+
+	priv->got_headers_cb = NULL;
+	priv->finished_cb = NULL;
+	soup_http_input_stream_done_io (stream);
+
+	result = priv->result;
+	priv->result = NULL;
+
+	g_simple_async_result_set_op_res_gboolean (result, error == NULL);
+	if (error) {
+		g_simple_async_result_set_from_error (result, error);
+		g_error_free (error);
+	}
+	g_simple_async_result_complete (result);
+}
+
+static void
+soup_http_input_stream_send_async_internal (GInputStream        *stream,
+					    int                  io_priority,
+					    GCancellable        *cancellable,
+					    GAsyncReadyCallback  callback,
+					    gpointer             user_data)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+	g_object_ref (stream);
+	priv->outstanding_callback = callback;
+
+	/* If the session uses the default GMainContext, then we can do
+	 * async I/O directly. But if it has its own main context, it's
+	 * easier to just run it in another thread.
+	 */
+	if (soup_session_get_async_context (priv->session)) {
+		soup_http_input_stream_send_async_in_thread (stream, io_priority, cancellable,
+							     wrapper_callback, user_data);
+		return;
+	}
+
+	priv->got_headers_cb = send_async_finished;
+	priv->finished_cb = send_async_finished;
+
+	soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
+	priv->result = g_simple_async_result_new (G_OBJECT (stream),
+						  wrapper_callback, user_data,
+						  soup_http_input_stream_send_async);
+}
+
+/**
+ * soup_http_input_stream_send_async:
+ * @httpstream: a #SoupHTTPInputStream
+ * @io_priority: the io priority of the request.
+ * @cancellable: optional #GCancellable object, %NULL to ignore.
+ * @callback: callback to call when the request is satisfied
+ * @user_data: the data to pass to callback function
+ *
+ * Asynchronously sends the HTTP request associated with @stream, and
+ * reads the response headers. Call this after soup_http_input_stream_new()
+ * and before the first g_input_stream_read_async() if you want to
+ * check the HTTP status code before you start reading.
+ **/
+void
+soup_http_input_stream_send_async (SoupHTTPInputStream *httpstream,
+				   int                  io_priority,
+				   GCancellable        *cancellable,
+				   GAsyncReadyCallback  callback,
+				   gpointer             user_data)
+{
+	GInputStream *istream = (GInputStream *)httpstream;
+	GError *error = NULL;
+
+	g_return_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream));
+
+	if (!g_input_stream_set_pending (istream, &error)) {
+		g_simple_async_report_gerror_in_idle (G_OBJECT (httpstream),
+						      callback,
+						      user_data,
+						      error);
+		g_error_free (error);
+		return;
+	}
+	soup_http_input_stream_send_async_internal (istream, io_priority, cancellable,
+						    callback, user_data);
+}
+
+/**
+ * soup_http_input_stream_send_finish:
+ * @httpstream: a #SoupHTTPInputStream
+ * @result: a #GAsyncResult.
+ * @error: a #GError location to store the error occuring, or %NULL to
+ * ignore.
+ *
+ * Finishes a soup_http_input_stream_send_async() operation.
+ *
+ * Return value: %TRUE if the message was sent successfully and
+ * received a successful status code, %FALSE if not.
+ **/
+gboolean
+soup_http_input_stream_send_finish (SoupHTTPInputStream  *httpstream,
+				    GAsyncResult         *result,
+				    GError              **error)
+{
+	GSimpleAsyncResult *simple;
+
+	g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE);
+	simple = G_SIMPLE_ASYNC_RESULT (result);
+
+	g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_send_async, FALSE);
+
+	if (g_simple_async_result_propagate_error (simple, error))
+		return FALSE;
+
+	return g_simple_async_result_get_op_res_gboolean (simple);
+}
+
+static void
+read_async_done (GInputStream *stream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+	GSimpleAsyncResult *result;
+	GError *error = NULL;
+
+	result = priv->result;
+	priv->result = NULL;
+
+	if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) ||
+	    set_error_if_http_failed (priv->msg, &error)) {
+		g_simple_async_result_set_from_error (result, error);
+		g_error_free (error);
+	} else
+		g_simple_async_result_set_op_res_gssize (result, priv->caller_nread);
+
+	priv->got_chunk_cb = NULL;
+	priv->finished_cb = NULL;
+	priv->cancelled_cb = NULL;
+	soup_http_input_stream_done_io (stream);
+
+	g_simple_async_result_complete (result);
+	g_object_unref (result);
+}
+
+static void
+soup_http_input_stream_read_async (GInputStream        *stream,
+				   void                *buffer,
+				   gsize                count,
+				   int                  io_priority,
+				   GCancellable        *cancellable,
+				   GAsyncReadyCallback  callback,
+				   gpointer             user_data)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+	GSimpleAsyncResult *result;
+
+	/* If the session uses the default GMainContext, then we can do
+	 * async I/O directly. But if it has its own main context, we fall
+	 * back to the async-via-sync-in-another-thread implementation.
+	 */
+	if (soup_session_get_async_context (priv->session)) {
+		G_INPUT_STREAM_CLASS (soup_http_input_stream_parent_class)->
+			read_async (stream, buffer, count, io_priority,
+				    cancellable, callback, user_data);
+		return;
+	}
+
+	result = g_simple_async_result_new (G_OBJECT (stream),
+					    callback, user_data,
+					    soup_http_input_stream_read_async);
+
+	if (priv->finished) {
+		g_simple_async_result_set_op_res_gssize (result, 0);
+		g_simple_async_result_complete_in_idle (result);
+		g_object_unref (result);
+		return;
+	}
+
+	if (priv->leftover_bufsize) {
+		gsize nread = read_from_leftover (priv, buffer, count);
+		g_simple_async_result_set_op_res_gssize (result, nread);
+		g_simple_async_result_complete_in_idle (result);
+		g_object_unref (result);
+		return;
+	}
+
+	priv->result = result;
+
+	priv->got_chunk_cb = read_async_done;
+	priv->finished_cb = read_async_done;
+	priv->cancelled_cb = read_async_done;
+	soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
+}
+
+static gssize
+soup_http_input_stream_read_finish (GInputStream  *stream,
+				    GAsyncResult  *result,
+				    GError       **error)
+{
+	GSimpleAsyncResult *simple;
+
+	g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1);
+	simple = G_SIMPLE_ASYNC_RESULT (result);
+	g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_read_async, -1);
+
+	return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+static void
+soup_http_input_stream_close_async (GInputStream       *stream,
+				    int                 io_priority,
+				    GCancellable       *cancellable,
+				    GAsyncReadyCallback callback,
+				    gpointer            user_data)
+{
+	GSimpleAsyncResult *result;
+	gboolean success;
+	GError *error = NULL;
+
+	result = g_simple_async_result_new (G_OBJECT (stream),
+					    callback, user_data,
+					    soup_http_input_stream_close_async);
+	success = soup_http_input_stream_close (stream, cancellable, &error);
+	g_simple_async_result_set_op_res_gboolean (result, success);
+	if (error) {
+		g_simple_async_result_set_from_error (result, error);
+		g_error_free (error);
+	}
+
+	g_simple_async_result_complete_in_idle (result);
+	g_object_unref (result);
+}
+
+static gboolean
+soup_http_input_stream_close_finish (GInputStream  *stream,
+				     GAsyncResult  *result,
+				     GError       **error)
+{
+	/* Failures handled in generic close_finish code */
+	return TRUE;
+}
+
+static goffset
+soup_http_input_stream_tell (GSeekable *seekable)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (seekable);
+
+	return priv->offset;
+}
+
+static gboolean
+soup_http_input_stream_can_seek (GSeekable *seekable)
+{
+	return TRUE;
+}
+
+extern void soup_message_io_cleanup (SoupMessage *msg);
+
+static gboolean
+soup_http_input_stream_seek (GSeekable     *seekable,
+			     goffset        offset,
+			     GSeekType      type,
+			     GCancellable  *cancellable,
+			     GError       **error)
+{
+	GInputStream *stream = G_INPUT_STREAM (seekable);
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (seekable);
+	char *range;
+
+	if (type == G_SEEK_END) {
+		/* FIXME: we could send "bytes=-offset", but unless we
+		 * know the Content-Length, we wouldn't be able to
+		 * answer a tell() properly. We could find the
+		 * Content-Length by doing a HEAD...
+		 */
+
+		g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+				     "G_SEEK_END not currently supported");
+		return FALSE;
+	}
+
+	if (!g_input_stream_set_pending (stream, error))
+		return FALSE;
+
+	soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
+	soup_message_io_cleanup (priv->msg);
+
+	switch (type) {
+	case G_SEEK_CUR:
+		offset += priv->offset;
+		/* fall through */
+
+	case G_SEEK_SET:
+		range = g_strdup_printf ("bytes=%"G_GUINT64_FORMAT"-", (guint64)offset);
+		priv->offset = offset;
+		break;
+
+	case G_SEEK_END:
+		range = NULL; /* keep compilers happy */
+		g_return_val_if_reached (FALSE);
+		break;
+
+	default:
+		g_return_val_if_reached (FALSE);
+	}
+
+	soup_message_headers_remove (priv->msg->request_headers, "Range");
+	soup_message_headers_append (priv->msg->request_headers, "Range", range);
+	g_free (range);
+
+	soup_http_input_stream_queue_message (SOUP_HTTP_INPUT_STREAM (stream));
+
+	g_input_stream_clear_pending (stream);
+	return TRUE;
+}
+
+static gboolean
+soup_http_input_stream_can_truncate (GSeekable *seekable)
+{
+	return FALSE;
+}
+
+static gboolean
+soup_http_input_stream_truncate (GSeekable     *seekable,
+				 goffset        offset,
+				 GCancellable  *cancellable,
+				 GError       **error)
+{
+	g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+			     "Truncate not allowed on input stream");
+	return FALSE;
+}
+
+SoupMessage *
+soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream)
+{
+	SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
+	return priv->msg ? g_object_ref (priv->msg) : NULL;
+}
diff --git a/libsoup/soup-http-input-stream.h b/libsoup/soup-http-input-stream.h
new file mode 100644
index 0000000..4052196
--- /dev/null
+++ b/libsoup/soup-http-input-stream.h
@@ -0,0 +1,77 @@
+/* Copyright (C) 2006, 2007, 2009 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __SOUP_HTTP_INPUT_STREAM_H__
+#define __SOUP_HTTP_INPUT_STREAM_H__
+
+#include <gio/gio.h>
+#include <libsoup/soup-types.h>
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_HTTP_INPUT_STREAM         (soup_http_input_stream_get_type ())
+#define SOUP_HTTP_INPUT_STREAM(o)           (G_TYPE_CHECK_INSTANCE_CAST ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStream))
+#define SOUP_HTTP_INPUT_STREAM_CLASS(k)     (G_TYPE_CHECK_CLASS_CAST((k), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
+#define SOUP_IS_HTTP_INPUT_STREAM(o)        (G_TYPE_CHECK_INSTANCE_TYPE ((o), SOUP_TYPE_HTTP_INPUT_STREAM))
+#define SOUP_IS_HTTP_INPUT_STREAM_CLASS(k)  (G_TYPE_CHECK_CLASS_TYPE ((k), SOUP_TYPE_HTTP_INPUT_STREAM))
+#define SOUP_HTTP_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
+
+typedef struct SoupHTTPInputStream         SoupHTTPInputStream;
+typedef struct SoupHTTPInputStreamClass    SoupHTTPInputStreamClass;
+
+struct SoupHTTPInputStream
+{
+	GInputStream parent;
+
+};
+
+struct SoupHTTPInputStreamClass
+{
+	GInputStreamClass parent_class;
+
+	/* Padding for future expansion */
+	void (*_g_reserved1) (void);
+	void (*_g_reserved2) (void);
+	void (*_g_reserved3) (void);
+	void (*_g_reserved4) (void);
+	void (*_g_reserved5) (void);
+};
+
+GType soup_http_input_stream_get_type (void) G_GNUC_CONST;
+
+SoupHTTPInputStream *soup_http_input_stream_new         (SoupSession         *session,
+							 SoupMessage         *msg);
+
+gboolean             soup_http_input_stream_send        (SoupHTTPInputStream *httpstream,
+							 GCancellable        *cancellable,
+							 GError             **error);
+
+void                 soup_http_input_stream_send_async  (SoupHTTPInputStream *httpstream,
+							 int                  io_priority,
+							 GCancellable        *cancellable,
+							 GAsyncReadyCallback  callback,
+							 gpointer             user_data);
+gboolean             soup_http_input_stream_send_finish (SoupHTTPInputStream *httpstream,
+							 GAsyncResult        *result,
+							 GError             **error);
+
+SoupMessage         *soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream);
+
+G_END_DECLS
+
+#endif /* __SOUP_HTTP_INPUT_STREAM_H__ */
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index ee6221d..f5ede4b 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -41,6 +41,8 @@ typedef struct {
 
 	GSList            *disabled_features;
 	GSList            *decoders;
+
+	SoupSession       *session;
 } SoupMessagePrivate;
 #define SOUP_MESSAGE_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_MESSAGE, SoupMessagePrivate))
 
diff --git a/libsoup/soup-message.c b/libsoup/soup-message.c
index 93c2522..16a27ce 100644
--- a/libsoup/soup-message.c
+++ b/libsoup/soup-message.c
@@ -11,10 +11,13 @@
 #include "soup-address.h"
 #include "soup-auth.h"
 #include "soup-enum-types.h"
+#include "soup-http-input-stream.h"
 #include "soup-marshal.h"
 #include "soup-message.h"
 #include "soup-message-private.h"
 #include "soup-misc.h"
+#include "soup-request.h"
+#include "soup-session.h"
 #include "soup-uri.h"
 
 /**
@@ -86,7 +89,11 @@
  * @request_body as appropriate, passing %FALSE.
  **/
 
-G_DEFINE_TYPE (SoupMessage, soup_message, G_TYPE_OBJECT)
+static void soup_message_request_interface_init (SoupRequestInterface *request_interface);
+
+G_DEFINE_TYPE_WITH_CODE (SoupMessage, soup_message, G_TYPE_OBJECT,
+			 G_IMPLEMENT_INTERFACE (SOUP_TYPE_REQUEST,
+						soup_message_request_interface_init))
 
 enum {
 	WROTE_INFORMATIONAL,
@@ -119,6 +126,7 @@ enum {
 	PROP_SERVER_SIDE,
 	PROP_STATUS_CODE,
 	PROP_REASON_PHRASE,
+	PROP_SESSION,
 
 	LAST_PROP
 };
@@ -132,6 +140,17 @@ static void set_property (GObject *object, guint prop_id,
 static void get_property (GObject *object, guint prop_id,
 			  GValue *value, GParamSpec *pspec);
 
+static GInputStream *soup_message_send        (SoupRequest          *request,
+					       GCancellable         *cancellable,
+					       GError              **error);
+static void          soup_message_send_async  (SoupRequest          *request,
+					       GCancellable         *cancellable,
+					       GAsyncReadyCallback   callback,
+					       gpointer              user_data);
+static GInputStream *soup_message_send_finish (SoupRequest          *request,
+					       GAsyncResult         *result,
+					       GError              **error);
+
 static void
 soup_message_init (SoupMessage *msg)
 {
@@ -495,7 +514,7 @@ soup_message_class_init (SoupMessageClass *message_class)
 				     "Method",
 				     "The message's HTTP method",
 				     SOUP_METHOD_GET,
-				     G_PARAM_READWRITE));
+				     G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
 	/**
 	 * SOUP_MESSAGE_URI:
 	 *
@@ -576,6 +595,22 @@ soup_message_class_init (SoupMessageClass *message_class)
 				     "The HTTP response reason phrase",
 				     NULL,
 				     G_PARAM_READWRITE));
+
+	g_object_class_install_property (
+		object_class, PROP_SESSION,
+		g_param_spec_object ("session",
+				     "Session",
+				     "The message's session",
+				     SOUP_TYPE_SESSION,
+				     G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_message_request_interface_init (SoupRequestInterface *request_interface)
+{
+	request_interface->send = soup_message_send;
+	request_interface->send_async = soup_message_send_async;
+	request_interface->send_finish = soup_message_send_finish;
 }
 
 static void
@@ -612,6 +647,11 @@ set_property (GObject *object, guint prop_id,
 		soup_message_set_status_full (msg, msg->status_code,
 					      g_value_get_string (value));
 		break;
+	case PROP_SESSION:
+		if (priv->session)
+			g_object_unref (priv->session);
+		priv->session = g_value_dup_object (value);
+		break;
 	default:
 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 		break;
@@ -647,6 +687,9 @@ get_property (GObject *object, guint prop_id,
 	case PROP_REASON_PHRASE:
 		g_value_set_string (value, msg->reason_phrase);
 		break;
+	case PROP_SESSION:
+		g_value_set_object (value, priv->session);
+		break;
 	default:
 		G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
 		break;
@@ -1664,3 +1707,73 @@ soup_message_disables_feature (SoupMessage *msg, gpointer feature)
 	}
 	return FALSE;
 }
+
+/* SoupRequest interface implementation */
+
+static GInputStream *
+soup_message_send (SoupRequest   *request,
+		   GCancellable  *cancellable,
+		   GError       **error)
+{
+	SoupMessage *msg = SOUP_MESSAGE (request);
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupHTTPInputStream *httpstream;
+
+	httpstream = soup_http_input_stream_new (priv->session, msg);
+	if (!soup_http_input_stream_send (httpstream, cancellable, error)) {
+		g_object_unref (httpstream);
+		return NULL;
+	}
+	return (GInputStream *)httpstream;
+}
+
+static void
+sent_async (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+	SoupHTTPInputStream *httpstream = SOUP_HTTP_INPUT_STREAM (source);
+	GSimpleAsyncResult *simple = user_data;
+	GError *error = NULL;
+
+	if (soup_http_input_stream_send_finish (httpstream, result, &error)) {
+		g_simple_async_result_set_op_res_gpointer (simple, httpstream, g_object_unref);
+	} else {
+		g_simple_async_result_set_from_error (simple, error);
+		g_error_free (error);
+		g_object_unref (httpstream);
+	}
+	g_simple_async_result_complete (simple);
+	g_object_unref (simple);
+}
+
+static void
+soup_message_send_async (SoupRequest          *request,
+			 GCancellable         *cancellable,
+			 GAsyncReadyCallback   callback,
+			 gpointer              user_data)
+{
+	SoupMessage *msg = SOUP_MESSAGE (request);
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupHTTPInputStream *httpstream;
+	GSimpleAsyncResult *simple;
+
+	simple = g_simple_async_result_new (G_OBJECT (msg), callback, user_data,
+					    soup_message_send_async);
+	httpstream = soup_http_input_stream_new (priv->session, msg);
+	soup_http_input_stream_send_async (httpstream, G_PRIORITY_DEFAULT,
+					   cancellable, sent_async, simple);
+}
+
+static GInputStream *
+soup_message_send_finish (SoupRequest   *request,
+			  GAsyncResult  *result,
+			  GError       **error)
+{
+	GSimpleAsyncResult *simple;
+
+	g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (request), soup_message_send_async), NULL);
+
+	simple = G_SIMPLE_ASYNC_RESULT (result);
+	if (g_simple_async_result_propagate_error (simple, error))
+		return NULL;
+	return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));
+}
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 97c6d35..0cec47b 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -2000,12 +2000,10 @@ init_request_types (SoupSessionPrivate *priv)
 			     GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_FILE));
 	g_hash_table_insert (priv->request_types, "data",
 			     GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_DATA));
-#if 0
 	g_hash_table_insert (priv->request_types, "http",
-			     GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_HTTP));
+			     GSIZE_TO_POINTER (SOUP_TYPE_MESSAGE));
 	g_hash_table_insert (priv->request_types, "https",
-			     GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_HTTP));
-#endif
+			     GSIZE_TO_POINTER (SOUP_TYPE_MESSAGE));
 	g_hash_table_insert (priv->request_types, "ftp",
 			     GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_FTP));
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]