[tracker/tracker-store-ipc: 4/4] Added commit support to the socket-ipc IPC & library. Added error reporting
- From: Philip Van Hoof <pvanhoof src gnome org>
- To: svn-commits-list gnome org
- Subject: [tracker/tracker-store-ipc: 4/4] Added commit support to the socket-ipc IPC & library. Added error reporting
- Date: Mon, 1 Jun 2009 19:13:06 -0400 (EDT)
commit 7d442e10b9b95a6ec6d0c5a058cd33b9ea8b0985
Author: Philip Van Hoof <philip codeminded be>
Date: Tue Jun 2 01:12:39 2009 +0200
Added commit support to the socket-ipc IPC & library. Added error reporting
---
src/libtracker-socket-ipc/Makefile.am | 3 +-
.../tracker-socket-ipc-test.c | 2 +
src/libtracker-socket-ipc/tracker-socket-ipc.c | 200 ++++++++++++++++++--
src/libtracker-socket-ipc/tracker-socket-ipc.h | 3 +
src/tracker-store/tracker-socket-listener.c | 23 ++-
5 files changed, 207 insertions(+), 24 deletions(-)
diff --git a/src/libtracker-socket-ipc/Makefile.am b/src/libtracker-socket-ipc/Makefile.am
index 883a0ba..d6bbacb 100644
--- a/src/libtracker-socket-ipc/Makefile.am
+++ b/src/libtracker-socket-ipc/Makefile.am
@@ -10,6 +10,7 @@ INCLUDES = \
$(GOBJECT_CFLAGS) \
$(GCOV_CFLAGS) \
$(LIBTRACKERSOCKETIPC_CFLAGS) \
+ $(GIO_CFLAGS) \
$(UNAC_CFLAGS)
noinst_PROGRAMS = tracker-socket-ipc-test
@@ -36,6 +37,6 @@ libtracker_socket_ipc_la_LIBADD = \
tracker_socket_ipc_test_SOURCES = tracker-socket-ipc-test.c
tracker_socket_ipc_test_LDADD = $(LIBTRACKERSOCKETIPC_LIBS) \
- $(GLIB2_LIBS) $(GOBJECT_LIBS) \
+ $(GLIB2_LIBS) $(GOBJECT_LIBS) $(GIO_LIBS) \
$(top_builddir)/src/libtracker-socket-ipc/libtracker-socket-ipc.la
diff --git a/src/libtracker-socket-ipc/tracker-socket-ipc-test.c b/src/libtracker-socket-ipc/tracker-socket-ipc-test.c
index ff0abd7..c9c2bbd 100644
--- a/src/libtracker-socket-ipc/tracker-socket-ipc-test.c
+++ b/src/libtracker-socket-ipc/tracker-socket-ipc-test.c
@@ -43,6 +43,8 @@ run_program (gpointer user_data)
on_received, (gpointer) i, NULL);
}
+ tracker_socket_ipc_queue_commit (NULL, NULL, NULL);
+
return FALSE;
}
diff --git a/src/libtracker-socket-ipc/tracker-socket-ipc.c b/src/libtracker-socket-ipc/tracker-socket-ipc.c
index 825306a..2ec9278 100644
--- a/src/libtracker-socket-ipc/tracker-socket-ipc.c
+++ b/src/libtracker-socket-ipc/tracker-socket-ipc.c
@@ -29,27 +29,53 @@
#include <stdint.h>
#include <fcntl.h>
+#include <gio/gio.h>
+
#include "tracker-socket-ipc.h"
#define IPC_ERROR_DOMAIN "TrackerSocketIpcDomain"
#define IPC_DOMAIN g_quark_from_static_string (IPC_ERROR_DOMAIN)
-static int sockfd = -1;
+static int sockfd = 0;
static GHashTable *queued = NULL;
static gchar standard_buffer[4028];
+static GFileMonitor *monitor = NULL;
typedef struct {
TrackerSocketIpcSparqlUpdateCallback callback;
GDestroyNotify destroy;
gpointer user_data;
+ gboolean handled;
} QueuedTask;
+typedef struct {
+ TrackerSocketIpcSparqlUpdateCallback callback;
+ gpointer user_data;
+ GDestroyNotify destroy;
+} ImmediateErrorInfo;
+
+static void tracker_socket_ipc_reset (void);
+
static void
queued_task_free (QueuedTask *queued_task)
{
+ if (!queued_task->handled && queued_task->callback) {
+ GError *new_error;
+
+ g_set_error (&new_error,
+ IPC_DOMAIN,
+ 0, "Unknown error, not ready");
+
+ queued_task->callback (new_error,
+ queued_task->user_data);
+
+ g_error_free (new_error);
+ }
+
if (queued_task->destroy) {
queued_task->destroy (queued_task->user_data);
}
+
g_slice_free (QueuedTask, queued_task);
}
@@ -112,6 +138,12 @@ data_to_handle_received (GIOChannel *source,
queued_task->user_data);
}
+ if (new_error) {
+ g_error_free (new_error);
+ }
+
+ queued_task->handled = TRUE;
+
g_hash_table_remove (queued, key);
g_free (free_data);
@@ -126,6 +158,8 @@ data_to_handle_received (GIOChannel *source,
}
if (cond & (G_IO_HUP | G_IO_ERR)) {
+ close (sockfd);
+ sockfd = 0;
g_io_channel_close (source);
goto failed;
}
@@ -137,14 +171,32 @@ failed:
return FALSE;
}
+static void
+on_socket_file_changed (GFileMonitor *monitor_, GFile *file, GFile *other_file, GFileMonitorEvent event_type, gpointer user_data)
+{
+ if (event_type == G_FILE_MONITOR_EVENT_CHANGES_DONE_HINT || event_type == G_FILE_MONITOR_EVENT_CREATED) {
+ tracker_socket_ipc_reset ();
+ }
+}
-void
-tracker_socket_ipc_init (void)
+static void
+tracker_socket_ipc_reset (void)
{
GIOChannel *io;
gchar *path, *tmp;
struct sockaddr_un addr;
int len;
+ GFile *file;
+
+ if (monitor) {
+ g_object_unref (monitor);
+ monitor = NULL;
+ }
+
+ if (sockfd) {
+ close (sockfd);
+ sockfd = 0;
+ }
tmp = g_strdup_printf ("tracker-%s", g_get_user_name ());
path = g_build_filename (g_get_tmp_dir (), tmp, "socket", NULL);
@@ -152,44 +204,115 @@ tracker_socket_ipc_init (void)
addr.sun_family = AF_UNIX;
strcpy (addr.sun_path, path);
- g_free (tmp);
- g_free (path);
-
sockfd = socket(PF_LOCAL, SOCK_STREAM, 0);
+
if (!sockfd) {
perror ("socket");
+ goto failed;
}
len = strlen(addr.sun_path) + sizeof(addr.sun_family);
- if (connect(sockfd, (struct sockaddr *)&addr, len) == -1) {
+ if (connect (sockfd, (struct sockaddr *)&addr, len) == -1) {
perror("connect");
close (sockfd);
- } else {
+ sockfd = 0;
+ goto failed;
+ }
- //listen (sockfd, 1);
+ io = g_io_channel_unix_new (sockfd);
- io = g_io_channel_unix_new (sockfd);
- signal(SIGPIPE, SIG_IGN);
+ if (queued) {
+ g_hash_table_unref (queued);
+ }
- queued = g_hash_table_new_full (g_str_hash, g_str_equal,
- (GDestroyNotify) g_free,
- (GDestroyNotify) queued_task_free);
+ queued = g_hash_table_new_full (g_str_hash, g_str_equal,
+ (GDestroyNotify) g_free,
+ (GDestroyNotify) queued_task_free);
- g_io_add_watch (io, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
- data_to_handle_received, (gpointer) sockfd);
+ g_io_add_watch (io, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
+ data_to_handle_received, (gpointer) sockfd);
- g_io_channel_unref (io);
- }
+ g_io_channel_unref (io);
+
+ g_free (tmp);
+ g_free (path);
+
+ return;
+
+failed:
+
+ file = g_file_new_for_path (path);
+ monitor = g_file_monitor_file (file, G_FILE_MONITOR_NONE, NULL, NULL);
+
+ g_signal_connect (G_OBJECT (monitor), "changed",
+ G_CALLBACK (on_socket_file_changed), NULL);
+
+ g_object_unref (file);
+
+ g_free (tmp);
+ g_free (path);
+}
+
+void
+tracker_socket_ipc_init (void)
+{
+ tracker_socket_ipc_reset ();
}
void
tracker_socket_ipc_shutdown (void)
{
- close (sockfd);
+ if (sockfd) {
+ close (sockfd);
+ sockfd = 0;
+ }
+
+ if (monitor) {
+ g_object_unref (monitor);
+ }
+
g_hash_table_unref (queued);
}
+static gboolean
+emit_immediate_error_idle (gpointer user_data)
+{
+ ImmediateErrorInfo *info = user_data;
+
+ if (info->callback) {
+ GError *new_error;
+
+ g_set_error (&new_error,
+ IPC_DOMAIN,
+ 0, "Tracker service not available");
+
+ info->callback (new_error, info->user_data);
+
+ g_error_free (new_error);
+ }
+
+ if (info->destroy) {
+ info->destroy (info->user_data);
+ }
+
+ return FALSE;
+}
+
+static void
+emit_immediate_error (TrackerSocketIpcSparqlUpdateCallback callback,
+ gpointer user_data, GDestroyNotify destroy)
+{
+ ImmediateErrorInfo *info = g_new (ImmediateErrorInfo, 1);
+
+ info->callback = callback;
+ info->user_data = user_data;
+ info->destroy = destroy;
+
+ g_idle_add_full (G_PRIORITY_DEFAULT, emit_immediate_error_idle,
+ info, (GDestroyNotify) g_free);
+}
+
void
tracker_socket_ipc_queue_sparql_update (const gchar *sparql,
TrackerSocketIpcSparqlUpdateCallback callback,
@@ -202,6 +325,11 @@ tracker_socket_ipc_queue_sparql_update (const gchar *sparql,
g_return_if_fail (queued != NULL);
+ if (!sockfd) {
+ emit_immediate_error (callback, user_data, destroy);
+ return;
+ }
+
queued_task = g_slice_new (QueuedTask);
queued_task->callback = callback;
queued_task->destroy = destroy;
@@ -223,3 +351,37 @@ tracker_socket_ipc_queue_sparql_update (const gchar *sparql,
g_free (query);
}
+void
+tracker_socket_ipc_queue_commit (TrackerSocketIpcSparqlUpdateCallback callback,
+ gpointer user_data,
+ GDestroyNotify destroy)
+{
+ static guint key_counter = 0;
+ QueuedTask *queued_task;
+ gchar *query, *key;
+
+ g_return_if_fail (queued != NULL);
+
+ if (!sockfd) {
+ emit_immediate_error (callback, user_data, destroy);
+ return;
+ }
+
+ queued_task = g_slice_new (QueuedTask);
+ queued_task->callback = callback;
+ queued_task->destroy = destroy;
+ queued_task->user_data = user_data;
+
+ key = g_strdup_printf ("%010u", key_counter++);
+
+ query = g_strdup_printf ("COMMIT {%s} {%010u}\nCOMMIT",
+ key, 6);
+
+ if (send (sockfd, query, strlen (query), 0) == -1) {
+ perror("send");
+ }
+
+ g_hash_table_insert (queued, key, queued_task);
+
+ g_free (query);
+}
diff --git a/src/libtracker-socket-ipc/tracker-socket-ipc.h b/src/libtracker-socket-ipc/tracker-socket-ipc.h
index 5a14bb7..29dc16b 100644
--- a/src/libtracker-socket-ipc/tracker-socket-ipc.h
+++ b/src/libtracker-socket-ipc/tracker-socket-ipc.h
@@ -37,6 +37,9 @@ void tracker_socket_ipc_queue_sparql_update (const gchar *sparql,
TrackerSocketIpcSparqlUpdateCallback callback,
gpointer user_data,
GDestroyNotify destroy);
+void tracker_socket_ipc_queue_commit (TrackerSocketIpcSparqlUpdateCallback callback,
+ gpointer user_data,
+ GDestroyNotify destroy);
G_END_DECLS
diff --git a/src/tracker-store/tracker-socket-listener.c b/src/tracker-store/tracker-socket-listener.c
index 4c704b0..e7db491 100644
--- a/src/tracker-store/tracker-socket-listener.c
+++ b/src/tracker-store/tracker-socket-listener.c
@@ -90,6 +90,12 @@ on_update_fin (GError *error,
g_free (message);
}
+static void
+on_update_commit_fin (gpointer user_data)
+{
+ on_update_fin (NULL, user_data);
+}
+
static gboolean
data_to_handle_received (GIOChannel *source,
GIOCondition cond,
@@ -133,13 +139,22 @@ data_to_handle_received (GIOChannel *source,
info->key = g_strdup (key);
info->clientfd = clientfd;
- /* g_debug ("QUEUED: %s\n", query_data); */
-
tracker_store_queue_sparql_update (query_data,
on_update_fin,
info,
(GDestroyNotify) update_info_free);
+ } else if (strstr (command, "COMMIT")) {
+
+ UpdateFinInfo *info = g_slice_new (UpdateFinInfo);
+
+ info->key = g_strdup (key);
+ info->clientfd = clientfd;
+
+ tracker_store_queue_commit (on_update_commit_fin,
+ info,
+ (GDestroyNotify) update_info_free);
+
} else {
goto failed;
}
@@ -174,7 +189,8 @@ failed:
}
-static gboolean server_cb(GIOChannel *chan, GIOCondition cond, gpointer data)
+static gboolean
+server_cb (GIOChannel *chan, GIOCondition cond, gpointer data)
{
struct sockaddr_un addr;
socklen_t addrlen;
@@ -236,7 +252,6 @@ tracker_socket_listener_init (void)
listen (sockfd, 1);
io = g_io_channel_unix_new (sockfd);
- signal(SIGPIPE, SIG_IGN);
g_io_add_watch (io, G_IO_IN | G_IO_HUP | G_IO_ERR | G_IO_NVAL,
server_cb, NULL);
[
Date Prev][
Date Next] [
Thread Prev][
Thread Next]
[
Thread Index]
[
Date Index]
[
Author Index]