[libsoup/giobased: 1/11] Use gio instead of soup_socket_write



commit 390d051ea2a4712be9c5a9d7815aaaf033a2794b
Author: Dan Winship <danw gnome org>
Date:   Wed Dec 8 15:56:37 2010 +0100

    Use gio instead of soup_socket_write

 libsoup/soup-message-io.c |  141 +++++++++++++++++++++++++++++----------------
 libsoup/soup-socket.c     |    8 +++
 libsoup/soup-socket.h     |    1 +
 3 files changed, 101 insertions(+), 49 deletions(-)
---
diff --git a/libsoup/soup-message-io.c b/libsoup/soup-message-io.c
index 290d781..b5d9840 100644
--- a/libsoup/soup-message-io.c
+++ b/libsoup/soup-message-io.c
@@ -44,11 +44,16 @@ typedef enum {
 	 state != SOUP_MESSAGE_IO_STATE_DONE)
 
 typedef struct {
-	SoupSocket           *sock;
 	SoupMessageQueueItem *item;
 	SoupMessageIOMode     mode;
 	GCancellable         *cancellable;
 
+	SoupSocket           *sock;
+	GInputStream         *istream;
+	GOutputStream        *ostream;
+	GMainContext         *async_context;
+	gboolean              non_blocking;
+
 	SoupMessageIOState    read_state;
 	SoupEncoding          read_encoding;
 	GByteArray           *read_meta_buf;
@@ -68,8 +73,10 @@ typedef struct {
 	goffset               write_length;
 	goffset               written;
 
-	guint read_tag, write_tag, tls_signal_id;
+	guint read_tag, tls_signal_id;
+	GSource *write_source;
 	GSource *unpause_source;
+	gboolean paused;
 
 	SoupMessageGetHeadersFn   get_headers_cb;
 	SoupMessageParseHeadersFn parse_headers_cb;
@@ -84,8 +91,8 @@ typedef struct {
  */
 #define dummy_to_make_emacs_happy {
 #define SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK { gboolean cancelled; g_object_ref (msg);
-#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return; }
-#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || (!io->read_tag && !io->write_tag)) return val; }
+#define SOUP_MESSAGE_IO_RETURN_IF_CANCELLED_OR_PAUSED cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return; }
+#define SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED(val) cancelled = (priv->io_data != io); g_object_unref (msg); if (cancelled || io->paused) return val; }
 
 #define RESPONSE_BLOCK_SIZE 8192
 
@@ -106,6 +113,8 @@ soup_message_io_cleanup (SoupMessage *msg)
 		g_signal_handler_disconnect (io->sock, io->tls_signal_id);
 	if (io->sock)
 		g_object_unref (io->sock);
+	if (io->async_context)
+		g_main_context_unref (io->async_context);
 	if (io->item)
 		soup_message_queue_item_unref (io->item);
 
@@ -134,9 +143,9 @@ soup_message_io_stop (SoupMessage *msg)
 		g_signal_handler_disconnect (io->sock, io->read_tag);
 		io->read_tag = 0;
 	}
-	if (io->write_tag) {
-		g_signal_handler_disconnect (io->sock, io->write_tag);
-		io->write_tag = 0;
+	if (io->write_source) {
+		g_source_destroy (io->write_source);
+		io->write_source = NULL;
 	}
 
 	if (io->unpause_source) {
@@ -169,6 +178,7 @@ soup_message_io_finished (SoupMessage *msg)
 }
 
 static void io_read (SoupSocket *sock, SoupMessage *msg);
+static void io_write (SoupSocket *sock, SoupMessage *msg);
 
 static gboolean
 request_is_idempotent (SoupMessage *msg)
@@ -514,6 +524,19 @@ read_body_chunk (SoupMessage *msg)
 	return TRUE;
 }
 
+static gboolean
+io_writable (GPollableOutputStream *stream, gpointer msg)
+{
+	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
+	SoupMessageIOData *io = priv->io_data;
+
+	g_source_destroy (io->write_source);
+	io->write_source = NULL;
+
+	io_write (io->sock, msg);
+	return FALSE;
+}
+
 /* Attempts to write @len bytes from @data. See the note at
  * read_metadata() for an explanation of the return value.
  */
@@ -522,43 +545,56 @@ write_data (SoupMessage *msg, const char *data, guint len, gboolean body)
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io = priv->io_data;
-	SoupSocketIOStatus status;
 	gsize nwrote;
 	GError *error = NULL;
 	SoupBuffer *chunk;
 	const char *start;
 
+	if (!io->ostream) {
+		io_error (io->sock, msg, NULL);
+		return FALSE;
+	}
+
 	while (len > io->written) {
-		status = soup_socket_write (io->sock,
-					    data + io->written,
-					    len - io->written,
-					    &nwrote,
-					    io->cancellable, &error);
-		switch (status) {
-		case SOUP_SOCKET_EOF:
-		case SOUP_SOCKET_ERROR:
+		if (io->non_blocking) {
+			nwrote = g_pollable_output_stream_write_nonblocking (
+				G_POLLABLE_OUTPUT_STREAM (io->ostream),
+				data + io->written, len - io->written,
+				io->cancellable, &error);
+		} else {
+			nwrote = g_output_stream_write (io->ostream,
+							data + io->written,
+							len - io->written,
+							io->cancellable, &error);
+		}
+
+		if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) {
+			g_error_free (error);
+			io->write_source = g_pollable_output_stream_create_source (
+				G_POLLABLE_OUTPUT_STREAM (io->ostream),
+				NULL);
+			g_source_set_callback (io->write_source, (GSourceFunc) io_writable, msg, NULL);
+			g_source_attach (io->write_source, io->async_context);
+			g_source_unref (io->write_source);
+			return FALSE;
+		} else if (error) {
 			io_error (io->sock, msg, error);
 			return FALSE;
+		}
 
-		case SOUP_SOCKET_WOULD_BLOCK:
-			return FALSE;
+		start = data + io->written;
+		io->written += nwrote;
 
-		case SOUP_SOCKET_OK:
-			start = data + io->written;
-			io->written += nwrote;
-
-			if (body) {
-				if (io->write_length)
-					io->write_length -= nwrote;
-
-				chunk = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
-							 start, nwrote);
-				SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
-				soup_message_wrote_body_data (msg, chunk);
-				soup_buffer_free (chunk);
-				SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
-			}
-			break;
+		if (body) {
+			if (io->write_length)
+				io->write_length -= nwrote;
+
+			chunk = soup_buffer_new (SOUP_MEMORY_TEMPORARY,
+						 start, nwrote);
+			SOUP_MESSAGE_IO_PREPARE_FOR_CALLBACK;
+			soup_message_wrote_body_data (msg, chunk);
+			soup_buffer_free (chunk);
+			SOUP_MESSAGE_IO_RETURN_VAL_IF_CANCELLED_OR_PAUSED (FALSE);
 		}
 	}
 
@@ -809,9 +845,9 @@ io_write (SoupSocket *sock, SoupMessage *msg)
 
 
 	case SOUP_MESSAGE_IO_STATE_FINISHING:
-		if (io->write_tag) {
-			g_signal_handler_disconnect (io->sock, io->write_tag);
-			io->write_tag = 0;
+		if (io->write_source) {
+			g_source_destroy (io->write_source);
+			io->write_source = NULL;
 		}
 		io->write_state = SOUP_MESSAGE_IO_STATE_DONE;
 
@@ -1077,9 +1113,9 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 {
 	SoupMessagePrivate *priv = SOUP_MESSAGE_GET_PRIVATE (msg);
 	SoupMessageIOData *io;
+	GIOStream *iostream;
 
 	io = g_slice_new0 (SoupMessageIOData);
-	io->sock = g_object_ref (sock);
 	io->mode = mode;
 	io->get_headers_cb   = get_headers_cb;
 	io->parse_headers_cb = parse_headers_cb;
@@ -1087,13 +1123,22 @@ new_iostate (SoupMessage *msg, SoupSocket *sock, SoupMessageIOMode mode,
 	io->completion_cb    = completion_cb;
 	io->completion_data  = completion_data;
 
+	io->sock = g_object_ref (sock);
+	iostream = soup_socket_get_iostream (sock);
+	if (iostream) {
+		io->istream = g_io_stream_get_input_stream (iostream);
+		io->ostream = g_io_stream_get_output_stream (iostream);
+	}
+	g_object_get (io->sock,
+		      SOUP_SOCKET_FLAG_NONBLOCKING, &io->non_blocking,
+		      SOUP_SOCKET_ASYNC_CONTEXT, &io->async_context,
+		      NULL);
+
 	io->read_meta_buf    = g_byte_array_new ();
 	io->write_buf        = g_string_new (NULL);
 
 	io->read_tag  = g_signal_connect (io->sock, "readable",
 					  G_CALLBACK (io_read), msg);
-	io->write_tag = g_signal_connect (io->sock, "writable",
-					  G_CALLBACK (io_write), msg);
 
 	io->read_state  = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
 	io->write_state = SOUP_MESSAGE_IO_STATE_NOT_STARTED;
@@ -1164,9 +1209,9 @@ soup_message_io_pause (SoupMessage *msg)
 
 	g_return_if_fail (io != NULL);
 
-	if (io->write_tag) {
-		g_signal_handler_disconnect (io->sock, io->write_tag);
-		io->write_tag = 0;
+	if (io->write_source) {
+		g_source_destroy (io->write_source);
+		io->write_source = NULL;
 	}
 	if (io->read_tag) {
 		g_signal_handler_disconnect (io->sock, io->read_tag);
@@ -1177,6 +1222,8 @@ soup_message_io_pause (SoupMessage *msg)
 		g_source_destroy (io->unpause_source);
 		io->unpause_source = NULL;
 	}
+
+	io->paused = TRUE;
 }
 
 static gboolean
@@ -1187,15 +1234,11 @@ io_unpause_internal (gpointer msg)
 
 	g_return_val_if_fail (io != NULL, FALSE);
 	io->unpause_source = NULL;
+	io->paused = FALSE;
 
-	if (io->write_tag || io->read_tag)
+	if (io->write_source || io->read_tag)
 		return FALSE;
 
-	if (io->write_state != SOUP_MESSAGE_IO_STATE_DONE) {
-		io->write_tag = g_signal_connect (io->sock, "writable",
-						  G_CALLBACK (io_write), msg);
-	}
-
 	if (io->read_state != SOUP_MESSAGE_IO_STATE_DONE) {
 		io->read_tag = g_signal_connect (io->sock, "readable",
 						 G_CALLBACK (io_read), msg);
diff --git a/libsoup/soup-socket.c b/libsoup/soup-socket.c
index 3e7d725..35608c0 100644
--- a/libsoup/soup-socket.c
+++ b/libsoup/soup-socket.c
@@ -1128,6 +1128,14 @@ soup_socket_get_remote_address (SoupSocket *sock)
 	return priv->remote_addr;
 }
 
+GIOStream *
+soup_socket_get_iostream (SoupSocket *sock)
+{
+	g_return_val_if_fail (SOUP_IS_SOCKET (sock), NULL);
+
+	return SOUP_SOCKET_GET_PRIVATE (sock)->conn;
+}
+
 
 static gboolean
 socket_read_watch (GObject *pollable, gpointer user_data)
diff --git a/libsoup/soup-socket.h b/libsoup/soup-socket.h
index 8761071..8305ad7 100644
--- a/libsoup/soup-socket.h
+++ b/libsoup/soup-socket.h
@@ -83,6 +83,7 @@ gboolean       soup_socket_is_connected       (SoupSocket         *sock);
 SoupAddress   *soup_socket_get_local_address  (SoupSocket         *sock);
 SoupAddress   *soup_socket_get_remote_address (SoupSocket         *sock);
 
+GIOStream     *soup_socket_get_iostream       (SoupSocket         *sock);
 
 typedef enum {
 	SOUP_SOCKET_OK,



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]