[evolution-ews/wip/mcrha/soup3] Read EWS notifications in the SoupSession worker thread



commit 855d070399809b0db4d19f29d75973121fb43f32
Author: Milan Crha <mcrha redhat com>
Date:   Thu May 19 18:56:23 2022 +0200

    Read EWS notifications in the SoupSession worker thread

 src/EWS/common/e-ews-connection.c   | 103 ++++++++++++++++++++++++++++------
 src/EWS/common/e-ews-connection.h   |  15 ++++-
 src/EWS/common/e-ews-notification.c | 107 ++++++++++++++++++++++++++----------
 3 files changed, 176 insertions(+), 49 deletions(-)
---
diff --git a/src/EWS/common/e-ews-connection.c b/src/EWS/common/e-ews-connection.c
index e96954f7..11675039 100644
--- a/src/EWS/common/e-ews-connection.c
+++ b/src/EWS/common/e-ews-connection.c
@@ -307,7 +307,8 @@ e_ews_connection_process_request_ready_cb (GObject *source_object,
 
        /* Need to process the 'input_stream' in this thread */
        if (!ews_connection_credentials_failed (pd->cnc, pd->message, FALSE, NULL) &&
-           soup_message_get_status (pd->message) != SOUP_STATUS_UNAUTHORIZED) {
+           soup_message_get_status (pd->message) != SOUP_STATUS_UNAUTHORIZED &&
+           input_stream) {
                ESoapRequestCustomProcessFn custom_process_fn = NULL;
                gpointer custom_process_data = NULL;
 
@@ -9605,6 +9606,51 @@ e_ews_connection_unsubscribe_sync (EEwsConnection *cnc,
        return success;
 }
 
+typedef struct _StreamEventsData {
+       gint ref_count;
+       gboolean reading;
+       gpointer buffer;
+       EEwsStreamingEventsReadCallback cb_read;
+       EEwsStreamingEventsFinishedCallback cb_finished;
+       gpointer cb_user_data;
+       GCancellable *cancellable;
+       GError *error;
+} StreamingEventsData;
+
+static void
+streaming_events_data_unref (StreamingEventsData *sed)
+{
+       if (g_atomic_int_dec_and_test (&sed->ref_count)) {
+               g_clear_pointer (&sed->buffer, g_free);
+               g_clear_object (&sed->cancellable);
+               g_clear_error (&sed->error);
+               g_slice_free (StreamingEventsData, sed);
+       }
+}
+
+static void
+e_ews_get_streaming_events_read_data_cb (GObject *source_object,
+                                        GAsyncResult *result,
+                                        gpointer user_data)
+{
+       StreamingEventsData *sed = user_data;
+       GInputStream *input_stream = G_INPUT_STREAM (source_object);
+       gssize nread;
+
+       nread = g_input_stream_read_finish (input_stream, result, &sed->error);
+
+       if (nread > 0 &&
+           sed->cb_read (sed->buffer, nread, sed->cb_user_data, sed->cancellable, &sed->error)) {
+               g_input_stream_read_async (input_stream, sed->buffer, BUFFER_SIZE, G_PRIORITY_HIGH, 
sed->cancellable,
+                       e_ews_get_streaming_events_read_data_cb, sed);
+               return;
+       }
+
+       sed->cb_finished (sed->cb_user_data, sed->error);
+
+       streaming_events_data_unref (sed);
+}
+
 static void
 e_ews_get_streaming_events_custom_process (ESoapRequest *request,
                                           SoupMessage *message,
@@ -9614,27 +9660,44 @@ e_ews_get_streaming_events_custom_process (ESoapRequest *request,
                                           GCancellable *cancellable,
                                           GError **error)
 {
-       GInputStream **out_input_stream = user_data;
+       StreamingEventsData *sed = user_data;
+
+       sed->reading = TRUE;
+       g_atomic_int_inc (&sed->ref_count);
 
-       if (out_input_stream && input_stream)
-               *out_input_stream = g_object_ref (input_stream);
+       g_input_stream_read_async (input_stream, sed->buffer, BUFFER_SIZE, G_PRIORITY_HIGH, sed->cancellable,
+               e_ews_get_streaming_events_read_data_cb, sed);
 }
 
+/*
+ * Only begins reading of the events, which is done asynchronously using @cb_read,
+ * until @cb_finished is called. When @cb_read returns FALSE, the read is done,
+ * followed by the @cb_finished call. Both callbacks use the same user data.
+ * It's guaranteed the @cb_finished is always called when this function returns TRUE.
+ *
+ * This is to make reading from the input stream from the EWS worker thread,
+ * because it cannot be stopped, not read, from another thread, with a different
+ * thread default main context than it was created with. A libsoup3 thing.
+ */
 gboolean
-e_ews_connection_get_streaming_events_sync (EEwsConnection *cnc,
-                                           gint pri,
-                                           const gchar *subscription_id,
-                                           GInputStream **out_input_stream,
-                                           GCancellable *cancellable,
-                                           GError **error)
+e_ews_connection_read_streaming_events_sync (EEwsConnection *cnc,
+                                            gint pri,
+                                            const gchar *subscription_id,
+                                            EEwsStreamingEventsReadCallback cb_read,
+                                            EEwsStreamingEventsFinishedCallback cb_finished,
+                                            gpointer cb_user_data,
+                                            GCancellable *cancellable,
+                                            GError **error)
 {
        ESoapRequest *request;
        ESoapResponse *response;
-       GInputStream *input_stream = NULL;
+       StreamingEventsData *sed;
        gboolean success;
 
        g_return_val_if_fail (cnc != NULL, FALSE);
        g_return_val_if_fail (subscription_id != NULL, FALSE);
+       g_return_val_if_fail (cb_read != NULL, FALSE);
+       g_return_val_if_fail (cb_finished != NULL, FALSE);
 
        request = e_ews_request_new_with_header (
                cnc->priv->uri,
@@ -9658,7 +9721,16 @@ e_ews_connection_get_streaming_events_sync (EEwsConnection *cnc,
 
        e_ews_request_write_footer (request);
 
-       e_soap_request_set_custom_process_fn (request, e_ews_get_streaming_events_custom_process, 
&input_stream);
+       sed = g_slice_new0 (StreamingEventsData);
+       sed->ref_count = 1;
+       sed->reading = FALSE;
+       sed->buffer = g_malloc (BUFFER_SIZE);
+       sed->cb_read = cb_read;
+       sed->cb_finished = cb_finished;
+       sed->cb_user_data = cb_user_data;
+       sed->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+       e_soap_request_set_custom_process_fn (request, e_ews_get_streaming_events_custom_process, sed);
 
        response = e_ews_connection_send_request_sync (cnc, request, cancellable, error);
        g_warn_if_fail (response == NULL);
@@ -9666,12 +9738,9 @@ e_ews_connection_get_streaming_events_sync (EEwsConnection *cnc,
        g_clear_object (&request);
        g_clear_object (&response);
 
-       success = input_stream != NULL;
+       success = sed->reading;
 
-       if (success && out_input_stream)
-               *out_input_stream = input_stream;
-       else
-               g_clear_object (&input_stream);
+       streaming_events_data_unref (sed);
 
        return success;
 }
diff --git a/src/EWS/common/e-ews-connection.h b/src/EWS/common/e-ews-connection.h
index b55fbeb1..c077ba13 100644
--- a/src/EWS/common/e-ews-connection.h
+++ b/src/EWS/common/e-ews-connection.h
@@ -72,6 +72,15 @@ typedef gboolean(*EEwsRequestCreationCallback)       (ESoapRequest *request,
                                                 GError **error);
 typedef void   (*EEwsResponseCallback)         (ESoapResponse *response,
                                                 GSimpleAsyncResult *simple);
+typedef gboolean(*EEwsStreamingEventsReadCallback)
+                                               (gconstpointer buffer,
+                                                gssize nread,
+                                                gpointer user_data,
+                                                GCancellable *cancellable,
+                                                GError **error);
+typedef void   (*EEwsStreamingEventsFinishedCallback)
+                                               (gpointer user_data,
+                                                const GError *error);
 
 typedef enum {
        EWS_SEARCH_AD,
@@ -892,11 +901,13 @@ gboolean  e_ews_connection_unsubscribe_sync
                                                 const gchar *subscription_id,
                                                 GCancellable *cancellable,
                                                 GError **error);
-gboolean       e_ews_connection_get_streaming_events_sync
+gboolean       e_ews_connection_read_streaming_events_sync
                                                (EEwsConnection *cnc,
                                                 gint pri,
                                                 const gchar *subscription_id,
-                                                GInputStream **out_input_stream,
+                                                EEwsStreamingEventsReadCallback cb_read,
+                                                EEwsStreamingEventsFinishedCallback cb_finished,
+                                                gpointer cb_user_data,
                                                 GCancellable *cancellable,
                                                 GError **error);
 
diff --git a/src/EWS/common/e-ews-notification.c b/src/EWS/common/e-ews-notification.c
index 37860229..198e1eba 100644
--- a/src/EWS/common/e-ews-notification.c
+++ b/src/EWS/common/e-ews-notification.c
@@ -506,7 +506,44 @@ ews_notification_process_chunk (EEwsNotification *notification,
        } while (chunk_len > 0 && !g_cancellable_is_cancelled (cancellable));
 }
 
-#define BUFFER_SIZE 16384
+typedef struct _ReadindData {
+       GMutex mutex;
+       GCond cond;
+       gboolean finished;
+       EEwsNotification *notification;
+       GByteArray *chunk_data;
+       GError *error;
+} ReadingData;
+
+static gboolean
+e_ews_notification_read_data_cb (gconstpointer buffer,
+                                gssize nread,
+                                gpointer user_data,
+                                GCancellable *cancellable,
+                                GError **error)
+{
+       ReadingData *rd = user_data;
+
+       g_byte_array_append (rd->chunk_data, buffer, nread);
+       ews_notification_process_chunk (rd->notification, rd->chunk_data, cancellable);
+
+       return TRUE;
+}
+
+static void
+e_ews_notification_read_finished_cb (gpointer user_data,
+                                    const GError *error)
+{
+       ReadingData *rd = user_data;
+
+       if (error)
+               rd->error = g_error_copy (error);
+
+       g_mutex_lock (&rd->mutex);
+       rd->finished = TRUE;
+       g_cond_signal (&rd->cond);
+       g_mutex_unlock (&rd->mutex);
+}
 
 static gboolean
 e_ews_notification_get_events_sync (EEwsNotification *notification,
@@ -515,12 +552,8 @@ e_ews_notification_get_events_sync (EEwsNotification *notification,
                                    GCancellable *cancellable)
 {
        EEwsConnection *cnc;
-       GInputStream *input_stream = NULL;
-       GByteArray *chunk_data;
-       gpointer buffer;
-       gssize nread = 0;
+       ReadingData rd;
        gboolean success;
-       GError *local_error = NULL;
 
        g_return_val_if_fail (out_fatal_error != NULL, FALSE);
 
@@ -537,35 +570,49 @@ e_ews_notification_get_events_sync (EEwsNotification *notification,
        if (!cnc)
                return FALSE;
 
-       if (!e_ews_connection_get_streaming_events_sync (cnc, G_PRIORITY_DEFAULT, subscription_id, 
&input_stream, cancellable, &local_error) || !input_stream) {
-               *out_fatal_error = !local_error || local_error->domain == E_SOUP_SESSION_ERROR || 
local_error->domain == G_TLS_ERROR;
-               g_clear_object (&cnc);
-               g_clear_error (&local_error);
-               return FALSE;
-       }
-
-       /* Unref early, thus it can be freed and will cancel this thread. */
-       g_clear_object (&cnc);
+       g_mutex_init (&rd.mutex);
+       g_cond_init (&rd.cond);
+       rd.finished = FALSE;
+       rd.notification = notification;
+       rd.chunk_data = g_byte_array_new ();
+       rd.error = NULL;
 
-       chunk_data = g_byte_array_new ();
-       buffer = g_malloc (BUFFER_SIZE);
+       if (e_ews_connection_read_streaming_events_sync (cnc, G_PRIORITY_DEFAULT, subscription_id,
+               e_ews_notification_read_data_cb,
+               e_ews_notification_read_finished_cb,
+               &rd, cancellable, &rd.error)) {
 
-       while (nread = g_input_stream_read (input_stream, buffer, BUFFER_SIZE, cancellable, &local_error),
-              nread > 0) {
-               g_byte_array_append (chunk_data, buffer, nread);
-               ews_notification_process_chunk (notification, chunk_data, cancellable);
-       }
+               e_ews_debug_print ("%s: %p: started reading events\n", G_STRFUNC, notification);
 
-       g_free (buffer);
-       g_byte_array_unref (chunk_data);
-       g_clear_object (&input_stream);
+               /* Unref early, thus it can be freed and will cancel this thread. */
+               g_clear_object (&cnc);
 
-       /* It's okay when read failed on EOF */
-       *out_fatal_error = (local_error != NULL && !g_error_matches (local_error, G_IO_ERROR, 
G_IO_ERROR_PARTIAL_INPUT)) ||
-               g_cancellable_is_cancelled (cancellable);
-       success = !local_error && !*out_fatal_error;
+               g_mutex_lock (&rd.mutex);
+               while (!rd.finished) {
+                       g_cond_wait (&rd.cond, &rd.mutex);
+               }
+               g_mutex_unlock (&rd.mutex);
+
+               e_ews_debug_print ("%s: %p: finished reading events; cancelled:%d err:%s 
is-partial-input:%d\n", G_STRFUNC, notification,
+                       g_cancellable_is_cancelled (cancellable),
+                       rd.error ? rd.error->message : "no-err",
+                       g_error_matches (rd.error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT));
+
+               /* It's okay when read failed on EOF */
+               *out_fatal_error = (rd.error != NULL && !g_error_matches (rd.error, G_IO_ERROR, 
G_IO_ERROR_PARTIAL_INPUT)) ||
+                       g_cancellable_is_cancelled (cancellable);
+               success = !rd.error && !*out_fatal_error;
+       } else {
+               e_ews_debug_print ("%s: %p: failed to start reading events: %s\n", G_STRFUNC, notification, 
rd.error ? rd.error->message : "no-err");
+
+               *out_fatal_error = !rd.error || rd.error->domain == E_SOUP_SESSION_ERROR || rd.error->domain 
== G_TLS_ERROR;
+               success = FALSE;
+       }
 
-       g_clear_error (&local_error);
+       g_mutex_clear (&rd.mutex);
+       g_cond_clear (&rd.cond);
+       g_byte_array_unref (rd.chunk_data);
+       g_clear_error (&rd.error);
 
        return success;
 }


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]