[epiphany/wip/sync-batch-upload: 3/4] sync-service: Implement batch upload



commit 8969027196ef3b20b24a9f4fccec091f6db4de76
Author: Gabriel Ivascu <gabrielivascu gnome org>
Date:   Fri Dec 1 21:53:59 2017 +0200

    sync-service: Implement batch upload

 lib/ephy-sync-utils.h                  |    3 +
 lib/sync/ephy-history-manager.c        |    2 +-
 lib/sync/ephy-open-tabs-manager.c      |    2 +-
 lib/sync/ephy-password-manager.c       |    2 +-
 lib/sync/ephy-sync-service.c           |  243 ++++++++++++++++++++++++++++++--
 lib/sync/ephy-synchronizable-manager.h |    2 +-
 src/bookmarks/ephy-bookmarks-manager.c |    2 +-
 src/ephy-shell.c                       |   10 +-
 8 files changed, 242 insertions(+), 24 deletions(-)
---
diff --git a/lib/ephy-sync-utils.h b/lib/ephy-sync-utils.h
index cad54f4..a124dd0 100644
--- a/lib/ephy-sync-utils.h
+++ b/lib/ephy-sync-utils.h
@@ -37,6 +37,9 @@ const SecretSchema *ephy_sync_utils_get_secret_schema (void) G_GNUC_CONST;
 #define EPHY_SYNC_DEVICE_ID_LEN   32
 #define EPHY_SYNC_BSO_ID_LEN      12
 
+#define EPHY_SYNC_BATCH_SIZE    80
+#define EPHY_SYNC_NUM_BATCHES   80
+
 char     *ephy_sync_utils_encode_hex                    (const guint8 *data,
                                                          gsize         data_len);
 guint8   *ephy_sync_utils_decode_hex                    (const char   *hex);
diff --git a/lib/sync/ephy-history-manager.c b/lib/sync/ephy-history-manager.c
index 6b99b6c..3c12c6f 100644
--- a/lib/sync/ephy-history-manager.c
+++ b/lib/sync/ephy-history-manager.c
@@ -505,7 +505,7 @@ merge_history_cb (EphyHistoryService    *service,
                                                            data->remotes_updated);
 
 out:
-  data->callback (to_upload, TRUE, data->user_data);
+  data->callback (to_upload, data->user_data);
 
   g_list_free_full (urls, (GDestroyNotify)ephy_history_url_free);
   if (records_ht_id)
diff --git a/lib/sync/ephy-open-tabs-manager.c b/lib/sync/ephy-open-tabs-manager.c
index 4b58a23..e16b1ff 100644
--- a/lib/sync/ephy-open-tabs-manager.c
+++ b/lib/sync/ephy-open-tabs-manager.c
@@ -280,7 +280,7 @@ synchronizable_manager_merge (EphySynchronizableManager              *manager,
 
   g_free (device_bso_id);
 
-  callback (to_upload, TRUE, user_data);
+  callback (to_upload, user_data);
 }
 
 static void
diff --git a/lib/sync/ephy-password-manager.c b/lib/sync/ephy-password-manager.c
index 9d4ca54..fb10005 100644
--- a/lib/sync/ephy-password-manager.c
+++ b/lib/sync/ephy-password-manager.c
@@ -1065,7 +1065,7 @@ merge_cb (GList    *records,
                                                             data->remotes_deleted,
                                                             data->remotes_updated);
 
-  data->callback (to_upload, FALSE, data->user_data);
+  data->callback (to_upload, data->user_data);
 
   g_list_free_full (records, g_object_unref);
   merge_passwords_async_data_free (data);
diff --git a/lib/sync/ephy-sync-service.c b/lib/sync/ephy-sync-service.c
index 99c9cb3..242f8fc 100644
--- a/lib/sync/ephy-sync-service.c
+++ b/lib/sync/ephy-sync-service.c
@@ -130,6 +130,16 @@ typedef struct {
   EphySynchronizable        *synchronizable;
 } SyncAsyncData;
 
+typedef struct {
+  EphySyncService           *service;
+  EphySynchronizableManager *manager;
+  GPtrArray                 *synchronizables;
+  guint                      start;
+  guint                      end;
+  char                      *batch_id;
+  gboolean                   batch_is_last;
+} BatchUploadAsyncData;
+
 static StorageRequestAsyncData *
 storage_request_async_data_new (const char          *endpoint,
                                 const char          *method,
@@ -268,6 +278,51 @@ sync_async_data_free (SyncAsyncData *data)
   g_slice_free (SyncAsyncData, data);
 }
 
+static inline BatchUploadAsyncData *
+batch_upload_async_data_new (EphySyncService           *service,
+                             EphySynchronizableManager *manager,
+                             GPtrArray                 *synchronizables,
+                             guint                      start,
+                             guint                      end,
+                             const char                *batch_id,
+                             gboolean                   batch_is_last)
+{
+  BatchUploadAsyncData *data;
+
+  data = g_slice_new (BatchUploadAsyncData);
+  data->service = g_object_ref (service);
+  data->manager = g_object_ref (manager);
+  data->synchronizables = g_ptr_array_ref (synchronizables);
+  data->start = start;
+  data->end = end;
+  data->batch_id = g_strdup (batch_id);
+  data->batch_is_last = batch_is_last;
+
+  return data;
+}
+
+static inline BatchUploadAsyncData *
+batch_upload_async_data_dup (BatchUploadAsyncData *data)
+{
+  g_assert (data);
+
+  return batch_upload_async_data_new (data->service, data->manager,
+                                      data->synchronizables, data->start,
+                                      data->end, data->batch_id, data->batch_is_last);
+}
+
+static inline void
+batch_upload_async_data_free (BatchUploadAsyncData *data)
+{
+  g_assert (data);
+
+  g_object_unref (data->service);
+  g_object_unref (data->manager);
+  g_ptr_array_unref (data->synchronizables);
+  g_free (data->batch_id);
+  g_slice_free (BatchUploadAsyncData, data);
+}
+
 static void
 ephy_sync_service_set_property (GObject      *object,
                                 guint         prop_id,
@@ -476,7 +531,7 @@ ephy_sync_service_send_storage_request (EphySyncService         *self,
                               data->request_body, strlen (data->request_body));
   }
 
-  if (!g_strcmp0 (data->method, SOUP_METHOD_PUT))
+  if (!g_strcmp0 (data->method, SOUP_METHOD_PUT) || !g_strcmp0 (data->method, SOUP_METHOD_POST))
     soup_message_headers_append (msg->request_headers, "content-type", content_type);
 
   if (data->modified_since >= 0) {
@@ -1244,27 +1299,192 @@ ephy_sync_service_upload_synchronizable (EphySyncService           *self,
   ephy_sync_crypto_key_bundle_free (bundle);
 }
 
+static GPtrArray *
+ephy_sync_service_split_into_batches (EphySyncService           *self,
+                                      EphySynchronizableManager *manager,
+                                      GPtrArray                 *synchronizables,
+                                      guint                      start,
+                                      guint                      end)
+{
+  SyncCryptoKeyBundle *bundle;
+  GPtrArray *batches;
+  const char *collection;
+
+  g_assert (EPHY_IS_SYNC_SERVICE (self));
+  g_assert (EPHY_IS_SYNCHRONIZABLE_MANAGER (manager));
+  g_assert (synchronizables);
+
+  batches = g_ptr_array_new_with_free_func (g_free);
+  collection = ephy_synchronizable_manager_get_collection_name (manager);
+  bundle = ephy_sync_service_get_key_bundle (self, collection);
+
+  for (guint i = start; i < end; i += EPHY_SYNC_BATCH_SIZE) {
+    JsonNode *node = json_node_new (JSON_NODE_ARRAY);
+    JsonArray *array = json_array_new ();
+
+    for (guint k = i; k < MIN (i + EPHY_SYNC_BATCH_SIZE, end); k++) {
+      EphySynchronizable *synchronizable = g_ptr_array_index (synchronizables, k);
+      JsonNode *bso = ephy_synchronizable_to_bso (synchronizable, bundle);
+      JsonObject *object = json_object_ref (json_node_get_object (bso));
+
+      json_array_add_object_element (array, object);
+      json_node_unref (bso);
+    }
+
+    json_node_take_array (node, array);
+    g_ptr_array_add (batches, json_to_string (node, FALSE));
+    json_node_unref (node);
+  }
+
+  ephy_sync_crypto_key_bundle_free (bundle);
+
+  return batches;
+}
+
+static void
+commit_batch_cb (SoupSession *session,
+                 SoupMessage *msg,
+                 gpointer     user_data)
+{
+  BatchUploadAsyncData *data = user_data;
+  const char *last_modified;
+
+  if (msg->status_code != 200) {
+    g_warning ("Failed to commit batch. Status code: %u, response: %s",
+               msg->status_code, msg->response_body->data);
+    goto out;
+  }
+
+  LOG ("Successfully committed batches");
+  /* Update sync time. */
+  last_modified = soup_message_headers_get_one (msg->response_headers, "X-Last-Modified");
+  ephy_synchronizable_manager_set_sync_time (data->manager, g_ascii_strtod (last_modified, NULL));
+
+out:
+  batch_upload_async_data_free (data);
+}
+
+static void
+upload_batch_cb (SoupSession *session,
+                 SoupMessage *msg,
+                 gpointer     user_data)
+{
+  BatchUploadAsyncData *data = user_data;
+  const char *collection;
+  char *endpoint = NULL;
+
+  /* Note: "202 Accepted" status code. */
+  if (msg->status_code != 202) {
+    g_warning ("Failed to upload batch. Status code: %u, response: %s",
+               msg->status_code, msg->response_body->data);
+  } else {
+    LOG ("Successfully uploaded batch");
+  }
+
+  if (!data->batch_is_last)
+    goto out;
+
+  collection = ephy_synchronizable_manager_get_collection_name (data->manager);
+  endpoint = g_strdup_printf ("storage/%s?commit=true&batch=%s", collection, data->batch_id);
+  ephy_sync_service_queue_storage_request (data->service, endpoint,
+                                           SOUP_METHOD_POST, "[]", -1, -1,
+                                           commit_batch_cb,
+                                           batch_upload_async_data_dup (data));
+
+out:
+  g_free (endpoint);
+  /* Remove last reference to the array with the items to upload. */
+  if (data->batch_is_last)
+    g_ptr_array_unref (data->synchronizables);
+  batch_upload_async_data_free (data);
+}
+
+static void
+start_batch_upload_cb (SoupSession *session,
+                       SoupMessage *msg,
+                       gpointer     user_data)
+{
+  BatchUploadAsyncData *data = user_data;
+  GPtrArray *batches = NULL;
+  JsonNode *node = NULL;
+  JsonObject *object;
+  GError *error = NULL;
+  const char *collection;
+  char *endpoint = NULL;
+
+  /* Note: "202 Accepted" status code. */
+  if (msg->status_code != 202) {
+    g_warning ("Failed to start batch upload. Status code: %u, response: %s",
+               msg->status_code, msg->response_body->data);
+    goto out;
+  }
+
+  node = json_from_string (msg->response_body->data, &error);
+  if (error) {
+    g_warning ("Response is not a valid JSON: %s", error->message);
+    g_error_free (error);
+    goto out;
+  }
+
+  object = json_node_get_object (node);
+  data->batch_id = soup_uri_encode (json_object_get_string_member (object, "batch"), NULL);
+  collection = ephy_synchronizable_manager_get_collection_name (data->manager);
+  endpoint = g_strdup_printf ("storage/%s?batch=%s", collection, data->batch_id);
+
+  batches = ephy_sync_service_split_into_batches (data->service, data->manager,
+                                                  data->synchronizables,
+                                                  data->start, data->end);
+  for (guint i = 0; i < batches->len; i++) {
+    BatchUploadAsyncData *data_dup = batch_upload_async_data_dup (data);
+
+    if (i == batches->len - 1)
+      data_dup->batch_is_last = TRUE;
+
+    ephy_sync_service_queue_storage_request (data->service, endpoint, SOUP_METHOD_POST,
+                                             g_ptr_array_index (batches, i), -1, -1,
+                                             upload_batch_cb, data_dup);
+  }
+
+out:
+  g_free (endpoint);
+  if (node)
+    json_node_unref (node);
+  if (batches)
+    g_ptr_array_unref (batches);
+  batch_upload_async_data_free (data);
+}
+
 static void
 merge_collection_finished_cb (GPtrArray *to_upload,
-                              gboolean   should_force,
                               gpointer   user_data)
 {
-  SyncCollectionAsyncData *data = (SyncCollectionAsyncData *)user_data;
+  SyncCollectionAsyncData *data = user_data;
+  BatchUploadAsyncData *bdata;
+  guint step = EPHY_SYNC_NUM_BATCHES * EPHY_SYNC_BATCH_SIZE;
+  const char *collection;
+  char *endpoint = NULL;
 
   if (!to_upload || to_upload->len == 0)
     goto out;
 
-  for (guint i = 0; i < to_upload->len; i++) {
-    ephy_sync_service_upload_synchronizable (data->service, data->manager,
-                                             g_ptr_array_index (to_upload, i),
-                                             should_force);
+  collection = ephy_synchronizable_manager_get_collection_name (data->manager);
+  endpoint = g_strdup_printf ("storage/%s?batch=true", collection);
+
+  /* 
http://moz-services-docs.readthedocs.io/en/latest/storage/apis-1.5.html#example-uploading-a-large-batch-of-items
 */
+  for (guint i = 0; i < to_upload->len; i += step) {
+    bdata = batch_upload_async_data_new (data->service, data->manager,
+                                         to_upload, i,
+                                         MIN (i + step, to_upload->len),
+                                         NULL, FALSE);
+    ephy_sync_service_queue_storage_request (data->service, endpoint,
+                                             SOUP_METHOD_POST, "[]", -1, -1,
+                                             start_batch_upload_cb, bdata);
   }
 
 out:
+  g_free (endpoint);
   if (data->is_last)
     g_signal_emit (data->service, signals[SYNC_FINISHED], 0);
-  if (to_upload)
-    g_ptr_array_unref (to_upload);
   sync_collection_async_data_free (data);
 }
 
@@ -1281,7 +1501,6 @@ sync_collection_cb (SoupSession *session,
   GError *error = NULL;
   GType type;
   const char *collection;
-  const char *last_modified;
   gboolean is_deleted;
 
   collection = ephy_synchronizable_manager_get_collection_name (data->manager);
@@ -1322,11 +1541,7 @@ sync_collection_cb (SoupSession *session,
        g_list_length (data->remotes_updated),
        collection);
 
-  /* Update sync time. */
-  last_modified = soup_message_headers_get_one (msg->response_headers, "X-Last-Modified");
-  ephy_synchronizable_manager_set_sync_time (data->manager, g_ascii_strtod (last_modified, NULL));
   ephy_synchronizable_manager_set_is_initial_sync (data->manager, FALSE);
-
   ephy_synchronizable_manager_merge (data->manager, data->is_initial,
                                      data->remotes_deleted, data->remotes_updated,
                                      merge_collection_finished_cb, data);
diff --git a/lib/sync/ephy-synchronizable-manager.h b/lib/sync/ephy-synchronizable-manager.h
index 8c83e52..04243d4 100644
--- a/lib/sync/ephy-synchronizable-manager.h
+++ b/lib/sync/ephy-synchronizable-manager.h
@@ -30,7 +30,7 @@ G_BEGIN_DECLS
 
 G_DECLARE_INTERFACE (EphySynchronizableManager, ephy_synchronizable_manager, EPHY, SYNCHRONIZABLE_MANAGER, 
GObject)
 
-typedef void (*EphySynchronizableManagerMergeCallback) (GPtrArray *to_upload, gboolean should_force, 
gpointer user_data);
+typedef void (*EphySynchronizableManagerMergeCallback) (GPtrArray *to_upload, gpointer user_data);
 
 struct _EphySynchronizableManagerInterface {
   GTypeInterface parent_iface;
diff --git a/src/bookmarks/ephy-bookmarks-manager.c b/src/bookmarks/ephy-bookmarks-manager.c
index 0abec5b..b39310b 100644
--- a/src/bookmarks/ephy-bookmarks-manager.c
+++ b/src/bookmarks/ephy-bookmarks-manager.c
@@ -916,7 +916,7 @@ synchronizable_manager_merge (EphySynchronizableManager              *manager,
   else
     to_upload = ephy_bookmarks_manager_handle_regular_merge (self, remotes_updated, remotes_deleted);
 
-  callback (to_upload, FALSE, user_data);
+  callback (to_upload, user_data);
 }
 
 static void
diff --git a/src/ephy-shell.c b/src/ephy-shell.c
index a715725..d91fbcb 100644
--- a/src/ephy-shell.c
+++ b/src/ephy-shell.c
@@ -320,6 +320,11 @@ register_synchronizable_managers (EphyShell       *shell,
   g_assert (EPHY_IS_SYNC_SERVICE (service));
   g_assert (EPHY_IS_SHELL (shell));
 
+  if (ephy_sync_utils_history_sync_is_enabled ()) {
+    manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_history_manager (shell));
+    ephy_sync_service_register_manager (service, manager);
+  }
+
   if (ephy_sync_utils_bookmarks_sync_is_enabled ()) {
     manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_bookmarks_manager (shell));
     ephy_sync_service_register_manager (service, manager);
@@ -330,11 +335,6 @@ register_synchronizable_managers (EphyShell       *shell,
     ephy_sync_service_register_manager (service, manager);
   }
 
-  if (ephy_sync_utils_history_sync_is_enabled ()) {
-    manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_history_manager (shell));
-    ephy_sync_service_register_manager (service, manager);
-  }
-
   if (ephy_sync_utils_open_tabs_sync_is_enabled ()) {
     manager = EPHY_SYNCHRONIZABLE_MANAGER (ephy_shell_get_open_tabs_manager (shell));
     ephy_sync_service_register_manager (service, manager);


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]