[evolution-ews/wip/mcrha/soup3] Fix listen for server change notifications
- From: Milan Crha <mcrha src gnome org>
- To: commits-list gnome org
- Cc:
- Subject: [evolution-ews/wip/mcrha/soup3] Fix listen for server change notifications
- Date: Thu, 2 Jun 2022 12:51:20 +0000 (UTC)
commit 2ea0c180b27ff4b795b9be498d42abb3b8150eb9
Author: Milan Crha <mcrha redhat com>
Date: Thu Jun 2 14:50:56 2022 +0200
Fix listen for server change notifications
src/EWS/common/e-ews-connection.c | 221 ++++++++++--------------------------
src/EWS/common/e-ews-connection.h | 10 +-
src/EWS/common/e-ews-notification.c | 94 +++++----------
src/EWS/common/e-soap-request.c | 19 ----
src/EWS/common/e-soap-request.h | 5 -
5 files changed, 94 insertions(+), 255 deletions(-)
---
diff --git a/src/EWS/common/e-ews-connection.c b/src/EWS/common/e-ews-connection.c
index ab30846b..485d93cc 100644
--- a/src/EWS/common/e-ews-connection.c
+++ b/src/EWS/common/e-ews-connection.c
@@ -32,9 +32,6 @@
/* A chunk size limit when moving items in chunks. */
#define EWS_MOVE_ITEMS_CHUNK_SIZE 500
-/* For network stream reading */
-#define BUFFER_SIZE 16384
-
#define EWS_RETRY_IO_ERROR_SECONDS 3
#define EWS_RETRY_AUTH_ERROR_SECONDS 0.1
@@ -169,37 +166,6 @@ e_ews_soup_log_printer (SoupLogger *logger,
g_debug ("%c %s", direction, e_ews_debug_redact_headers (direction, data));
}
-static gboolean ews_connection_notification_delay_cb (gpointer user_data);
-
-static void
-e_ews_connection_unpause_notifications_locked (EEwsConnection *cnc)
-{
- if (camel_ews_settings_get_listen_notifications (cnc->priv->settings)) {
- e_ews_debug_print ("Unpause notifications for cnc:%p\n", cnc);
-
- if (cnc->priv->notification_delay_id)
- g_source_remove (cnc->priv->notification_delay_id);
-
- cnc->priv->notification_delay_id = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 5,
- ews_connection_notification_delay_cb, e_weak_ref_new (cnc), (GDestroyNotify)
e_weak_ref_free);
- }
-}
-
-static void
-e_ews_connection_pause_notifications_locked (EEwsConnection *cnc)
-{
- if (cnc->priv->notification_delay_id) {
- g_source_remove (cnc->priv->notification_delay_id);
- cnc->priv->notification_delay_id = 0;
- }
-
- if (cnc->priv->notification) {
- e_ews_debug_print ("Pause notifications for cnc:%p\n", cnc);
- e_ews_notification_stop_listening_sync (cnc->priv->notification);
- g_clear_object (&cnc->priv->notification);
- }
-}
-
static ESoupSession *
e_ews_connection_create_soup_session (EEwsConnection *cnc)
{
@@ -461,12 +427,6 @@ e_ews_connection_process_request_sync (EEwsConnection *cnc,
pd.out_certificate_pem = out_certificate_pem;
pd.out_certificate_errors = out_certificate_errors;
- if (!e_soap_request_get_is_notification (request)) {
- NOTIFICATION_LOCK (cnc);
- e_ews_connection_pause_notifications_locked (cnc);
- NOTIFICATION_UNLOCK (cnc);
- }
-
g_mutex_lock (&pd.mutex);
source = g_idle_source_new ();
@@ -481,12 +441,6 @@ e_ews_connection_process_request_sync (EEwsConnection *cnc,
g_mutex_unlock (&pd.mutex);
- if (!e_soap_request_get_is_notification (request)) {
- NOTIFICATION_LOCK (cnc);
- e_ews_connection_unpause_notifications_locked (cnc);
- NOTIFICATION_UNLOCK (cnc);
- }
-
persistent_auth = soup_message_headers_get_one (soup_message_get_response_headers (pd.message),
"Persistent-Auth");
if (persistent_auth && g_ascii_strcasecmp (persistent_auth, "false") == 0) {
SoupSessionFeature *feature;
@@ -655,8 +609,10 @@ e_ews_connection_send_request_sync (EEwsConnection *cnc,
diagnostics = soup_message_headers_get_list
(soup_message_get_response_headers (message), "X-MS-DIAGNOSTICS");
if (diagnostics && strstr (diagnostics, "invalid_grant")) {
+ g_clear_error (&local_error2);
g_set_error_literal (&local_error2,
EWS_CONNECTION_ERROR, EWS_CONNECTION_ERROR_ACCESSDENIED, diagnostics);
} else if (diagnostics && *diagnostics) {
+ g_clear_error (&local_error2);
g_set_error_literal (&local_error2,
EWS_CONNECTION_ERROR, EWS_CONNECTION_ERROR_AUTHENTICATION_FAILED, diagnostics);
}
}
@@ -2710,9 +2666,9 @@ ews_autodiscover_response_ready_cb (GObject *source_object,
else
bytes = g_byte_array_new ();
- buffer = g_malloc (BUFFER_SIZE);
+ buffer = g_malloc (EWS_BUFFER_SIZE);
- while (success = g_input_stream_read_all (input_stream, buffer, BUFFER_SIZE, &nread,
ad->cancellable, &local_error),
+ while (success = g_input_stream_read_all (input_stream, buffer, EWS_BUFFER_SIZE, &nread,
ad->cancellable, &local_error),
success && nread > 0) {
g_byte_array_append (bytes, buffer, nread);
}
@@ -3347,9 +3303,9 @@ e_ews_process_download_oal_file_response (ESoapRequest *request,
if (size)
response_size = g_ascii_strtoll (size, NULL, 10);
- buffer = g_malloc (BUFFER_SIZE);
+ buffer = g_malloc (EWS_BUFFER_SIZE);
- while (success = g_input_stream_read_all (input_stream, buffer, BUFFER_SIZE, &nread, cancellable,
error),
+ while (success = g_input_stream_read_all (input_stream, buffer, EWS_BUFFER_SIZE, &nread, cancellable,
error),
success && nread > 0) {
response_received += nread;
@@ -8031,6 +7987,20 @@ ews_connection_notification_delay_cb (gpointer user_data)
return FALSE;
}
+static void
+e_ews_connection_maybe_start_notifications_locked (EEwsConnection *cnc)
+{
+ if (camel_ews_settings_get_listen_notifications (cnc->priv->settings)) {
+ e_ews_debug_print ("Start notifications for cnc:%p\n", cnc);
+
+ if (cnc->priv->notification_delay_id)
+ g_source_remove (cnc->priv->notification_delay_id);
+
+ cnc->priv->notification_delay_id = g_timeout_add_seconds_full (G_PRIORITY_DEFAULT, 5,
+ ews_connection_notification_delay_cb, e_weak_ref_new (cnc), (GDestroyNotify)
e_weak_ref_free);
+ }
+}
+
/*
* Enables server notification on a folder (or a set of folders).
* The events we are listen for notifications are: Copied, Created, Deleted, Modified and Moved.
@@ -8110,7 +8080,7 @@ e_ews_connection_enable_notifications_sync (EEwsConnection *cnc,
g_hash_table_foreach (cnc->priv->subscriptions, ews_connection_build_subscribed_folders_list, cnc);
- e_ews_connection_unpause_notifications_locked (cnc);
+ e_ews_connection_maybe_start_notifications_locked (cnc);
exit:
*subscription_key = notification_key;
@@ -8142,7 +8112,7 @@ e_ews_connection_disable_notifications_sync (EEwsConnection *cnc,
g_hash_table_foreach (cnc->priv->subscriptions, ews_connection_build_subscribed_folders_list, cnc);
if (cnc->priv->subscribed_folders != NULL && !e_ews_connection_get_disconnected_flag (cnc)) {
- e_ews_connection_unpause_notifications_locked (cnc);
+ e_ews_connection_maybe_start_notifications_locked (cnc);
} else {
g_clear_object (&cnc->priv->notification);
}
@@ -9582,8 +9552,6 @@ e_ews_connection_subscribe_sync (EEwsConnection *cnc,
e_ews_request_write_footer (request);
- e_soap_request_set_is_notification (request, TRUE);
-
response = e_ews_connection_send_request_sync (cnc, request, cancellable, error);
if (!response) {
@@ -9631,8 +9599,6 @@ e_ews_connection_unsubscribe_sync (EEwsConnection *cnc,
e_ews_request_write_footer (request);
- e_soap_request_set_is_notification (request, TRUE);
-
response = e_ews_connection_send_request_sync (cnc, request, cancellable, error);
if (!response) {
@@ -9648,98 +9614,23 @@ e_ews_connection_unsubscribe_sync (EEwsConnection *cnc,
return success;
}
-typedef struct _StreamEventsData {
- gint ref_count;
- gboolean reading;
- gpointer buffer;
- EEwsStreamingEventsReadCallback cb_read;
- EEwsStreamingEventsFinishedCallback cb_finished;
- gpointer cb_user_data;
- GCancellable *cancellable;
- GError *error;
-} StreamingEventsData;
-
-static void
-streaming_events_data_unref (StreamingEventsData *sed)
-{
- if (g_atomic_int_dec_and_test (&sed->ref_count)) {
- g_clear_pointer (&sed->buffer, g_free);
- g_clear_object (&sed->cancellable);
- g_clear_error (&sed->error);
- g_slice_free (StreamingEventsData, sed);
- }
-}
-
-static void
-e_ews_get_streaming_events_read_data_cb (GObject *source_object,
- GAsyncResult *result,
- gpointer user_data)
-{
- StreamingEventsData *sed = user_data;
- GInputStream *input_stream = G_INPUT_STREAM (source_object);
- gssize nread;
-
- nread = g_input_stream_read_finish (input_stream, result, &sed->error);
-
- if (nread > 0 &&
- sed->cb_read (sed->buffer, nread, sed->cb_user_data, sed->cancellable, &sed->error)) {
- g_input_stream_read_async (input_stream, sed->buffer, BUFFER_SIZE, G_PRIORITY_HIGH,
sed->cancellable,
- e_ews_get_streaming_events_read_data_cb, sed);
- return;
- }
-
- sed->cb_finished (sed->cb_user_data, sed->error);
-
- streaming_events_data_unref (sed);
-}
-
-static void
-e_ews_get_streaming_events_custom_process (ESoapRequest *request,
- SoupMessage *message,
- GInputStream *input_stream,
- gpointer user_data,
- gboolean *out_repeat,
- GCancellable *cancellable,
- GError **error)
-{
- StreamingEventsData *sed = user_data;
-
- sed->reading = TRUE;
- g_atomic_int_inc (&sed->ref_count);
-
- g_input_stream_read_async (input_stream, sed->buffer, BUFFER_SIZE, G_PRIORITY_HIGH, sed->cancellable,
- e_ews_get_streaming_events_read_data_cb, sed);
-}
-
-/*
- * Only begins reading of the events, which is done asynchronously using @cb_read,
- * until @cb_finished is called. When @cb_read returns FALSE, the read is done,
- * followed by the @cb_finished call. Both callbacks use the same user data.
- * It's guaranteed the @cb_finished is always called when this function returns TRUE.
- *
- * This is to make reading from the input stream from the EWS worker thread,
- * because it cannot be stopped, not read, from another thread, with a different
- * thread default main context than it was created with. A libsoup3 thing.
- */
-gboolean
-e_ews_connection_read_streaming_events_sync (EEwsConnection *cnc,
- gint pri,
- const gchar *subscription_id,
- EEwsStreamingEventsReadCallback cb_read,
- EEwsStreamingEventsFinishedCallback cb_finished,
- gpointer cb_user_data,
- GCancellable *cancellable,
- GError **error)
+GInputStream *
+e_ews_connection_prepare_streaming_events_sync (EEwsConnection *cnc,
+ gint pri,
+ const gchar *subscription_id,
+ ESoupSession **out_session,
+ SoupMessage **out_message,
+ GCancellable *cancellable,
+ GError **error)
{
ESoapRequest *request;
- ESoapResponse *response;
- StreamingEventsData *sed;
- gboolean success;
+ CamelEwsSettings *settings;
+ GInputStream *input_stream;
- g_return_val_if_fail (cnc != NULL, FALSE);
- g_return_val_if_fail (subscription_id != NULL, FALSE);
- g_return_val_if_fail (cb_read != NULL, FALSE);
- g_return_val_if_fail (cb_finished != NULL, FALSE);
+ g_return_val_if_fail (cnc != NULL, NULL);
+ g_return_val_if_fail (subscription_id != NULL, NULL);
+ g_return_val_if_fail (out_session != NULL, NULL);
+ g_return_val_if_fail (out_message != NULL, NULL);
request = e_ews_request_new_with_header (
cnc->priv->uri,
@@ -9753,7 +9644,7 @@ e_ews_connection_read_streaming_events_sync (EEwsConnection *cnc,
error);
if (!request)
- return FALSE;
+ return NULL;
e_soap_request_start_element (request, "SubscriptionIds", "messages", NULL);
e_ews_request_write_string_parameter_with_attribute (request, "SubscriptionId", NULL,
subscription_id, NULL, NULL);
@@ -9763,27 +9654,31 @@ e_ews_connection_read_streaming_events_sync (EEwsConnection *cnc,
e_ews_request_write_footer (request);
- sed = g_slice_new0 (StreamingEventsData);
- sed->ref_count = 1;
- sed->reading = FALSE;
- sed->buffer = g_malloc (BUFFER_SIZE);
- sed->cb_read = cb_read;
- sed->cb_finished = cb_finished;
- sed->cb_user_data = cb_user_data;
- sed->cancellable = cancellable ? g_object_ref (cancellable) : NULL;
+ settings = e_ews_connection_ref_settings (cnc);
- e_soap_request_set_is_notification (request, TRUE);
- e_soap_request_set_custom_process_fn (request, e_ews_get_streaming_events_custom_process, sed);
+ *out_session = e_ews_connection_create_soup_session (cnc);
+ *out_message = e_soap_request_persist (request, *out_session, settings, error);
- response = e_ews_connection_send_request_sync (cnc, request, cancellable, error);
- g_warn_if_fail (response == NULL);
+ g_clear_object (&settings);
- g_clear_object (&request);
- g_clear_object (&response);
+ if (!*out_message) {
+ g_clear_object (out_session);
+ g_clear_object (&request);
+ return NULL;
+ }
- success = sed->reading;
+ g_mutex_lock (&cnc->priv->property_lock);
+ e_soup_session_set_credentials (*out_session, cnc->priv->credentials);
+ g_mutex_unlock (&cnc->priv->property_lock);
- streaming_events_data_unref (sed);
+ input_stream = e_soup_session_send_message_sync (*out_session, *out_message, cancellable, error);
- return success;
+ g_clear_object (&request);
+
+ if (!input_stream) {
+ g_clear_object (out_message);
+ g_clear_object (out_session);
+ }
+
+ return input_stream;
}
diff --git a/src/EWS/common/e-ews-connection.h b/src/EWS/common/e-ews-connection.h
index c077ba13..183dd66c 100644
--- a/src/EWS/common/e-ews-connection.h
+++ b/src/EWS/common/e-ews-connection.h
@@ -23,6 +23,9 @@
#include "e-ews-oof-settings.h"
#include "camel-ews-settings.h"
+/* For network stream reading */
+#define EWS_BUFFER_SIZE 16384
+
/* Standard GObject macros */
#define E_TYPE_EWS_CONNECTION \
(e_ews_connection_get_type ())
@@ -901,13 +904,12 @@ gboolean e_ews_connection_unsubscribe_sync
const gchar *subscription_id,
GCancellable *cancellable,
GError **error);
-gboolean e_ews_connection_read_streaming_events_sync
+GInputStream * e_ews_connection_prepare_streaming_events_sync
(EEwsConnection *cnc,
gint pri,
const gchar *subscription_id,
- EEwsStreamingEventsReadCallback cb_read,
- EEwsStreamingEventsFinishedCallback cb_finished,
- gpointer cb_user_data,
+ ESoupSession **out_session,
+ SoupMessage **out_message,
GCancellable *cancellable,
GError **error);
diff --git a/src/EWS/common/e-ews-notification.c b/src/EWS/common/e-ews-notification.c
index 57b58bee..bbc01cb3 100644
--- a/src/EWS/common/e-ews-notification.c
+++ b/src/EWS/common/e-ews-notification.c
@@ -459,45 +459,6 @@ ews_notification_process_chunk (EEwsNotification *notification,
} while (chunk_len > 0 && !g_cancellable_is_cancelled (cancellable));
}
-typedef struct _ReadindData {
- GMutex mutex;
- GCond cond;
- gboolean finished;
- EEwsNotification *notification;
- GByteArray *chunk_data;
- GError *error;
-} ReadingData;
-
-static gboolean
-e_ews_notification_read_data_cb (gconstpointer buffer,
- gssize nread,
- gpointer user_data,
- GCancellable *cancellable,
- GError **error)
-{
- ReadingData *rd = user_data;
-
- g_byte_array_append (rd->chunk_data, buffer, nread);
- ews_notification_process_chunk (rd->notification, rd->chunk_data, cancellable);
-
- return TRUE;
-}
-
-static void
-e_ews_notification_read_finished_cb (gpointer user_data,
- const GError *error)
-{
- ReadingData *rd = user_data;
-
- if (error)
- rd->error = g_error_copy (error);
-
- g_mutex_lock (&rd->mutex);
- rd->finished = TRUE;
- g_cond_signal (&rd->cond);
- g_mutex_unlock (&rd->mutex);
-}
-
static gboolean
e_ews_notification_get_events_sync (EEwsNotification *notification,
const gchar *subscription_id,
@@ -505,7 +466,10 @@ e_ews_notification_get_events_sync (EEwsNotification *notification,
GCancellable *cancellable)
{
EEwsConnection *cnc;
- ReadingData rd;
+ ESoupSession *session = NULL;
+ SoupMessage *message = NULL;
+ GInputStream *input_stream = NULL;
+ GError *local_error = NULL;
gboolean success;
g_return_val_if_fail (out_fatal_error != NULL, FALSE);
@@ -523,49 +487,51 @@ e_ews_notification_get_events_sync (EEwsNotification *notification,
if (!cnc)
return FALSE;
- g_mutex_init (&rd.mutex);
- g_cond_init (&rd.cond);
- rd.finished = FALSE;
- rd.notification = notification;
- rd.chunk_data = g_byte_array_new ();
- rd.error = NULL;
+ input_stream = e_ews_connection_prepare_streaming_events_sync (cnc, G_PRIORITY_DEFAULT,
subscription_id, &session, &message, cancellable, &local_error);
- if (e_ews_connection_read_streaming_events_sync (cnc, G_PRIORITY_DEFAULT, subscription_id,
- e_ews_notification_read_data_cb,
- e_ews_notification_read_finished_cb,
- &rd, cancellable, &rd.error)) {
+ if (input_stream) {
+ GByteArray *chunk_data;
+ gpointer buffer;
+ gssize nread;
+
+ buffer = g_malloc (EWS_BUFFER_SIZE);
+ chunk_data = g_byte_array_new ();
e_ews_debug_print ("%s: %p: started reading events\n", G_STRFUNC, notification);
/* Unref early, thus it can be freed and will cancel this thread. */
g_clear_object (&cnc);
- g_mutex_lock (&rd.mutex);
- while (!rd.finished) {
- g_cond_wait (&rd.cond, &rd.mutex);
+ while (nread = g_input_stream_read (input_stream, buffer, EWS_BUFFER_SIZE, cancellable,
&local_error),
+ nread > 0) {
+ g_byte_array_append (chunk_data, buffer, nread);
+ ews_notification_process_chunk (notification, chunk_data, cancellable);
}
- g_mutex_unlock (&rd.mutex);
e_ews_debug_print ("%s: %p: finished reading events; cancelled:%d err:%s
is-partial-input:%d\n", G_STRFUNC, notification,
g_cancellable_is_cancelled (cancellable),
- rd.error ? rd.error->message : "no-err",
- g_error_matches (rd.error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT));
+ local_error ? local_error->message : "no-err",
+ g_error_matches (local_error, G_IO_ERROR, G_IO_ERROR_PARTIAL_INPUT));
/* It's okay when read failed on EOF */
- *out_fatal_error = (rd.error != NULL && !g_error_matches (rd.error, G_IO_ERROR,
G_IO_ERROR_PARTIAL_INPUT)) ||
+ *out_fatal_error = (local_error != NULL && !g_error_matches (local_error, G_IO_ERROR,
G_IO_ERROR_PARTIAL_INPUT)) ||
g_cancellable_is_cancelled (cancellable);
- success = !rd.error && !*out_fatal_error;
+ success = !local_error && !*out_fatal_error;
+
+ g_byte_array_unref (chunk_data);
+ g_free (buffer);
} else {
- e_ews_debug_print ("%s: %p: failed to start reading events: %s\n", G_STRFUNC, notification,
rd.error ? rd.error->message : "no-err");
+ e_ews_debug_print ("%s: %p: failed to start reading events: %s\n", G_STRFUNC, notification,
local_error ? local_error->message : "no-err");
- *out_fatal_error = !rd.error || rd.error->domain == E_SOUP_SESSION_ERROR || rd.error->domain
== G_TLS_ERROR;
+ *out_fatal_error = !local_error || local_error->domain == E_SOUP_SESSION_ERROR ||
local_error->domain == G_TLS_ERROR;
success = FALSE;
}
- g_mutex_clear (&rd.mutex);
- g_cond_clear (&rd.cond);
- g_byte_array_unref (rd.chunk_data);
- g_clear_error (&rd.error);
+ g_clear_object (&input_stream);
+ g_clear_object (&message);
+ g_clear_object (&session);
+ g_clear_object (&cnc);
+ g_clear_error (&local_error);
return success;
}
diff --git a/src/EWS/common/e-soap-request.c b/src/EWS/common/e-soap-request.c
index 23e3e83a..b6cf7f17 100644
--- a/src/EWS/common/e-soap-request.c
+++ b/src/EWS/common/e-soap-request.c
@@ -49,8 +49,6 @@ struct _ESoapRequestPrivate {
xmlChar *env_uri;
gboolean body_started;
gchar *action;
-
- gboolean is_notification;
};
G_DEFINE_TYPE_WITH_PRIVATE (ESoapRequest, e_soap_request, G_TYPE_OBJECT)
@@ -1039,23 +1037,6 @@ e_soap_request_get_etag (ESoapRequest *req)
return req->priv->etag;
}
-void
-e_soap_request_set_is_notification (ESoapRequest *req,
- gboolean is_notification)
-{
- g_return_if_fail (E_IS_SOAP_REQUEST (req));
-
- req->priv->is_notification = is_notification;
-}
-
-gboolean
-e_soap_request_get_is_notification (ESoapRequest *req)
-{
- g_return_val_if_fail (E_IS_SOAP_REQUEST (req), FALSE);
-
- return req->priv->is_notification;
-}
-
void
e_soap_request_set_store_node_data (ESoapRequest *req,
const gchar *nodename,
diff --git a/src/EWS/common/e-soap-request.h b/src/EWS/common/e-soap-request.h
index f5825fb7..fddff613 100644
--- a/src/EWS/common/e-soap-request.h
+++ b/src/EWS/common/e-soap-request.h
@@ -176,11 +176,6 @@ gboolean e_soap_request_get_tls_error_details
void e_soap_request_set_etag (ESoapRequest *req,
const gchar *etag);
const gchar * e_soap_request_get_etag (ESoapRequest *req);
-void e_soap_request_set_is_notification
- (ESoapRequest *req,
- gboolean is_notification);
-gboolean e_soap_request_get_is_notification
- (ESoapRequest *req);
SoupMessage * e_soap_request_persist (ESoapRequest *req,
ESoupSession *soup_session,
CamelEwsSettings *settings,
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]