[tracker] libtracker-miner: Do not break ordering with bulk operations



commit c0310dbb3da32865ec43732de6121d1def7aaf52
Author: Carlos Garnacho <carlosg gnome org>
Date:   Wed Feb 24 13:41:01 2016 +0100

    libtracker-miner: Do not break ordering with bulk operations
    
    The way bulk operations break the ordering of the sparql updates
    may bring certain glitches. eg. if a delete (bulk) operation
    happens on a folder while the sparql buffer already contains
    updates corresponding to files within that folder, the way we
    order operations results in the delete happening first, and
    the inserts/updates happening afterwards. This may leave the
    database in inconsistent states.
    
    So make bulk operations a 1 file thing, this could maybe be
    smarter and try to compress consecutive similar operations,
    but there's not much added value in it. Plus, the bulk operations
    we issue in the miner would never match that criteria.
    
    Because we no longer break the ordering with bulk operations,
    things like the error map are no longer necessary, so we can
    remove some icky code here.
    
    BulkOperationMerge and its functions could be further simplified,
    that's left for a later cleanup.

 src/libtracker-miner/tracker-sparql-buffer.c |  158 ++++++--------------------
 1 files changed, 37 insertions(+), 121 deletions(-)
---
diff --git a/src/libtracker-miner/tracker-sparql-buffer.c b/src/libtracker-miner/tracker-sparql-buffer.c
index 761ad2f..5d0be93 100644
--- a/src/libtracker-miner/tracker-sparql-buffer.c
+++ b/src/libtracker-miner/tracker-sparql-buffer.c
@@ -79,15 +79,11 @@ struct _UpdateArrayData {
        TrackerSparqlBuffer *buffer;
        GPtrArray *tasks;
        GArray *sparql_array;
-       GArray *error_map;
-       GPtrArray *bulk_ops;
-       gint n_bulk_operations;
 };
 
 struct _BulkOperationMerge {
        const gchar *bulk_operation;
        GList *tasks;
-       gchar *sparql;
 };
 
 
@@ -236,18 +232,11 @@ update_array_data_free (UpdateArrayData *update_data)
                g_array_free (update_data->sparql_array, TRUE);
        }
 
-       if (update_data->bulk_ops) {
-               /* The BulkOperationMerge structs which contain the sparql strings
-                * are deallocated here */
-               g_ptr_array_free (update_data->bulk_ops, TRUE);
-       }
-
        g_ptr_array_foreach (update_data->tasks,
                             (GFunc) remove_task_foreach,
                             update_data->buffer);
        g_ptr_array_free (update_data->tasks, TRUE);
 
-       g_array_free (update_data->error_map, TRUE);
        g_slice_free (UpdateArrayData, update_data);
 }
 
@@ -290,18 +279,9 @@ tracker_sparql_buffer_update_array_cb (GObject      *object,
                if (global_error) {
                        error = global_error;
                } else {
-                       gint error_pos;
-
-                       error_pos = g_array_index (update_data->error_map, gint, i);
-
-                       /* Find the corresponing error according to the passed map,
-                        * numbers >= 0 are non-bulk tasks, and < 0 are bulk tasks,
-                        * so the number of bulk operations must be added, as these
-                        * tasks are prepended.
-                        */
-                       error_pos += update_data->n_bulk_operations;
-                       error = g_ptr_array_index (sparql_array_errors, error_pos);
+                       error = g_ptr_array_index (sparql_array_errors, i);
                        if (error) {
+                               const gchar *sparql = NULL;
                                GFile *file;
                                gchar *uri;
 
@@ -311,51 +291,26 @@ tracker_sparql_buffer_update_array_cb (GObject      *object,
                                            i, uri, error->message);
                                g_free (uri);
 
-                               if (error_pos < update_data->n_bulk_operations) {
-                                       BulkOperationMerge *bulk;
-                                       GList *tasks;
-                                       gint bulk_pos;
-
-                                       bulk_pos = ABS (error_pos - update_data->n_bulk_operations + 1);
-                                       bulk = g_ptr_array_index (update_data->bulk_ops, bulk_pos);
-                                       tasks = bulk->tasks;
-
-                                       g_debug ("    Affected files:");
-
-                                       while (tasks) {
-                                               TrackerTask *task = tasks->data;
-                                               gchar *uri;
-
-                                               uri = g_file_get_uri (tracker_task_get_file (task));
-                                               g_debug ("      - %s", uri);
-                                               g_free (uri);
+                               uri = g_file_get_uri (tracker_task_get_file (task));
+                               g_debug ("    Affected file: %s", uri);
+                               g_free (uri);
 
-                                               tasks = tasks->next;
-                                       }
+                               switch (task_data->type) {
+                               case TASK_TYPE_SPARQL_STR:
+                                       sparql = task_data->data.str;
+                                       break;
+                               case TASK_TYPE_SPARQL:
+                                       sparql = tracker_sparql_builder_get_result (task_data->data.builder);
+                                       break;
+                               case TASK_TYPE_BULK:
+                                       sparql = task_data->data.bulk.str;
+                                       break;
+                               default:
+                                       break;
+                               }
 
-                                       g_debug ("    Sparql: %s", bulk->sparql);
-                               } else {
-                                       const gchar *sparql = NULL;
-                                       gchar *uri;
-
-                                       uri = g_file_get_uri (tracker_task_get_file (task));
-                                       g_debug ("    Affected file: %s", uri);
-                                       g_free (uri);
-
-                                       switch (task_data->type) {
-                                       case TASK_TYPE_SPARQL_STR:
-                                               sparql = task_data->data.str;
-                                               break;
-                                       case TASK_TYPE_SPARQL:
-                                               sparql = tracker_sparql_builder_get_result 
(task_data->data.builder);
-                                               break;
-                                       default:
-                                               break;
-                                       }
-
-                                       if (sparql) {
-                                               g_debug ("    Sparql: %s", sparql);
-                                       }
+                               if (sparql) {
+                                       g_debug ("    Sparql: %s", sparql);
                                }
                        }
                }
@@ -385,14 +340,9 @@ tracker_sparql_buffer_update_array_cb (GObject      *object,
        }
 }
 
-static void
+static gchar *
 bulk_operation_merge_finish (BulkOperationMerge *merge)
 {
-       if (merge->sparql) {
-               g_free (merge->sparql);
-               merge->sparql = NULL;
-       }
-
        if (merge->bulk_operation && merge->tasks) {
                GString *equals_string = NULL, *children_string = NULL, *sparql;
                gint n_equals = 0;
@@ -484,8 +434,10 @@ bulk_operation_merge_finish (BulkOperationMerge *merge)
                        g_string_append_printf (sparql, "} ");
                }
 
-               merge->sparql = g_string_free (sparql, FALSE);
+               return g_string_free (sparql, FALSE);
        }
+
+       return NULL;
 }
 
 static BulkOperationMerge *
@@ -506,7 +458,6 @@ bulk_operation_merge_free (BulkOperationMerge *operation)
                        (GFunc) tracker_task_unref,
                        NULL);
        g_list_free (operation->tasks);
-       g_free (operation->sparql);
        g_slice_free (BulkOperationMerge, operation);
 }
 
@@ -515,10 +466,10 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
                              const gchar         *reason)
 {
        TrackerSparqlBufferPrivate *priv;
-       GPtrArray *bulk_ops = NULL;
-       GArray *sparql_array, *error_map;
+       GArray *sparql_array;
+       GPtrArray *bulk_sparql;
        UpdateArrayData *update_data;
-       gint i, j;
+       gint i;
 
        priv = buffer->priv;
 
@@ -540,79 +491,41 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
 
        /* Loop buffer and construct array of strings */
        sparql_array = g_array_new (FALSE, TRUE, sizeof (gchar *));
-       error_map = g_array_new (TRUE, TRUE, sizeof (gint));
+       bulk_sparql = g_ptr_array_new_with_free_func ((GDestroyNotify) g_free);
 
        for (i = 0; i < priv->tasks->len; i++) {
                SparqlTaskData *task_data;
                TrackerTask *task;
-               gint pos;
 
                task = g_ptr_array_index (priv->tasks, i);
                task_data = tracker_task_get_data (task);
 
                if (task_data->type == TASK_TYPE_SPARQL_STR) {
                        g_array_append_val (sparql_array, task_data->data.str);
-                       pos = sparql_array->len - 1;
                } else if (task_data->type == TASK_TYPE_SPARQL) {
                        const gchar *str;
 
                        str = tracker_sparql_builder_get_result (task_data->data.builder);
                        g_array_append_val (sparql_array, str);
-                       pos = sparql_array->len - 1;
                } else if (task_data->type == TASK_TYPE_BULK) {
                        BulkOperationMerge *bulk = NULL;
-                       gint j;
-
-                       if (G_UNLIKELY (!bulk_ops)) {
-                               bulk_ops = g_ptr_array_new_with_free_func ((GDestroyNotify) 
bulk_operation_merge_free);
-                       }
-
-                       for (j = 0; j < bulk_ops->len; j++) {
-                               BulkOperationMerge *cur;
-
-                               cur = g_ptr_array_index (bulk_ops, j);
-
-                               /* This is a comparison of intern strings */
-                               if (cur->bulk_operation == task_data->data.bulk.str) {
-                                       bulk = cur;
-                                       pos = - 1 - j;
-                                       break;
-                               }
-                       }
-
-                       if (!bulk) {
-                               bulk = bulk_operation_merge_new (task_data->data.bulk.str);
-                               g_ptr_array_add (bulk_ops, bulk);
-                               pos = - bulk_ops->len;
-                       }
+                       gchar *str;
 
+                       bulk = bulk_operation_merge_new (task_data->data.bulk.str);
                        bulk->tasks = g_list_prepend (bulk->tasks,
                                                      tracker_task_ref (task));
-               }
-
-               g_array_append_val (error_map, pos);
-       }
 
-       if (bulk_ops) {
-               for (j = 0; j < bulk_ops->len; j++) {
-                       BulkOperationMerge *bulk;
-
-                       bulk = g_ptr_array_index (bulk_ops, j);
-                       bulk_operation_merge_finish (bulk);
+                       str = bulk_operation_merge_finish (bulk);
+                       g_ptr_array_add (bulk_sparql, str);
+                       g_array_append_val (sparql_array, str);
 
-                       if (bulk->sparql) {
-                               g_array_prepend_val (sparql_array,
-                                                    bulk->sparql);
-                       }
+                       bulk_operation_merge_free (bulk);
                }
        }
 
        update_data = g_slice_new0 (UpdateArrayData);
        update_data->buffer = buffer;
        update_data->tasks = g_ptr_array_ref (priv->tasks);
-       update_data->bulk_ops = bulk_ops;
-       update_data->n_bulk_operations = bulk_ops ? bulk_ops->len : 0;
-       update_data->error_map = error_map;
        update_data->sparql_array = sparql_array;
 
        /* Empty pool, update_data will keep
@@ -632,6 +545,9 @@ tracker_sparql_buffer_flush (TrackerSparqlBuffer *buffer,
                                                      tracker_sparql_buffer_update_array_cb,
                                                      update_data);
 
+       /* These strings we generated here can be freed now */
+       g_ptr_array_free (bulk_sparql, TRUE);
+
        return TRUE;
 }
 


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]