[libsoup/websocket: 5/10] soup-session: add soup_session_steal_connection()



commit 3ab39994d0011fdd881e44e65e92d9c9b51db1a9
Author: Dan Winship <danw gnome org>
Date:   Wed Dec 10 15:18:54 2014 +0100

    soup-session: add soup_session_steal_connection()
    
    Add a method to allow stealing a connection from a SoupSession.
    
    Includes a new test using both this and
    soup_client_context_steal_connection() to perform an upgrade on both
    sides of a previously-HTTP connection.

 libsoup/libsoup-2.4.sym |    1 +
 libsoup/soup-session.c  |   68 +++++++++++++++++++
 libsoup/soup-session.h  |    4 +
 tests/misc-test.c       |  168 +++++++++++++++++++++++++++++++++++++++++++++++
 4 files changed, 241 insertions(+), 0 deletions(-)
---
diff --git a/libsoup/libsoup-2.4.sym b/libsoup/libsoup-2.4.sym
index 5edeab3..39f26d1 100644
--- a/libsoup/libsoup-2.4.sym
+++ b/libsoup/libsoup-2.4.sym
@@ -426,6 +426,7 @@ soup_session_send
 soup_session_send_async
 soup_session_send_finish
 soup_session_send_message
+soup_session_steal_connection
 soup_session_sync_get_type
 soup_session_sync_new
 soup_session_sync_new_with_options
diff --git a/libsoup/soup-session.c b/libsoup/soup-session.c
index e6dabb3..885d9e2 100644
--- a/libsoup/soup-session.c
+++ b/libsoup/soup-session.c
@@ -1577,6 +1577,12 @@ message_completed (SoupMessage *msg, SoupMessageIOCompletion completion, gpointe
        if (item->async)
                soup_session_kick_queue (item->session);
 
+       if (completion == SOUP_MESSAGE_IO_STOLEN) {
+               item->state = SOUP_MESSAGE_FINISHED;
+               soup_session_unqueue_item (item->session, item);
+               return;
+       }
+
        if (item->state != SOUP_MESSAGE_RESTARTING) {
                item->state = SOUP_MESSAGE_FINISHING;
 
@@ -4649,3 +4655,65 @@ soup_request_error_quark (void)
                error = g_quark_from_static_string ("soup_request_error_quark");
        return error;
 }
+
+/**
+ * soup_session_steal_connection:
+ * @session: a #SoupSession
+ * @msg: the message whose connection is to be stolen
+ *
+ * "Steals" the HTTP connection associated with @msg from @session.
+ * This happens immediately, regardless of the current state of the
+ * connection, and @msg's callback will not be called. You can steal
+ * the connection from a #SoupMessage signal handler if you need to
+ * wait for part or all of the response to be received first.
+ *
+ * Calling this function may cause @msg to be freed if you are not
+ * holding any other reference to it.
+ *
+ * Return value: (transfer full): the #GIOStream formerly associated
+ *   with @msg (or %NULL if @msg was no longer associated with a
+ *   connection). No guarantees are made about what kind of #GIOStream
+ *   is returned.
+ *
+ * Since: 2.50
+ **/
+GIOStream *
+soup_session_steal_connection (SoupSession *session,
+                              SoupMessage *msg)
+{
+       SoupSessionPrivate *priv = SOUP_SESSION_GET_PRIVATE (session);
+       SoupMessageQueueItem *item;
+       SoupConnection *conn;
+       SoupSocket *sock;
+       SoupSessionHost *host;
+       GIOStream *stream;
+
+       item = soup_message_queue_lookup (priv->queue, msg);
+       if (!item)
+               return NULL;
+       if (!item->conn ||
+           soup_connection_get_state (item->conn) != SOUP_CONNECTION_IN_USE) {
+               soup_message_queue_item_unref (item);
+               return NULL;
+       }
+
+       conn = g_object_ref (item->conn);
+       soup_session_set_item_connection (session, item, NULL);
+
+       g_mutex_lock (&priv->conn_lock);
+       host = get_host_for_message (session, item->msg);
+       g_hash_table_remove (priv->conns, conn);
+       drop_connection (session, host, conn);
+       g_mutex_unlock (&priv->conn_lock);
+
+       sock = soup_connection_get_socket (conn);
+       g_object_set (G_OBJECT (sock),
+                     SOUP_SOCKET_CLOSE_ON_DISPOSE, FALSE,
+                     NULL);
+       g_object_unref (conn);
+
+       stream = soup_message_io_steal (item->msg);
+
+       soup_message_queue_item_unref (item);
+       return stream;
+}
diff --git a/libsoup/soup-session.h b/libsoup/soup-session.h
index eed392d..5dcd747 100644
--- a/libsoup/soup-session.h
+++ b/libsoup/soup-session.h
@@ -206,6 +206,10 @@ typedef enum {
        SOUP_REQUEST_ERROR_ENCODING
 } SoupRequestError;
 
+SOUP_AVAILABLE_IN_2_50
+GIOStream *soup_session_steal_connection (SoupSession *session,
+                                         SoupMessage *msg);
+
 G_END_DECLS
 
 #endif /* SOUP_SESSION_H */
diff --git a/tests/misc-test.c b/tests/misc-test.c
index acf42c3..68095f5 100644
--- a/tests/misc-test.c
+++ b/tests/misc-test.c
@@ -900,6 +900,172 @@ do_pause_abort_test (void)
        g_assert_null (ptr);
 }
 
+static void
+steal_after_upgrade (SoupMessage *msg, gpointer user_data)
+{
+       SoupClientContext *context = user_data;
+       GIOStream *stream;
+       GInputStream *istream;
+       GDataInputStream *distream;
+       GOutputStream *ostream;
+       char *str, *caps;
+       gssize n;
+       GError *error = NULL;
+
+       /* This should not ever be seen. */
+       soup_message_set_status (msg, SOUP_STATUS_INTERNAL_SERVER_ERROR);
+
+       stream = soup_client_context_steal_connection (context);
+
+       istream = g_io_stream_get_input_stream (stream);
+       distream = G_DATA_INPUT_STREAM (g_data_input_stream_new (istream));
+       ostream = g_io_stream_get_output_stream (stream);
+
+       /* Echo until the client disconnects */
+       while (TRUE) {
+               str = g_data_input_stream_read_line (distream, NULL, NULL, &error);
+               g_assert_no_error (error);
+               if (!str)
+                       break;
+
+               caps = g_ascii_strup (str, -1);
+               n = g_output_stream_write (ostream, caps, strlen (caps), NULL, &error);
+               g_assert_no_error (error);
+               g_assert_cmpint (n, ==, strlen (caps)); 
+               n = g_output_stream_write (ostream, "\n", 1, NULL, &error);
+               g_assert_no_error (error);
+               g_assert_cmpint (n, ==, 1);
+               g_free (caps);
+               g_free (str);
+       }
+
+       g_object_unref (distream);
+
+       g_io_stream_close (stream, NULL, &error);
+       g_assert_no_error (error);
+       g_object_unref (stream);
+}
+
+static void
+upgrade_server_callback (SoupServer *server, SoupMessage *msg,
+                        const char *path, GHashTable *query,
+                        SoupClientContext *context, gpointer data)
+{
+       if (msg->method != SOUP_METHOD_GET) {
+               soup_message_set_status (msg, SOUP_STATUS_NOT_IMPLEMENTED);
+               return;
+       }
+
+       soup_message_set_status (msg, SOUP_STATUS_SWITCHING_PROTOCOLS);
+       soup_message_headers_append (msg->request_headers, "Upgrade", "ECHO");
+       soup_message_headers_append (msg->request_headers, "Connection", "upgrade");
+
+       g_signal_connect (msg, "wrote-informational",
+                         G_CALLBACK (steal_after_upgrade), context);
+}
+
+static void
+callback_not_reached (SoupSession *session, SoupMessage *msg, gpointer user_data)
+{
+       g_assert_not_reached ();
+}
+
+static void
+switching_protocols (SoupMessage *msg, gpointer user_data)
+{
+       GIOStream **out_iostream = user_data;
+       SoupSession *session = g_object_get_data (G_OBJECT (msg), "SoupSession");
+
+       *out_iostream = soup_session_steal_connection (session, msg);
+}
+
+static void
+do_stealing_test (gconstpointer data)
+{
+       gboolean sync = GPOINTER_TO_INT (data);
+       SoupServer *server;
+       SoupURI *uri;
+       SoupSession *session;
+       SoupMessage *msg;
+       GIOStream *iostream;
+       GInputStream *istream;
+       GDataInputStream *distream;
+       GOutputStream *ostream;
+       int i;
+       gssize n;
+       char *str, *caps;
+       GError *error = NULL;
+       static const char *strings[] = { "one", "two", "three", "four", "five" };
+
+       server = soup_test_server_new (SOUP_TEST_SERVER_IN_THREAD);
+       uri = soup_test_server_get_uri (server, SOUP_URI_SCHEME_HTTP, "127.0.0.1");
+       soup_server_add_handler (server, NULL, upgrade_server_callback, NULL, NULL);
+
+       session = soup_test_session_new (SOUP_TYPE_SESSION, NULL);
+       msg = soup_message_new_from_uri ("GET", uri);
+       soup_message_headers_append (msg->request_headers, "Upgrade", "echo");
+       soup_message_headers_append (msg->request_headers, "Connection", "upgrade");
+       g_object_set_data (G_OBJECT (msg), "SoupSession", session);
+
+       soup_message_add_status_code_handler (msg, "got-informational",
+                                             SOUP_STATUS_SWITCHING_PROTOCOLS,
+                                             G_CALLBACK (switching_protocols), &iostream);
+
+       iostream = NULL;
+
+       if (sync) {
+               soup_session_send_message (session, msg);
+               soup_test_assert_message_status (msg, SOUP_STATUS_SWITCHING_PROTOCOLS);
+       } else {
+               g_object_ref (msg);
+               soup_session_queue_message (session, msg, callback_not_reached, NULL);
+               while (iostream == NULL)
+                       g_main_context_iteration (NULL, TRUE);
+       }
+
+       g_assert (iostream != NULL);
+
+       g_object_unref (msg);
+       soup_test_session_abort_unref (session);
+       soup_uri_free (uri);
+
+       /* Now iostream connects to a (capitalizing) echo server */
+
+       istream = g_io_stream_get_input_stream (iostream);
+       distream = G_DATA_INPUT_STREAM (g_data_input_stream_new (istream));
+       ostream = g_io_stream_get_output_stream (iostream);
+
+       for (i = 0; i < G_N_ELEMENTS (strings); i++) {
+               n = g_output_stream_write (ostream, strings[i], strlen (strings[i]),
+                                          NULL, &error);
+               g_assert_no_error (error);
+               g_assert_cmpint (n, ==, strlen (strings[i]));
+               n = g_output_stream_write (ostream, "\n", 1, NULL, &error);
+               g_assert_no_error (error);
+               g_assert_cmpint (n, ==, 1);
+       }
+
+       for (i = 0; i < G_N_ELEMENTS (strings); i++) {
+               str = g_data_input_stream_read_line (distream, NULL, NULL, &error);
+               g_assert_no_error (error);
+               caps = g_ascii_strup (strings[i], -1);
+               g_assert_cmpstr (caps, ==, str);
+               g_free (caps);
+               g_free (str);
+       }
+
+       g_object_unref (distream);
+
+       g_io_stream_close (iostream, NULL, &error);
+       g_assert_no_error (error);
+       g_object_unref (iostream);
+
+       /* We can't do this until the end because it's in another thread, and
+        * soup_test_server_quit_unref() will wait for that thread to exit.
+        */ 
+       soup_test_server_quit_unref (server);
+}
+
 int
 main (int argc, char **argv)
 {
@@ -940,6 +1106,8 @@ main (int argc, char **argv)
        g_test_add_func ("/misc/aliases", do_aliases_test);
        g_test_add_func ("/misc/idle-on-dispose", do_idle_on_dispose_test);
        g_test_add_func ("/misc/pause-abort", do_pause_abort_test);
+       g_test_add_data_func ("/misc/stealing/async", GINT_TO_POINTER (FALSE), do_stealing_test);
+       g_test_add_data_func ("/misc/stealing/sync", GINT_TO_POINTER (TRUE), do_stealing_test);
 
        ret = g_test_run ();
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]