[gnome-documents/wip/flickr: 4/12] Flickr-miner: Synchronous version with pooling



commit c641f790ed5f37b3345851144ed560baa1d87db9
Author: Marek Chalupa <mchalupa redhat com>
Date:   Fri May 17 07:45:20 2013 +0200

    Flickr-miner: Synchronous version with pooling
    
    Since GdMiner object frees job after end of query_* function, the query
    needs to do everything synchrounously. There were a problem with doing
    it all synchrounously because of asynchronous adding of sources. This is
    not still solved. This version uses GThreadPool for browsing containers.
    It hangs when it comes to browsing not-root containers, which it did not
    do when it was asynchronous or when the functions were used standalone.

 src/miner/gd-flickr-miner.c |  383 ++++++++++++++++++++++++++-----------------
 1 files changed, 234 insertions(+), 149 deletions(-)
---
diff --git a/src/miner/gd-flickr-miner.c b/src/miner/gd-flickr-miner.c
index 17a95ff..92472e7 100644
--- a/src/miner/gd-flickr-miner.c
+++ b/src/miner/gd-flickr-miner.c
@@ -18,18 +18,46 @@
  * Author: Marek Chalupa <mchalupa redhat com>
  */
 
+/* DEV:   ------------------------------------------------
+ * to make it called in gd-miner we have to add the flickr account into
+   gd_miner_refresh_db_real into doc_objects. */
+/* ------------------------------------------------------- */
+
 #define GOA_API_IS_SUBJECT_TO_CHANGE
 #include <goa/goa.h>
 #include <grilo.h>
 
+
 #include "gd-flickr-miner.h"
 
 /** FIXME find out how to create this identifier */
 #define MINER_IDENTIFIER "gd:flickr:miner:30058620-777c-47a3-a19c-a6cdf4a315c4"
 
 #define GRILO_TARGET_PLUGIN "grl-flickr"
+#define FLICKR_MINER_MAX_BROWSE_THREADS 1
+#define POOL_WAIT_SEC 3
 
 /* ==================== DECLARATIONS ==================== */
+
+/*
+ * GrlMedia with it's source and parent
+ */
+struct entry {
+  GrlSource *source;
+  GrlMedia  *folder;
+  GrlMedia  *parent;
+};
+
+struct pool_data
+{
+  GThreadPool       *pool;
+  gboolean          active;
+ // GMutex            *mutex;   /* used when manipulating with entries */
+  GHashTable        *entries; /* data given to pool --> need to be freed */
+  gint              sources_no;
+  GdAccountMinerJob *job;
+};
+
 static void
 query_flickr (GdAccountMinerJob *job,
               GError **error);
@@ -38,25 +66,21 @@ static GObject *
 create_service (GdMiner *self,
                 GoaObject *object);
 
+/* FIXME -> can delete job argument since it's present in pool_data struct */
 static void
 account_miner_job_browse_container (GdAccountMinerJob *job,
-                                    GrlSource         *source,
-                                    GrlMedia          *container);
+                                    struct entry      *entry,
+                                    struct pool_data  *pool);
 static gboolean
 account_miner_job_process_entry (GdAccountMinerJob *job,
                                  GrlMedia *entry,
                                  GError   **error);
+
 static void
-browse_container_cb (GrlSource *  source,
-                     guint        operation_id,
-                     GrlMedia *   media,
-                     guint        remaining,
-                     gpointer     user_data,
-                     const GError *error);
+source_added_cb (GrlRegistry *registry, GrlSource *source, gpointer user_data);
+
 static void
-source_added_cb (GrlRegistry *registry,
-                 GrlSource   *source,
-                 gpointer     user_data);
+pool_push (gpointer data, gpointer user_data);
 
 /* ==================== GOBJECT ==================== */
 
@@ -76,7 +100,7 @@ gd_flickr_miner_class_init (GdFlickrMinerClass *klass)
   GError *error = NULL;
 
   /* TODO get and assign provider type from plugin */
-  miner_class->goa_provider_type = "flickr";
+  miner_class->goa_provider_type = "flickr"; /* leave blank - we dont need intf to search something for us */
   miner_class->miner_identifier = MINER_IDENTIFIER;
   miner_class->version = 1;
 
@@ -94,76 +118,212 @@ gd_flickr_miner_class_init (GdFlickrMinerClass *klass)
 
   registry = grl_registry_get_default();
 
-
-
-  /* FIXME: make the path relative and find universal way */
-  /* we can use load_all, since create service returns only flickr plugin */
-  if (! grl_registry_load_plugin(registry,
-                                  "/home/marek/local/lib64/grilo-0.2/libgrlflickr.so",
-                                  &error))
+  if (! grl_registry_load_plugin_by_id (registry, "grl-flickr", &error))
   {
     g_error ("Flickr Miner cannot be loaded. Cannot load flickr "
              "plugin (Grilo) :: dbg error = %s\n", error->message);
   }
+  
 }
 
+
 /* ==================== "EXPORTED" FUNCTIONS ==================== */
+
 static void
 query_flickr (GdAccountMinerJob *job,
               GError **error)
 {
-  g_debug ("Querying flickr");
-
   GrlRegistry *registry;
   GList *m, *sources;
- 
-  registry = grl_registry_get_default ();
 
-  /* enable asyncronous adding of grilo sources */
-  /* TODO - solve possible multiple browsing of sources */
-  g_signal_connect (registry, "source-added",
-                    G_CALLBACK (source_added_cb), job);
+  struct entry    *ent;
+  struct pool_data pooldata;
 
-  /* TODO - dont do that, do it all via source-added */
-  sources = grl_plugin_get_sources (GRL_PLUGIN (job->service));
+  gulong sig_handler;
+
+  if (GPOINTER_TO_INT (job->service) == 0)
+    return;
+
+  GThreadPool *p = g_thread_pool_new (pool_push, &pooldata,
+                                      FLICKR_MINER_MAX_BROWSE_THREADS,
+                                      /*FALSE, NULL); // FALSE ==> no errors */
+                                      TRUE, error);
+
+  if (*error != NULL)
+  {
+    g_warning ("Pool: %s", (*error)->message);
+    return;
+  }
+
+  pooldata.pool = p;
+  pooldata.active = TRUE;
+ // pooldata.mutex = g_mutex_new ();
+  pooldata.entries = g_hash_table_new (NULL, NULL);
+  //pooldata.sources_no = GPOINTER_TO_INT (job->service);
+  pooldata.job = job;
+
+  registry = grl_registry_get_default ();
+
+  sources = grl_registry_get_sources (registry, FALSE);
 
   for (m = sources; m != NULL; m = g_list_next (m))
   {
-    /* TODO what to do with the error? */
-    account_miner_job_browse_container (job,
-                                        GRL_SOURCE (m->data),
-                                        NULL);
+    g_debug ("Got source: %s", grl_source_get_name (GRL_SOURCE (m->data)));
+
+    ent = g_slice_alloc (sizeof (struct entry));
+
+    ent->source = GRL_SOURCE (m->data);
+    ent->folder = NULL;
+    ent->parent = NULL;
+
+    g_hash_table_add (pooldata.entries, ent);
+    //pooldata.sources_no--;
+
+    if (g_thread_pool_push (pooldata.pool, (gpointer) ent, error) == FALSE)
+    {
+      /* warn but continue */
+      g_warning ("Pool push: %s", (*error)->message);
+      g_error_free (*error);
+      *error = NULL;
+    }
+  }
+
+  /* Wait for pending threads */
+  while (1)
+  {
+    // dont hurry, wait POOL_WAIT_SEC before asking for state
+    g_usleep (G_USEC_PER_SEC * POOL_WAIT_SEC);
+
+    if (g_hash_table_size (pooldata.entries) == 0)
+    {
+      pooldata.active = FALSE;
+
+      g_debug ("No pending job. Quiting query..");
+      break;
+    }
   }
+  
+
+  /* ==========  Clean up ========== */
+
+  g_thread_pool_free (pooldata.pool, FALSE, TRUE);
+  pooldata.pool = NULL;
+
+  g_hash_table_destroy (pooldata.entries);
+
+  g_debug ("Ending query_flickr");
 }
 
-/* Fix me - in generialized version return GrlRegistry and
- * in create service just configure it (and return) */
 static GObject *
 create_service (GdMiner *self,
                 GoaObject *object)
-{ 
-  GrlRegistry *registry;
-  GrlPlugin *plugin;
+{  
+  static gint s = 0;
 
-  registry = grl_registry_get_default ();
-  plugin = grl_registry_lookup_plugin (registry, GRILO_TARGET_PLUGIN);
 
-  if (plugin == NULL)
-    g_error ("Could not find services (grilo plugin: %s)", GRILO_TARGET_PLUGIN);
-
-  return G_OBJECT (g_object_ref (plugin));
+  if (s == 0)
+  {
+    s = 1;
+    return GINT_TO_POINTER (1);
+  }
+  else
+    return GINT_TO_POINTER (0);
 }
 
 /* ==================== PRIVATE FUNCTIONS ==================== */
+static void
+pool_push (gpointer data, gpointer user_data)
+{
+  struct entry     *ent   = (struct entry *)      data;
+  struct pool_data *pool  = (struct pool_data *) user_data;
+
+  account_miner_job_browse_container (pool->job, ent, pool);
+}
+
+
+/* FIXME --> delete job argument (is in pool) */
+static void
+account_miner_job_browse_container (GdAccountMinerJob *job,
+                                    struct entry      *entry,
+                                    struct pool_data  *pool)
+{
+  g_return_if_fail (entry != NULL);
+  g_return_if_fail (entry->folder == NULL || GRL_IS_MEDIA (entry->folder));
+  g_return_if_fail (entry->parent == NULL || GRL_IS_MEDIA (entry->parent));
+  g_return_if_fail (GRL_IS_SOURCE (entry->source));
+
+  g_debug ("Browsing container %s of %s (%s)", entry->folder ? grl_media_get_title (entry->folder) : 
"[root]",
+                                          entry->parent ? grl_media_get_title (entry->parent) : "[root]",
+                                          grl_source_get_name (entry->source));
+
+  /* Skip public source */
+  if (g_strcmp0 (grl_source_get_name (entry->source), "Flickr") == 0) {
+    g_debug ("Skipping public source");
+    g_hash_table_remove (pool->entries, entry);
+    return;
+  }
+ 
+  GrlOperationOptions *ops;
+  const GList *keys;
+  GError *err = NULL;
+  GList *result, *m;
+
+  GrlMedia *media;
+  struct entry *ent;
+
+  /* get possiblly all */
+  keys = grl_source_supported_keys (entry->source);
+  ops = grl_operation_options_new (grl_source_get_caps (entry->source, GRL_OP_BROWSE));
+
+  result = grl_source_browse_sync (entry->source, entry->folder, keys, ops, NULL);
+
+  for (m = result; m != NULL; m = g_list_next (m))
+  {
+    media = GRL_MEDIA (m->data);//grl_source_resolve_sync (source, GRL_MEDIA (m->data), keys, ops, NULL);
+    account_miner_job_process_entry (pool->job, media, NULL); 
+
+    if (GRL_IS_MEDIA_BOX (media) /* && public == FALSE */)
+    {
+      ent = g_slice_alloc (sizeof (struct entry));
+
+      ent->source = entry->source;
+      ent->folder = g_object_ref (media); 
+      ent->parent = (entry->folder == NULL) ?
+                      NULL : g_object_ref (entry->folder);
+    
+      g_hash_table_add (pool->entries, ent);
+
+      account_miner_job_browse_container(pool->job, ent, pool);
+      //if (g_thread_pool_push (pool->pool, ent, &err) == FALSE)
+      //  g_warning ("Pooling container: %s", err->message);
+    }
+  }
+
+  g_list_free_full (result, g_object_unref);
+
+  if (entry->folder != NULL)
+    g_object_unref (entry->folder);
+  if (entry->parent != NULL)
+    g_object_unref (entry->parent);
+
+  gpointer mem = g_hash_table_lookup (pool->entries, entry);
+  g_slice_free1 (sizeof (struct entry), mem);
+
+  g_hash_table_remove (pool->entries, entry);
+
+
+  g_object_unref (ops);
+}
 
 static gboolean
 account_miner_job_process_entry (GdAccountMinerJob *job,
                                  GrlMedia *entry,
                                  GError   **error)
 {
-  g_debug ("Got media %s from source %s", grl_media_get_title (entry),
-                                          grl_media_get_source (entry));
-  /*
+  g_debug ("Got %s %s from source %s", GRL_IS_MEDIA_BOX (entry) ? "box" : "media",
+                                        grl_media_get_title (entry),
+                                        grl_media_get_source (entry));
+/*
   GDateTime *created_time, *updated_time;
   gchar *contact_resource;
   gchar *resource = NULL;
@@ -172,22 +332,24 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
   gboolean resource_exists, mtime_changed;
   gint64 new_mtime;
 
-  id = zpj_skydrive_entry_get_id (entry);
+  id = grl_media_get_id (entry);
 
-  identifier = g_strdup_printf ("%swindows-live:skydrive:%s",
-                                ZPJ_IS_SKYDRIVE_FOLDER (entry) ? "gd:collection:" : "",
+  identifier = g_strdup_printf ("%sflickr:%s",
+                                GRL_IS_MEDIA_BOX (entry) ? "gd:collection:" : "",
                                 id);
-
+  
   // remove from the list of the previous resources
-  g_hash_table_remove (job->previous_resources, identifier);
+  //g_hash_table_remove (job->previous_resources, identifier);
 
-  name = zpj_skydrive_entry_get_name (entry);
+  name = grl_media_get_title (entry);
 
-  if (ZPJ_IS_SKYDRIVE_FILE (entry))
-    class = gd_filename_to_rdf_type (name);
-  else if (ZPJ_IS_SKYDRIVE_FOLDER (entry))
-    class = "nfo:DataContainer";
 
+  if (GRL_IS_MEDIA_BOX (entry))
+    class = "nfo:DataContainer";
+  else
+    class = gd_filename_to_rdf_type (name);
+*/
+  /*
   resource = gd_miner_tracker_sparql_connection_ensure_resource
     (job->connection,
      job->cancellable, error,
@@ -198,6 +360,7 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
   if (*error != NULL)
     goto out;
 
+
   gd_miner_tracker_update_datasource (job->connection, job->datasource_urn,
                                       resource_exists, identifier, resource,
                                       job->cancellable, error);
@@ -205,7 +368,7 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
   if (*error != NULL)
     goto out;
 
-  updated_time = zpj_skydrive_entry_get_updated_time (entry);
+  updated_time = grl_media_get_modification_date (entry);
   new_mtime = g_date_time_to_unix (updated_time);
   mtime_changed = gd_miner_tracker_update_mtime (job->connection, new_mtime,
                                                  resource_exists, identifier, resource,
@@ -230,8 +393,10 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
   if (*error != NULL)
     goto out;
 
-  if (ZPJ_IS_SKYDRIVE_FILE (entry))
-    {
+
+  if (! GRL_IS_MEDIA_BOX (entry))
+    { 
+      g_warning ("isPartOf undefined!!");
       gchar *parent_resource_urn, *parent_identifier;
       const gchar *parent_id, *mime;
 
@@ -271,15 +436,17 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
         }
     }
 
+  // insert description 
   gd_miner_tracker_sparql_connection_insert_or_replace_triple
     (job->connection,
      job->cancellable, error,
      job->datasource_urn, resource,
-     "nie:description", zpj_skydrive_entry_get_description (entry));
+     "nie:description", grl_media_get_description (entry));
 
   if (*error != NULL)
     goto out;
 
+  // insert filename 
   gd_miner_tracker_sparql_connection_insert_or_replace_triple
     (job->connection,
      job->cancellable, error,
@@ -289,14 +456,16 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
   if (*error != NULL)
     goto out;
 
+  // DEV: why? 
   contact_resource = gd_miner_tracker_utils_ensure_contact_resource
     (job->connection,
      job->cancellable, error,
-     job->datasource_urn, zpj_skydrive_entry_get_from_name (entry));
+     job->datasource_urn, grl_media_get_author (entry));
 
   if (*error != NULL)
     goto out;
 
+  // insert author 
   gd_miner_tracker_sparql_connection_insert_or_replace_triple
     (job->connection,
      job->cancellable, error,
@@ -307,7 +476,8 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
   if (*error != NULL)
     goto out;
 
-  created_time = zpj_skydrive_entry_get_created_time (entry);
+  // get and insert creation date 
+  created_time = grl_media_get_creation_date (entry);
   date = gd_iso8601_from_timestamp (g_date_time_to_unix (created_time));
   gd_miner_tracker_sparql_connection_insert_or_replace_triple
     (job->connection,
@@ -325,94 +495,9 @@ account_miner_job_process_entry (GdAccountMinerJob *job,
 
   if (*error != NULL)
     return FALSE;
-*/
   
   g_object_unref (entry);
-
+*/
   return TRUE;
 }
 
-static void
-account_miner_job_browse_container (GdAccountMinerJob *job,
-                                    GrlSource *source,
-                                    GrlMedia  *container)
-{
-  g_return_if_fail (GRL_IS_SOURCE (source));
-  g_return_if_fail (container == NULL || GRL_IS_MEDIA  (container));
-
-  /* Skip public source */
-  if (g_strcmp0 (grl_source_get_name (source), "Flickr") == 0) {
-    g_debug ("Skipping public source");
-    return;
-  }
-
-  GrlOperationOptions *ops;
-  const GList *keys;
-  gint op_id;
-
-  /* get possiblly all */
-  ops = grl_operation_options_new (NULL);
-  keys = grl_source_supported_keys (source);
-
-  op_id = grl_source_browse (source, container,
-                             keys, ops, browse_container_cb, job);
-
-  /* TODO use op_id to make it cancellable */
-
-  g_object_unref (ops);
-}
-
-static void
-browse_container_cb (GrlSource *source,
-                     guint operation_id,
-                     GrlMedia *media,
-                     guint remaining,
-                     gpointer user_data,
-                     const GError *error)
-{
-  if (error != NULL)
-  {
-    g_warning ("%s", error->message);
-    return;
-  }
-
-  GError *err = NULL;
-
-  if (media != NULL)
-  {
-    if (GRL_IS_MEDIA_BOX (media) && source != NULL)
-    {
-      account_miner_job_browse_container ((GdAccountMinerJob *) user_data,
-                                          source, media);
-      g_object_unref (media);
-    }
-    else if (GRL_IS_MEDIA_IMAGE (media))
-    {
-      /* TODO now is process entry undefined, but if it will be
-       * some kind of async, we need to handle errors somehow */
-      account_miner_job_process_entry ((GdAccountMinerJob *) user_data,
-                                       media, &err);
-
-      if (err != NULL)
-      {
-        g_warning ("%s", err->message);
-        g_error_free (err);
-      }
-    }
-    else
-    {
-      /* some future extension? */
-      return;
-    }
-  }
-}
-
-static void
-source_added_cb (GrlRegistry *registry,
-                 GrlSource   *source,
-                 gpointer     user_data)
-{
-  g_debug ("New source: %s", grl_source_get_name (source));
-
-  account_miner_job_browse_container ((GdAccountMinerJob *) user_data, source, NULL);
-}


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]