[pan2: 2/8] intermediate



commit 3eb66093d8ba118ae2bd2b59f22e213ceff36bc9
Author: Heinrich MÃller <henmull src gnome org>
Date:   Wed Oct 3 11:57:49 2012 +0200

    intermediate

 configure.in              |    5 +
 pan/data-impl/data-impl.h |    8 ++-
 pan/data-impl/server.cc   |   23 +++++
 pan/data/data.h           |    7 ++-
 pan/data/server-info.h    |    8 ++
 pan/general/Makefile.am   |    1 +
 pan/gui/Makefile.am       |    4 +-
 pan/gui/gtk-compat.h      |   12 +--
 pan/gui/server-ui.cc      |   75 +++++++++++++++-
 pan/tasks/Makefile.am     |    6 +-
 pan/tasks/decoder.h       |    2 +
 pan/tasks/nntp.cc         |   33 +++++++
 pan/tasks/nntp.h          |   16 +++-
 pan/tasks/queue.cc        |   31 ++++++-
 pan/tasks/queue.h         |    5 +
 pan/tasks/socket.cc       |    3 +-
 pan/tasks/socket.h        |    7 ++
 pan/tasks/task-xover.cc   |  217 ++++++++++++++++++++++++++++++++++++++++-----
 pan/tasks/task-xover.h    |   36 +++++++-
 pan/tasks/task.h          |   16 ++--
 20 files changed, 468 insertions(+), 47 deletions(-)
---
diff --git a/configure.in b/configure.in
index 2895646..99c6c60 100644
--- a/configure.in
+++ b/configure.in
@@ -88,6 +88,11 @@ AC_DEFINE_UNQUOTED([GETTEXT_PACKAGE],["$GETTEXT_PACKAGE"],[gettext package name]
 AM_GLIB_GNU_GETTEXT
 panlocaledir='${prefix}/${DATADIRNAME}/locale'
 
+dnl libz for xzver
+PKG_CHECK_MODULES([ZLIB], [zlib >= 1.2.0],AC_DEFINE(HAVE_ZLIB,[1],[Zlib for xzver support]),[])
+AC_SUBST(ZLIB_CFLAGS)
+AC_SUBST(ZLIB_LIBS)
+
 dnl D-Bus support
 AC_ARG_WITH(dbus, AS_HELP_STRING([--with-dbus],[enable D-Bus support (normally: no)]), [want_dbus=$withval], [want_dbus=no])
 if test "x$want_dbus" = "xyes" ; then
diff --git a/pan/data-impl/data-impl.h b/pan/data-impl/data-impl.h
index c742269..234d24d 100644
--- a/pan/data-impl/data-impl.h
+++ b/pan/data-impl/data-impl.h
@@ -39,7 +39,6 @@
 #include <pan/data/article-cache.h>
 #include <pan/data/encode-cache.h>
 #include <pan/data/data.h>
-
 #include <pan/data-impl/data-io.h>
 #include <pan/data-impl/article-filter.h>
 #include <pan/data-impl/rules-filter.h>
@@ -105,7 +104,6 @@ namespace pan
       ArticleCache _cache;
       EncodeCache _encode_cache;
       CertStore _certstore;
-      Prefs _prefs;
       Queue* _queue;
 
     public:
@@ -118,6 +116,7 @@ namespace pan
       void rebuild_backend ();
       const bool _unit_test;
       DataIO * _data_io;
+      Prefs& _prefs;
 
     /**
     *** SERVERS
@@ -169,6 +168,9 @@ namespace pan
       virtual void set_server_article_expiration_age  (const Quark  & server,
                                                        int            days);
 
+      virtual void set_server_compression_type  (const Quark   & server,
+                                                 const int       setme);
+
       virtual void save_server_info (const Quark& server);
 
     public: // accessors
@@ -187,6 +189,8 @@ namespace pan
 
       virtual bool get_server_trust (const Quark  & servername, int&) const;
 
+      virtual bool get_server_compression_type (const Quark  & servername, CompressionType&) const;
+
       virtual bool get_server_addr (const Quark   & server,
                                     std::string   & setme_host,
                                     int           & setme_port) const;
diff --git a/pan/data-impl/server.cc b/pan/data-impl/server.cc
index 40f27c0..6553b1b 100644
--- a/pan/data-impl/server.cc
+++ b/pan/data-impl/server.cc
@@ -157,6 +157,15 @@ DataImpl :: set_server_trust  (const Quark   & server,
 }
 
 void
+DataImpl :: set_server_compression_type  (const Quark   & server,
+                                          const int       setme)
+{
+  Server * s (find_server (server));
+  assert (s);
+  s->compression_type = setme;
+}
+
+void
 DataImpl :: set_server_addr (const Quark       & server,
                              const StringView  & host,
                              int                 port)
@@ -286,6 +295,18 @@ DataImpl :: get_server_trust (const Quark   & server, int& setme) const
 }
 
 bool
+DataImpl :: get_server_compression_type (const Quark   & server, CompressionType& setme) const
+{
+  const Server * s (find_server (server));
+  const bool found (s);
+  if (found) {
+    setme = get_compression_type(s->compression_type);
+  }
+
+  return found;
+}
+
+bool
 DataImpl :: get_server_addr (const Quark   & server,
                              std::string   & setme_host,
                              int           & setme_port) const
@@ -481,6 +502,7 @@ DataImpl :: load_server_properties (const DataIO& source)
     s.cert = kv["cert"];
     int trust(to_int(kv["trust"], 0));
     s.trust = trust;
+    s.compression_type = to_int(kv["compression-type"], 0); // NONE
     s.newsrc_filename = kv["newsrc"];
     if (s.newsrc_filename.empty()) { // set a default filename
       std::ostringstream o;
@@ -545,6 +567,7 @@ else
          << indent(depth) << "<rank>" << s->rank << "</rank>\n"
          << indent(depth) << "<use-ssl>" << s->ssl_support << "</use-ssl>\n"
          << indent(depth) << "<trust>" << s->trust << "</trust>\n"
+         << indent(depth) << "<compression-type>" << s->compression_type << "</compression-type>\n"
          << indent(depth) << "<cert>"    << s->cert << "</cert>\n";
 
     *out << indent(--depth) << "</server>\n";
diff --git a/pan/data/data.h b/pan/data/data.h
index 684961d..648f0e3 100644
--- a/pan/data/data.h
+++ b/pan/data/data.h
@@ -27,6 +27,7 @@
 
 #include <pan/general/macros.h>
 #include <pan/general/quark.h>
+#include <pan/general/compression.h>
 #include <pan/general/string-view.h>
 #include <pan/usenet-utils/scorefile.h>
 #include <pan/data/article.h>
@@ -34,6 +35,7 @@
 #include <pan/data/encode-cache.h>
 #include <pan/data/cert-store.h>
 #include <pan/data/server-info.h>
+
 #include <pan/gui/prefs.h>
 #include <pan/gui/progress-view.h>
 
@@ -242,12 +244,13 @@ namespace pan
          int rank;
          int ssl_support;
          int trust;
+         int compression_type;
          typedef sorted_vector<Quark,true,AlphabeticalQuarkOrdering> groups_t;
          groups_t groups;
          gchar* gkr_pw;
 
          Server(): port(STD_NNTP_PORT), article_expiration_age(31), max_connections(2),
-                    rank(1), ssl_support(0), trust(0), gkr_pw(NULL) {}
+                    rank(1), ssl_support(0), trust(0), gkr_pw(NULL), compression_type(0) /* NONE */ {}
       };
 
     protected:
@@ -314,6 +317,8 @@ namespace pan
 
       virtual bool get_server_trust (const Quark  & servername, int&) const = 0;
 
+      virtual bool get_server_compression_type (const Quark  & servername, CompressionType&) const = 0;
+
       virtual std::string get_server_cert (const Quark & server) const = 0;
 
       virtual int get_server_limits (const Quark & server) const = 0;
diff --git a/pan/data/server-info.h b/pan/data/server-info.h
index 7f4f6a9..d0b407d 100644
--- a/pan/data/server-info.h
+++ b/pan/data/server-info.h
@@ -22,10 +22,12 @@
 
 #include <string>
 #include <pan/general/quark.h>
+#include <pan/general/compression.h>
 #include <pan/general/string-view.h>
 
 namespace pan
 {
+
   struct ServerRank
   {
     virtual ~ServerRank () {}
@@ -38,6 +40,7 @@ namespace pan
    */
   class ServerInfo: public ServerRank
   {
+
     public:
 
       virtual ~ServerInfo () {}
@@ -52,6 +55,9 @@ namespace pan
       virtual void set_server_trust (const Quark       & servername,
                                      const int           setme) = 0;
 
+      virtual void set_server_compression_type (const Quark       & servername,
+                                                  const int           setme) = 0;
+
       virtual void set_server_addr (const Quark       & servername,
                                     const StringView  & address,
                                     const int           port) = 0;
@@ -81,6 +87,8 @@ namespace pan
 
       virtual bool get_server_trust (const Quark  & servername, int&) const = 0;
 
+      virtual bool get_server_compression_type (const Quark  & servername, CompressionType&) const = 0;
+
       virtual bool get_server_addr (const Quark   & servername,
                                     std::string   & setme_address,
                                     int           & setme_port) const = 0;
diff --git a/pan/general/Makefile.am b/pan/general/Makefile.am
index 1bd5a1d..b23d480 100644
--- a/pan/general/Makefile.am
+++ b/pan/general/Makefile.am
@@ -17,6 +17,7 @@ libgeneralutils_a_SOURCES = \
  worker-pool.cc
 
 noinst_HEADERS = \
+ compression.h \
  debug.h \
  defgroup.h \
  e-util.h \
diff --git a/pan/gui/Makefile.am b/pan/gui/Makefile.am
index ca82ce3..135a075 100644
--- a/pan/gui/Makefile.am
+++ b/pan/gui/Makefile.am
@@ -1,5 +1,5 @@
 AM_CPPFLAGS = -I top_srcdir@ @GTKSPELL_CFLAGS@ @ENCHANT_CFLAGS@ @GTK_CFLAGS@ @GMIME_CFLAGS@ @GLIB_CFLAGS@ \
-							@GNUTLS_CFLAGS@ @LIBNOTIFY_CFLAGS@ @LIBGNOME_KEYRING_1_CFLAGS@ @WEBKITGTK_CFLAGS@ -DPANLOCALEDIR=\""$(panlocaledir)"\"
+	@GNUTLS_CFLAGS@ @LIBNOTIFY_CFLAGS@ @LIBGNOME_KEYRING_1_CFLAGS@ @WEBKITGTK_CFLAGS -DPANLOCALEDIR=\""$(panlocaledir)"\" @ZLIB_CFLAGS@
 
 noinst_LIBRARIES = libpangui.a
 
@@ -106,7 +106,7 @@ endif
 
 pan_SOURCES = gui.cc pan.cc $(WINRC)
 pan_LDADD = ./libpangui.a $(WINRCOBJ) ../data-impl/libpandata.a ../tasks/libtasks.a ../data/libdata.a ../usenet-utils/libusenetutils.a ../general/libgeneralutils.a ../../uulib/libuu.a \
-		@GTKSPELL_LIBS@ @ENCHANT_LIBS@ @GTK_LIBS@ @GMIME_LIBS@ @GLIB_LIBS@ @GNUTLS_LIBS@ @LIBNOTIFY_LIBS@ @LIBGNOME_KEYRING_1_LIBS@ @WEBKITGTK_LIBS@
+		@GTKSPELL_LIBS@ @ENCHANT_LIBS@ @GTK_LIBS@ @GMIME_LIBS@ @GLIB_LIBS@ @GNUTLS_LIBS@ @LIBNOTIFY_LIBS@ @LIBGNOME_KEYRING_1_LIBS@ @WEBKITGTK_LIBS@ @ZLIB_LIBS@
 if HAVE_WIN32
 pan_LDFLAGS = -mwindows
 endif
diff --git a/pan/gui/gtk-compat.h b/pan/gui/gtk-compat.h
index 074ba47..030ade6 100644
--- a/pan/gui/gtk-compat.h
+++ b/pan/gui/gtk-compat.h
@@ -78,7 +78,10 @@ static inline GdkWindow * gdk_window_get_device_position (GdkWindow *window,
     gdk_window_get_device_position (event->window, event->device, x, y, t);
 #endif
   }
-
+    ret = gtk_separator_new(GTK_ORIENTATION_VERTICAL);
+#endif
+    return ret;
+  }
 
 #if !GTK_CHECK_VERSION(2,18,0)
   static inline void gtk_widget_get_allocation( GtkWidget *w, GtkAllocation *a)
@@ -186,22 +189,15 @@ static inline GdkWindow * gdk_window_get_device_position (GdkWindow *window,
     gtk_widget_modify_font(w,f);
   }
 #endif
-
 #if GTK_CHECK_VERSION(3,0,0)
-// include this for conversion of old key names to new
   #include <gdk/gdkkeysyms-compat.h>
-
   #define GTK_OBJECT(w) w
-  typedef GtkWidget GtkObject;
 #endif
 
   static inline void cursor_unref(GdkCursor *p)
-  {
 #if GTK_CHECK_VERSION(3,0,0)
-    g_object_unref(p);
 #else
     gdk_cursor_unref(p);
-#endif
   }
 
 #ifdef __cplusplus
diff --git a/pan/gui/server-ui.cc b/pan/gui/server-ui.cc
index eaa00d9..1a7096f 100644
--- a/pan/gui/server-ui.cc
+++ b/pan/gui/server-ui.cc
@@ -69,7 +69,10 @@ namespace
     GtkWidget * rank_combo;
     GtkWidget * ssl_combo;
     GtkWidget * always_trust_checkbox;
+    GtkWidget * compression_checkbox;
     ServerEditDialog (Data& d, Queue& q, Prefs& p): data(d), queue(q), prefs(p) {}
+    CompressionType compressiontype;
+
   };
 
   void pan_entry_set_text (GtkWidget * w, const StringView& v)
@@ -103,10 +106,58 @@ namespace
     GtkTreeIter iter;
     if (gtk_combo_box_get_active_iter (w, &iter))
       gtk_tree_model_get (gtk_combo_box_get_model(w), &iter, 1, &ssl, -1);
+    if (ssl==0)
+    {
+      gtk_widget_hide(d->always_trust_checkbox);
+    }
+    else
+    {
+      gtk_widget_show(d->always_trust_checkbox);
+    }
     pan_spin_button_set (d->port_spin, ssl == 0 ? STD_NNTP_PORT : STD_SSL_PORT);
 #endif
   }
 
+  void address_entry_changed_cb   (GtkEditable *editable,
+                                    gpointer     gp)
+  {
+    ServerEditDialog* d (static_cast<ServerEditDialog*>(gp));
+    if (!d) return;
+
+    gchar* text = gtk_editable_get_chars (editable, 0, -1);
+    StringView t(text);
+
+    CompressionType type(HEADER_COMPRESS_NONE);
+
+    // 0 == NONE
+
+    if (t.strstr("astraweb")) // 1
+    {
+      type = HEADER_COMPRESS_XZVER;
+    }
+    if (t.strstr("giganews"))  // 2
+    {
+      type = HEADER_COMPRESS_XFEATURE;
+    }
+    char* others[] = {"newshosting", "easynews","usenetserver" };
+    for (int i= 0; i < G_N_ELEMENTS(others); i++)
+    {
+        if (t.strstr(others[i]))
+        {
+          type = HEADER_COMPRESS_DIABLO; // 3
+          break;
+        }
+    }
+    d->compressiontype = type;
+
+    if (type != HEADER_COMPRESS_NONE)
+      gtk_widget_show(d->compression_checkbox);
+    else
+      gtk_widget_hide(d->compression_checkbox);
+
+    gtk_toggle_button_set_active (GTK_TOGGLE_BUTTON(d->compression_checkbox), type != HEADER_COMPRESS_NONE);
+  }
+
   void
   edit_dialog_populate (Data&, Prefs& prefs, const Quark& server, ServerEditDialog * d)
   {
@@ -117,6 +168,7 @@ namespace
     d->server = server;
 
     int port(STD_NNTP_PORT), max_conn(4), age(31*3), rank(1), ssl(0), trust(0);
+    CompressionType compression(HEADER_COMPRESS_NONE);
     std::string addr, user, cert;
     gchar* pass(NULL);
     if (!server.empty()) {
@@ -128,13 +180,20 @@ namespace
       ssl = d->data.get_server_ssl_support(server);
       cert = d->data.get_server_cert(server);
       d->data.get_server_trust (server, trust);
+      d->data.get_server_compression_type (server, compression);
     }
 
     pan_entry_set_text (d->address_entry, addr);
+    pan_spin_button_set (d->port_spin, port);
     pan_spin_button_set (d->connection_limit_spin, max_conn);
     pan_entry_set_text (d->auth_username_entry, user);
     pan_entry_set_text (d->auth_password_entry, pass);
     d->cert = cert;
+    d->compressiontype = compression;
+    std::cerr<<compression<<"\n";
+
+    gtk_toggle_button_set_active (GTK_TOGGLE_BUTTON(d->compression_checkbox), d->compressiontype != HEADER_COMPRESS_NONE);
+
 
     // set the age combobox
     GtkComboBox * combo (GTK_COMBO_BOX (d->expiration_age_combo));
@@ -222,6 +281,8 @@ namespace
       trust = gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(d->always_trust_checkbox)) ? 1 : 0;
 #endif
 
+      int header_comp = gtk_toggle_button_get_active(GTK_TOGGLE_BUTTON(d->compression_checkbox)) ? 1 : 0;
+
       const char * err_msg (0);
       if (addr.empty())
         err_msg = _("Please specify the server's address.");
@@ -246,7 +307,7 @@ namespace
         d->data.set_server_ssl_support(d->server, ssl);
         d->data.set_server_cert(d->server,cert);
         d->data.set_server_trust(d->server,trust);
-
+        d->data.set_server_compression_type(d->server, header_comp);
         d->data.save_server_info(d->server);
 
         d->queue.upkeep ();
@@ -322,6 +383,7 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
     GtkWidget * w = d->address_entry = gtk_entry_new ();
     gtk_widget_set_tooltip_text( w, _("The news server's actual address, e.g. \"news.mynewsserver.com\""));
     HIG::workarea_add_row (t, &row, _("_Address:"), w, NULL);
+    g_signal_connect (w, "changed", G_CALLBACK(address_entry_changed_cb), d);
 
     GtkAdjustment * a = GTK_ADJUSTMENT (gtk_adjustment_new (1.0, 1.0, ULONG_MAX, 1.0, 1.0, 0.0));
     w = d->port_spin = gtk_spin_button_new (GTK_ADJUSTMENT(a), 1.0, 0u);
@@ -398,6 +460,10 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
     gtk_widget_set_tooltip_text( e, _("Fallback servers are used for articles that can't be found on the primaries.  One common approach is to use free servers as primaries and subscription servers as fallbacks."));
     HIG::workarea_add_row (t, &row, e, w);
 
+    // header compression options
+    d->compression_checkbox = w = gtk_check_button_new_with_label (_("Enable header compression for speedup"));
+    HIG::workarea_add_row (t, &row, e, w);
+
     // ssl 3.0 option
 #ifdef HAVE_GNUTLS
     // select ssl/plaintext
@@ -442,6 +508,13 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
   d->server = server;
   edit_dialog_populate (data, prefs, server, d);
   gtk_widget_show_all (d->dialog);
+
+  // perhaps hide compression checkbox
+  if (d->compressiontype==HEADER_COMPRESS_NONE)
+  {
+    gtk_widget_hide(d->compression_checkbox);
+  }
+
   return d->dialog;
 }
 
diff --git a/pan/tasks/Makefile.am b/pan/tasks/Makefile.am
index f120e77..edf1e1b 100644
--- a/pan/tasks/Makefile.am
+++ b/pan/tasks/Makefile.am
@@ -1,11 +1,12 @@
-AM_CPPFLAGS = -I top_srcdir@ @GMIME_CFLAGS@ @GLIB_CFLAGS@ @GNUTLS_CFLAGS@ @GTK_CFLAGS@
+AM_CPPFLAGS = -I top_srcdir@ @GMIME_CFLAGS@ @GLIB_CFLAGS@ @GNUTLS_CFLAGS@ @GTK_CFLAGS@ @ZLIB_CFLAGS@
 
-AM_LDFLAGS = ../../uulib/libuu.a @GNUTLS_LIBS@ @GTK_LIBS@
+AM_LDFLAGS = ../../uulib/libuu.a @GNUTLS_LIBS@ @ZLIB_LIBS@
 
 noinst_LIBRARIES = libtasks.a
 
 libtasks_a_SOURCES = \
   decoder.cc \
+  xzver-decoder.cc \
   encoder.cc \
   task.cc \
   task-article.cc \
@@ -29,6 +30,7 @@ noinst_HEADERS = \
   adaptable-set.cc \
   adaptable-set.h \
   decoder.h \
+  xzver-decoder.h \
   encoder.h \
   defgroup.h \
   health.h \
diff --git a/pan/tasks/decoder.h b/pan/tasks/decoder.h
index 0814694..46c8a53 100644
--- a/pan/tasks/decoder.h
+++ b/pan/tasks/decoder.h
@@ -95,6 +95,8 @@ namespace pan
       /** tell our task about the decode's progress */
       static gboolean progress_update_timer_func(gpointer decoder);
 
+    protected:
+
       WorkerPool& _worker_pool;
       int _gsourceid;
       void disable_progress_update();
diff --git a/pan/tasks/nntp.cc b/pan/tasks/nntp.cc
index cf426c4..3ed9e16 100644
--- a/pan/tasks/nntp.cc
+++ b/pan/tasks/nntp.cc
@@ -331,6 +331,39 @@ NNTP :: xover (const Quark   & group,
 }
 
 void
+NNTP :: xzver (const Quark   & group,
+               uint64_t        low,
+               uint64_t        high,
+               Listener      * l)
+{
+   _listener = l;
+
+   if (group != _group)
+      _commands.push_back (build_command ("GROUP %s\r\n", group.c_str()));
+
+   _commands.push_back (build_command ("XZVER %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT"\r\n", low, high));
+
+   write_next_command ();
+}
+
+void
+NNTP :: xfeat (const Quark   & group,
+               uint64_t        low,
+               uint64_t        high,
+               Listener      * l)
+{
+   _listener = l;
+
+   if (group != _group)
+      _commands.push_back (build_command ("GROUP %s\r\n", group.c_str()));
+
+   _commands.push_back ("XFEATURE COMPRESS GZIP");
+   _commands.push_back (build_command ("XOVER %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT"\r\n", low, high));
+
+   write_next_command ();
+}
+
+void
 NNTP :: xover_count_only (const Quark   & group,
                           Listener      * l)
 {
diff --git a/pan/tasks/nntp.h b/pan/tasks/nntp.h
index 95b3ea1..c27cd66 100644
--- a/pan/tasks/nntp.h
+++ b/pan/tasks/nntp.h
@@ -197,6 +197,17 @@ namespace pan
                              uint64_t             low,
                              uint64_t             high,
                              Listener           * l);
+
+      void xzver            (const Quark        & group,
+                             uint64_t             low,
+                             uint64_t             high,
+                             Listener           * l);
+
+      void xfeat            (const Quark        & group,
+                             uint64_t             low,
+                             uint64_t             high,
+                             Listener           * l);
+
       /**
        * Executes an XOVER command: "XOVER" to count
        * the xover numbers internally
@@ -307,10 +318,13 @@ namespace pan
 
       const Quark _server;
       Quark _group;
-      Quark _request_group;
       Socket * _socket;
       DownloadMeter& _meter;
       bool _socket_error;
+      const std::string& get_username()
+      {
+        return _username;
+      }
 
     protected:
 
diff --git a/pan/tasks/queue.cc b/pan/tasks/queue.cc
index a9a59c8..3f02c24 100644
--- a/pan/tasks/queue.cc
+++ b/pan/tasks/queue.cc
@@ -49,6 +49,7 @@ Queue :: Queue (ServerInfo         & server_info,
   _socket_creator (socket_creator),
   _worker_pool (pool),
   _decoder (pool),
+  _xzver_decoder (pool),
   _encoder (pool),
   _decoder_task (0),
   _encoder_task (0),
@@ -236,6 +237,16 @@ Queue :: give_task_a_decoder (Task * task)
 }
 
 void
+Queue :: give_task_a_xzver_decoder (Task * task)
+{
+  const bool was_active (task_is_active (task));
+  _decoder_task = task;
+  if (!was_active)
+    fire_task_active_changed (task, true);
+  task->give_decoder (this, static_cast<Decoder*>(&_xzver_decoder)); // it's active now...
+}
+
+void
 Queue :: give_task_a_encoder (Task * task)
 {
   const bool was_active (task_is_active (task));
@@ -333,10 +344,14 @@ Queue :: process_task (Task * task)
   }
   else if (state._work == Task::NEED_DECODER)
   {
-
     if (!_decoder_task)
       give_task_a_decoder (task);
   }
+  else if (state._work == Task::NEED_XZVER_DECODER)
+  {
+    if (!_decoder_task)
+      give_task_a_xzver_decoder (task);
+  }
   else if (state._work == Task::NEED_ENCODER)
   {
     if (!_encoder_task)
@@ -391,6 +406,20 @@ Queue :: find_first_task_needing_decoder ()
 }
 
 Task*
+Queue :: find_first_task_needing_xzver_decoder ()
+{
+  foreach (TaskSet, _tasks, it) {
+    const Task::State& state ((*it)->get_state ());
+    if  ((state._work == Task::NEED_XZVER_DECODER)
+      && (!_stopped.count (*it))
+      && (!_removing.count (*it)))
+      return *it;
+  }
+
+  return 0;
+}
+
+Task*
 Queue :: find_first_task_needing_encoder ()
 {
   foreach (TaskSet, _tasks, it) {
diff --git a/pan/tasks/queue.h b/pan/tasks/queue.h
index de3be6b..9e23824 100644
--- a/pan/tasks/queue.h
+++ b/pan/tasks/queue.h
@@ -26,6 +26,7 @@
 #include <pan/general/macros.h> // for UNUSED
 #include <pan/general/map-vector.h>
 #include <pan/tasks/decoder.h>
+#include <pan/tasks/xzver-decoder.h>
 #include <pan/tasks/encoder.h>
 #include <pan/general/quark.h>
 #include <pan/tasks/nntp-pool.h>
@@ -49,6 +50,7 @@ namespace pan
   class WorkerPool;
   struct Encoder;
   struct Decoder;
+  struct XZVERDecoder;
 
   /**
    * A Queue helper that saves tasks to disk and restores them from disk.
@@ -190,12 +192,14 @@ namespace pan
     protected:
       void process_task (Task *);
       void give_task_a_decoder (Task*);
+      void give_task_a_xzver_decoder (Task*);
       void give_task_a_encoder (Task*);
       void give_task_a_connection (Task*, NNTP*);
       ServerInfo& _server_info;
       bool _is_online;
       Task* find_first_task_needing_server (const Quark& server);
       Task* find_first_task_needing_decoder ();
+      Task* find_first_task_needing_xzver_decoder ();
       Task* find_first_task_needing_encoder ();
 
       void give_task_an_upload_slot (TaskUpload* task);
@@ -214,6 +218,7 @@ namespace pan
       SocketCreator * _socket_creator;
       WorkerPool & _worker_pool;
       Decoder _decoder;
+      XZVERDecoder _xzver_decoder;
       Encoder _encoder;
       Task * _decoder_task;
       Task * _encoder_task;
diff --git a/pan/tasks/socket.cc b/pan/tasks/socket.cc
index 2cd12db..3b011ac 100644
--- a/pan/tasks/socket.cc
+++ b/pan/tasks/socket.cc
@@ -34,7 +34,8 @@ Socket :: Socket ():
   _bytes_since_last_check (0),
   _time_of_last_check (time(0)),
   _speed_KiBps (0.0),
-  _abort_flag (false)
+  _abort_flag (false),
+  _stream (new std::stringstream())
 {
 }
 
diff --git a/pan/tasks/socket.h b/pan/tasks/socket.h
index af26a62..91c8160 100644
--- a/pan/tasks/socket.h
+++ b/pan/tasks/socket.h
@@ -31,6 +31,8 @@ extern "C" {
   #include <gnutls/gnutls.h>
 #endif
 
+#include <sstream>
+
 namespace pan
 {
   class StringView;
@@ -61,6 +63,10 @@ namespace pan
         virtual void on_socket_bytes_transferred (uint64_t bytes, Socket*) = 0;
       };
 
+      void write(const std::string& str) { *_stream << str; }
+      void clear() { (*_stream).clear(); }
+      std::stringstream*& get_stream() { return _stream; }
+
     public:
       virtual bool open (const StringView& address, int port, std::string& setme_err) = 0;
       virtual void write_command (const StringView& chars, Listener *) = 0;
@@ -79,6 +85,7 @@ namespace pan
       mutable time_t _time_of_last_check;
       mutable double _speed_KiBps;
       bool _abort_flag;
+      std::stringstream* _stream;
 
     public:
 
diff --git a/pan/tasks/task-xover.cc b/pan/tasks/task-xover.cc
index 483bf38..93d88cb 100644
--- a/pan/tasks/task-xover.cc
+++ b/pan/tasks/task-xover.cc
@@ -32,6 +32,7 @@ extern "C" {
 
 #include <fstream>
 #include <iostream>
+#include <pan/general/log.h>
 #include <pan/general/debug.h>
 #include <pan/general/file-util.h>
 #include <pan/general/macros.h>
@@ -40,12 +41,13 @@ extern "C" {
 #include <pan/data/data.h>
 #include "nntp.h"
 #include "task-xover.h"
-
+#include "xzver-decoder.h"
 
 using namespace pan;
 
 namespace
 {
+
    std::string
    get_short_name (const StringView& in)
    {
@@ -87,6 +89,16 @@ namespace
   }
 }
 
+namespace
+{
+  char* build_cachename (char* buf, size_t len, const char* name)
+  {
+    const char * home(file::get_pan_home().c_str());
+    g_snprintf(buf,len,"%s%c%s%c%s",home, G_DIR_SEPARATOR, "encode-cache", G_DIR_SEPARATOR, name);
+    return buf;
+  }
+}
+
 TaskXOver :: TaskXOver (Data         & data,
                         const Quark  & group,
                         Mode           mode,
@@ -102,9 +114,14 @@ TaskXOver :: TaskXOver (Data         & data,
   _bytes_so_far (0),
   _parts_so_far (0ul),
   _articles_so_far (0ul),
-  _total_minitasks (0)
+  _total_minitasks (0),
+  _running_minitasks (0),
+  _decoder(0),
+  _decoder_has_run (false)
 {
 
+
+
   debug ("ctor for " << group);
 
   // add a ``GROUP'' MiniTask for each server that has this group
@@ -126,6 +143,12 @@ TaskXOver :: TaskXOver (Data         & data,
   update_work ();
 }
 
+void
+TaskXOver :: setHigh(const Quark& server, uint64_t& h)
+{
+  _high[server] = h;
+}
+
 TaskXOver :: ~TaskXOver ()
 {
   if (_group_xover_is_reffed) {
@@ -133,6 +156,9 @@ TaskXOver :: ~TaskXOver ()
       _data.set_xover_high (_group, it->first, it->second);
     _data.xover_unref (_group);
   }
+  if (_decoder)
+    _decoder->cancel_silently();
+
   _data.fire_group_entered(_group, 1, 0);
 }
 
@@ -141,6 +167,10 @@ TaskXOver :: use_nntp (NNTP* nntp)
 {
   const Quark& server (nntp->_server);
   debug ("got an nntp from " << nntp->_server);
+  CompressionType type;
+  _data.get_server_compression_type(nntp->_server, type);
+
+  _compression_enabled = type != HEADER_COMPRESS_NONE;
 
   // if this is the first nntp we've gotten, ref the xover data
   if (!_group_xover_is_reffed) {
@@ -151,7 +181,7 @@ TaskXOver :: use_nntp (NNTP* nntp)
   MiniTasks_t& minitasks (_server_to_minitasks[server]);
   if (minitasks.empty())
   {
-    debug ("That's interesting, I got a socket for " << server << " but have no use for it!");
+    debug ("That's interesting, I got a socket for " << server << " but _have no use for it!");
     _state._servers.erase (server);
     check_in (nntp, OK);
   }
@@ -168,7 +198,18 @@ TaskXOver :: use_nntp (NNTP* nntp)
       case MiniTask::XOVER:
         debug ("XOVER " << mt._low << '-' << mt._high << " to " << server);
         _last_xover_number[nntp] = mt._low;
-        nntp->xover (_group, mt._low, mt._high, this);
+        if (_compression_enabled)
+        {
+// TODO support XFEATURE!
+//          if (type == XZVER)
+            nntp->xzver (_group, mt._low, mt._high, this);
+//          if (type == XFEATURE)
+//            nntp->xfeat (_group, mt._low, mt._high, this);
+          // TODO diablo
+        }
+        else
+          nntp->xover (_group, mt._low, mt._high, this);
+        --_running_minitasks;
         break;
       default:
         assert (0);
@@ -181,7 +222,6 @@ TaskXOver :: use_nntp (NNTP* nntp)
 ****
 ***/
 
-///TODO show low and high in UI (is this already there?)
 void
 TaskXOver :: on_nntp_group (NNTP          * nntp,
                             const Quark   & group,
@@ -221,15 +261,32 @@ TaskXOver :: on_nntp_group (NNTP          * nntp,
 
   if (l <= high)
   {
-    //std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
+//    std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
+
     add_steps (h-l);
-    const int INCREMENT(1000);
+
     MiniTasks_t& minitasks (_server_to_minitasks[servername]);
-    for (uint64_t m=l; m<=h; m+=INCREMENT) {
-      MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
-      debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
-      minitasks.push_front (mt);
-      ++_total_minitasks;
+    if (_compression_enabled)
+    {
+      const int INCREMENT(100000);
+      MiniTasks_t& minitasks (_server_to_minitasks[servername]);
+      for (uint64_t m=l; m<=h; m+=INCREMENT) {
+        MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
+        debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
+        minitasks.push_front (mt);
+        ++_total_minitasks;
+      }
+    }
+    else
+    {
+      const int INCREMENT(1000);
+      MiniTasks_t& minitasks (_server_to_minitasks[servername]);
+      for (uint64_t m=l; m<=h; m+=INCREMENT) {
+        MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
+        debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
+        minitasks.push_front (mt);
+        ++_total_minitasks;
+      }
     }
   }
   else
@@ -237,6 +294,7 @@ TaskXOver :: on_nntp_group (NNTP          * nntp,
     //std::cerr << LINE_ID << " nothing new here..." << std::endl;
     _high[nntp->_server] = high;
   }
+  _running_minitasks = _total_minitasks;
 }
 
 namespace
@@ -298,6 +356,31 @@ void
 TaskXOver :: on_nntp_line         (NNTP               * nntp,
                                    const StringView   & line)
 {
+    if (_compression_enabled) {
+
+      std::string l(line.str);
+
+      if (line.strstr("=ybegin line=128"))
+      {
+        l += " name=xzver_decoded\n";
+        nntp->_socket->write(l);
+      }
+      else
+      {
+        l += "\n";
+        nntp->_socket->write(l);
+      }
+    }
+    else
+    {
+      on_nntp_line_process (nntp, line);
+    }
+}
+
+void
+TaskXOver :: on_nntp_line_process (NNTP               * nntp,
+                                   const StringView   & line)
+{
 
   pan_return_if_fail (nntp != 0);
   pan_return_if_fail (!nntp->_server.empty());
@@ -319,6 +402,7 @@ TaskXOver :: on_nntp_line         (NNTP               * nntp,
 
   //handle multiple "References:"-message-ids correctly. (hack for some faulty servers)
   ok = ok && l.pop_token (tmp, '\t');
+  ref += tmp;
   do
   {
     // usenetbucket uses a (null) (sic!) value for an empty reference list. hence the following hack
@@ -359,9 +443,6 @@ TaskXOver :: on_nntp_line         (NNTP               * nntp,
                                   nntp->_group.c_str(),
                                   number);
 
-  uint64_t& h (_high[nntp->_server]);
-  h = std::max (h, number);
-
   const char * fallback_charset = NULL; // FIXME
 
   // are we done?
@@ -382,8 +463,11 @@ TaskXOver :: on_nntp_line         (NNTP               * nntp,
   if (article)
     ++_articles_so_far;
 
-  // emit a status update
+//   emit a status update
   uint64_t& prev = _last_xover_number[nntp];
+  uint64_t& h (_high[nntp->_server]);
+  h = std::max (h, number);
+
   increment_step (number - prev);
   prev = number;
   if (!(_parts_so_far % 500))
@@ -398,6 +482,15 @@ TaskXOver :: on_nntp_done (NNTP              * nntp,
                            Health              health,
                            const StringView  & response UNUSED)
 {
+  if (_compression_enabled)
+  {
+    DataStream stream;
+    stream.stream = nntp->_socket->get_stream();
+    stream.group = nntp->_group;
+    stream.server = nntp->_server;
+    _data_streams.push_back(stream);
+  }
+
   update_work (true);
   check_in (nntp, health);
 }
@@ -405,6 +498,7 @@ TaskXOver :: on_nntp_done (NNTP              * nntp,
 void
 TaskXOver :: update_work (bool subtract_one_from_nntp_count)
 {
+
   int nntp_count (get_nntp_count ());
   if (subtract_one_from_nntp_count)
     --nntp_count;
@@ -415,16 +509,50 @@ TaskXOver :: update_work (bool subtract_one_from_nntp_count)
     if (!it->second.empty())
       servers.insert (it->first);
 
-  //std::cerr << LINE_ID << " servers: " << servers.size() << " nntp: " << nntp_count << std::endl;
-
   if (!servers.empty())
+  {
     _state.set_need_nntp (servers);
+  }
   else if (nntp_count)
+  {
     _state.set_working ();
-  else {
-    _state.set_completed();
-    set_finished(OK);
   }
+  else if (_data_streams.size() != 0 || (!_decoder && !_decoder_has_run)) {
+    _state.set_need_xzverdecoder ();
+  } else if (_decoder_has_run) {
+    _state.set_completed();
+    set_finished (OK);
+  } else if (!_compression_enabled)
+  {
+    _state.set_completed();
+    set_finished (OK);
+  } else assert(0 && "hm, missed a state.");
+}
+
+void
+TaskXOver:: use_decoder (Decoder* decoder)
+{
+  if (_state._work != NEED_XZVER_DECODER)
+    check_in (decoder);
+
+  _decoder = static_cast<XZVERDecoder*>(decoder);
+  _state.set_working();
+
+  DataStream* stream = new DataStream();
+  DataStream& ref(_data_streams.back());
+  stream->stream = ref.stream;
+  stream->group = ref.group;
+  stream->server = ref.server;
+  _data_streams.pop_back();
+  _decoder->enqueue (this, stream, &_data);
+  debug ("decoder thread was free, enqueued work");
+}
+
+void
+TaskXOver :: stop ()
+{
+  if (_decoder)
+      _decoder->cancel();
 }
 
 unsigned long
@@ -440,3 +568,50 @@ TaskXOver :: get_bytes_remaining () const
   const unsigned long total_bytes = (unsigned long)(_bytes_so_far / percent_done);
   return total_bytes - _bytes_so_far;
 }
+
+
+void
+TaskXOver :: on_worker_done (bool cancelled)
+{
+  assert(_decoder);
+  if (!_decoder) return;
+
+  if (!cancelled)
+  {
+    // the decoder is done... catch up on all housekeeping
+    // now that we're back in the main thread.
+
+    foreach_const(Decoder::log_t, _decoder->log_severe, it)
+    {
+      Log :: add_err(it->c_str());
+      verbose (it->c_str());
+    }
+    foreach_const(Decoder::log_t, _decoder->log_errors, it)
+    {
+      Log :: add_err(it->c_str());
+      verbose (it->c_str());
+    }
+    foreach_const(Decoder::log_t, _decoder->log_infos, it)
+    {
+      Log :: add_info(it->c_str());
+      verbose (it->c_str());
+    }
+
+    if (!_decoder->log_errors.empty())
+      set_error (_decoder->log_errors.front());
+
+    _state.set_health(_decoder->health);
+
+    if (!_decoder->log_severe.empty())
+      _state.set_health (ERR_LOCAL);
+    else {
+      _state.set_completed();
+      _decoder_has_run = true;
+    }
+  }
+
+  Decoder * d (_decoder);
+  _decoder = 0;
+  update_work ();
+  check_in (d);
+}
diff --git a/pan/tasks/task-xover.h b/pan/tasks/task-xover.h
index d1784de..13d4b97 100644
--- a/pan/tasks/task-xover.h
+++ b/pan/tasks/task-xover.h
@@ -23,20 +23,28 @@
 #include <map>
 #include <vector>
 #include <sstream>
+#include <zlib.h>
 
 #include <pan/data/data.h>
 #include <pan/tasks/task.h>
 #include <pan/tasks/nntp.h>
+#include <pan/general/worker-pool.h>
 #include <fstream>
 #include <iostream>
 
 namespace pan
 {
+
+  struct XZVERDecoder;
+
   /**
    * Task for downloading a some or all of a newsgroups' headers
    * @ingroup tasks
    */
-  class TaskXOver: public Task, private NNTP::Listener
+  class TaskXOver: public Task,
+                    private WorkerPool::Worker::Listener,
+                    private NNTP::Listener
+
   {
     public: // life cycle
       enum Mode { ALL, NEW, SAMPLE, DAYS };
@@ -45,15 +53,21 @@ namespace pan
 
     public: // task subclass
       virtual unsigned long get_bytes_remaining () const;
+      virtual void use_decoder (Decoder*);
+      void stop ();
 
     protected: // task subclass
       virtual void use_nntp (NNTP * nntp);
 
     private: // NNTP::Listener
+      void on_nntp_line_process (NNTP*, const StringView&);
       virtual void on_nntp_line (NNTP*, const StringView&);
       virtual void on_nntp_done (NNTP*, Health, const StringView&);
       virtual void on_nntp_group (NNTP*, const Quark&, unsigned long, uint64_t, uint64_t);
 
+    private: // WorkerPool::Listener interface
+      void on_worker_done (bool cancelled);
+
     private: // implementation - minitasks
       struct MiniTask {
         enum Type { GROUP, XOVER };
@@ -67,6 +81,7 @@ namespace pan
       server_to_minitasks_t _server_to_minitasks;
 
     private: // implementation
+      void process_headers (NNTP*);
       Data& _data;
       const Quark _group;
       std::string _short_group_name;
@@ -82,8 +97,27 @@ namespace pan
       unsigned long _bytes_so_far;
       unsigned long _parts_so_far;
       unsigned long _articles_so_far;
+      unsigned long _lines_so_far;
       unsigned long _total_minitasks;
+      unsigned long _running_minitasks;
+      bool _compression_enabled;
+      CompressionType _compressiontype;
+
+      struct DataStream
+      {
+        std::stringstream* stream;
+        Quark group;
+        Quark server;
+      };
+
+      std::vector<DataStream> _data_streams;
+
+      friend class XZVERDecoder;
+      XZVERDecoder * _decoder;
+      bool _decoder_has_run;
 
+    public:
+      void setHigh(const Quark& server, uint64_t& h);
   };
 }
 
diff --git a/pan/tasks/task.h b/pan/tasks/task.h
index 9fa2968..0618247 100644
--- a/pan/tasks/task.h
+++ b/pan/tasks/task.h
@@ -50,16 +50,17 @@ namespace pan
          enum Work
          {
             /** Task finished successfully */
-            COMPLETED,
+            COMPLETED = 0,
             /** Task is waiting on an nntp connection */
-            NEED_NNTP,
+            NEED_NNTP = 1,
             /** Task waiting for a decoder/encoder */
-            NEED_DECODER,
-            NEED_ENCODER,
+            NEED_DECODER = 2,
+            NEED_ENCODER = 3,
+            NEED_XZVER_DECODER = 4,
             /** Task is running */
-            WORKING,
+            WORKING = 5,
             /** Task is paused, woken up if 'current_connections < max_connections' */
-            PAUSED
+            PAUSED = 6
          };
 
          /**
@@ -99,6 +100,9 @@ namespace pan
                void set_need_decoder () {
                    _work = NEED_DECODER; _servers.clear(); }
 
+               void set_need_xzverdecoder () {
+                   _work = NEED_XZVER_DECODER; _servers.clear(); }
+
                void set_need_encoder () {
                    _work = NEED_ENCODER; _servers.clear(); }
 



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]