balsa r7971 - in trunk: . src



Author: pawels
Date: Sun Sep 14 18:14:24 2008
New Revision: 7971
URL: http://svn.gnome.org/viewvc/balsa?rev=7971&view=rev

Log:
* src/balsa-index.c: process messages with external program - asynchronously.


Modified:
   trunk/ChangeLog
   trunk/src/balsa-index.c

Modified: trunk/src/balsa-index.c
==============================================================================
--- trunk/src/balsa-index.c	(original)
+++ trunk/src/balsa-index.c	Sun Sep 14 18:14:24 2008
@@ -2241,32 +2241,282 @@
                            BNDX_SEARCH_DIRECTION_PREV);
 }
 
-/* Pipe commands. */
+/* Functions and data structures for asynchronous message piping via
+   external commands. */
+/** PipeData stores the context of a message currently sent via a pipe
+    to an external command. */
+struct PipeData {
+    GDestroyNotify destroy_notify;
+    gpointer notify_arg;
+    gchar *message;
+    ssize_t message_length;
+    ssize_t chars_written;
+    GIOChannel *in_channel;
+    GIOChannel *out_channel;
+    GIOChannel *err_channel;
+    guint in_source;
+    guint out_source;
+    guint err_source;
+    int out_closed:1;
+    int err_closed:1;
+};
 
 static void
-bndx_pipe(LibBalsaMailbox * mailbox, guint msgno, const gchar * pipe_cmd)
+pipe_data_destroy(struct PipeData* pipe)
 {
-    FILE *fprog;
+    if(pipe->destroy_notify)
+	pipe->destroy_notify(pipe->notify_arg);
+    g_free(pipe->message);
+    if(pipe->in_channel)
+	g_io_channel_unref(pipe->in_channel);
+    g_io_channel_unref(pipe->out_channel);
+    g_io_channel_unref(pipe->err_channel);
+    if(pipe->in_source)
+	g_source_remove(pipe->in_source);
+    g_source_remove(pipe->out_source);
+    g_source_remove(pipe->err_source);
+    g_free(pipe);
+}
 
-    if ((fprog = popen(pipe_cmd, "w")) == 0) {
-        fprintf(stderr, "popen failed for message %d\n", msgno);
-    } else {
-        GMimeStream *stream =
-            libbalsa_mailbox_get_message_stream(mailbox, msgno, TRUE);
-        GMimeStream *pipe;
-
-        g_return_if_fail(stream);
-
-        pipe = g_mime_stream_file_new(fprog);
-        g_mime_stream_file_set_owner(GMIME_STREAM_FILE(pipe), FALSE);
-        libbalsa_mailbox_lock_store(mailbox);
-        g_mime_stream_write_to_stream(stream, pipe);
-        libbalsa_mailbox_unlock_store(mailbox);
-        g_object_unref(pipe);
-        g_object_unref(stream);
-        if (pclose(fprog) == -1)
-            fprintf(stderr, "pclose failed for message %d\n", msgno);
+static gboolean
+pipe_in_watch(GIOChannel *channel, GIOCondition condition, gpointer data)
+{
+    struct PipeData *pipe = (struct PipeData*)data;
+    GIOStatus status;
+    GError *error = NULL;
+    gsize chars_written;
+    gboolean rc;
+    
+    if( (condition & G_IO_OUT) == G_IO_OUT) {
+	status =
+	    g_io_channel_write_chars(channel,
+				     pipe->message + pipe->chars_written,
+				     pipe->message_length-pipe->chars_written,
+				     &chars_written,
+				     &error);
+
+	switch(status) {
+	case G_IO_STATUS_ERROR:
+	    libbalsa_information(LIBBALSA_INFORMATION_ERROR,
+				 _("Cannot process the message: %s"),
+				 error->message);
+	    g_clear_error(&error);
+	    pipe_data_destroy(pipe);
+	    return FALSE;
+	case G_IO_STATUS_NORMAL:
+	    pipe->chars_written += chars_written;
+	    break;
+	case G_IO_STATUS_EOF:
+	    printf("pipe_in::write_chars receieved premature EOF %s\n",
+		   error ? error->message : "unknown");
+	    pipe_data_destroy(pipe);
+	    return FALSE;
+	case G_IO_STATUS_AGAIN:
+	    printf("pipe_in::write_chars again?\n");
+	    break;
+	}
+    }
+    if( (condition & G_IO_HUP) == G_IO_HUP) {
+	pipe_data_destroy(pipe);
+	rc = FALSE;
+    }
+    if( (condition & G_IO_ERR) == G_IO_ERR) {
+	fprintf(stderr,
+		"pipe_in_watch encountered error. Aborts writing.\n");
+	return FALSE;
+    }
+    rc = pipe->message_length > pipe->chars_written;
+    if(!rc) {
+	g_io_channel_unref(pipe->in_channel); pipe->in_channel = NULL;
+	g_source_remove(pipe->in_source);     pipe->in_source = 0;
+    }
+    return rc;
+}
+
+static gboolean
+pipe_out_watch(GIOChannel *channel, GIOCondition condition, gpointer data)
+{
+    struct PipeData *pipe = (struct PipeData*)data;
+    char buf[2048];
+    gsize bytes_read;
+    GIOStatus status;
+    GError *error = NULL;
+    gchar *s;
+
+    if ( (condition & G_IO_IN) == G_IO_IN) {
+	status =
+	    g_io_channel_read_chars(channel, buf, sizeof(buf), &bytes_read,
+				    &error);
+	switch(status) {
+	case G_IO_STATUS_ERROR:
+	    pipe_data_destroy(pipe);
+	    fprintf(stderr, "Reading characters from pipe failed: %s\n",
+		    error ? error->message : "unknown");
+	    g_clear_error(&error);
+	    return FALSE;
+	case G_IO_STATUS_NORMAL:
+	    s = g_strndup(buf, bytes_read > 128 ? 128 : bytes_read);
+	    libbalsa_information(LIBBALSA_INFORMATION_MESSAGE, "%s", s);
+	    g_free(s);
+	    /* if(fwrite(buf, 1, bytes_read, stdout) != bytes_read); */
+	    break;
+	case G_IO_STATUS_EOF:
+	    printf("pipe_out got EOF\n");
+	    pipe_data_destroy(pipe);
+	    g_clear_error(&error);
+	    return FALSE;
+	case G_IO_STATUS_AGAIN: break;
+	}
+    }
+
+    if ( condition == G_IO_HUP) {
+	if(channel == pipe->out_channel)
+	    pipe->out_closed = 1;
+	else
+	    pipe->err_closed = 1;
+	if(pipe->out_closed && pipe->err_closed)
+	    pipe_data_destroy((struct PipeData*)data);
+    }
+
+    if ( (condition & G_IO_ERR) == G_IO_ERR) {
+	fprintf(stderr,
+		"pipe_out_watch encountered error...\n");
+	pipe_data_destroy(pipe);
+	return FALSE;
+    }
+    return TRUE;
+}
+
+/** BndxPipeQueue represents the context of message pipe
+    processing. Several messages from specified mailbox can be sent
+    via the pipe. For each message, a separate context as stored in
+    PipeData structure is created. */
+
+struct BndxPipeQueue {
+    LibBalsaMailbox *mailbox;
+    GArray *msgnos;
+    gchar *pipe_cmd;
+};
+
+static void
+bndx_pipe_queue_last(struct BndxPipeQueue *queue)
+{
+    gchar **argv;
+    struct PipeData *pipe;
+    GMimeStream *stream = NULL;
+    gboolean spawned;
+    gssize chars_read;
+    GError *error = NULL;
+    int std_input;
+    int std_output;
+    int std_error;
+    guint msgno;
+
+    while(queue->msgnos->len>0){
+	msgno = g_array_index(queue->msgnos, guint, queue->msgnos->len-1);
+	stream =
+	    libbalsa_mailbox_get_message_stream(queue->mailbox, msgno, TRUE);
+	g_array_remove_index(queue->msgnos, queue->msgnos->len-1);
+	if(stream)
+	    break;
+	if(!stream) {
+	    libbalsa_information(LIBBALSA_INFORMATION_ERROR,
+				 _("Cannot access message %u to pass to %s"),
+				 msgno, queue->pipe_cmd);
+	}
     }
+
+    if(queue->msgnos->len == 0 && !stream) {
+	printf("Piping finished. Destroying the context.\n");
+	libbalsa_mailbox_unregister_msgnos(queue->mailbox, queue->msgnos);
+	libbalsa_mailbox_close(queue->mailbox, FALSE);
+	g_array_free(queue->msgnos, TRUE);
+	g_free(queue->pipe_cmd);
+	g_free(queue);
+	return;
+    }
+
+    pipe = g_new0(struct PipeData, 1);
+    pipe->destroy_notify = (GDestroyNotify)bndx_pipe_queue_last;
+    pipe->notify_arg = queue;
+
+    pipe->message_length = g_mime_stream_length(stream);
+    pipe->message = g_malloc(pipe->message_length);
+    libbalsa_mailbox_lock_store(queue->mailbox);
+    chars_read = g_mime_stream_read(stream, pipe->message,
+				    pipe->message_length);
+    libbalsa_mailbox_unlock_store(queue->mailbox);
+    if(chars_read != pipe->message_length) {
+	    libbalsa_information(LIBBALSA_INFORMATION_ERROR,
+				 _("Cannot read message %u to pass to %s"),
+				 msgno, queue->pipe_cmd);
+	g_free(pipe);
+	bndx_pipe_queue_last(queue);
+	return;
+    }
+    
+    argv = g_new(gchar *, 4);
+    argv[0] = g_strdup("/bin/sh");
+    argv[1] = g_strdup("-c");
+    argv[2] = g_strdup(queue->pipe_cmd);
+    argv[3] = NULL;
+    spawned =
+	g_spawn_async_with_pipes(NULL, argv, NULL,
+				 0, NULL, NULL,
+				 NULL,
+				 &std_input, &std_output,
+				 &std_error,
+				 &error);
+    g_strfreev(argv);
+    if(spawned) {
+	pipe->in_channel = g_io_channel_unix_new(std_input);
+	g_io_channel_set_flags(pipe->in_channel, G_IO_FLAG_NONBLOCK, NULL);
+	pipe->in_source = g_io_add_watch(pipe->in_channel, G_IO_OUT | G_IO_HUP,
+					 pipe_in_watch, pipe);
+	pipe->out_channel = g_io_channel_unix_new(std_output);
+	g_io_channel_set_flags(pipe->out_channel, G_IO_FLAG_NONBLOCK, NULL);
+	pipe->out_source = g_io_add_watch(pipe->out_channel, G_IO_IN|G_IO_HUP,
+					  pipe_out_watch, pipe);
+	pipe->err_channel = g_io_channel_unix_new(std_error);
+	g_io_channel_set_flags(pipe->err_channel, G_IO_FLAG_NONBLOCK, NULL);
+	pipe->err_source = g_io_add_watch(pipe->err_channel, G_IO_IN | G_IO_HUP,
+					  pipe_out_watch, pipe);
+
+	g_io_channel_set_encoding(pipe->in_channel, NULL, NULL);
+	g_io_channel_set_encoding(pipe->out_channel, NULL, NULL);
+	g_io_channel_set_encoding(pipe->err_channel, NULL, NULL);
+	g_io_channel_set_close_on_unref(pipe->in_channel, TRUE);
+	g_io_channel_set_close_on_unref(pipe->out_channel, TRUE);
+	g_io_channel_set_close_on_unref(pipe->err_channel, TRUE);
+    } else {
+	printf("Could not spawn pipe %s : %s\n", queue->pipe_cmd,
+	       error ? error->message : "unknown");
+	g_clear_error(&error);
+    }    
+}
+
+/** Initiates the asynchronous process of sending specified messages
+    from given mailbox via the provided command. */
+static gboolean
+bndx_start_pipe_messages_array(LibBalsaMailbox *mailbox,
+			       GArray *msgnos,
+			       const char *pipe_cmd)
+{
+    guint i;
+    struct BndxPipeQueue *queue = g_new(struct BndxPipeQueue, 1);
+    queue->mailbox = mailbox;
+    if(!libbalsa_mailbox_open(mailbox, NULL))
+	return FALSE;
+    queue->msgnos = g_array_sized_new(FALSE, FALSE, sizeof(guint), msgnos->len);
+    queue->pipe_cmd = g_strdup(pipe_cmd);
+    for(i=0; i<msgnos->len; i++)
+	g_array_append_val(queue->msgnos,
+			   g_array_index(msgnos, guint, msgnos->len-i-1));
+	    ;
+    libbalsa_mailbox_register_msgnos(mailbox, queue->msgnos);
+
+    bndx_pipe_queue_last(queue);
+    return TRUE;
 }
 
 struct bndx_mailbox_info {
@@ -2300,7 +2550,6 @@
     if (response == GTK_RESPONSE_OK) {
         gchar *pipe_cmd;
         GList *active_cmd;
-        guint i;
 
 #if GTK_CHECK_VERSION(2, 6, 0)
         pipe_cmd =
@@ -2323,9 +2572,7 @@
                 g_list_concat(active_cmd, balsa_app.pipe_cmds);
         }
 
-        for (i = 0; i < info->msgnos->len && mailbox; i++)
-            bndx_pipe(mailbox, g_array_index(info->msgnos, guint, i),
-                      pipe_cmd);
+	bndx_start_pipe_messages_array(mailbox, info->msgnos, pipe_cmd);
         g_free(pipe_cmd);
     }
 
@@ -2360,12 +2607,13 @@
         return;
     }
 
+    if(!libbalsa_mailbox_open(index->mailbox_node->mailbox, NULL))
+	return;
     info = g_new(struct bndx_mailbox_info, 1);
     info->bindex = index;
     info->mailbox = index->mailbox_node->mailbox;
     g_object_set_data_full(G_OBJECT(info->mailbox), BALSA_INDEX_PIPE_INFO,
                            info, bndx_mailbox_notify);
-    libbalsa_mailbox_open(info->mailbox, NULL);
 
     info->msgnos = balsa_index_selected_msgnos_new(index);
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]