[evolution-data-server] CamelIMAPXServer: Use a main loop in the parser thread.



commit a2b3c60191021e7dd93f810cbd8f23993fb53a46
Author: Matthew Barnes <mbarnes redhat com>
Date:   Thu Jan 16 08:33:16 2014 -0500

    CamelIMAPXServer: Use a main loop in the parser thread.
    
    The "while" loop previously used in the parser thread just blocked the
    thread until incoming data became available.  Using a GMainLoop allows
    us to add other event sources, like periodic timeouts to help keep the
    connection alive without relying on Evolution.
    
    I'm also hoping this will help eliminate the separate thread for IDLE,
    since it now uses a GMainLoop as well.  But I don't quite have my head
    around all the implications yet.

 camel/providers/imapx/camel-imapx-server.c |  233 ++++++++++++++++++----------
 1 files changed, 151 insertions(+), 82 deletions(-)
---
diff --git a/camel/providers/imapx/camel-imapx-server.c b/camel/providers/imapx/camel-imapx-server.c
index 2346444..6d8a186 100644
--- a/camel/providers/imapx/camel-imapx-server.c
+++ b/camel/providers/imapx/camel-imapx-server.c
@@ -25,6 +25,10 @@
 #include <glib/gi18n-lib.h>
 #include <gio/gnetworking.h>
 
+#ifndef G_OS_WIN32
+#include <glib-unix.h>
+#endif /* G_OS_WIN32 */
+
 #include "camel-imapx-server.h"
 
 #include "camel-imapx-command.h"
@@ -312,8 +316,9 @@ struct _CamelIMAPXServerPrivate {
        GMutex stream_lock;
 
        GThread *parser_thread;
+       GMainLoop *parser_main_loop;
+       GMainContext *parser_main_context;
        GWeakRef parser_cancellable;
-       gboolean parser_quit;
 
        CamelIMAPXNamespaceResponse *namespaces;
        GMutex namespaces_lock;
@@ -1129,7 +1134,7 @@ fail:
        camel_imapx_command_queue_remove (is->active, ic);
 
        /* Break the parser thread out of its loop so it disconnects. */
-       is->priv->parser_quit = TRUE;
+       g_main_loop_quit (is->priv->parser_main_loop);
        g_cancellable_cancel (cancellable);
 
        /* Hand the error off to the command that we failed to start. */
@@ -1268,7 +1273,7 @@ imapx_command_start_next (CamelIMAPXServer *is)
                        camel_imapx_command_unref (ic);
 
                        /* This will terminate the loop. */
-                       if (is->priv->parser_quit)
+                       if (is->state == IMAPX_SHUTDOWN)
                                g_queue_clear (&start);
                }
 
@@ -1430,7 +1435,7 @@ imapx_command_start_next (CamelIMAPXServer *is)
                        imapx_command_start (is, ic);
                        camel_imapx_command_unref (ic);
 
-                       if (is->priv->parser_quit) {
+                       if (is->state == IMAPX_SHUTDOWN) {
                                g_queue_clear (&start);
                                return;
                        }
@@ -1525,7 +1530,7 @@ imapx_command_start_next (CamelIMAPXServer *is)
                        camel_imapx_command_unref (ic);
 
                        /* This will terminate the loop. */
-                       if (is->priv->parser_quit)
+                       if (is->state == IMAPX_SHUTDOWN)
                                g_queue_clear (&start);
                }
        }
@@ -3430,7 +3435,7 @@ imapx_command_idle_stop (CamelIMAPXServer *is,
                g_prefix_error (error, "Unable to issue DONE: ");
                c (is->tagprefix, "Failed to issue DONE to terminate IDLE\n");
                is->state = IMAPX_SHUTDOWN;
-               is->priv->parser_quit = TRUE;
+               g_main_loop_quit (is->priv->parser_main_loop);
        }
 
        g_clear_object (&cancellable);
@@ -4154,6 +4159,8 @@ connect_to_server_process (CamelIMAPXServer *is,
        g_free (host);
        g_free (user);
 
+       /* FIXME Use GSubprocess here as soon as we can. */
+
        cmd_stream = camel_stream_process_new ();
 
        ret = camel_stream_process_connect (
@@ -4290,7 +4297,7 @@ imapx_connect_to_server (CamelIMAPXServer *is,
 
        while (1) {
                // poll ? wait for other stuff? loop?
-               if (camel_application_is_exiting || is->priv->parser_quit) {
+               if (camel_application_is_exiting) {
                        g_set_error (
                                error, G_IO_ERROR,
                                G_IO_ERROR_CANCELLED,
@@ -7149,15 +7156,71 @@ imapx_abort_all_commands (CamelIMAPXServer *is,
 
 /* ********************************************************************** */
 
-static void
-imapx_parse_contents (CamelIMAPXServer *is,
-                      CamelIMAPXStream *stream,
-                      GCancellable *cancellable,
-                      GError **error)
+static gboolean
+imapx_parse_contents (CamelIMAPXServer *is)
 {
-       while (imapx_step (is, stream, cancellable, error))
-               if (camel_imapx_stream_buffered (stream) == 0)
-                       break;
+       CamelIMAPXStream *stream;
+       GCancellable *cancellable;
+       GError *local_error = NULL;
+
+       /* XXX Still need to retrieve the CamelStream until IMAPX can
+        *     be ported to use GInputStream / GOutputStream directly. */
+       stream = camel_imapx_server_ref_stream (is);
+
+       cancellable = g_weak_ref_get (&is->priv->parser_cancellable);
+
+       if (stream != NULL) {
+               while (imapx_step (is, stream, cancellable, &local_error))
+                       if (camel_imapx_stream_buffered (stream) == 0)
+                               break;
+       } else {
+               local_error = g_error_new_literal (
+                       CAMEL_SERVICE_ERROR,
+                       CAMEL_SERVICE_ERROR_NOT_CONNECTED,
+                       _("Lost connection to IMAP server"));
+       }
+
+       if (g_cancellable_is_cancelled (cancellable)) {
+               gboolean active_queue_is_empty;
+
+               QUEUE_LOCK (is);
+               active_queue_is_empty =
+                       camel_imapx_command_queue_is_empty (is->active);
+               QUEUE_UNLOCK (is);
+
+               if (active_queue_is_empty || imapx_in_idle (is)) {
+                       g_cancellable_reset (cancellable);
+                       g_clear_error (&local_error);
+               } else {
+                       /* Cancelled error should be set. */
+                       g_warn_if_fail (local_error != NULL);
+               }
+       }
+
+       g_clear_object (&stream);
+       g_clear_object (&cancellable);
+
+       if (local_error != NULL) {
+               g_main_loop_quit (is->priv->parser_main_loop);
+               imapx_abort_all_commands (is, local_error);
+               g_clear_error (&local_error);
+               return G_SOURCE_REMOVE;
+       }
+
+       return G_SOURCE_CONTINUE;
+}
+
+static gboolean
+imapx_ready_to_read (GInputStream *input_stream,
+                     CamelIMAPXServer *is)
+{
+       return imapx_parse_contents (is);
+}
+
+static gboolean
+imapx_process_ready_to_read (CamelIMAPXServer *is)
+{
+       return imapx_parse_contents (is);
 }
 
 /*
@@ -7172,90 +7235,84 @@ imapx_parser_thread (gpointer user_data)
 {
        CamelIMAPXServer *is;
        CamelIMAPXStore *store;
+       CamelIMAPXStream *stream;
+       CamelStream *source_stream;
        GCancellable *cancellable;
-       GError *local_error = NULL;
 
        is = CAMEL_IMAPX_SERVER (user_data);
 
        cancellable = camel_operation_new ();
        g_weak_ref_set (&is->priv->parser_cancellable, cancellable);
 
-       while (local_error == NULL) {
-               CamelIMAPXStream *stream;
+       stream = camel_imapx_server_ref_stream (is);
+       g_return_val_if_fail (stream != NULL, NULL);
 
-               /* Reacquire the stream on every loop iteration. */
-               stream = camel_imapx_server_ref_stream (is);
-               if (stream == NULL) {
-                       local_error = g_error_new_literal (
-                               CAMEL_SERVICE_ERROR,
-                               CAMEL_SERVICE_ERROR_NOT_CONNECTED,
-                               _("Lost connection to IMAP server"));
-                       break;
-               }
+       source_stream = camel_imapx_stream_ref_source (stream);
 
-               g_cancellable_reset (cancellable);
+       g_main_context_push_thread_default (is->priv->parser_main_context);
 
 #ifndef G_OS_WIN32
-               if (is->is_process_stream) {
-                       GPollFD fds[2] = { {0, 0, 0}, {0, 0, 0} };
-                       CamelStream *source;
-                       gint res;
+       if (is->is_process_stream) {
+               GSource *unix_fd_source;
+               GSource *cancellable_source;
+               gint fd;
 
-                       source = camel_imapx_stream_ref_source (stream);
-
-                       fds[0].fd = CAMEL_STREAM_PROCESS (source)->sockfd;
-                       fds[0].events = G_IO_IN;
-                       fds[1].fd = g_cancellable_get_fd (cancellable);
-                       fds[1].events = G_IO_IN;
-                       res = g_poll (fds, 2, -1);
-                       if (res == -1)
-                               g_usleep (1) /* ?? */ ;
-                       else if (res == 0)
-                               /* timed out */;
-                       else if (fds[0].revents & G_IO_IN)
-                               imapx_parse_contents (
-                                       is, stream, cancellable, &local_error);
-                       g_cancellable_release_fd (cancellable);
-
-                       g_object_unref (source);
-               } else
-#endif
-               {
-                       imapx_parse_contents (
-                               is, stream, cancellable, &local_error);
-               }
+               /* FIXME Use GSubprocess here as soon as we can. */
 
-               if (is->priv->parser_quit)
-                       g_cancellable_cancel (cancellable);
-               else if (g_cancellable_is_cancelled (cancellable)) {
-                       gboolean active_queue_is_empty;
+               fd = CAMEL_STREAM_PROCESS (source_stream)->sockfd;
 
-                       QUEUE_LOCK (is);
-                       active_queue_is_empty =
-                               camel_imapx_command_queue_is_empty (is->active);
-                       QUEUE_UNLOCK (is);
+               cancellable_source = g_cancellable_source_new (cancellable);
+               g_source_set_dummy_callback (cancellable_source);
 
-                       if (active_queue_is_empty || imapx_in_idle (is)) {
-                               g_cancellable_reset (cancellable);
-                               g_clear_error (&local_error);
-                       } else {
-                               /* Cancelled error should be set. */
-                               g_warn_if_fail (local_error != NULL);
-                       }
-               }
+               unix_fd_source = g_unix_fd_source_new (fd, G_IO_IN);
+               g_source_add_child_source (
+                       unix_fd_source, cancellable_source);
+               g_source_set_callback (
+                       unix_fd_source,
+                       (GSourceFunc) imapx_process_ready_to_read,
+                       g_object_ref (is),
+                       (GDestroyNotify) g_object_unref);
+               g_source_attach (
+                       unix_fd_source,
+                       is->priv->parser_main_context);
+               g_source_unref (unix_fd_source);
 
-               g_clear_object (&stream);
-       }
+               g_source_unref (cancellable_source);
 
-       QUEUE_LOCK (is);
-       is->state = IMAPX_SHUTDOWN;
-       QUEUE_UNLOCK (is);
+       } else
+#endif /* G_OS_WIN32 */
+       {
+               GIOStream *base_stream;
+               GInputStream *input_stream;
+               GSource *pollable_source;
+
+               base_stream = camel_stream_ref_base_stream (source_stream);
+               input_stream = g_io_stream_get_input_stream (base_stream);
+
+               pollable_source = g_pollable_input_stream_create_source (
+                       G_POLLABLE_INPUT_STREAM (input_stream), cancellable);
+               g_source_set_callback (
+                       pollable_source,
+                       (GSourceFunc) imapx_ready_to_read,
+                       g_object_ref (is),
+                       (GDestroyNotify) g_object_unref);
+               g_source_attach (
+                       pollable_source,
+                       is->priv->parser_main_context);
+               g_source_unref (pollable_source);
 
-       imapx_abort_all_commands (is, local_error);
+               g_object_unref (base_stream);
+       }
 
        g_clear_object (&cancellable);
+       g_clear_object (&source_stream);
+       g_clear_object (&stream);
 
-       is->priv->parser_quit = FALSE;
+       g_main_loop_run (is->priv->parser_main_loop);
+
+       QUEUE_LOCK (is);
+       is->state = IMAPX_SHUTDOWN;
+       QUEUE_UNLOCK (is);
 
        /* Disconnect the CamelService. */
        store = camel_imapx_server_ref_store (is);
@@ -7263,7 +7320,7 @@ imapx_parser_thread (gpointer user_data)
                CAMEL_SERVICE (store), FALSE, NULL, NULL);
        g_object_unref (store);
 
-       g_clear_error (&local_error);
+       g_main_context_pop_thread_default (is->priv->parser_main_context);
 
        g_object_unref (is);
 
@@ -7334,11 +7391,11 @@ imapx_server_dispose (GObject *object)
        CamelIMAPXServer *server = CAMEL_IMAPX_SERVER (object);
        GCancellable *cancellable;
 
+       g_main_loop_quit (server->priv->parser_main_loop);
+
        QUEUE_LOCK (server);
        server->state = IMAPX_SHUTDOWN;
 
-       server->priv->parser_quit = TRUE;
-
        cancellable = g_weak_ref_get (&server->priv->parser_cancellable);
        g_weak_ref_set (&server->priv->parser_cancellable, NULL);
 
@@ -7385,6 +7442,9 @@ imapx_server_finalize (GObject *object)
        g_rec_mutex_clear (&is->queue_lock);
        g_mutex_clear (&is->priv->select_lock);
 
+       g_main_loop_unref (is->priv->parser_main_loop);
+       g_main_context_unref (is->priv->parser_main_context);
+
        camel_folder_change_info_free (is->priv->changes);
 
        g_free (is->priv->context);
@@ -7630,7 +7690,16 @@ camel_imapx_server_init (CamelIMAPXServer *is)
                (GDestroyNotify) g_free,
                (GDestroyNotify) NULL);
 
-       /* Initialize IDLE structs. */
+       /* Initialize parser thread structs. */
+
+       main_context = g_main_context_new ();
+
+       is->priv->parser_main_loop = g_main_loop_new (main_context, FALSE);
+       is->priv->parser_main_context = g_main_context_ref (main_context);
+
+       g_main_context_unref (main_context);
+
+       /* Initialize IDLE thread structs. */
 
        main_context = g_main_context_new ();
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]