[evolution-ews/wip/mcrha/soup3] Read EWS notifications in the SoupSession worker thread
- From: Milan Crha <mcrha src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [evolution-ews/wip/mcrha/soup3] Read EWS notifications in the SoupSession worker thread
- Date: Thu, 19 May 2022 16:56:51 +0000 (UTC)
commit 855d070399809b0db4d19f29d75973121fb43f32
Author: Milan Crha <mcrha redhat com>
Date: Thu May 19 18:56:23 2022 +0200
Read EWS notifications in the SoupSession worker thread
src/EWS/common/e-ews-connection.c | 103 ++++++++++++++++++++++++++++------
src/EWS/common/e-ews-connection.h | 15 ++++-
src/EWS/common/e-ews-notification.c | 107 ++++++++++++++++++++++++++----------
3 files changed, 176 insertions(+), 49 deletions(-)
---
diff --git a/src/EWS/common/e-ews-connection.c b/src/EWS/common/e-ews-connection.c
index e96954f7..11675039 100644
--- a/src/EWS/common/e-ews-connection.c
+++ b/src/EWS/common/e-ews-connection.c
@@ -307,7 +307,8 @@ e_ews_connection_process_request_ready_cb (GObject *source_object,
/* Need to process the 'input_stream' in this thread */
if (!ews_connection_credentials_failed (pd->cnc, pd->message, FALSE, NULL) &&
- soup_message_get_status (pd->message) != SOUP_STATUS_UNAUTHORIZED) {
+ soup_message_get_status (pd->message) != SOUP_STATUS_UNAUTHORIZED &&
+ input_stream) {
ESoapRequestCustomProcessFn custom_process_fn = NULL;
gpointer custom_process_data = NULL;
@@ -9605,6 +9606,51 @@ e_ews_connection_unsubscribe_sync (EEwsConnection *cnc,
return success;
}
+typedef struct _StreamEventsData {
+ gint ref_count;
+ gboolean reading;
+ gpointer buffer;
+ EEwsStreamingEventsReadCallback cb_read;
+ EEwsStreamingEventsFinishedCallback cb_finished;
+ gpointer cb_user_data;
+ GCancellable *cancellable;
+ GError *error;
+} StreamingEventsData;
+
+static void
+streaming_events_data_unref (StreamingEventsData *sed)
+{
+ if (g_atomic_int_dec_and_test (&sed->ref_count)) {
+ g_clear_pointer (&sed->buffer, g_free);
+ g_clear_object (&sed->cancellable);
+ g_clear_error (&sed->error);
+ g_slice_free (StreamingEventsData, sed);
+ }
+}
+
+static void
+e_ews_get_streaming_events_read_data_cb (GObject *source_object,
+ GAsyncResult *result,
+ gpointer user_data)
+{
+ StreamingEventsData *sed = user_data;
+ GInputStream *input_stream = G_INPUT_STREAM (source_object);
+ gssize nread;
+
+ nread = g_input_stream_read_finish (input_stream, result, &sed->error);
+
+ if (nread > 0 &&
+ sed->cb_read (sed->buffer, nread, sed->cb_user_data, sed->cancellable, &sed->error)) {
+ g_input_stream_read_async (input_stream, sed->buffer, BUFFER_SIZE, G_PRIORITY_HIGH,
sed->cancellable,
+ e_ews_get_streaming_events_read_data_cb, sed);
+ return;
+ }
+
+ sed->cb_finished (sed->cb_user_data, sed->error);
+
+ streaming_events_data_unref (sed);
+}
+
static void
e_ews_get_streaming_events_custom_process (ESoapRequest *request,
SoupMessage *message,
@@ -9614,27 +9660,44 @@ e_ews_get_streaming_events_custom_process (ESoapRequest *request,
GCancellable *cancellable,
GError **error)
{
- GInputStream **out_input_stream = user_data;
+ StreamingEventsData *sed = user_data;
+
+ sed->reading = TRUE;
+ g_atomic_int_inc (&sed->ref_count);
- if (out_input_stream && input_stream)
- *out_input_stream = g_object_ref (input_stream);
+ g_input_stream_read_async (input_stream, sed->buffer, BUFFER_SIZE, G_PRIORITY_HIGH, sed->cancellable,
+ e_ews_get_streaming_events_read_data_cb, sed);
}
+/*
+ * Only begins reading of the events, which is done asynchronously using @cb_read,
+ * until @cb_finished is called. When @cb_read returns FALSE, the read is done,
+ * followed by the @cb_finished call. Both callbacks use the same user data.
+ * It's guaranteed the @cb_finished is always called when this function returns TRUE.
+ *
+ * This is to make reading from the input stream from the EWS worker thread,
+ * because it cannot be stopped, not read, from another thread, with a different
+ * thread default main context than it was created with. A libsoup3 thing.
+ */
gboolean
-e_ews_connection_get_streaming_events_sync (EEwsConnection *cnc,
- gint pri,
- const gchar *subscription_id,
- GInputStream **out_input_stream,
- GCancellable *cancellable,
- GError **error)
+e_ews_connection_read_streaming_events_sync (EEwsConnection *cnc,
+ gint pri,
+ const gchar *subscription_id,
+ EEwsStreamingEventsReadCallback cb_read,
+ EEwsStreamingEventsFinishedCallback cb_finished,
+ gpointer cb_user_data,
+ GCancellable *cancellable,
+ GError **error)
{
ESoapRequest *request;
ESoapResponse *response;
- GInputStream *input_stream = NULL;
+ StreamingEventsData *sed;
gboolean success;
g_return_val_if_fail (cnc != NULL, FALSE);
g_return_val_if_fail (subscription_id != NULL, FALSE);
+ g_return_val_if_fail (cb_read != NULL, FALSE);
+ g_return_val_if_fail (cb_finished != NULL, FALSE);
request = e_ews_request_new_with_header (
cnc->priv->uri,
@@ -9658,7 +9721,16 @@ e_ews_connection_get_streaming_events_sync (EEwsConnection *cnc,
e_ews_request_write_footer (request);
- e_soap_request_set_custom_process_fn (request, e_ews_get_streaming_events_custom_process,
&input_stream);
+ sed = g_slice_new0 (StreamingEventsData);
+ sed->ref_count = 1;
+ sed->reading = FALSE;
+ sed->buffer = g_malloc (BUFFER_SIZE);
+ sed->cb_read = cb_read;
+ sed->cb_finished = cb_finished;
+ sed->cb_user_data = cb_user_data;
+ sed->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+
+ e_soap_request_set_custom_process_fn (request, e_ews_get_streaming_events_custom_process, sed);
response = e_ews_connection_send_request_sync (cnc, request, cancellable, error);
g_warn_if_fail (response == NULL);
@@ -9666,12 +9738,9 @@ e_ews_connection_get_streaming_events_sync (EEwsConnection *cnc,
g_clear_object (&request);
g_clear_object (&response);
- success = input_stream != NULL;
+ success = sed->reading;
- if (success && out_input_stream)
- *out_input_stream = input_stream;
- else
- g_clear_object (&input_stream);
+ streaming_events_data_unref (sed);
return success;
}
diff --git a/src/EWS/common/e-ews-connection.h b/src/EWS/common/e-ews-connection.h
index b55fbeb1..c077ba13 100644
--- a/src/EWS/common/e-ews-connection.h
+++ b/src/EWS/common/e-ews-connection.h
@@ -72,6 +72,15 @@ typedef gboolean(*EEwsRequestCreationCallback) (ESoapRequest *request,
GError **error);
typedef void (*EEwsResponseCallback) (ESoapResponse *response,
GSimpleAsyncResult *simple);
+typedef gboolean(*EEwsStreamingEventsReadCallback)
+ (gconstpointer buffer,
+ gssize nread,
+ gpointer user_data,
+ GCancellable *cancellable,
+ GError **error);
+typedef void (*EEwsStreamingEventsFinishedCallback)
+ (gpointer user_data,
+ const GError *error);
typedef enum {
EWS_SEARCH_AD,
@@ -892,11 +901,13 @@ gboolean e_ews_connection_unsubscribe_sync
const gchar *subscription_id,
GCancellable *cancellable,
GError **error);
-gboolean e_ews_connection_get_streaming_events_sync
+gboolean e_ews_connection_read_streaming_events_sync
(EEwsConnection *cnc,
gint pri,
const gchar *subscription_id,
- GInputStream **out_input_stream,
+ EEwsStreamingEventsReadCallback cb_read,
+ EEwsStreamingEventsFinishedCallback cb_finished,
+ gpointer cb_user_data,
GCancellable *cancellable,
GError **error);
diff --git a/src/EWS/common/e-ews-notification.c b/src/EWS/common/e-ews-notification.c
index 37860229..198e1eba 100644
--- a/src/EWS/common/e-ews-notification.c
+++ b/src/EWS/common/e-ews-notification.c
@@ -506,7 +506,44 @@ ews_notification_process_chunk (EEwsNotification *notification,
} while (chunk_len > 0 && !g_cancellable_is_cancelled (cancellable));
}
-#define BUFFER_SIZE 16384
+typedef struct _ReadindData {
+ GMutex mutex;
+ GCond cond;
+ gboolean finished;
+ EEwsNotification *notification;
+ GByteArray *chunk_data;
+ GError *error;
+} ReadingData;
+
+static gboolean
+e_ews_notification_read_data_cb (gconstpointer buffer,
+ gssize nread,
+ gpointer user_data,
+ GCancellable *cancellable,
+ GError **error)
+{
+ ReadingData *rd = user_data;
+
+ g_byte_array_append (rd->chunk_data, buffer, nread);
+ ews_notification_process_chunk (rd->notification, rd->chunk_data, cancellable);
+
+ return TRUE;
+}
+
+static void
+e_ews_notification_read_finished_cb (gpointer user_data,
+ const GError *error)
+{
+ ReadingData *rd = user_data;
+
+ if (error)
+ rd->error = g_error_copy (error);
+
+ g_mutex_lock (&rd->mutex);
+ rd->finished = TRUE;
+ g_cond_signal (&rd->cond);
+ g_mutex_unlock (&rd->mutex);
+}
static gboolean
e_ews_notification_get_events_sync (EEwsNotification *notification,
@@ -515,12 +552,8 @@ e_ews_notification_get_events_sync (EEwsNotification *notification,
GCancellable *cancellable)
{
EEwsConnection *cnc;
- GInputStream *input_stream = NULL;
- GByteArray *chunk_data;
- gpointer buffer;
- gssize nread = 0;
+ ReadingData rd;
gboolean success;
- GError *local_error = NULL;
g_return_val_if_fail (out_fatal_error != NULL, FALSE);
@@ -537,35 +570,49 @@ e_ews_notification_get_events_sync (EEwsNotification *notification,
if (!cnc)
return FALSE;
- if (!e_ews_connection_get_streaming_events_sync (cnc, G_PRIORITY_DEFAULT, subscription_id,
&input_stream, cancellable, &local_error) || !input_stream) {
- *out_fatal_error = !local_error || local_error->domain == E_SOUP_SESSION_ERROR ||
local_error->domain == G_TLS_ERROR;
- g_clear_object (&cnc);
- g_clear_error (&local_error);
- return FALSE;
- }
-
- /* Unref early, thus it can be freed and will cancel this thread. */
- g_clear_object (&cnc);
+ g_mutex_init (&rd.mutex);
+ g_cond_init (&rd.cond);
+ rd.finished = FALSE;
+ rd.notification = notification;
+ rd.chunk_data = g_byte_array_new ();
+ rd.error = NULL;
- chunk_data = g_byte_array_new ();
- buffer = g_malloc (BUFFER_SIZE);
+ if (e_ews_connection_read_streaming_events_sync (cnc, G_PRIORITY_DEFAULT, subscription_id,
+ e_ews_notification_read_data_cb,
+ e_ews_notification_read_finished_cb,
+ &rd, cancellable, &rd.error)) {
- while (nread = g_input_stream_read (input_stream, buffer, BUFFER_SIZE, cancellable, &local_error),
- nread > 0) {
- g_byte_array_append (chunk_data, buffer, nread);
- ews_notification_process_chunk (notification, chunk_data, cancellable);
- }
+ e_ews_debug_print ("%s: %p: started reading events\n", G_STRFUNC, notification);
- g_free (buffer);
- g_byte_array_unref (chunk_data);
- g_clear_object (&input_stream);
+ /* Unref early, thus it can be freed and will cancel this thread. */
+ g_clear_object (&cnc);
- /* It's okay when read failed on EOF */
- *out_fatal_error = (local_error != NULL && !g_error_matches (local_error, G_IO_ERROR,
G_IO_ERROR_PARTIAL_INPUT)) ||
- g_cancellable_is_cancelled (cancellable);
- success = !local_error && !*out_fatal_error;
+ g_mutex_lock (&rd.mutex);
+ while (!rd.finished) {
+ g_cond_wait (&rd.cond, &rd.mutex);
+ }
+ g_mutex_unlock (&rd.mutex);
+
+ e_ews_debug_print ("%s: %p: finished reading events; cancelled:%d err:%s
is-partial-input:%d\n", G_STRFUNC, notification,
+ g_cancellable_is_cancelled (cancellable),
+ rd.error ? rd.error->message : "no-err",
+ g_error_matches (rd.error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT));
+
+ /* It's okay when read failed on EOF */
+ *out_fatal_error = (rd.error != NULL && !g_error_matches (rd.error, G_IO_ERROR,
G_IO_ERROR_PARTIAL_INPUT)) ||
+ g_cancellable_is_cancelled (cancellable);
+ success = !rd.error && !*out_fatal_error;
+ } else {
+ e_ews_debug_print ("%s: %p: failed to start reading events: %s\n", G_STRFUNC, notification,
rd.error ? rd.error->message : "no-err");
+
+ *out_fatal_error = !rd.error || rd.error->domain == E_SOUP_SESSION_ERROR || rd.error->domain
== G_TLS_ERROR;
+ success = FALSE;
+ }
- g_clear_error (&local_error);
+ g_mutex_clear (&rd.mutex);
+ g_cond_clear (&rd.cond);
+ g_byte_array_unref (rd.chunk_data);
+ g_clear_error (&rd.error);
return success;
}
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]