[libsoup/new-io] Add SoupRequest implementation to SoupMessage
- From: Dan Winship <danw src gnome org>
- To: svn-commits-list gnome org
- Cc:
- Subject: [libsoup/new-io] Add SoupRequest implementation to SoupMessage
- Date: Sun, 20 Dec 2009 14:14:00 +0000 (UTC)
commit 7b584c8e02fcd5f013631aebcf6540ae17dc0338
Author: Dan Winship <danw gnome org>
Date: Sun Dec 20 15:11:55 2009 +0100
Add SoupRequest implementation to SoupMessage
using gvfs's SoupInputStream (now "SoupHTTPInputStream"). as in gvfs,
it only works for async
libsoup/Makefile.am | 2 +
libsoup/soup-http-input-stream.c | 901 ++++++++++++++++++++++++++++++++++++++
libsoup/soup-http-input-stream.h | 77 ++++
libsoup/soup-message-private.h | 2 +
libsoup/soup-message.c | 117 +++++-
libsoup/soup-session.c | 6 +-
6 files changed, 1099 insertions(+), 6 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index c8d51ef..567fb84 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -144,6 +144,8 @@ libsoup_2_4_la_SOURCES = \
soup-ftp-input-stream.c \
soup-gnutls.c \
soup-headers.c \
+ soup-http-input-stream.h \
+ soup-http-input-stream.c \
soup-logger.c \
soup-message.c \
soup-message-body.c \
diff --git a/libsoup/soup-http-input-stream.c b/libsoup/soup-http-input-stream.c
new file mode 100644
index 0000000..565d598
--- /dev/null
+++ b/libsoup/soup-http-input-stream.c
@@ -0,0 +1,901 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/* soup-input-stream.c, based on gsocketinputstream.c
+ *
+ * Copyright (C) 2006-2007 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#include <config.h>
+
+#include <string.h>
+
+#include <glib.h>
+#include <gio/gio.h>
+
+#include <libsoup/soup.h>
+
+#include "soup-http-input-stream.h"
+
+static void soup_http_input_stream_seekable_iface_init (GSeekableIface *seekable_iface);
+
+G_DEFINE_TYPE_WITH_CODE (SoupHTTPInputStream, soup_http_input_stream, G_TYPE_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_SEEKABLE,
+ soup_http_input_stream_seekable_iface_init))
+
+typedef void (*SoupHTTPInputStreamCallback) (GInputStream *);
+
+typedef struct {
+ SoupSession *session;
+ GMainContext *async_context;
+ SoupMessage *msg;
+ gboolean got_headers, finished;
+ goffset offset;
+
+ GCancellable *cancellable;
+ GSource *cancel_watch;
+ SoupHTTPInputStreamCallback got_headers_cb;
+ SoupHTTPInputStreamCallback got_chunk_cb;
+ SoupHTTPInputStreamCallback finished_cb;
+ SoupHTTPInputStreamCallback cancelled_cb;
+
+ guchar *leftover_buffer;
+ gsize leftover_bufsize, leftover_offset;
+
+ guchar *caller_buffer;
+ gsize caller_bufsize, caller_nread;
+ GAsyncReadyCallback outstanding_callback;
+ GSimpleAsyncResult *result;
+
+} SoupHTTPInputStreamPrivate;
+#define SOUP_HTTP_INPUT_STREAM_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamPrivate))
+
+
+static gssize soup_http_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error);
+static gboolean soup_http_input_stream_close (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error);
+static void soup_http_input_stream_read_async (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data);
+static gssize soup_http_input_stream_read_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+static void soup_http_input_stream_close_async (GInputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer data);
+static gboolean soup_http_input_stream_close_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error);
+
+static goffset soup_http_input_stream_tell (GSeekable *seekable);
+
+static gboolean soup_http_input_stream_can_seek (GSeekable *seekable);
+static gboolean soup_http_input_stream_seek (GSeekable *seekable,
+ goffset offset,
+ GSeekType type,
+ GCancellable *cancellable,
+ GError **error);
+
+static gboolean soup_http_input_stream_can_truncate (GSeekable *seekable);
+static gboolean soup_http_input_stream_truncate (GSeekable *seekable,
+ goffset offset,
+ GCancellable *cancellable,
+ GError **error);
+
+static void soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream);
+static void soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk, gpointer stream);
+static void soup_http_input_stream_finished (SoupMessage *msg, gpointer stream);
+
+static void
+soup_http_input_stream_finalize (GObject *object)
+{
+ SoupHTTPInputStream *stream = SOUP_HTTP_INPUT_STREAM (object);
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ g_object_unref (priv->session);
+
+ g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_headers), stream);
+ g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_got_chunk), stream);
+ g_signal_handlers_disconnect_by_func (priv->msg, G_CALLBACK (soup_http_input_stream_finished), stream);
+ g_object_unref (priv->msg);
+ g_free (priv->leftover_buffer);
+
+ if (G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize)
+ (*G_OBJECT_CLASS (soup_http_input_stream_parent_class)->finalize) (object);
+}
+
+static void
+soup_http_input_stream_class_init (SoupHTTPInputStreamClass *klass)
+{
+ GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
+ GInputStreamClass *stream_class = G_INPUT_STREAM_CLASS (klass);
+
+ g_type_class_add_private (klass, sizeof (SoupHTTPInputStreamPrivate));
+
+ gobject_class->finalize = soup_http_input_stream_finalize;
+
+ stream_class->read_fn = soup_http_input_stream_read;
+ stream_class->close_fn = soup_http_input_stream_close;
+ stream_class->read_async = soup_http_input_stream_read_async;
+ stream_class->read_finish = soup_http_input_stream_read_finish;
+ stream_class->close_async = soup_http_input_stream_close_async;
+ stream_class->close_finish = soup_http_input_stream_close_finish;
+}
+
+static void
+soup_http_input_stream_seekable_iface_init (GSeekableIface *seekable_iface)
+{
+ seekable_iface->tell = soup_http_input_stream_tell;
+ seekable_iface->can_seek = soup_http_input_stream_can_seek;
+ seekable_iface->seek = soup_http_input_stream_seek;
+ seekable_iface->can_truncate = soup_http_input_stream_can_truncate;
+ seekable_iface->truncate_fn = soup_http_input_stream_truncate;
+}
+
+static void
+soup_http_input_stream_init (SoupHTTPInputStream *stream)
+{
+ ;
+}
+
+static void
+soup_http_input_stream_queue_message (SoupHTTPInputStream *stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ priv->got_headers = priv->finished = FALSE;
+
+ /* Add an extra ref since soup_session_queue_message steals one */
+ g_object_ref (priv->msg);
+ soup_session_queue_message (priv->session, priv->msg, NULL, NULL);
+}
+
+/**
+ * soup_http_input_stream_new:
+ * @session: the #SoupSession to use
+ * @msg: the #SoupMessage whose response will be streamed
+ *
+ * Prepares to send @msg over @session, and returns a #GInputStream
+ * that can be used to read the response.
+ *
+ * @msg may not be sent until the first read call; if you need to look
+ * at the status code or response headers before reading the body, you
+ * can use soup_http_input_stream_send() or soup_http_input_stream_send_async()
+ * to force the message to be sent and the response headers read.
+ *
+ * If @msg gets a non-2xx result, the first read (or send) will return
+ * an error with type %SOUP_HTTP_INPUT_STREAM_HTTP_ERROR.
+ *
+ * Internally, #SoupHTTPInputStream is implemented using asynchronous I/O,
+ * so if you are using the synchronous API (eg,
+ * g_input_stream_read()), you should create a new #GMainContext and
+ * set it as the %SOUP_SESSION_ASYNC_CONTEXT property on @session. (If
+ * you don't, then synchronous #GInputStream calls will cause the main
+ * loop to be run recursively.) The async #GInputStream API works fine
+ * with %SOUP_SESSION_ASYNC_CONTEXT either set or unset.
+ *
+ * Returns: a new #GInputStream.
+ **/
+SoupHTTPInputStream *
+soup_http_input_stream_new (SoupSession *session, SoupMessage *msg)
+{
+ SoupHTTPInputStream *stream;
+ SoupHTTPInputStreamPrivate *priv;
+
+ g_return_val_if_fail (SOUP_IS_MESSAGE (msg), NULL);
+
+ stream = g_object_new (SOUP_TYPE_HTTP_INPUT_STREAM, NULL);
+ priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ priv->session = g_object_ref (session);
+ priv->async_context = soup_session_get_async_context (session);
+ priv->msg = g_object_ref (msg);
+
+ g_signal_connect (msg, "got_headers",
+ G_CALLBACK (soup_http_input_stream_got_headers), stream);
+ g_signal_connect (msg, "got_chunk",
+ G_CALLBACK (soup_http_input_stream_got_chunk), stream);
+ g_signal_connect (msg, "finished",
+ G_CALLBACK (soup_http_input_stream_finished), stream);
+
+ soup_http_input_stream_queue_message (stream);
+ return stream;
+}
+
+static void
+soup_http_input_stream_got_headers (SoupMessage *msg, gpointer stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ /* If the status is unsuccessful, we just ignore the signal and let
+ * libsoup keep going (eventually either it will requeue the request
+ * (after handling authentication/redirection), or else the
+ * "finished" handler will run).
+ */
+ if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
+ return;
+
+ priv->got_headers = TRUE;
+ if (!priv->caller_buffer) {
+ /* Not ready to read the body yet */
+ soup_session_pause_message (priv->session, msg);
+ }
+
+ if (priv->got_headers_cb)
+ priv->got_headers_cb (stream);
+}
+
+static void
+soup_http_input_stream_got_chunk (SoupMessage *msg, SoupBuffer *chunk_buffer,
+ gpointer stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+ const gchar *chunk = chunk_buffer->data;
+ gsize chunk_size = chunk_buffer->length;
+
+ /* We only pay attention to the chunk if it's part of a successful
+ * response.
+ */
+ if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code))
+ return;
+
+ /* Sanity check */
+ if (priv->caller_bufsize == 0 || priv->leftover_bufsize != 0)
+ g_warning ("soup_http_input_stream_got_chunk called again before previous chunk was processed");
+
+ /* Copy what we can into priv->caller_buffer */
+ if (priv->caller_bufsize - priv->caller_nread > 0) {
+ gsize nread = MIN (chunk_size, priv->caller_bufsize - priv->caller_nread);
+
+ memcpy (priv->caller_buffer + priv->caller_nread, chunk, nread);
+ priv->caller_nread += nread;
+ priv->offset += nread;
+ chunk += nread;
+ chunk_size -= nread;
+ }
+
+ if (chunk_size > 0) {
+ /* Copy the rest into priv->leftover_buffer. If
+ * there's already some data there, realloc and
+ * append. Otherwise just copy.
+ */
+ if (priv->leftover_bufsize) {
+ priv->leftover_buffer = g_realloc (priv->leftover_buffer,
+ priv->leftover_bufsize + chunk_size);
+ memcpy (priv->leftover_buffer + priv->leftover_bufsize,
+ chunk, chunk_size);
+ priv->leftover_bufsize += chunk_size;
+ } else {
+ priv->leftover_bufsize = chunk_size;
+ priv->leftover_buffer = g_memdup (chunk, chunk_size);
+ priv->leftover_offset = 0;
+ }
+ }
+
+ soup_session_pause_message (priv->session, msg);
+ if (priv->got_chunk_cb)
+ priv->got_chunk_cb (stream);
+}
+
+static void
+soup_http_input_stream_finished (SoupMessage *msg, gpointer stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ priv->finished = TRUE;
+
+ if (priv->finished_cb)
+ priv->finished_cb (stream);
+}
+
+static gboolean
+soup_http_input_stream_cancelled (GIOChannel *chan, GIOCondition condition,
+ gpointer stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ priv->cancel_watch = NULL;
+
+ soup_session_pause_message (priv->session, priv->msg);
+ if (priv->cancelled_cb)
+ priv->cancelled_cb (stream);
+
+ return FALSE;
+}
+
+static void
+soup_http_input_stream_prepare_for_io (GInputStream *stream,
+ GCancellable *cancellable,
+ guchar *buffer,
+ gsize count)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+ int cancel_fd;
+
+ priv->cancellable = cancellable;
+ cancel_fd = g_cancellable_get_fd (cancellable);
+ if (cancel_fd != -1) {
+ GIOChannel *chan = g_io_channel_unix_new (cancel_fd);
+ priv->cancel_watch = soup_add_io_watch (priv->async_context, chan,
+ G_IO_IN | G_IO_ERR | G_IO_HUP,
+ soup_http_input_stream_cancelled,
+ stream);
+ g_io_channel_unref (chan);
+ }
+
+ priv->caller_buffer = buffer;
+ priv->caller_bufsize = count;
+ priv->caller_nread = 0;
+
+ if (priv->got_headers)
+ soup_session_unpause_message (priv->session, priv->msg);
+}
+
+static void
+soup_http_input_stream_done_io (GInputStream *stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ if (priv->cancel_watch) {
+ g_source_destroy (priv->cancel_watch);
+ priv->cancel_watch = NULL;
+ g_cancellable_release_fd (priv->cancellable);
+ }
+ priv->cancellable = NULL;
+
+ priv->caller_buffer = NULL;
+ priv->caller_bufsize = 0;
+}
+
+static gboolean
+set_error_if_http_failed (SoupMessage *msg, GError **error)
+{
+ if (!SOUP_STATUS_IS_SUCCESSFUL (msg->status_code)) {
+ g_set_error_literal (error, SOUP_HTTP_ERROR,
+ msg->status_code, msg->reason_phrase);
+ return TRUE;
+ }
+ return FALSE;
+}
+
+static gsize
+read_from_leftover (SoupHTTPInputStreamPrivate *priv,
+ gpointer buffer, gsize bufsize)
+{
+ gsize nread;
+
+ if (priv->leftover_bufsize - priv->leftover_offset <= bufsize) {
+ nread = priv->leftover_bufsize - priv->leftover_offset;
+ memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);
+
+ g_free (priv->leftover_buffer);
+ priv->leftover_buffer = NULL;
+ priv->leftover_bufsize = priv->leftover_offset = 0;
+ } else {
+ nread = bufsize;
+ memcpy (buffer, priv->leftover_buffer + priv->leftover_offset, nread);
+ priv->leftover_offset += nread;
+ }
+
+ priv->offset += nread;
+ return nread;
+}
+
+/* This does the work of soup_http_input_stream_send(), assuming that the
+ * GInputStream pending flag has already been set. It is also used by
+ * soup_http_input_stream_send_async() in some circumstances.
+ */
+static gboolean
+soup_http_input_stream_send_internal (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
+ while (!priv->finished && !priv->got_headers &&
+ !g_cancellable_is_cancelled (cancellable))
+ g_main_context_iteration (priv->async_context, TRUE);
+ soup_http_input_stream_done_io (stream);
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
+ else if (set_error_if_http_failed (priv->msg, error))
+ return FALSE;
+ return TRUE;
+}
+
+/**
+ * soup_http_input_stream_send:
+ * @httpstream: a #SoupHTTPInputStream
+ * @cancellable: optional #GCancellable object, %NULL to ignore.
+ * @error: location to store the error occuring, or %NULL to ignore
+ *
+ * Synchronously sends the HTTP request associated with @stream, and
+ * reads the response headers. Call this after soup_http_input_stream_new()
+ * and before the first g_input_stream_read() if you want to check the
+ * HTTP status code before you start reading.
+ *
+ * Return value: %TRUE if msg has a successful (2xx) status, %FALSE if
+ * not.
+ **/
+gboolean
+soup_http_input_stream_send (SoupHTTPInputStream *httpstream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GInputStream *istream = (GInputStream *)httpstream;
+ gboolean result;
+
+ g_return_val_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream), FALSE);
+
+ if (!g_input_stream_set_pending (istream, error))
+ return FALSE;
+ result = soup_http_input_stream_send_internal (istream, cancellable, error);
+ g_input_stream_clear_pending (istream);
+
+ return result;
+}
+
+static gssize
+soup_http_input_stream_read (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ if (priv->finished)
+ return 0;
+
+ /* If there is data leftover from a previous read, return it. */
+ if (priv->leftover_bufsize)
+ return read_from_leftover (priv, buffer, count);
+
+ /* No leftover data, accept one chunk from the network */
+ soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
+ while (!priv->finished && priv->caller_nread == 0 &&
+ !g_cancellable_is_cancelled (cancellable))
+ g_main_context_iteration (priv->async_context, TRUE);
+ soup_http_input_stream_done_io (stream);
+
+ if (priv->caller_nread > 0)
+ return priv->caller_nread;
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return -1;
+ else if (set_error_if_http_failed (priv->msg, error))
+ return -1;
+ else
+ return 0;
+}
+
+static gboolean
+soup_http_input_stream_close (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ if (!priv->finished)
+ soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
+
+ return TRUE;
+}
+
+static void
+wrapper_callback (GObject *source_object, GAsyncResult *res,
+ gpointer user_data)
+{
+ GInputStream *stream = G_INPUT_STREAM (source_object);
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ g_input_stream_clear_pending (stream);
+ if (priv->outstanding_callback)
+ (*priv->outstanding_callback) (source_object, res, user_data);
+ priv->outstanding_callback = NULL;
+ g_object_unref (stream);
+}
+
+static void
+send_async_thread (GSimpleAsyncResult *res,
+ GObject *object,
+ GCancellable *cancellable)
+{
+ GError *error = NULL;
+ gboolean success;
+
+ success = soup_http_input_stream_send_internal (G_INPUT_STREAM (object),
+ cancellable, &error);
+ g_simple_async_result_set_op_res_gboolean (res, success);
+ if (error) {
+ g_simple_async_result_set_from_error (res, error);
+ g_error_free (error);
+ }
+}
+
+static void
+soup_http_input_stream_send_async_in_thread (GInputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *res;
+
+ res = g_simple_async_result_new (G_OBJECT (stream), callback, user_data,
+ soup_http_input_stream_send_async_in_thread);
+ g_simple_async_result_run_in_thread (res, send_async_thread,
+ io_priority, cancellable);
+ g_object_unref (res);
+}
+
+static void
+send_async_finished (GInputStream *stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+ GSimpleAsyncResult *result;
+ GError *error = NULL;
+
+ if (!g_cancellable_set_error_if_cancelled (priv->cancellable, &error))
+ set_error_if_http_failed (priv->msg, &error);
+
+ priv->got_headers_cb = NULL;
+ priv->finished_cb = NULL;
+ soup_http_input_stream_done_io (stream);
+
+ result = priv->result;
+ priv->result = NULL;
+
+ g_simple_async_result_set_op_res_gboolean (result, error == NULL);
+ if (error) {
+ g_simple_async_result_set_from_error (result, error);
+ g_error_free (error);
+ }
+ g_simple_async_result_complete (result);
+}
+
+static void
+soup_http_input_stream_send_async_internal (GInputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+
+ g_object_ref (stream);
+ priv->outstanding_callback = callback;
+
+ /* If the session uses the default GMainContext, then we can do
+ * async I/O directly. But if it has its own main context, it's
+ * easier to just run it in another thread.
+ */
+ if (soup_session_get_async_context (priv->session)) {
+ soup_http_input_stream_send_async_in_thread (stream, io_priority, cancellable,
+ wrapper_callback, user_data);
+ return;
+ }
+
+ priv->got_headers_cb = send_async_finished;
+ priv->finished_cb = send_async_finished;
+
+ soup_http_input_stream_prepare_for_io (stream, cancellable, NULL, 0);
+ priv->result = g_simple_async_result_new (G_OBJECT (stream),
+ wrapper_callback, user_data,
+ soup_http_input_stream_send_async);
+}
+
+/**
+ * soup_http_input_stream_send_async:
+ * @httpstream: a #SoupHTTPInputStream
+ * @io_priority: the io priority of the request.
+ * @cancellable: optional #GCancellable object, %NULL to ignore.
+ * @callback: callback to call when the request is satisfied
+ * @user_data: the data to pass to callback function
+ *
+ * Asynchronously sends the HTTP request associated with @stream, and
+ * reads the response headers. Call this after soup_http_input_stream_new()
+ * and before the first g_input_stream_read_async() if you want to
+ * check the HTTP status code before you start reading.
+ **/
+void
+soup_http_input_stream_send_async (SoupHTTPInputStream *httpstream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GInputStream *istream = (GInputStream *)httpstream;
+ GError *error = NULL;
+
+ g_return_if_fail (SOUP_IS_HTTP_INPUT_STREAM (httpstream));
+
+ if (!g_input_stream_set_pending (istream, &error)) {
+ g_simple_async_report_gerror_in_idle (G_OBJECT (httpstream),
+ callback,
+ user_data,
+ error);
+ g_error_free (error);
+ return;
+ }
+ soup_http_input_stream_send_async_internal (istream, io_priority, cancellable,
+ callback, user_data);
+}
+
+/**
+ * soup_http_input_stream_send_finish:
+ * @httpstream: a #SoupHTTPInputStream
+ * @result: a #GAsyncResult.
+ * @error: a #GError location to store the error occuring, or %NULL to
+ * ignore.
+ *
+ * Finishes a soup_http_input_stream_send_async() operation.
+ *
+ * Return value: %TRUE if the message was sent successfully and
+ * received a successful status code, %FALSE if not.
+ **/
+gboolean
+soup_http_input_stream_send_finish (SoupHTTPInputStream *httpstream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), FALSE);
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+
+ g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_send_async, FALSE);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+
+ return g_simple_async_result_get_op_res_gboolean (simple);
+}
+
+static void
+read_async_done (GInputStream *stream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+ GSimpleAsyncResult *result;
+ GError *error = NULL;
+
+ result = priv->result;
+ priv->result = NULL;
+
+ if (g_cancellable_set_error_if_cancelled (priv->cancellable, &error) ||
+ set_error_if_http_failed (priv->msg, &error)) {
+ g_simple_async_result_set_from_error (result, error);
+ g_error_free (error);
+ } else
+ g_simple_async_result_set_op_res_gssize (result, priv->caller_nread);
+
+ priv->got_chunk_cb = NULL;
+ priv->finished_cb = NULL;
+ priv->cancelled_cb = NULL;
+ soup_http_input_stream_done_io (stream);
+
+ g_simple_async_result_complete (result);
+ g_object_unref (result);
+}
+
+static void
+soup_http_input_stream_read_async (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (stream);
+ GSimpleAsyncResult *result;
+
+ /* If the session uses the default GMainContext, then we can do
+ * async I/O directly. But if it has its own main context, we fall
+ * back to the async-via-sync-in-another-thread implementation.
+ */
+ if (soup_session_get_async_context (priv->session)) {
+ G_INPUT_STREAM_CLASS (soup_http_input_stream_parent_class)->
+ read_async (stream, buffer, count, io_priority,
+ cancellable, callback, user_data);
+ return;
+ }
+
+ result = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ soup_http_input_stream_read_async);
+
+ if (priv->finished) {
+ g_simple_async_result_set_op_res_gssize (result, 0);
+ g_simple_async_result_complete_in_idle (result);
+ g_object_unref (result);
+ return;
+ }
+
+ if (priv->leftover_bufsize) {
+ gsize nread = read_from_leftover (priv, buffer, count);
+ g_simple_async_result_set_op_res_gssize (result, nread);
+ g_simple_async_result_complete_in_idle (result);
+ g_object_unref (result);
+ return;
+ }
+
+ priv->result = result;
+
+ priv->got_chunk_cb = read_async_done;
+ priv->finished_cb = read_async_done;
+ priv->cancelled_cb = read_async_done;
+ soup_http_input_stream_prepare_for_io (stream, cancellable, buffer, count);
+}
+
+static gssize
+soup_http_input_stream_read_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (G_IS_SIMPLE_ASYNC_RESULT (result), -1);
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ g_return_val_if_fail (g_simple_async_result_get_source_tag (simple) == soup_http_input_stream_read_async, -1);
+
+ return g_simple_async_result_get_op_res_gssize (simple);
+}
+
+static void
+soup_http_input_stream_close_async (GInputStream *stream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ GSimpleAsyncResult *result;
+ gboolean success;
+ GError *error = NULL;
+
+ result = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ soup_http_input_stream_close_async);
+ success = soup_http_input_stream_close (stream, cancellable, &error);
+ g_simple_async_result_set_op_res_gboolean (result, success);
+ if (error) {
+ g_simple_async_result_set_from_error (result, error);
+ g_error_free (error);
+ }
+
+ g_simple_async_result_complete_in_idle (result);
+ g_object_unref (result);
+}
+
+static gboolean
+soup_http_input_stream_close_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ /* Failures handled in generic close_finish code */
+ return TRUE;
+}
+
+static goffset
+soup_http_input_stream_tell (GSeekable *seekable)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (seekable);
+
+ return priv->offset;
+}
+
+static gboolean
+soup_http_input_stream_can_seek (GSeekable *seekable)
+{
+ return TRUE;
+}
+
+extern void soup_message_io_cleanup (SoupMessage *msg);
+
+static gboolean
+soup_http_input_stream_seek (GSeekable *seekable,
+ goffset offset,
+ GSeekType type,
+ GCancellable *cancellable,
+ GError **error)
+{
+ GInputStream *stream = G_INPUT_STREAM (seekable);
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (seekable);
+ char *range;
+
+ if (type == G_SEEK_END) {
+ /* FIXME: we could send "bytes=-offset", but unless we
+ * know the Content-Length, we wouldn't be able to
+ * answer a tell() properly. We could find the
+ * Content-Length by doing a HEAD...
+ */
+
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ "G_SEEK_END not currently supported");
+ return FALSE;
+ }
+
+ if (!g_input_stream_set_pending (stream, error))
+ return FALSE;
+
+ soup_session_cancel_message (priv->session, priv->msg, SOUP_STATUS_CANCELLED);
+ soup_message_io_cleanup (priv->msg);
+
+ switch (type) {
+ case G_SEEK_CUR:
+ offset += priv->offset;
+ /* fall through */
+
+ case G_SEEK_SET:
+ range = g_strdup_printf ("bytes=%"G_GUINT64_FORMAT"-", (guint64)offset);
+ priv->offset = offset;
+ break;
+
+ case G_SEEK_END:
+ range = NULL; /* keep compilers happy */
+ g_return_val_if_reached (FALSE);
+ break;
+
+ default:
+ g_return_val_if_reached (FALSE);
+ }
+
+ soup_message_headers_remove (priv->msg->request_headers, "Range");
+ soup_message_headers_append (priv->msg->request_headers, "Range", range);
+ g_free (range);
+
+ soup_http_input_stream_queue_message (SOUP_HTTP_INPUT_STREAM (stream));
+
+ g_input_stream_clear_pending (stream);
+ return TRUE;
+}
+
+static gboolean
+soup_http_input_stream_can_truncate (GSeekable *seekable)
+{
+ return FALSE;
+}
+
+static gboolean
+soup_http_input_stream_truncate (GSeekable *seekable,
+ goffset offset,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_set_error_literal (error, G_IO_ERROR, G_IO_ERROR_NOT_SUPPORTED,
+ "Truncate not allowed on input stream");
+ return FALSE;
+}
+
+SoupMessage *
+soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream)
+{
+ SoupHTTPInputStreamPrivate *priv = SOUP_HTTP_INPUT_STREAM_GET_PRIVATE (httpstream);
+ return priv->msg ? g_object_ref (priv->msg) : NULL;
+}
diff --git a/libsoup/soup-http-input-stream.h b/libsoup/soup-http-input-stream.h
new file mode 100644
index 0000000..4052196
--- /dev/null
+++ b/libsoup/soup-http-input-stream.h
@@ -0,0 +1,77 @@
+/* Copyright (C) 2006, 2007, 2009 Red Hat, Inc.
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General
+ * Public License along with this library; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place, Suite 330,
+ * Boston, MA 02111-1307, USA.
+ */
+
+#ifndef __SOUP_HTTP_INPUT_STREAM_H__
+#define __SOUP_HTTP_INPUT_STREAM_H__
+
+#include <gio/gio.h>
+#include <libsoup/soup-types.h>
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_HTTP_INPUT_STREAM (soup_http_input_stream_get_type ())
+#define SOUP_HTTP_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStream))
+#define SOUP_HTTP_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_CAST((k), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
+#define SOUP_IS_HTTP_INPUT_STREAM(o) (G_TYPE_CHECK_INSTANCE_TYPE ((o), SOUP_TYPE_HTTP_INPUT_STREAM))
+#define SOUP_IS_HTTP_INPUT_STREAM_CLASS(k) (G_TYPE_CHECK_CLASS_TYPE ((k), SOUP_TYPE_HTTP_INPUT_STREAM))
+#define SOUP_HTTP_INPUT_STREAM_GET_CLASS(o) (G_TYPE_INSTANCE_GET_CLASS ((o), SOUP_TYPE_HTTP_INPUT_STREAM, SoupHTTPInputStreamClass))
+
+typedef struct SoupHTTPInputStream SoupHTTPInputStream;
+typedef struct SoupHTTPInputStreamClass SoupHTTPInputStreamClass;
+
+struct SoupHTTPInputStream
+{
+ GInputStream parent;
+
+};
+
+struct SoupHTTPInputStreamClass
+{
+ GInputStreamClass parent_class;
+
+ /* Padding for future expansion */
+ void (*_g_reserved1) (void);
+ void (*_g_reserved2) (void);
+ void (*_g_reserved3) (void);
+ void (*_g_reserved4) (void);
+ void (*_g_reserved5) (void);
+};
+
+GType soup_http_input_stream_get_type (void) G_GNUC_CONST;
+
+SoupHTTPInputStream *soup_http_input_stream_new (SoupSession *session,
+ SoupMessage *msg);
+
+gboolean soup_http_input_stream_send (SoupHTTPInputStream *httpstream,
+ GCancellable *cancellable,
+ GError **error);
+
+void soup_http_input_stream_send_async (SoupHTTPInputStream *httpstream,
+ int io_priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+gboolean soup_http_input_stream_send_finish (SoupHTTPInputStream *httpstream,
+ GAsyncResult *result,
+ GError **error);
+
+SoupMessage *soup_http_input_stream_get_message (SoupHTTPInputStream *httpstream);
+
+G_END_DECLS
+
+#endif /* __SOUP_HTTP_INPUT_STREAM_H__ */
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index ee6221d..f5ede4b 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -41,6 +41,8 @@ typedef struct {
GSList *disabled_features;
GSList *decoders;
+
+ SoupSession *session;
} SoupMessagePrivate;
#define SOUP_MESSAGE_GET_PRIVATE(o) (G_TYPE_INSTANCE_GET_PRIVATE ((o), SOUP_TYPE_MESSAGE, SoupMessagePrivate))
diff --git a/libsoup/soup-message.c b/libsoup/soup-message.c
index 93c2522..16a27ce 100644
--- a/libsoup/soup-message.c
+++ b/libsoup/soup-message.c
@@ -11,10 +11,13 @@
#include "soup-address.h"
#include "soup-auth.h"
#include "soup-enum-types.h"
+#include "soup-http-input-stream.h"
#include "soup-marshal.h"
#include "soup-message.h"
#include "soup-message-private.h"
#include "soup-misc.h"
+#include "soup-request.h"
+#include "soup-session.h"
#include "soup-uri.h"
/**
@@ -86,7 +89,11 @@
* @request_body as appropriate, passing %FALSE.
**/
-G_DEFINE_TYPE (SoupMessage, soup_message, G_TYPE_OBJECT)
+static void soup_message_request_interface_init (SoupRequestInterface *request_interface);
+
+G_DEFINE_TYPE_WITH_CODE (SoupMessage, soup_message, G_TYPE_OBJECT,
+ G_IMPLEMENT_INTERFACE (SOUP_TYPE_REQUEST,
+ soup_message_request_interface_init))
enum {
WROTE_INFORMATIONAL,
@@ -119,6 +126,7 @@ enum {
PROP_SERVER_SIDE,
PROP_STATUS_CODE,
PROP_REASON_PHRASE,
+ PROP_SESSION,
LAST_PROP
};
@@ -132,6 +140,17 @@ static void set_property (GObject *object, guint prop_id,
static void get_property (GObject *object, guint prop_id,
GValue *value, GParamSpec *pspec);
+static GInputStream *soup_message_send (SoupRequest *request,
+ GCancellable *cancellable,
+ GError **error);
+static void soup_message_send_async (SoupRequest *request,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+static GInputStream *soup_message_send_finish (SoupRequest *request,
+ GAsyncResult *result,
+ GError **error);
+
static void
soup_message_init (SoupMessage *msg)
{
@@ -495,7 +514,7 @@ soup_message_class_init (SoupMessageClass *message_class)
"Method",
"The message's HTTP method",
SOUP_METHOD_GET,
- G_PARAM_READWRITE));
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT));
/**
* SOUP_MESSAGE_URI:
*
@@ -576,6 +595,22 @@ soup_message_class_init (SoupMessageClass *message_class)
"The HTTP response reason phrase",
NULL,
G_PARAM_READWRITE));
+
+ g_object_class_install_property (
+ object_class, PROP_SESSION,
+ g_param_spec_object ("session",
+ "Session",
+ "The message's session",
+ SOUP_TYPE_SESSION,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_message_request_interface_init (SoupRequestInterface *request_interface)
+{
+ request_interface->send = soup_message_send;
+ request_interface->send_async = soup_message_send_async;
+ request_interface->send_finish = soup_message_send_finish;
}
static void
@@ -612,6 +647,11 @@ set_property (GObject *object, guint prop_id,
soup_message_set_status_full (msg, msg->status_code,
g_value_get_string (value));
break;
+ case PROP_SESSION:
+ if (priv->session)
+ g_object_unref (priv->session);
+ priv->session = g_value_dup_object (value);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -647,6 +687,9 @@ get_property (GObject *object, guint prop_id,
case PROP_REASON_PHRASE:
g_value_set_string (value, msg->reason_phrase);
break;
+ case PROP_SESSION:
+ g_value_set_object (value, priv->session);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
@@ -1664,3 +1707,73 @@ soup_message_disables_feature (SoupMessage *msg, gpointer feature)
}
return FALSE;
}
+
+/* SoupRequest interface implementation */
+
+static GInputStream *
+soup_message_send (SoupRequest *request,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupMessage *msg = SOUP_MESSAGE (request);
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupHTTPInputStream *httpstream;
+
+ httpstream = soup_http_input_stream_new (priv->session, msg);
+ if (!soup_http_input_stream_send (httpstream, cancellable, error)) {
+ g_object_unref (httpstream);
+ return NULL;
+ }
+ return (GInputStream *)httpstream;
+}
+
+static void
+sent_async (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ SoupHTTPInputStream *httpstream = SOUP_HTTP_INPUT_STREAM (source);
+ GSimpleAsyncResult *simple = user_data;
+ GError *error = NULL;
+
+ if (soup_http_input_stream_send_finish (httpstream, result, &error)) {
+ g_simple_async_result_set_op_res_gpointer (simple, httpstream, g_object_unref);
+ } else {
+ g_simple_async_result_set_from_error (simple, error);
+ g_error_free (error);
+ g_object_unref (httpstream);
+ }
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+}
+
+static void
+soup_message_send_async (SoupRequest *request,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupMessage *msg = SOUP_MESSAGE (request);
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupHTTPInputStream *httpstream;
+ GSimpleAsyncResult *simple;
+
+ simple = g_simple_async_result_new (G_OBJECT (msg), callback, user_data,
+ soup_message_send_async);
+ httpstream = soup_http_input_stream_new (priv->session, msg);
+ soup_http_input_stream_send_async (httpstream, G_PRIORITY_DEFAULT,
+ cancellable, sent_async, simple);
+}
+
+static GInputStream *
+soup_message_send_finish (SoupRequest *request,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (request), soup_message_send_async), NULL);
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ if (g_simple_async_result_propagate_error (simple, error))
+ return NULL;
+ return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));
+}
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index 97c6d35..0cec47b 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -2000,12 +2000,10 @@ init_request_types (SoupSessionPrivate *priv)
GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_FILE));
g_hash_table_insert (priv->request_types, "data",
GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_DATA));
-#if 0
g_hash_table_insert (priv->request_types, "http",
- GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_HTTP));
+ GSIZE_TO_POINTER (SOUP_TYPE_MESSAGE));
g_hash_table_insert (priv->request_types, "https",
- GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_HTTP));
-#endif
+ GSIZE_TO_POINTER (SOUP_TYPE_MESSAGE));
g_hash_table_insert (priv->request_types, "ftp",
GSIZE_TO_POINTER (SOUP_TYPE_REQUEST_FTP));
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]