[geary/mjog/imap-connection-fixes: 26/42] Update Geary.Imap.Deserialiser implementation



commit 816ea03d3c949ebb4582341c33a21f1f5916577a
Author: Michael Gratton <mike vee net>
Date:   Sun Dec 29 13:14:53 2019 +1030

    Update Geary.Imap.Deserialiser implementation
    
    Rename private fields to be more descriptive. Don't keep both a
    SocketConnection and an IOStream around and always try to close both.
    In the case of a TLS connection, just check if the stream is a TLS class
    and close the inner stream if needed. Tidy up serialiser and
    deserialiser management.

 .../imap/transport/imap-client-connection.vala     | 156 ++++++++++-----------
 src/engine/imap/transport/imap-serializer.vala     |  12 +-
 2 files changed, 83 insertions(+), 85 deletions(-)
---
diff --git a/src/engine/imap/transport/imap-client-connection.vala 
b/src/engine/imap/transport/imap-client-connection.vala
index aa3c6dc1..14a9457c 100644
--- a/src/engine/imap/transport/imap-client-connection.vala
+++ b/src/engine/imap/transport/imap-client-connection.vala
@@ -40,12 +40,6 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
     private static int next_cx_id = 0;
 
 
-    /**
-     * This identifier is used only for debugging, to differentiate connections from one another
-     * in logs and debug output.
-     */
-    public int cx_id { get; private set; }
-
     /**
      * Determines if the connection will use IMAP IDLE when idle.
      *
@@ -69,11 +63,11 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
     private weak Logging.Source? _logging_parent = null;
 
     private Geary.Endpoint endpoint;
-    private SocketConnection? cx = null;
-    private IOStream? ios = null;
-    private Serializer? ser = null;
-    private BufferedOutputStream? ser_buffer = null;
-    private Deserializer? des = null;
+    private int cx_id;
+    private IOStream? cx = null;
+    private Deserializer? deserializer = null;
+    private Serializer? serializer = null;
+    private GLib.BufferedOutputStream? serializer_buffer = null;
 
     private int tag_counter = 0;
     private char tag_prefix = 'a';
@@ -138,8 +132,9 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
     /** Returns the remote address of this connection, if any. */
     public GLib.SocketAddress? get_remote_address() throws GLib.Error {
         GLib.SocketAddress? addr = null;
-        if (cx != null) {
-            addr = cx.get_remote_address();
+        var tcp_cx = getTcpConnection();
+        if (tcp_cx != null) {
+            addr = tcp_cx.get_remote_address();
         }
         return addr;
     }
@@ -147,8 +142,9 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
     /** Returns the local address of this connection, if any. */
     public SocketAddress? get_local_address() throws GLib.Error {
         GLib.SocketAddress? addr = null;
-        if (cx != null) {
-            addr = cx.get_local_address();
+        var tcp_cx = getTcpConnection();
+        if (tcp_cx != null) {
+            addr = tcp_cx.get_local_address();
         }
         return addr;
     }
@@ -189,28 +185,22 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
         if (this.cx != null) {
             throw new ImapError.ALREADY_CONNECTED("Client already connected");
         }
-
-        this.cx = yield endpoint.connect_async(cancellable);
-        this.ios = cx;
+        this.cx = yield this.endpoint.connect_async(cancellable);
 
         this.pending_queue.clear();
         this.sent_queue.clear();
 
         try {
             yield open_channels_async();
-        } catch (Error err) {
-            // if this fails, need to close connection because the caller will not call
-            // disconnect_async()
+        } catch (GLib.Error err) {
+            // if this fails, need to close connection because the
+            // caller will not call disconnect_async()
             try {
                 yield cx.close_async();
-            } catch (Error close_err) {
+            } catch (GLib.Error close_err) {
                 // ignored
             }
-
             this.cx = null;
-            this.ios = null;
-
-            receive_failure(err);
 
             throw err;
         }
@@ -221,17 +211,14 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
     }
 
     public async void disconnect_async(Cancellable? cancellable = null) throws Error {
-        if (cx == null)
+        if (this.cx == null)
             return;
 
         this.idle_timer.reset();
 
         // To guard against reentrancy
-        SocketConnection close_cx = cx;
-        cx = null;
-
-        // close the Serializer and Deserializer
-        yield close_channels_async(cancellable);
+        GLib.IOStream old_cx = this.cx;
+        this.cx = null;
 
         // Cancel any pending commands
         foreach (Command pending in this.pending_queue.get_all()) {
@@ -241,19 +228,14 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
         this.pending_queue.clear();
 
         // close the actual streams and the connection itself
-        Error? close_err = null;
-        try {
-            yield ios.close_async(Priority.DEFAULT, cancellable);
-            yield close_cx.close_async(Priority.DEFAULT, cancellable);
-        } catch (Error err) {
-            close_err = err;
-        } finally {
-            ios = null;
-
-            if (close_err != null) {
-                close_error(close_err);
-            }
+        yield close_channels_async(cancellable);
+        yield old_cx.close_async(Priority.DEFAULT, cancellable);
 
+        var tls_cx = old_cx as GLib.TlsConnection;
+        if (tls_cx != null && !tls_cx.base_io_stream.is_closed()) {
+            yield tls_cx.base_io_stream.close_async(
+                Priority.DEFAULT, cancellable
+            );
         }
     }
 
@@ -277,9 +259,7 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
         yield close_channels_async(cancellable);
 
         // wrap connection with TLS connection
-        TlsClientConnection tls_cx = yield endpoint.starttls_handshake_async(cx, cancellable);
-
-        ios = tls_cx;
+        this.cx = yield endpoint.starttls_handshake_async(this.cx, cancellable);
 
         // re-open Serializer/Deserializer with the new streams
         yield open_channels_async();
@@ -329,33 +309,39 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
         this._logging_parent = parent;
     }
 
-    private async void open_channels_async() throws Error {
-        assert(ios != null);
-        assert(ser == null);
-        assert(des == null);
+    private GLib.TcpConnection? getTcpConnection() {
+        var cx = this.cx;
+        var tls_cx = cx as GLib.TlsConnection;
+        if (tls_cx != null) {
+            cx = tls_cx.base_io_stream;
+        }
+        return cx as TcpConnection;
+    }
 
+    private async void open_channels_async() throws Error {
         this.open_cancellable = new GLib.Cancellable();
 
-        ser_buffer = new BufferedOutputStream(ios.output_stream);
-        ser_buffer.set_close_base_stream(false);
-
         string id = "%04d".printf(cx_id);
-        ser = new Serializer(id, ser_buffer);
+
+        this.serializer_buffer = new GLib.BufferedOutputStream(
+            this.cx.output_stream
+        );
+        this.serializer_buffer.set_close_base_stream(false);
+        this.serializer = new Serializer(id, this.serializer_buffer);
 
         // Not buffering the Deserializer because it uses a
         // DataInputStream, which is already buffered
-        des = new Deserializer(id, this.cx.input_stream);
-        des.bytes_received.connect(on_bytes_received);
-        des.deserialize_failure.connect(on_deserialize_failure);
-        des.end_of_stream.connect(on_eos);
-        des.parameters_ready.connect(on_parameters_ready);
-        des.receive_failure.connect(on_receive_failure);
+        this.deserializer = new Deserializer(id, this.cx.input_stream);
+        this.deserializer.bytes_received.connect(on_bytes_received);
+        this.deserializer.deserialize_failure.connect(on_deserialize_failure);
+        this.deserializer.end_of_stream.connect(on_eos);
+        this.deserializer.parameters_ready.connect(on_parameters_ready);
+        this.deserializer.receive_failure.connect(on_receive_failure);
+        yield this.deserializer.start_async();
 
         // Start this running in the "background", it will stop when
         // open_cancellable is cancelled
         this.send_loop.begin();
-
-        yield des.start_async();
     }
 
     /** Disconnect and deallocates the Serializer and Deserializer. */
@@ -369,26 +355,28 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
         }
         this.sent_queue.clear();
 
-        // disconnect from Deserializer before yielding to stop it
-        if (des != null) {
-            des.bytes_received.disconnect(on_bytes_received);
-            des.deserialize_failure.disconnect(on_deserialize_failure);
-            des.end_of_stream.disconnect(on_eos);
-            des.parameters_ready.disconnect(on_parameters_ready);
-            des.receive_failure.disconnect(on_receive_failure);
+        if (this.serializer != null) {
+            yield this.serializer.close_stream(cancellable);
+            this.serializer = null;
+        }
 
-            yield des.stop_async();
+        if (this.serializer_buffer != null) {
+            yield this.serializer_buffer.close_async(
+                GLib.Priority.DEFAULT, cancellable
+            );
+            this.serializer_buffer = null;
         }
-        des = null;
-        ser = null;
-        // Close the Serializer's buffered stream after it as been
-        // deallocated so it can't possibly write to the stream again,
-        // and so the stream's async thread doesn't attempt to flush
-        // its buffers from its finaliser at some later unspecified
-        // point, possibly writing to an invalid underlying stream.
-        if (ser_buffer != null) {
-            yield ser_buffer.close_async(GLib.Priority.DEFAULT, cancellable);
-            ser_buffer = null;
+
+        var deserializer = this.deserializer;
+        if (deserializer != null) {
+            deserializer.bytes_received.disconnect(on_bytes_received);
+            deserializer.deserialize_failure.disconnect(on_deserialize_failure);
+            deserializer.end_of_stream.disconnect(on_eos);
+            deserializer.parameters_ready.disconnect(on_parameters_ready);
+            deserializer.receive_failure.disconnect(on_receive_failure);
+
+            yield deserializer.stop_async();
+            this.deserializer = null;
         }
     }
 
@@ -431,7 +419,7 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
                 // Check the queue is still empty after sending the
                 // command, since that might have changed.
                 if (this.pending_queue.is_empty) {
-                    yield this.ser.flush_stream(cancellable);
+                    yield this.serializer.flush_stream(cancellable);
                 }
             } catch (GLib.Error err) {
                 if (!(err is GLib.IOError.CANCELLED)) {
@@ -462,9 +450,9 @@ public class Geary.Imap.ClientConnection : BaseObject, Logging.Source {
 
             this.current_command = command;
             this.sent_queue.add(command);
-            yield command.send(this.ser, cancellable);
+            yield command.send(this.serializer, cancellable);
             sent_command(command);
-            yield command.send_wait(this.ser, cancellable);
+            yield command.send_wait(this.serializer, cancellable);
         } catch (GLib.Error err) {
             ser_error = err;
         }
diff --git a/src/engine/imap/transport/imap-serializer.vala b/src/engine/imap/transport/imap-serializer.vala
index 549391fc..f1a6a4fc 100644
--- a/src/engine/imap/transport/imap-serializer.vala
+++ b/src/engine/imap/transport/imap-serializer.vala
@@ -20,9 +20,11 @@
  */
 public class Geary.Imap.Serializer : BaseObject {
 
+
     private string identifier;
     private GLib.DataOutputStream output;
 
+
     public Serializer(string identifier, GLib.OutputStream output) {
         this.identifier = identifier;
         this.output = new GLib.DataOutputStream(output);
@@ -121,7 +123,15 @@ public class Geary.Imap.Serializer : BaseObject {
      */
     public async void flush_stream(GLib.Cancellable? cancellable = null)
         throws GLib.Error {
-        yield this.output.flush_async(Priority.DEFAULT, cancellable);
+        yield this.output.flush_async(GLib.Priority.DEFAULT, cancellable);
+    }
+
+    /**
+     * Closes the stream, ensuring a command has been sent.
+     */
+    public async void close_stream(GLib.Cancellable? cancellable)
+        throws GLib.IOError {
+        yield this.output.close_async(GLib.Priority.DEFAULT, cancellable);
     }
 
     /**


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]