[libsoup/giobased: 1/11] Use gio instead of soup_socket_write
- From: Dan Winship <danw src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [libsoup/giobased: 1/11] Use gio instead of soup_socket_write
- Date: Tue, 22 Mar 2011 13:06:43 +0000 (UTC)
commit 390d051ea2a4712be9c5a9d7815aaaf033a2794b
Author: Dan Winship <danw gnome org>
Date: Wed Dec 8 15:56:37 2010 +0100
Use gio instead of soup_socket_write
libsoup/soup-message-io.c | 141 +++++++++++++++++++++++++++++----------------
libsoup/soup-socket.c | 8 +++
libsoup/soup-socket.h | 1 +
3 files changed, 101 insertions(+), 49 deletions(-)
---
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 290d781..b5d9840 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -44,11 +44,16 @@ typedef enum {
state != SOUP_MESSAGE_IO_STATE_DONE)
typedef struct {
- SoupSocket *sock;
SoupMessageQueueItem *item;
SoupMessageIOMode mode;
GCancellable *cancellable;
+ SoupSocket *sock;
+ GInputStream *istream;
+ GOutputStream *ostream;
+ GMainContext *async_context;
+ gboolean non_blocking;
+
SoupMessageIOState read_state;
SoupEncoding read_encoding;
GByteArray *read_meta_buf;
@@ -68,8 +73,10 @@ typedef struct {
goffset write_length;
goffset written;
- guint read_tag, write_tag, tls_signal_id;
+ guint read_tag, tls_signal_id;
+ GSource *write_source;
GSource *unpause_source;
+ gboolean paused;
SoupMessageGetHeadersFn get_headers_cb;
SoupMessageParseHeadersFn parse_headers_cb;
@@ -84,8 +91,8 @@ typedef struct {
*/
#define dummy_to_make_emacs_happy {
#define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg);
-#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return; }
-#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return val; }
+#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; }
+#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; }
#define RESPONSE_BLOCK_SIZE 8192
@@ -106,6 +113,8 @@ soup_message_io_cleanup (SoupMessage *msg)
g_signal_handler_disconnect (io->sock, io->tls_signal_id);
if (io->sock)
g_object_unref (io->sock);
+ if (io->async_context)
+ g_main_context_unref (io->async_context);
if (io->item)
soup_message_queue_item_unref (io->item);
@@ -134,9 +143,9 @@ soup_message_io_stop (SoupMessage *msg)
g_signal_handler_disconnect (io->sock, io->read_tag);
io->read_tag = 0;
}
- if (io->write_tag) {
- g_signal_handler_disconnect (io->sock, io->write_tag);
- io->write_tag = 0;
+ if (io->write_source) {
+ g_source_destroy (io->write_source);
+ io->write_source = NULL;
}
if (io->unpause_source) {
@@ -169,6 +178,7 @@ soup_message_io_finished (SoupMessage *msg)
}
static void io_read (SoupSocket *sock, SoupMessage *msg);
+static void io_write (SoupSocket *sock, SoupMessage *msg);
static gboolean
request_is_idempotent (SoupMessage *msg)
@@ -514,6 +524,19 @@ read_body_chunk (SoupMessage *msg)
return TRUE;
}
+static gboolean
+io_writable (GPollableOutputStream *stream, gpointer msg)
+{
+ SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+ SoupMessageIOData *io = priv->io_data;
+
+ g_source_destroy (io->write_source);
+ io->write_source = NULL;
+
+ io_write (io->sock, msg);
+ return FALSE;
+}
+
/* Attempts to write @len bytes from @data. See the note at
* read_metadata() for an explanation of the return value.
*/
@@ -522,43 +545,56 @@ write_data (SoupMessage *msg, const char *data, guint len, gboolean body)
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io = priv->io_data;
- SoupSocketIOStatus status;
gsize nwrote;
GError *error = NULL;
SoupBuffer *chunk;
const char *start;
+ if (!io->ostream) {
+ io_error (io->sock, msg, NULL);
+ return FALSE;
+ }
+
while (len > io->written) {
- status = soup_socket_write (io->sock,
- data + io->written,
- len - io->written,
- &nwrote,
- io->cancellable, &error);
- switch (status) {
- case SOUP_SOCKET_EOF:
- case SOUP_SOCKET_ERROR:
+ if (io->non_blocking) {
+ nwrote = g_pollable_output_stream_write_nonblocking (
+ G_POLLABLE_OUTPUT_STREAM (io->ostream),
+ data + io->written, len - io->written,
+ io->cancellable, &error);
+ } else {
+ nwrote = g_output_stream_write (io->ostream,
+ data + io->written,
+ len - io->written,
+ io->cancellable, &error);
+ }
+
+ if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+ g_error_free (error);
+ io->write_source = g_pollable_output_stream_create_source (
+ G_POLLABLE_OUTPUT_STREAM (io->ostream),
+ NULL);
+ g_source_set_callback (io->write_source, (GSourceFunc) io_writable, msg, NULL);
+ g_source_attach (io->write_source, io->async_context);
+ g_source_unref (io->write_source);
+ return FALSE;
+ } else if (error) {
io_error (io->sock, msg, error);
return FALSE;
+ }
- case SOUP_SOCKET_WOULD_BLOCK:
- return FALSE;
+ start = data + io->written;
+ io->written += nwrote;
- case SOUP_SOCKET_OK:
- start = data + io->written;
- io->written += nwrote;
-
- if (body) {
- if (io->write_length)
- io->write_length -= nwrote;
-
- chunk = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
- start, nwrote);
- SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
- soup_message_wrote_body_data (msg, chunk);
- soup_buffer_free (chunk);
- SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
- }
- break;
+ if (body) {
+ if (io->write_length)
+ io->write_length -= nwrote;
+
+ chunk = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
+ start, nwrote);
+ SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
+ soup_message_wrote_body_data (msg, chunk);
+ soup_buffer_free (chunk);
+ SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
}
}
@@ -809,9 +845,9 @@ io_write (SoupSocket *sock, SoupMessage *msg)
case SOUP_MESSAGE_IO_STATE_FINISHING:
- if (io->write_tag) {
- g_signal_handler_disconnect (io->sock, io->write_tag);
- io->write_tag = 0;
+ if (io->write_source) {
+ g_source_destroy (io->write_source);
+ io->write_source = NULL;
}
io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
@@ -1077,9 +1113,9 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
{
SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
SoupMessageIOData *io;
+ GIOStream *iostream;
io = g_slice_new0 (SoupMessageIOData);
- io->sock = g_object_ref (sock);
io->mode = mode;
io->get_headers_cb = get_headers_cb;
io->parse_headers_cb = parse_headers_cb;
@@ -1087,13 +1123,22 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
io->completion_cb = completion_cb;
io->completion_data = completion_data;
+ io->sock = g_object_ref (sock);
+ iostream = soup_socket_get_iostream (sock);
+ if (iostream) {
+ io->istream = g_io_stream_get_input_stream (iostream);
+ io->ostream = g_io_stream_get_output_stream (iostream);
+ }
+ g_object_get (io->sock,
+ SOUP_SOCKET_FLAG_NONBLOCKING, &io->non_blocking,
+ SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
+ NULL);
+
io->read_meta_buf = g_byte_array_new ();
io->write_buf = g_string_new (NULL);
io->read_tag = g_signal_connect (io->sock, "readable",
G_CALLBACK (io_read), msg);
- io->write_tag = g_signal_connect (io->sock, "writable",
- G_CALLBACK (io_write), msg);
io->read_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
@@ -1164,9 +1209,9 @@ soup_message_io_pause (SoupMessage *msg)
g_return_if_fail (io != NULL);
- if (io->write_tag) {
- g_signal_handler_disconnect (io->sock, io->write_tag);
- io->write_tag = 0;
+ if (io->write_source) {
+ g_source_destroy (io->write_source);
+ io->write_source = NULL;
}
if (io->read_tag) {
g_signal_handler_disconnect (io->sock, io->read_tag);
@@ -1177,6 +1222,8 @@ soup_message_io_pause (SoupMessage *msg)
g_source_destroy (io->unpause_source);
io->unpause_source = NULL;
}
+
+ io->paused = TRUE;
}
static gboolean
@@ -1187,15 +1234,11 @@ io_unpause_internal (gpointer msg)
g_return_val_if_fail (io != NULL, FALSE);
io->unpause_source = NULL;
+ io->paused = FALSE;
- if (io->write_tag || io->read_tag)
+ if (io->write_source || io->read_tag)
return FALSE;
- if (io->write_state != SOUP_MESSAGE_IO_STATE_DONE) {
- io->write_tag = g_signal_connect (io->sock, "writable",
- G_CALLBACK (io_write), msg);
- }
-
if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE) {
io->read_tag = g_signal_connect (io->sock, "readable",
G_CALLBACK (io_read), msg);
diff --git a/libsoup/soup-socket.c b/libsoup/soup-socket.c
index 3e7d725..35608c0 100644
--- a/libsoup/soup-socket.c
+++ b/libsoup/soup-socket.c
@@ -1128,6 +1128,14 @@ soup_socket_get_remote_address (SoupSocket *sock)
return priv->remote_addr;
}
+GIOStream *
+soup_socket_get_iostream (SoupSocket *sock)
+{
+ g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+ return SOUP_SOCKET_GET_PRIVATE (sock)->conn;
+}
+
static gboolean
socket_read_watch (GObject *pollable, gpointer user_data)
diff --git a/libsoup/soup-socket.h b/libsoup/soup-socket.h
index 8761071..8305ad7 100644
--- a/libsoup/soup-socket.h
+++ b/libsoup/soup-socket.h
@@ -83,6 +83,7 @@ gboolean soup_socket_is_connected (SoupSocket *sock);
SoupAddress *soup_socket_get_local_address (SoupSocket *sock);
SoupAddress *soup_socket_get_remote_address (SoupSocket *sock);
+GIOStream *soup_socket_get_iostream (SoupSocket *sock);
typedef enum {
SOUP_SOCKET_OK,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]