[tracker/tracker-store-ipc: 4/4] Added commit support to the socket-ipc IPC & library. Added error reporting



commit 7d442e10b9b95a6ec6d0c5a058cd33b9ea8b0985
Author: Philip Van Hoof <philip codeminded be>
Date:   Tue Jun 2 01:12:39 2009 +0200

    Added commit support to the socket-ipc IPC & library. Added error reporting
---
 src/libtracker-socket-ipc/Makefile.am              |    3 +-
 .../tracker-socket-ipc-test.c                      |    2 +
 src/libtracker-socket-ipc/tracker-socket-ipc.c     |  200 ++++++++++++++++++--
 src/libtracker-socket-ipc/tracker-socket-ipc.h     |    3 +
 src/tracker-store/tracker-socket-listener.c        |   23 ++-
 5 files changed, 207 insertions(+), 24 deletions(-)

diff --git a/src/libtracker-socket-ipc/Makefile.am b/src/libtracker-socket-ipc/Makefile.am
index 883a0ba..d6bbacb 100644
--- a/src/libtracker-socket-ipc/Makefile.am
+++ b/src/libtracker-socket-ipc/Makefile.am
@@ -10,6 +10,7 @@ INCLUDES =						\
 	$(GOBJECT_CFLAGS)				\
 	$(GCOV_CFLAGS)					\
 	$(LIBTRACKERSOCKETIPC_CFLAGS)			\
+	$(GIO_CFLAGS)					\
 	$(UNAC_CFLAGS)
 
 noinst_PROGRAMS = tracker-socket-ipc-test
@@ -36,6 +37,6 @@ libtracker_socket_ipc_la_LIBADD = 				\
 tracker_socket_ipc_test_SOURCES = tracker-socket-ipc-test.c
 
 tracker_socket_ipc_test_LDADD = $(LIBTRACKERSOCKETIPC_LIBS) 	\
-	$(GLIB2_LIBS) $(GOBJECT_LIBS) \
+	$(GLIB2_LIBS) $(GOBJECT_LIBS) $(GIO_LIBS) \
 	$(top_builddir)/src/libtracker-socket-ipc/libtracker-socket-ipc.la
 
diff --git a/src/libtracker-socket-ipc/tracker-socket-ipc-test.c b/src/libtracker-socket-ipc/tracker-socket-ipc-test.c
index ff0abd7..c9c2bbd 100644
--- a/src/libtracker-socket-ipc/tracker-socket-ipc-test.c
+++ b/src/libtracker-socket-ipc/tracker-socket-ipc-test.c
@@ -43,6 +43,8 @@ run_program (gpointer user_data)
 		                                        on_received, (gpointer) i, NULL);
 	}
 
+	tracker_socket_ipc_queue_commit (NULL, NULL, NULL);
+
 	return FALSE;
 }
 
diff --git a/src/libtracker-socket-ipc/tracker-socket-ipc.c b/src/libtracker-socket-ipc/tracker-socket-ipc.c
index 825306a..2ec9278 100644
--- a/src/libtracker-socket-ipc/tracker-socket-ipc.c
+++ b/src/libtracker-socket-ipc/tracker-socket-ipc.c
@@ -29,27 +29,53 @@
 #include <stdint.h>
 #include <fcntl.h>
 
+#include <gio/gio.h>
+
 #include "tracker-socket-ipc.h"
 
 #define IPC_ERROR_DOMAIN      "TrackerSocketIpcDomain"
 #define IPC_DOMAIN             g_quark_from_static_string (IPC_ERROR_DOMAIN)
 
-static int sockfd = -1;
+static int sockfd = 0;
 static GHashTable *queued = NULL;
 static gchar standard_buffer[4028];
+static GFileMonitor *monitor = NULL;
 
 typedef struct {
 	TrackerSocketIpcSparqlUpdateCallback callback;
 	GDestroyNotify destroy;
 	gpointer user_data;
+	gboolean handled;
 } QueuedTask;
 
+typedef struct {
+	TrackerSocketIpcSparqlUpdateCallback callback;
+	gpointer user_data;
+	GDestroyNotify destroy;
+} ImmediateErrorInfo;
+
+static void tracker_socket_ipc_reset (void);
+
 static void
 queued_task_free (QueuedTask *queued_task)
 {
+	if (!queued_task->handled && queued_task->callback) {
+		GError *new_error;
+
+		g_set_error (&new_error,
+		             IPC_DOMAIN,
+		             0, "Unknown error, not ready");
+
+		queued_task->callback (new_error, 
+		                       queued_task->user_data);
+
+		g_error_free (new_error);
+	}
+
 	if (queued_task->destroy) {
 		queued_task->destroy (queued_task->user_data);
 	}
+
 	g_slice_free (QueuedTask, queued_task);
 }
 
@@ -112,6 +138,12 @@ data_to_handle_received (GIOChannel *source,
 					                       queued_task->user_data);
 				}
 
+				if (new_error) {
+					g_error_free (new_error);
+				}
+
+				queued_task->handled = TRUE;
+
 				g_hash_table_remove (queued, key);
 
 				g_free (free_data);
@@ -126,6 +158,8 @@ data_to_handle_received (GIOChannel *source,
 	}
 
 	if (cond & (G_IO_HUP | G_IO_ERR)) {
+		close (sockfd);
+		sockfd = 0;
 		g_io_channel_close (source);
 		goto failed;
 	}
@@ -137,14 +171,32 @@ failed:
 	return FALSE;
 
 }
+static void 
+on_socket_file_changed (GFileMonitor *monitor_, GFile *file, GFile *other_file, GFileMonitorEvent event_type, gpointer user_data)
+{
+	if (event_type == G_FILE_MONITOR_EVENT_CHANGES_DONE_HINT || event_type == G_FILE_MONITOR_EVENT_CREATED) {
+		tracker_socket_ipc_reset ();
+	}
+}
 
-void
-tracker_socket_ipc_init (void)
+static void
+tracker_socket_ipc_reset (void)
 {
 	GIOChannel *io;
 	gchar *path, *tmp;
 	struct sockaddr_un addr;
 	int len;
+	GFile *file;
+
+	if (monitor) {
+		g_object_unref (monitor);
+		monitor = NULL;
+	}
+
+	if (sockfd) {
+		close (sockfd);
+		sockfd = 0;
+	}
 
 	tmp = g_strdup_printf ("tracker-%s", g_get_user_name ());
 	path = g_build_filename (g_get_tmp_dir (), tmp, "socket", NULL);
@@ -152,44 +204,115 @@ tracker_socket_ipc_init (void)
 	addr.sun_family = AF_UNIX;
 	strcpy (addr.sun_path, path);
 
-	g_free (tmp);
-	g_free (path);
-
 	sockfd = socket(PF_LOCAL, SOCK_STREAM, 0);
+
 	if (!sockfd) {
 		perror ("socket");
+		goto failed;
 	}
 
 	len = strlen(addr.sun_path) + sizeof(addr.sun_family);
 
-	if (connect(sockfd, (struct sockaddr *)&addr, len) == -1) {
+	if (connect (sockfd, (struct sockaddr *)&addr, len) == -1) {
 		perror("connect");
 		close (sockfd);
-	} else {
+		sockfd = 0;
+		goto failed;
+	}
 
-		//listen (sockfd, 1);
+	io = g_io_channel_unix_new (sockfd);
 
-		io = g_io_channel_unix_new (sockfd);
-		signal(SIGPIPE, SIG_IGN);
+	if (queued) {
+		g_hash_table_unref (queued);
+	}
 
-		queued = g_hash_table_new_full (g_str_hash, g_str_equal,
-		                                (GDestroyNotify) g_free,
-		                                (GDestroyNotify) queued_task_free);
+	queued = g_hash_table_new_full (g_str_hash, g_str_equal,
+	                                (GDestroyNotify) g_free,
+	                                (GDestroyNotify) queued_task_free);
 
-		g_io_add_watch (io, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
-		                data_to_handle_received, (gpointer) sockfd);
+	g_io_add_watch (io, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+	                data_to_handle_received, (gpointer) sockfd);
 	
-		g_io_channel_unref (io);
-	}
+	g_io_channel_unref (io);
+
+	g_free (tmp);
+	g_free (path);
+
+	return;
+
+failed:
+
+	file = g_file_new_for_path (path);
+	monitor =  g_file_monitor_file (file, G_FILE_MONITOR_NONE, NULL, NULL);
+
+	g_signal_connect (G_OBJECT (monitor), "changed", 
+			  G_CALLBACK (on_socket_file_changed), NULL);
+
+	g_object_unref (file);
+
+	g_free (tmp);
+	g_free (path);
+}
+
+void
+tracker_socket_ipc_init (void)
+{
+	tracker_socket_ipc_reset ();
 }
 
 void
 tracker_socket_ipc_shutdown (void)
 {
-	close (sockfd);
+	if (sockfd) {
+		close (sockfd);
+		sockfd = 0;
+	}
+
+	if (monitor) {
+		g_object_unref (monitor);
+	}
+
 	g_hash_table_unref (queued);
 }
 
+static gboolean 
+emit_immediate_error_idle (gpointer user_data)
+{
+	ImmediateErrorInfo *info = user_data;
+
+	if (info->callback) {
+		GError *new_error;
+
+		g_set_error (&new_error,
+		             IPC_DOMAIN,
+		             0, "Tracker service not available");
+
+		info->callback (new_error, info->user_data);
+
+		g_error_free (new_error);
+	}
+
+	if (info->destroy) {
+		info->destroy (info->user_data);
+	}
+
+	return FALSE;
+}
+
+static void
+emit_immediate_error (TrackerSocketIpcSparqlUpdateCallback callback,
+                      gpointer user_data, GDestroyNotify destroy)
+{
+	ImmediateErrorInfo *info = g_new (ImmediateErrorInfo, 1);
+
+	info->callback = callback;
+	info->user_data = user_data;
+	info->destroy = destroy;
+
+	g_idle_add_full (G_PRIORITY_DEFAULT, emit_immediate_error_idle, 
+	                 info, (GDestroyNotify) g_free);
+}
+
 void
 tracker_socket_ipc_queue_sparql_update   (const gchar   *sparql,
                                           TrackerSocketIpcSparqlUpdateCallback callback,
@@ -202,6 +325,11 @@ tracker_socket_ipc_queue_sparql_update   (const gchar   *sparql,
 
 	g_return_if_fail (queued != NULL);
 
+	if (!sockfd) {
+		emit_immediate_error (callback, user_data, destroy);
+		return;
+	}
+
 	queued_task = g_slice_new (QueuedTask);
 	queued_task->callback = callback;
 	queued_task->destroy = destroy;
@@ -223,3 +351,37 @@ tracker_socket_ipc_queue_sparql_update   (const gchar   *sparql,
 	g_free (query);
 }
 
+void
+tracker_socket_ipc_queue_commit   (TrackerSocketIpcSparqlUpdateCallback callback,
+                                   gpointer       user_data,
+                                   GDestroyNotify destroy)
+{
+	static guint key_counter = 0;
+	QueuedTask *queued_task;
+	gchar *query, *key;
+
+	g_return_if_fail (queued != NULL);
+
+	if (!sockfd) {
+		emit_immediate_error (callback, user_data, destroy);
+		return;
+	}
+
+	queued_task = g_slice_new (QueuedTask);
+	queued_task->callback = callback;
+	queued_task->destroy = destroy;
+	queued_task->user_data = user_data;
+
+	key = g_strdup_printf ("%010u", key_counter++);
+
+	query = g_strdup_printf ("COMMIT {%s} {%010u}\nCOMMIT",
+	                         key, 6);
+
+	if (send (sockfd, query, strlen (query), 0) == -1) {
+		perror("send");
+	}
+
+	g_hash_table_insert (queued, key, queued_task);
+
+	g_free (query);
+}
diff --git a/src/libtracker-socket-ipc/tracker-socket-ipc.h b/src/libtracker-socket-ipc/tracker-socket-ipc.h
index 5a14bb7..29dc16b 100644
--- a/src/libtracker-socket-ipc/tracker-socket-ipc.h
+++ b/src/libtracker-socket-ipc/tracker-socket-ipc.h
@@ -37,6 +37,9 @@ void          tracker_socket_ipc_queue_sparql_update   (const gchar   *sparql,
                                                         TrackerSocketIpcSparqlUpdateCallback callback,
                                                         gpointer       user_data,
                                                         GDestroyNotify destroy);
+void          tracker_socket_ipc_queue_commit   (TrackerSocketIpcSparqlUpdateCallback callback,
+                                                 gpointer       user_data,
+                                                 GDestroyNotify destroy);
 
 G_END_DECLS
 
diff --git a/src/tracker-store/tracker-socket-listener.c b/src/tracker-store/tracker-socket-listener.c
index 4c704b0..e7db491 100644
--- a/src/tracker-store/tracker-socket-listener.c
+++ b/src/tracker-store/tracker-socket-listener.c
@@ -90,6 +90,12 @@ on_update_fin (GError   *error,
 	g_free (message);
 }
 
+static void
+on_update_commit_fin (gpointer  user_data)
+{
+	on_update_fin (NULL, user_data);
+}
+
 static gboolean
 data_to_handle_received (GIOChannel *source,
                          GIOCondition cond,
@@ -133,13 +139,22 @@ data_to_handle_received (GIOChannel *source,
 					info->key = g_strdup (key);
 					info->clientfd = clientfd;
 
-					/* g_debug ("QUEUED: %s\n", query_data); */
-
 					tracker_store_queue_sparql_update (query_data, 
 					                                   on_update_fin, 
 					                                   info,
 					                                   (GDestroyNotify) update_info_free);
 
+				} else if (strstr (command, "COMMIT")) {
+
+					UpdateFinInfo *info = g_slice_new (UpdateFinInfo);
+
+					info->key = g_strdup (key);
+					info->clientfd = clientfd;
+
+					tracker_store_queue_commit (on_update_commit_fin, 
+					                            info,
+					                            (GDestroyNotify) update_info_free);
+
 				} else {
 					goto failed;
 				}
@@ -174,7 +189,8 @@ failed:
 
 }
 
-static gboolean server_cb(GIOChannel *chan, GIOCondition cond, gpointer data)
+static gboolean 
+server_cb (GIOChannel *chan, GIOCondition cond, gpointer data)
 {
 	struct sockaddr_un addr;
 	socklen_t addrlen;
@@ -236,7 +252,6 @@ tracker_socket_listener_init (void)
 		listen (sockfd, 1);
 
 		io = g_io_channel_unix_new (sockfd);
-		signal(SIGPIPE, SIG_IGN);
 
 		g_io_add_watch (io, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
 		                server_cb, NULL);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]