[libsoup] SoupHTTPRequest: O brave new world!
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup] SoupHTTPRequest: O brave new world!
- Date: Wed, 18 Apr 2012 14:58:22 +0000 (UTC)
commit 9effb5ca942412ecde9242c745f2df6da80853a3
Author: Dan Winship <danw gnome org>
Date: Thu Jan 26 16:25:57 2012 -0500
SoupHTTPRequest: O brave new world!
Kill SoupHTTPInputStream, and have SoupHTTPRequest return the
message's body_istream directly (with a little help from SoupSession
and its subclasses). SoupHTTPRequest works synchronously now as well
(though it's still the case that async only works with
SoupSessionAsync and sync only works with SoupSessionSync).
https://bugzilla.gnome.org/show_bug.cgi?id=591739
libsoup/Makefile.am | 4 +-
libsoup/soup-body-input-stream.c | 28 ++
libsoup/soup-client-input-stream.c | 280 +++++++++++++
libsoup/soup-client-input-stream.h | 46 ++
libsoup/soup-http-input-stream.c | 793 ------------------------------------
libsoup/soup-http-input-stream.h | 79 ----
libsoup/soup-message-io.c | 310 +++++++++++---
libsoup/soup-message-private.h | 19 +
libsoup/soup-message-queue.c | 2 +
libsoup/soup-message-queue.h | 2 +
libsoup/soup-request-http.c | 71 +++-
libsoup/soup-session-async.c | 239 +++++++++++-
libsoup/soup-session-private.h | 14 +
libsoup/soup-session-sync.c | 117 ++++++-
libsoup/soup-session.c | 7 +-
po/POTFILES.in | 1 +
16 files changed, 1047 insertions(+), 965 deletions(-)
---
diff --git a/libsoup/Makefile.am b/libsoup/Makefile.am
index ad85b07..8fd8ee5 100644
--- a/libsoup/Makefile.am
+++ b/libsoup/Makefile.am
@@ -101,6 +101,8 @@ libsoup_2_4_la_SOURCES = \
soup-body-output-stream.c \
soup-cache.c \
soup-cache-private.h \
+ soup-client-input-stream.h \
+ soup-client-input-stream.c \
soup-connection.h \
soup-connection.c \
soup-content-decoder.c \
@@ -121,8 +123,6 @@ libsoup_2_4_la_SOURCES = \
soup-filter-input-stream.h \
soup-form.c \
soup-headers.c \
- soup-http-input-stream.h \
- soup-http-input-stream.c \
soup-logger.c \
soup-marshal.h \
soup-marshal.c \
diff --git a/libsoup/soup-body-input-stream.c b/libsoup/soup-body-input-stream.c
index 2c5d16e..a635bd5 100644
--- a/libsoup/soup-body-input-stream.c
+++ b/libsoup/soup-body-input-stream.c
@@ -18,6 +18,7 @@
#include "soup-body-input-stream.h"
#include "soup-enum-types.h"
#include "soup-filter-input-stream.h"
+#include "soup-marshal.h"
#include "soup-message-headers.h"
typedef enum {
@@ -38,6 +39,13 @@ struct _SoupBodyInputStreamPrivate {
};
enum {
+ CLOSED,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
PROP_0,
PROP_ENCODING,
@@ -270,6 +278,16 @@ soup_body_input_stream_read_fn (GInputStream *stream,
}
static gboolean
+soup_body_input_stream_close_fn (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_signal_emit (stream, signals[CLOSED], 0);
+
+ return G_INPUT_STREAM_CLASS (soup_body_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+static gboolean
soup_body_input_stream_is_readable (GPollableInputStream *stream)
{
SoupBodyInputStream *bistream = SOUP_BODY_INPUT_STREAM (stream);
@@ -321,6 +339,16 @@ soup_body_input_stream_class_init (SoupBodyInputStreamClass *stream_class)
object_class->get_property = get_property;
input_stream_class->read_fn = soup_body_input_stream_read_fn;
+ input_stream_class->close_fn = soup_body_input_stream_close_fn;
+
+ signals[CLOSED] =
+ g_signal_new ("closed",
+ G_OBJECT_CLASS_TYPE (object_class),
+ G_SIGNAL_RUN_LAST,
+ 0,
+ NULL, NULL,
+ _soup_marshal_NONE__NONE,
+ G_TYPE_NONE, 0);
g_object_class_install_property (
object_class, PROP_ENCODING,
diff --git a/libsoup/soup-client-input-stream.c b/libsoup/soup-client-input-stream.c
new file mode 100644
index 0000000..8d1a2ea
--- /dev/null
+++ b/libsoup/soup-client-input-stream.c
@@ -0,0 +1,280 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * soup-client-input-stream.c
+ *
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include <config.h>
+#endif
+
+#include <gio/gio.h>
+
+#include "soup-client-input-stream.h"
+#include "soup-marshal.h"
+#include "soup-message.h"
+#include "soup-message-private.h"
+
+struct _SoupClientInputStreamPrivate {
+ SoupMessage *msg;
+};
+
+enum {
+ EOF,
+ LAST_SIGNAL
+};
+
+static guint signals[LAST_SIGNAL] = { 0 };
+
+enum {
+ PROP_0,
+
+ PROP_MESSAGE
+};
+
+static GPollableInputStreamInterface *soup_client_input_stream_parent_pollable_interface;
+static void soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface, gpointer interface_data);
+
+G_DEFINE_TYPE_WITH_CODE (SoupClientInputStream, soup_client_input_stream, SOUP_TYPE_FILTER_INPUT_STREAM,
+ G_IMPLEMENT_INTERFACE (G_TYPE_POLLABLE_INPUT_STREAM,
+ soup_client_input_stream_pollable_init))
+
+static void
+soup_client_input_stream_init (SoupClientInputStream *stream)
+{
+ stream->priv = G_TYPE_INSTANCE_GET_PRIVATE (stream,
+ SOUP_TYPE_CLIENT_INPUT_STREAM,
+ SoupClientInputStreamPrivate);
+}
+
+static void
+set_property (GObject *object, guint prop_id,
+ const GValue *value, GParamSpec *pspec)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_MESSAGE:
+ cistream->priv->msg = g_value_dup_object (value);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static void
+get_property (GObject *object, guint prop_id,
+ GValue *value, GParamSpec *pspec)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (object);
+
+ switch (prop_id) {
+ case PROP_MESSAGE:
+ g_value_set_object (value, cistream->priv->msg);
+ break;
+ default:
+ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
+ break;
+ }
+}
+
+static gssize
+soup_client_input_stream_read_fn (GInputStream *stream,
+ void *buffer,
+ gsize count,
+ GCancellable *cancellable,
+ GError **error)
+{
+ gssize nread;
+
+ nread = G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ read_fn (stream, buffer, count, cancellable, error);
+
+ if (nread == 0)
+ g_signal_emit (stream, signals[EOF], 0);
+
+ return nread;
+}
+
+static gssize
+soup_client_input_stream_read_nonblocking (GPollableInputStream *stream,
+ void *buffer,
+ gsize count,
+ GError **error)
+{
+ gssize nread;
+
+ nread = soup_client_input_stream_parent_pollable_interface->
+ read_nonblocking (stream, buffer, count, error);
+
+ if (nread == 0)
+ g_signal_emit (stream, signals[EOF], 0);
+
+ return nread;
+}
+
+static gboolean
+soup_client_input_stream_close_fn (GInputStream *stream,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupClientInputStream *cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+
+ if (!soup_message_io_run_until_finish (cistream->priv->msg,
+ cancellable, error))
+ return FALSE;
+
+ return G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->close_fn (stream, cancellable, error);
+}
+
+typedef struct {
+ SoupClientInputStream *cistream;
+ gint priority;
+ GCancellable *cancellable;
+ GSimpleAsyncResult *result;
+} CloseAsyncData;
+
+static void
+close_async_data_free (CloseAsyncData *cad)
+{
+ if (cad->cancellable)
+ g_object_unref (cad->cancellable);
+ g_object_unref (cad->result);
+ g_slice_free (CloseAsyncData, cad);
+}
+
+static void
+base_stream_closed (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ CloseAsyncData *cad = user_data;
+ GError *error = NULL;
+
+ if (G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ close_finish (G_INPUT_STREAM (cad->cistream), result, &error))
+ g_simple_async_result_set_op_res_gboolean (cad->result, TRUE);
+ else
+ g_simple_async_result_take_error (cad->result, error);
+
+ g_simple_async_result_complete_in_idle (cad->result);
+ close_async_data_free (cad);
+}
+
+static gboolean
+close_async_ready (SoupMessage *msg, gpointer user_data)
+{
+ CloseAsyncData *cad = user_data;
+ GError *error = NULL;
+
+ if (soup_message_io_run_until_finish (cad->cistream->priv->msg,
+ cad->cancellable, &error)) {
+ G_INPUT_STREAM_CLASS (soup_client_input_stream_parent_class)->
+ close_async (G_INPUT_STREAM (cad->cistream),
+ cad->priority,
+ cad->cancellable,
+ base_stream_closed,
+ cad);
+ return FALSE;
+ } else if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_simple_async_result_take_error (cad->result, error);
+ g_simple_async_result_complete_in_idle (cad->result);
+ close_async_data_free (cad);
+ return FALSE;
+ }
+
+ return TRUE;
+}
+
+static void
+soup_client_input_stream_close_async (GInputStream *stream,
+ gint priority,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ CloseAsyncData *cad;
+ GSource *source;
+
+ cad = g_slice_new (CloseAsyncData);
+ cad->cistream = SOUP_CLIENT_INPUT_STREAM (stream);
+ cad->result = g_simple_async_result_new (G_OBJECT (stream),
+ callback, user_data,
+ soup_client_input_stream_close_async);
+ cad->priority = priority;
+ cad->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+ source = soup_message_io_get_source (cad->cistream->priv->msg,
+ cancellable,
+ close_async_ready, cad);
+ g_source_set_priority (source, priority);
+ g_source_attach (source, g_main_context_get_thread_default ());
+ g_source_unref (source);
+}
+
+static gboolean
+soup_client_input_stream_close_finish (GInputStream *stream,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple = G_SIMPLE_ASYNC_RESULT (result);
+
+ if (g_simple_async_result_propagate_error (simple, error))
+ return FALSE;
+ else
+ return g_simple_async_result_get_op_res_gboolean (simple);
+}
+
+static void
+soup_client_input_stream_class_init (SoupClientInputStreamClass *stream_class)
+{
+ GObjectClass *object_class = G_OBJECT_CLASS (stream_class);
+ GInputStreamClass *input_stream_class = G_INPUT_STREAM_CLASS (stream_class);
+
+ g_type_class_add_private (stream_class, sizeof (SoupClientInputStreamPrivate));
+
+ object_class->set_property = set_property;
+ object_class->get_property = get_property;
+
+ input_stream_class->read_fn = soup_client_input_stream_read_fn;
+ input_stream_class->close_fn = soup_client_input_stream_close_fn;
+ input_stream_class->close_async = soup_client_input_stream_close_async;
+ input_stream_class->close_finish = soup_client_input_stream_close_finish;
+
+ signals[EOF] =
+ g_signal_new ("eof",
+ G_OBJECT_CLASS_TYPE (object_class),
+ G_SIGNAL_RUN_LAST,
+ 0,
+ NULL, NULL,
+ _soup_marshal_NONE__NONE,
+ G_TYPE_NONE, 0);
+
+ g_object_class_install_property (
+ object_class, PROP_MESSAGE,
+ g_param_spec_object ("message",
+ "Message",
+ "Message",
+ SOUP_TYPE_MESSAGE,
+ G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY));
+}
+
+static void
+soup_client_input_stream_pollable_init (GPollableInputStreamInterface *pollable_interface,
+ gpointer interface_data)
+{
+ soup_client_input_stream_parent_pollable_interface =
+ g_type_interface_peek_parent (pollable_interface);
+
+ pollable_interface->read_nonblocking = soup_client_input_stream_read_nonblocking;
+}
+
+GInputStream *
+soup_client_input_stream_new (GInputStream *base_stream,
+ SoupMessage *msg)
+{
+ return g_object_new (SOUP_TYPE_CLIENT_INPUT_STREAM,
+ "base-stream", base_stream,
+ "message", msg,
+ NULL);
+}
diff --git a/libsoup/soup-client-input-stream.h b/libsoup/soup-client-input-stream.h
new file mode 100644
index 0000000..098c607
--- /dev/null
+++ b/libsoup/soup-client-input-stream.h
@@ -0,0 +1,46 @@
+/* -*- Mode: C; tab-width: 8; indent-tabs-mode: t; c-basic-offset: 8 -*- */
+/*
+ * Copyright 2010-2012 Red Hat, Inc.
+ */
+
+#ifndef SOUP_CLIENT_INPUT_STREAM_H
+#define SOUP_CLIENT_INPUT_STREAM_H 1
+
+#include "soup-types.h"
+#include "soup-filter-input-stream.h"
+
+G_BEGIN_DECLS
+
+#define SOUP_TYPE_CLIENT_INPUT_STREAM (soup_client_input_stream_get_type ())
+#define SOUP_CLIENT_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStream))
+#define SOUP_CLIENT_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+#define SOUP_IS_CLIENT_INPUT_STREAM(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_IS_CLIENT_INPUT_STREAM_CLASS(klass) (G_TYPE_CHECK_CLASS_TYPE ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM))
+#define SOUP_CLIENT_INPUT_STREAM_GET_CLASS(obj) (G_TYPE_INSTANCE_GET_CLASS ((obj), SOUP_TYPE_CLIENT_INPUT_STREAM, SoupClientInputStreamClass))
+
+typedef struct _SoupClientInputStreamPrivate SoupClientInputStreamPrivate;
+
+typedef struct {
+ SoupFilterInputStream parent;
+
+ SoupClientInputStreamPrivate *priv;
+} SoupClientInputStream;
+
+typedef struct {
+ SoupFilterInputStreamClass parent_class;
+
+ /* Padding for future expansion */
+ void (*_libsoup_reserved1) (void);
+ void (*_libsoup_reserved2) (void);
+ void (*_libsoup_reserved3) (void);
+ void (*_libsoup_reserved4) (void);
+} SoupClientInputStreamClass;
+
+GType soup_client_input_stream_get_type (void);
+
+GInputStream *soup_client_input_stream_new (GInputStream *base_stream,
+ SoupMessage *msg);
+
+G_END_DECLS
+
+#endif /* SOUP_CLIENT_INPUT_STREAM_H */
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index e71a8ad..4e5c35f 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -12,8 +12,11 @@
#include <stdlib.h>
#include <string.h>
+#include <glib/gi18n-lib.h>
+
#include "soup-body-input-stream.h"
#include "soup-body-output-stream.h"
+#include "soup-client-input-stream.h"
#include "soup-connection.h"
#include "soup-content-sniffer-stream.h"
#include "soup-converter-wrapper.h"
@@ -31,6 +34,7 @@ typedef enum {
typedef enum {
SOUP_MESSAGE_IO_STATE_NOT_STARTED,
+ SOUP_MESSAGE_IO_STATE_ANY = SOUP_MESSAGE_IO_STATE_NOT_STARTED,
SOUP_MESSAGE_IO_STATE_HEADERS,
SOUP_MESSAGE_IO_STATE_BLOCKING,
SOUP_MESSAGE_IO_STATE_BODY_START,
@@ -45,6 +49,9 @@ typedef enum {
(state != SOUP_MESSAGE_IO_STATE_NOT_STARTED && \
state != SOUP_MESSAGE_IO_STATE_BLOCKING && \
state != SOUP_MESSAGE_IO_STATE_DONE)
+#define SOUP_MESSAGE_IO_STATE_POLLABLE(state) \
+ (SOUP_MESSAGE_IO_STATE_ACTIVE (state) && \
+ state != SOUP_MESSAGE_IO_STATE_BODY_DONE)
typedef struct {
SoupMessageQueueItem *item;
@@ -136,11 +143,13 @@ soup_message_io_stop (SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
if (io->unpause_source) {
g_source_destroy (io->unpause_source);
+ g_source_unref (io->unpause_source);
io->unpause_source = NULL;
}
@@ -215,8 +224,12 @@ read_headers (SoupMessage *msg, GCancellable *cancellable, GError **error)
&got_lf,
cancellable, error);
io->read_header_buf->len = old_len + MAX (nread, 0);
- if (nread == 0)
- io_error (io->sock, msg, NULL);
+ if (nread == 0) {
+ soup_message_set_status (msg, SOUP_STATUS_MALFORMED);
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_PARTIAL_INPUT,
+ _("Connection terminated unexpectedly"));
+ }
if (nread <= 0)
return FALSE;
@@ -257,9 +270,10 @@ setup_body_istream (SoupMessage *msg)
GInputStream *filter;
GSList *d;
- io->body_istream = soup_body_input_stream_new (io->istream,
- io->read_encoding,
- io->read_length);
+ io->body_istream =
+ soup_body_input_stream_new (io->istream,
+ io->read_encoding,
+ io->read_length);
for (d = priv->decoders; d; d = d->next) {
decoder = d->data;
@@ -321,11 +335,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
gssize nwrote;
switch (io->write_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!io->write_buf->len) {
io->get_headers_cb (msg, io->write_buf,
@@ -416,6 +425,7 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
if (!io->write_chunk) {
io->write_chunk = soup_message_body_get_chunk (io->write_body, io->write_body_offset);
if (!io->write_chunk) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
@@ -486,7 +496,6 @@ io_write (SoupMessage *msg, GCancellable *cancellable, GError **error)
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
@@ -511,11 +520,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
guint status;
switch (io->read_state) {
- case SOUP_MESSAGE_IO_STATE_NOT_STARTED:
- case SOUP_MESSAGE_IO_STATE_BLOCKING:
- return FALSE;
-
-
case SOUP_MESSAGE_IO_STATE_HEADERS:
if (!read_headers (msg, cancellable, error))
return FALSE;
@@ -628,6 +632,7 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
if (priv->chunk_allocator) {
buffer = priv->chunk_allocator (msg, io->read_length, priv->chunk_allocator_data);
if (!buffer) {
+ g_return_val_if_fail (!io->item || !io->item->new_api, FALSE);
soup_message_io_pause (msg);
return FALSE;
}
@@ -675,7 +680,6 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
break;
- case SOUP_MESSAGE_IO_STATE_DONE:
default:
g_return_val_if_reached (FALSE);
}
@@ -683,43 +687,160 @@ io_read (SoupMessage *msg, GCancellable *cancellable, GError **error)
return TRUE;
}
-static GSource *
+typedef struct {
+ GSource source;
+ SoupMessage *msg;
+} SoupMessageSource;
+
+static gboolean
+message_source_prepare (GSource *source,
+ gint *timeout)
+{
+ *timeout = -1;
+ return FALSE;
+}
+
+static gboolean
+message_source_check (GSource *source)
+{
+ return FALSE;
+}
+
+static gboolean
+message_source_dispatch (GSource *source,
+ GSourceFunc callback,
+ gpointer user_data)
+{
+ SoupMessageSourceFunc func = (SoupMessageSourceFunc)callback;
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ return (*func) (message_source->msg, user_data);
+}
+
+static void
+message_source_finalize (GSource *source)
+{
+ SoupMessageSource *message_source = (SoupMessageSource *)source;
+
+ g_object_unref (message_source->msg);
+}
+
+static gboolean
+message_source_closure_callback (SoupMessage *msg,
+ gpointer data)
+{
+ GClosure *closure = data;
+
+ GValue param = G_VALUE_INIT;
+ GValue result_value = G_VALUE_INIT;
+ gboolean result;
+
+ g_value_init (&result_value, G_TYPE_BOOLEAN);
+
+ g_value_init (¶m, SOUP_TYPE_MESSAGE);
+ g_value_set_object (¶m, msg);
+
+ g_closure_invoke (closure, &result_value, 1, ¶m, NULL);
+
+ result = g_value_get_boolean (&result_value);
+ g_value_unset (&result_value);
+ g_value_unset (¶m);
+
+ return result;
+}
+
+static GSourceFuncs message_source_funcs =
+{
+ message_source_prepare,
+ message_source_check,
+ message_source_dispatch,
+ message_source_finalize,
+ (GSourceFunc)message_source_closure_callback,
+ (GSourceDummyMarshal)g_cclosure_marshal_generic,
+};
+
+GSource *
soup_message_io_get_source (SoupMessage *msg, GCancellable *cancellable,
- GSourceFunc callback, gpointer user_data)
+ SoupMessageSourceFunc callback, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- GSource *source;
-
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state)) {
- source = g_pollable_input_stream_create_source (
- G_POLLABLE_INPUT_STREAM (io->istream), cancellable);
- } else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state)) {
- source = g_pollable_output_stream_create_source (
- G_POLLABLE_OUTPUT_STREAM (io->ostream), cancellable);
+ GSource *base_source, *source;
+ SoupMessageSource *message_source;
+
+ if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->read_state)) {
+ GPollableInputStream *istream;
+
+ if (io->body_istream)
+ istream = G_POLLABLE_INPUT_STREAM (io->body_istream);
+ else
+ istream = G_POLLABLE_INPUT_STREAM (io->istream);
+ base_source = g_pollable_input_stream_create_source (istream, cancellable);
+ } else if (io && SOUP_MESSAGE_IO_STATE_POLLABLE (io->write_state)) {
+ GPollableOutputStream *ostream;
+
+ if (io->body_ostream)
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->body_ostream);
+ else
+ ostream = G_POLLABLE_OUTPUT_STREAM (io->ostream);
+ base_source = g_pollable_output_stream_create_source (ostream, cancellable);
} else
- g_return_val_if_reached (NULL);
-
- g_source_set_callback (source, callback, user_data, NULL);
+ base_source = g_timeout_source_new (0);
+
+ g_source_set_dummy_callback (base_source);
+ source = g_source_new (&message_source_funcs,
+ sizeof (SoupMessageSource));
+ g_source_set_name (source, "SoupMessageSource");
+ message_source = (SoupMessageSource *)source;
+ message_source->msg = g_object_ref (msg);
+
+ g_source_add_child_source (source, base_source);
+ g_source_unref (base_source);
+ g_source_set_callback (source, (GSourceFunc) callback, user_data, NULL);
return source;
}
-static gboolean io_run (GObject *stream, SoupMessage *msg);
-
-static void
-setup_io_source (SoupMessage *msg)
+static gboolean
+io_run_until (SoupMessage *msg,
+ SoupMessageIOState read_state, SoupMessageIOState write_state,
+ GCancellable *cancellable, GError **error)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
+ gboolean progress = TRUE, done;
+
+ if (g_cancellable_set_error_if_cancelled (cancellable, error))
+ return FALSE;
+ else if (!io) {
+ g_set_error_literal (error, G_IO_ERROR,
+ G_IO_ERROR_CANCELLED,
+ _("Operation was cancelled"));
+ return FALSE;
+ }
+
+ g_object_ref (msg);
- io->io_source = soup_message_io_get_source (msg, NULL,
- (GSourceFunc)io_run, msg);
- g_source_attach (io->io_source, io->async_context);
- g_source_unref (io->io_source);
+ while (progress && priv->io_data == io && !io->paused &&
+ (io->read_state < read_state || io->write_state < write_state)) {
+
+ if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
+ progress = io_read (msg, cancellable, error);
+ else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
+ progress = io_write (msg, cancellable, error);
+ else
+ progress = FALSE;
+ }
+
+ done = (priv->io_data == io &&
+ io->read_state >= read_state &&
+ io->write_state >= write_state);
+
+ g_object_unref (msg);
+ return done;
}
static gboolean
-io_run (GObject *stream, SoupMessage *msg)
+io_run (SoupMessage *msg, gpointer user_data)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
@@ -727,36 +848,99 @@ io_run (GObject *stream, SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
g_object_ref (msg);
- while (priv->io_data == io && !io->paused) {
- gboolean progress = FALSE;
+ if (io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ io->cancellable, &error)) {
+ soup_message_io_finished (msg);
+ } else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_clear_error (&error);
+ io->io_source = soup_message_io_get_source (msg, NULL, io_run, msg);
+ g_source_attach (io->io_source, io->async_context);
+ } else if (error) {
+ io_error (io->sock, msg, error);
+ }
- if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->read_state))
- progress = io_read (msg, io->cancellable, &error);
- else if (SOUP_MESSAGE_IO_STATE_ACTIVE (io->write_state))
- progress = io_write (msg, io->cancellable, &error);
+ g_object_unref (msg);
+ return FALSE;
+}
- if (!progress)
- break;
- }
+gboolean
+soup_message_io_run_until_write (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ cancellable, error);
+}
- if (error) {
- if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
- g_clear_error (&error);
- setup_io_source (msg);
- } else
- io_error (io->sock, msg, error);
- } else if (priv->io_data == io &&
- io->read_state == SOUP_MESSAGE_IO_STATE_DONE &&
- io->write_state == SOUP_MESSAGE_IO_STATE_DONE)
- soup_message_io_finished (msg);
+gboolean
+soup_message_io_run_until_read (SoupMessage *msg,
+ GCancellable *cancellable, GError **error)
+{
+ return io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_BODY,
+ SOUP_MESSAGE_IO_STATE_ANY,
+ cancellable, error);
+}
+gboolean
+soup_message_io_run_until_finish (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ g_object_ref (msg);
+
+ if (!io_run_until (msg,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ SOUP_MESSAGE_IO_STATE_DONE,
+ cancellable, error))
+ return FALSE;
+
+ soup_message_io_finished (msg);
g_object_unref (msg);
- return FALSE;
+ return TRUE;
+}
+
+static void
+client_stream_eof (SoupClientInputStream *stream, gpointer user_data)
+{
+ SoupMessage *msg = user_data;
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ if (io && io->read_state == SOUP_MESSAGE_IO_STATE_BODY)
+ io->read_state = SOUP_MESSAGE_IO_STATE_BODY_DONE;
+}
+
+GInputStream *
+soup_message_io_get_response_istream (SoupMessage *msg,
+ GError **error)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+ GInputStream *client_stream;
+
+ g_return_val_if_fail (io->mode == SOUP_MESSAGE_IO_CLIENT, NULL);
+
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ g_set_error_literal (error, SOUP_HTTP_ERROR,
+ msg->status_code, msg->reason_phrase);
+ return NULL;
+ }
+
+ client_stream = soup_client_input_stream_new (io->body_istream, msg);
+ g_signal_connect (client_stream, "eof",
+ G_CALLBACK (client_stream_eof), msg);
+
+ return client_stream;
}
@@ -839,7 +1023,8 @@ soup_message_io_client (SoupMessageQueueItem *item,
io->write_body = item->msg->request_body;
io->write_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, item->msg);
+ if (!item->new_api)
+ io_run (item->msg, NULL);
}
void
@@ -860,7 +1045,7 @@ soup_message_io_server (SoupMessage *msg, SoupSocket *sock,
io->write_body = msg->response_body;
io->read_state = SOUP_MESSAGE_IO_STATE_HEADERS;
- io_run (NULL, msg);
+ io_run (msg, NULL);
}
void
@@ -873,6 +1058,7 @@ soup_message_io_pause (SoupMessage *msg)
if (io->io_source) {
g_source_destroy (io->io_source);
+ g_source_unref (io->io_source);
io->io_source = NULL;
}
@@ -897,7 +1083,7 @@ io_unpause_internal (gpointer msg)
if (io->io_source)
return FALSE;
- io_run (NULL, msg);
+ io_run (msg, NULL);
return FALSE;
}
diff --git a/libsoup/soup-message-private.h b/libsoup/soup-message-private.h
index 5625354..0d34833 100644
--- a/libsoup/soup-message-private.h
+++ b/libsoup/soup-message-private.h
@@ -94,6 +94,25 @@ void soup_message_io_pause (SoupMessage *msg);
void soup_message_io_unpause (SoupMessage *msg);
gboolean soup_message_io_in_progress (SoupMessage *msg);
+gboolean soup_message_io_run_until_write (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+gboolean soup_message_io_run_until_read (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+gboolean soup_message_io_run_until_finish (SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+
+typedef gboolean (*SoupMessageSourceFunc) (SoupMessage *, gpointer);
+GSource *soup_message_io_get_source (SoupMessage *msg,
+ GCancellable *cancellable,
+ SoupMessageSourceFunc callback,
+ gpointer user_data);
+
+GInputStream *soup_message_io_get_response_istream (SoupMessage *msg,
+ GError **error);
+
gboolean soup_message_disables_feature (SoupMessage *msg,
gpointer feature);
diff --git a/libsoup/soup-message-queue.c b/libsoup/soup-message-queue.c
index 7b1e5dd..9715080 100644
--- a/libsoup/soup-message-queue.c
+++ b/libsoup/soup-message-queue.c
@@ -184,6 +184,8 @@ soup_message_queue_item_unref (SoupMessageQueueItem *item)
g_object_unref (item->proxy_addr);
if (item->proxy_uri)
soup_uri_free (item->proxy_uri);
+ if (item->result)
+ g_object_unref (item->result);
soup_message_queue_item_set_connection (item, NULL);
g_slice_free (SoupMessageQueueItem, item);
}
diff --git a/libsoup/soup-message-queue.h b/libsoup/soup-message-queue.h
index a1ae663..a9242a1 100644
--- a/libsoup/soup-message-queue.h
+++ b/libsoup/soup-message-queue.h
@@ -45,8 +45,10 @@ struct _SoupMessageQueueItem {
SoupAddress *proxy_addr;
SoupURI *proxy_uri;
SoupConnection *conn;
+ GSimpleAsyncResult *result;
guint paused : 1;
+ guint new_api : 1;
guint redirection_count : 31;
SoupMessageQueueItemState state;
diff --git a/libsoup/soup-request-http.c b/libsoup/soup-request-http.c
index 89547e1..264bda0 100644
--- a/libsoup/soup-request-http.c
+++ b/libsoup/soup-request-http.c
@@ -32,9 +32,9 @@
#include "soup-request-http.h"
#include "soup-cache.h"
#include "soup-cache-private.h"
-#include "soup-http-input-stream.h"
#include "soup-message.h"
#include "soup-session.h"
+#include "soup-session-private.h"
#include "soup-uri.h"
G_DEFINE_TYPE (SoupRequestHTTP, soup_request_http, SOUP_TYPE_REQUEST)
@@ -44,6 +44,11 @@ struct _SoupRequestHTTPPrivate {
char *content_type;
};
+static void content_sniffed (SoupMessage *msg,
+ const char *content_type,
+ GHashTable *params,
+ gpointer user_data);
+
static void
soup_request_http_init (SoupRequestHTTP *http)
{
@@ -61,6 +66,8 @@ soup_request_http_check_uri (SoupRequest *request,
return FALSE;
http->priv->msg = soup_message_new_from_uri (SOUP_METHOD_GET, uri);
+ g_signal_connect (http->priv->msg, "content-sniffed",
+ G_CALLBACK (content_sniffed), http);
return TRUE;
}
@@ -69,8 +76,12 @@ soup_request_http_finalize (GObject *object)
{
SoupRequestHTTP *http = SOUP_REQUEST_HTTP (object);
- if (http->priv->msg)
+ if (http->priv->msg) {
+ g_signal_handlers_disconnect_by_func (http->priv->msg,
+ G_CALLBACK (content_sniffed),
+ http);
g_object_unref (http->priv->msg);
+ }
g_free (http->priv->content_type);
@@ -82,17 +93,11 @@ soup_request_http_send (SoupRequest *request,
GCancellable *cancellable,
GError **error)
{
- GInputStream *httpstream;
SoupRequestHTTP *http = SOUP_REQUEST_HTTP (request);
- httpstream = soup_http_input_stream_new (soup_request_get_session (request), http->priv->msg);
- if (!soup_http_input_stream_send (SOUP_HTTP_INPUT_STREAM (httpstream),
- cancellable, error)) {
- g_object_unref (httpstream);
- return NULL;
- }
- http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (SOUP_HTTP_INPUT_STREAM (httpstream)));
- return httpstream;
+ return soup_session_send_request (soup_request_get_session (request),
+ http->priv->msg,
+ cancellable, error);
}
@@ -124,16 +129,15 @@ free_send_async_data (SendAsyncData *sadata)
static void
http_input_stream_ready_cb (GObject *source, GAsyncResult *result, gpointer user_data)
{
- SoupHTTPInputStream *httpstream = SOUP_HTTP_INPUT_STREAM (source);
SendAsyncData *sadata = user_data;
GError *error = NULL;
+ GInputStream *stream;
- if (soup_http_input_stream_send_finish (httpstream, result, &error)) {
- sadata->http->priv->content_type = g_strdup (soup_http_input_stream_get_content_type (httpstream));
- g_simple_async_result_set_op_res_gpointer (sadata->simple, httpstream, g_object_unref);
+ stream = soup_session_send_request_finish (SOUP_SESSION (source), result, &error);
+ if (stream) {
+ g_simple_async_result_set_op_res_gpointer (sadata->simple, stream, g_object_unref);
} else {
g_simple_async_result_take_error (sadata->simple, error);
- g_object_unref (httpstream);
}
g_simple_async_result_complete (sadata->simple);
free_send_async_data (sadata);
@@ -171,9 +175,8 @@ conditional_get_ready_cb (SoupSession *session, SoupMessage *msg, gpointer user_
/* The resource was modified, or else it mysteriously disappeared
* from our cache. Either way we need to reload it now.
*/
- stream = soup_http_input_stream_new (session, sadata->original);
- soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream), G_PRIORITY_DEFAULT,
- sadata->cancellable, http_input_stream_ready_cb, sadata);
+ soup_session_send_request_async (session, msg, sadata->cancellable,
+ http_input_stream_ready_cb, sadata);
}
static gboolean
@@ -253,10 +256,8 @@ soup_request_http_send_async (SoupRequest *request,
}
}
- stream = soup_http_input_stream_new (session, http->priv->msg);
- soup_http_input_stream_send_async (SOUP_HTTP_INPUT_STREAM (stream),
- G_PRIORITY_DEFAULT, cancellable,
- http_input_stream_ready_cb, sadata);
+ soup_session_send_request_async (session, http->priv->msg, cancellable,
+ http_input_stream_ready_cb, sadata);
}
static GInputStream *
@@ -282,6 +283,30 @@ soup_request_http_get_content_length (SoupRequest *request)
return soup_message_headers_get_content_length (http->priv->msg->response_headers);
}
+static void
+content_sniffed (SoupMessage *msg,
+ const char *content_type,
+ GHashTable *params,
+ gpointer user_data)
+{
+ SoupRequestHTTP *http = user_data;
+ GString *sniffed_type;
+
+ sniffed_type = g_string_new (content_type);
+ if (params) {
+ GHashTableIter iter;
+ gpointer key, value;
+
+ g_hash_table_iter_init (&iter, params);
+ while (g_hash_table_iter_next (&iter, &key, &value)) {
+ g_string_append (sniffed_type, "; ");
+ soup_header_g_string_append_param (sniffed_type, key, value);
+ }
+ }
+ g_free (http->priv->content_type);
+ http->priv->content_type = g_string_free (sniffed_type, FALSE);
+}
+
static const char *
soup_request_http_get_content_type (SoupRequest *request)
{
diff --git a/libsoup/soup-session-async.c b/libsoup/soup-session-async.c
index a58439e..90526dc 100644
--- a/libsoup/soup-session-async.c
+++ b/libsoup/soup-session-async.c
@@ -34,6 +34,10 @@
static void run_queue (SoupSessionAsync *sa);
static void do_idle_run_queue (SoupSession *session);
+static void send_request_running (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_restarted (SoupSession *session, SoupMessageQueueItem *item);
+static void send_request_finished (SoupSession *session, SoupMessageQueueItem *item);
+
static void queue_message (SoupSession *session, SoupMessage *req,
SoupSessionCallback callback, gpointer user_data);
static guint send_message (SoupSession *session, SoupMessage *req);
@@ -225,9 +229,10 @@ message_completed (SoupMessage *msg, gpointer user_data)
{
SoupMessageQueueItem *item = user_data;
+ do_idle_run_queue (item->session);
+
if (item->state != SOUP_MESSAGE_RESTARTING)
item->state = SOUP_MESSAGE_FINISHING;
- do_idle_run_queue (item->session);
}
static void
@@ -403,11 +408,15 @@ process_queue_item (SoupMessageQueueItem *item,
case SOUP_MESSAGE_READY:
item->state = SOUP_MESSAGE_RUNNING;
soup_session_send_queue_item (session, item, message_completed);
+ if (item->new_api)
+ send_request_running (session, item);
break;
case SOUP_MESSAGE_RESTARTING:
item->state = SOUP_MESSAGE_STARTING;
soup_message_restarted (item->msg);
+ if (item->new_api)
+ send_request_restarted (session, item);
break;
case SOUP_MESSAGE_FINISHING:
@@ -420,6 +429,8 @@ process_queue_item (SoupMessageQueueItem *item,
soup_session_unqueue_item (session, item);
if (item->callback)
item->callback (session, item->msg, item->callback_data);
+ else if (item->new_api)
+ send_request_finished (session, item);
g_object_unref (item->msg);
do_idle_run_queue (session);
g_object_unref (session);
@@ -611,3 +622,229 @@ kick (SoupSession *session)
{
do_idle_run_queue (session);
}
+
+
+static void
+send_request_return_result (SoupMessageQueueItem *item,
+ gpointer stream, GError *error)
+{
+ GSimpleAsyncResult *simple;
+
+ simple = item->result;
+ item->result = NULL;
+
+ if (error)
+ g_simple_async_result_take_error (simple, error);
+ else if (SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ if (stream)
+ g_object_unref (stream);
+ g_simple_async_result_set_error (simple,
+ SOUP_HTTP_ERROR,
+ item->msg->status_code,
+ "%s",
+ item->msg->reason_phrase);
+ } else
+ g_simple_async_result_set_op_res_gpointer (simple, stream, g_object_unref);
+
+ g_simple_async_result_complete (simple);
+ g_object_unref (simple);
+}
+
+static void
+send_request_restarted (SoupSession *session, SoupMessageQueueItem *item)
+{
+ /* We won't be needing this, then. */
+ g_object_set_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream", NULL);
+}
+
+static void
+send_request_finished (SoupSession *session, SoupMessageQueueItem *item)
+{
+ GMemoryOutputStream *mostream;
+ GInputStream *istream = NULL;
+
+ if (!item->result) {
+ /* Something else already took care of it. */
+ return;
+ }
+
+ mostream = g_object_get_data (G_OBJECT (item->msg), "SoupSessionAsync:ostream");
+ if (mostream && !SOUP_STATUS_IS_TRANSPORT_ERROR (item->msg->status_code)) {
+ gpointer data;
+ gssize size;
+
+ /* We thought it would be requeued, but it wasn't, so
+ * return the original body.
+ */
+ size = g_memory_output_stream_get_data_size (mostream);
+ data = size ? g_memory_output_stream_steal_data (mostream) : g_strdup ("");
+ istream = g_memory_input_stream_new_from_data (data, size, g_free);
+ }
+
+ send_request_return_result (item, istream, NULL);
+}
+
+static void
+send_async_spliced (GObject *source, GAsyncResult *result, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GInputStream *istream = g_object_get_data (source, "istream");
+
+ GError *error = NULL;
+
+ /* If the message was cancelled, it will be completed via other means */
+ if (g_cancellable_is_cancelled (item->cancellable) ||
+ !item->result) {
+ soup_message_queue_item_unref (item);
+ return;
+ }
+
+ if (g_output_stream_splice_finish (G_OUTPUT_STREAM (source),
+ result, &error) == -1) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ /* Otherwise either restarted or finished will eventually be called.
+ * It should be safe to call the sync close() method here since
+ * the message body has already been written.
+ */
+ g_input_stream_close (istream, NULL, NULL);
+ do_idle_run_queue (item->session);
+ soup_message_queue_item_unref (item);
+}
+
+static void
+send_async_maybe_complete (SoupMessageQueueItem *item,
+ GInputStream *stream)
+{
+ if (item->msg->status_code == SOUP_STATUS_UNAUTHORIZED ||
+ item->msg->status_code == SOUP_STATUS_PROXY_UNAUTHORIZED ||
+ soup_session_would_redirect (item->session, item->msg)) {
+ GOutputStream *ostream;
+
+ /* Message may be requeued, so gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ g_object_set_data_full (G_OBJECT (item->msg), "SoupSessionAsync:ostream",
+ ostream, g_object_unref);
+
+ g_object_set_data_full (G_OBJECT (ostream), "istream",
+ stream, g_object_unref);
+
+ /* Give the splice op its own ref on item */
+ soup_message_queue_item_ref (item);
+ g_output_stream_splice_async (ostream, stream,
+ /* We can't use CLOSE_SOURCE because it
+ * might get closed in the wrong thread.
+ */
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ G_PRIORITY_DEFAULT,
+ item->cancellable,
+ send_async_spliced, item);
+ return;
+ }
+
+ send_request_return_result (item, stream, NULL);
+}
+
+static void try_run_until_read (SoupMessageQueueItem *item);
+
+static gboolean
+read_ready_cb (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+ GError *error = NULL;
+
+ if (g_cancellable_set_error_if_cancelled (item->cancellable, &error)) {
+ send_request_return_result (item, NULL, error);
+ return FALSE;
+ }
+
+ try_run_until_read (item);
+ return FALSE;
+}
+
+static void
+try_run_until_read (SoupMessageQueueItem *item)
+{
+ GError *error = NULL;
+ GInputStream *stream = NULL;
+ GSource *source;
+
+ if (soup_message_io_run_until_read (item->msg, item->cancellable, &error))
+ stream = soup_message_io_get_response_istream (item->msg, &error);
+ if (stream) {
+ send_async_maybe_complete (item, stream);
+ return;
+ }
+
+ if (!g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ send_request_return_result (item, NULL, error);
+ return;
+ }
+
+ g_clear_error (&error);
+ source = soup_message_io_get_source (item->msg, item->cancellable,
+ read_ready_cb, item);
+ g_source_attach (source, soup_session_get_async_context (item->session));
+ g_source_unref (source);
+}
+
+static void
+send_request_running (SoupSession *session, SoupMessageQueueItem *item)
+{
+ try_run_until_read (item);
+}
+
+void
+soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data)
+{
+ SoupMessageQueueItem *item;
+ gboolean use_thread_context;
+
+ g_return_if_fail (SOUP_IS_SESSION_ASYNC (session));
+
+ g_object_get (G_OBJECT (session),
+ SOUP_SESSION_USE_THREAD_CONTEXT, &use_thread_context,
+ NULL);
+ g_return_if_fail (use_thread_context);
+
+ /* Balance out the unref that queuing will eventually do */
+ g_object_ref (msg);
+
+ queue_message (session, msg, NULL, NULL);
+
+ item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+ g_return_if_fail (item != NULL);
+
+ item->new_api = TRUE;
+ item->result = g_simple_async_result_new (G_OBJECT (session),
+ callback, user_data,
+ soup_session_send_request_async);
+ g_simple_async_result_set_op_res_gpointer (item->result, item, (GDestroyNotify) soup_message_queue_item_unref);
+
+ if (cancellable) {
+ g_object_unref (item->cancellable);
+ item->cancellable = g_object_ref (cancellable);
+ }
+}
+
+GInputStream *
+soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error)
+{
+ GSimpleAsyncResult *simple;
+
+ g_return_val_if_fail (SOUP_IS_SESSION_ASYNC (session), NULL);
+ g_return_val_if_fail (g_simple_async_result_is_valid (result, G_OBJECT (session), soup_session_send_request_async), NULL);
+
+ simple = G_SIMPLE_ASYNC_RESULT (result);
+ if (g_simple_async_result_propagate_error (simple, error))
+ return NULL;
+ return g_object_ref (g_simple_async_result_get_op_res_gpointer (simple));
+}
diff --git a/libsoup/soup-session-private.h b/libsoup/soup-session-private.h
index 7462c61..a72fb2b 100644
--- a/libsoup/soup-session-private.h
+++ b/libsoup/soup-session-private.h
@@ -31,6 +31,20 @@ void soup_session_set_item_status (SoupSession *s
SoupMessageQueueItem *item,
guint status_code);
+GInputStream *soup_session_send_request (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error);
+
+void soup_session_send_request_async (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GAsyncReadyCallback callback,
+ gpointer user_data);
+GInputStream *soup_session_send_request_finish (SoupSession *session,
+ GAsyncResult *result,
+ GError **error);
+
G_END_DECLS
#endif /* SOUP_SESSION_PRIVATE_H */
diff --git a/libsoup/soup-session-sync.c b/libsoup/soup-session-sync.c
index 1a919c7..a09c5b4 100644
--- a/libsoup/soup-session-sync.c
+++ b/libsoup/soup-session-sync.c
@@ -240,6 +240,19 @@ try_again:
item->state = SOUP_MESSAGE_READY;
}
+static void process_queue_item (SoupMessageQueueItem *item);
+
+static void
+new_api_message_completed (SoupMessage *msg, gpointer user_data)
+{
+ SoupMessageQueueItem *item = user_data;
+
+ if (item->state != SOUP_MESSAGE_RESTARTING) {
+ item->state = SOUP_MESSAGE_FINISHING;
+ process_queue_item (item);
+ }
+}
+
static void
process_queue_item (SoupMessageQueueItem *item)
{
@@ -249,7 +262,8 @@ process_queue_item (SoupMessageQueueItem *item)
SoupProxyURIResolver *proxy_resolver;
guint status;
- item->state = SOUP_MESSAGE_STARTING;
+ soup_message_queue_item_ref (item);
+
do {
if (item->paused) {
g_mutex_lock (&priv->lock);
@@ -303,11 +317,22 @@ process_queue_item (SoupMessageQueueItem *item)
case SOUP_MESSAGE_READY:
item->state = SOUP_MESSAGE_RUNNING;
+
+ if (item->new_api) {
+ soup_session_send_queue_item (item->session, item, new_api_message_completed);
+ goto out;
+ }
+
soup_session_send_queue_item (item->session, item, NULL);
if (item->state != SOUP_MESSAGE_RESTARTING)
item->state = SOUP_MESSAGE_FINISHING;
break;
+ case SOUP_MESSAGE_RUNNING:
+ g_warn_if_fail (item->new_api);
+ item->state = SOUP_MESSAGE_FINISHING;
+ break;
+
case SOUP_MESSAGE_RESTARTING:
item->state = SOUP_MESSAGE_STARTING;
soup_message_restarted (item->msg);
@@ -326,6 +351,9 @@ process_queue_item (SoupMessageQueueItem *item)
break;
}
} while (item->state != SOUP_MESSAGE_FINISHED);
+
+ out:
+ soup_message_queue_item_unref (item);
}
static gboolean
@@ -476,3 +504,90 @@ kick (SoupSession *session)
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->lock);
}
+
+
+GInputStream *
+soup_session_send_request (SoupSession *session,
+ SoupMessage *msg,
+ GCancellable *cancellable,
+ GError **error)
+{
+ SoupMessageQueueItem *item;
+ GInputStream *stream = NULL;
+ GOutputStream *ostream;
+ GMemoryOutputStream *mostream;
+ gssize size;
+
+ g_return_val_if_fail (SOUP_IS_SESSION_SYNC (session), NULL);
+
+ SOUP_SESSION_CLASS (soup_session_sync_parent_class)->queue_message (session, msg, NULL, NULL);
+
+ item = soup_message_queue_lookup (soup_session_get_queue (session), msg);
+ g_return_val_if_fail (item != NULL, NULL);
+
+ item->new_api = TRUE;
+
+ while (!stream) {
+ /* Get a connection, etc */
+ process_queue_item (item);
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code))
+ break;
+
+ /* Send request, read headers */
+ if (!soup_message_io_run_until_read (msg, cancellable, error))
+ break;
+
+ stream = soup_message_io_get_response_istream (msg, error);
+ if (!stream)
+ break;
+
+ /* Break if the message doesn't look likely-to-be-requeued */
+ if (msg->status_code != SOUP_STATUS_UNAUTHORIZED &&
+ msg->status_code != SOUP_STATUS_PROXY_UNAUTHORIZED &&
+ !soup_session_would_redirect (session, msg))
+ break;
+
+ /* Gather the current message body... */
+ ostream = g_memory_output_stream_new (NULL, 0, g_realloc, g_free);
+ if (g_output_stream_splice (ostream, stream,
+ G_OUTPUT_STREAM_SPLICE_CLOSE_SOURCE |
+ G_OUTPUT_STREAM_SPLICE_CLOSE_TARGET,
+ cancellable, error) == -1) {
+ g_object_unref (stream);
+ g_object_unref (ostream);
+ stream = NULL;
+ break;
+ }
+ g_object_unref (stream);
+ stream = NULL;
+
+ /* If the message was requeued, loop */
+ if (item->state == SOUP_MESSAGE_RESTARTING) {
+ g_object_unref (ostream);
+ continue;
+ }
+
+ /* Not requeued, so return the original body */
+ mostream = G_MEMORY_OUTPUT_STREAM (ostream);
+ size = g_memory_output_stream_get_data_size (mostream);
+ stream = g_memory_input_stream_new ();
+ if (size) {
+ g_memory_input_stream_add_data (G_MEMORY_INPUT_STREAM (stream),
+ g_memory_output_stream_steal_data (mostream),
+ size, g_free);
+ }
+ g_object_unref (ostream);
+ }
+
+ if (SOUP_STATUS_IS_TRANSPORT_ERROR (msg->status_code)) {
+ if (stream) {
+ g_object_unref (stream);
+ stream = NULL;
+ }
+ g_set_error_literal (error, SOUP_HTTP_ERROR, msg->status_code,
+ msg->reason_phrase);
+ }
+
+ soup_message_queue_item_unref (item);
+ return stream;
+}
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index fa95f7e..200a25a 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -1739,10 +1739,7 @@ redirect_handler (SoupMessage *msg, gpointer user_data)
if (new_uri)
soup_uri_free (new_uri);
- if (invalid) {
- /* Really we should just leave the status as-is,
- * but that would be an API break.
- */
+ if (invalid && !item->new_api) {
soup_message_set_status_full (msg,
SOUP_STATUS_MALFORMED,
"Invalid Redirect URL");
@@ -2247,6 +2244,7 @@ soup_session_pause_message (SoupSession *session,
priv = SOUP_SESSION_GET_PRIVATE (session);
item = soup_message_queue_lookup (priv->queue, msg);
g_return_if_fail (item != NULL);
+ g_return_if_fail (!item->new_api);
item->paused = TRUE;
if (item->state == SOUP_MESSAGE_RUNNING)
@@ -2279,6 +2277,7 @@ soup_session_unpause_message (SoupSession *session,
priv = SOUP_SESSION_GET_PRIVATE (session);
item = soup_message_queue_lookup (priv->queue, msg);
g_return_if_fail (item != NULL);
+ g_return_if_fail (!item->new_api);
item->paused = FALSE;
if (item->state == SOUP_MESSAGE_RUNNING)
diff --git a/po/POTFILES.in b/po/POTFILES.in
index b35ee88..80db055 100644
--- a/po/POTFILES.in
+++ b/po/POTFILES.in
@@ -1,4 +1,5 @@
libsoup/soup-body-input-stream.c
libsoup/soup-converter-wrapper.c
+libsoup/soup-message-io.c
libsoup/soup-request.c
libsoup/soup-requester.c
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]