[pan2: 7/8] Added xzver functionality



commit 00fe4a3d5f32a1ab387c378ab75629bfcf57b199
Author: Heinrich MÃller <henmull src gnome org>
Date:   Wed Oct 3 12:50:25 2012 +0200

    Added xzver functionality

 pan.cbp                          |    2 +
 pan/data-impl/data-impl.cc       |    1 +
 pan/data-impl/server.cc          |   26 +-
 pan/general/locking.h            |    8 +-
 pan/gui/server-ui.cc             |    7 +-
 pan/tasks/Makefile.am            |    2 -
 pan/tasks/nntp-pool.cc           |    3 +-
 pan/tasks/nntp.cc                |    6 +-
 pan/tasks/queue.cc               |   30 -
 pan/tasks/queue.h                |    5 -
 pan/tasks/socket-impl-gio.cc     |   10 +-
 pan/tasks/socket-impl-main.cc    |   22 +-
 pan/tasks/socket-impl-main.h     |   12 +-
 pan/tasks/socket-impl-openssl.cc |    9 +-
 pan/tasks/socket-impl-openssl.h  |    3 +
 pan/tasks/socket.cc              |    5 +-
 pan/tasks/socket.h               |   18 +-
 pan/tasks/task-xover.cc          | 1106 +++++++++++++++++++-------------------
 pan/tasks/task-xover.h           |   37 +--
 pan/tasks/task.h                 |    4 -
 pan/usenet-utils/Makefile.am     |   12 +-
 21 files changed, 655 insertions(+), 673 deletions(-)
---
diff --git a/pan.cbp b/pan.cbp
index 6c7bda6..ceff734 100644
--- a/pan.cbp
+++ b/pan.cbp
@@ -313,6 +313,8 @@
 		<Unit filename="pan/usenet-utils/url-find-test.cc" />
 		<Unit filename="pan/usenet-utils/url-find.cc" />
 		<Unit filename="pan/usenet-utils/url-find.h" />
+		<Unit filename="pan/usenet-utils/xzver.cc" />
+		<Unit filename="pan/usenet-utils/xzver.h" />
 		<Unit filename="uulib/crc32.c">
 			<Option compilerVar="CC" />
 		</Unit>
diff --git a/pan/data-impl/data-impl.cc b/pan/data-impl/data-impl.cc
index c4095d4..010c7eb 100644
--- a/pan/data-impl/data-impl.cc
+++ b/pan/data-impl/data-impl.cc
@@ -71,6 +71,7 @@ DataImpl :: DataImpl (const StringView& cache_ext, Prefs& prefs, bool unit_test,
   ProfilesImpl (*io),
   DownloadMeterImpl(prefs, *this),
   _cache (get_cache_path(), cache_ext, cache_megs),
+  _prefs(prefs),
   _encode_cache (get_encode_cache_path(), cache_megs),
   _certstore(*this),
   _unit_test (unit_test),
diff --git a/pan/data-impl/server.cc b/pan/data-impl/server.cc
index 6553b1b..b5163dc 100644
--- a/pan/data-impl/server.cc
+++ b/pan/data-impl/server.cc
@@ -294,14 +294,36 @@ DataImpl :: get_server_trust (const Quark   & server, int& setme) const
   return found;
 }
 
+namespace
+{
+  CompressionType get_compression_type(int val)
+  {
+    CompressionType ret = HEADER_COMPRESS_NONE;
+    switch (val)
+    {
+      case 1:
+        ret = HEADER_COMPRESS_XZVER;
+        break;
+
+      case 2:
+        ret = HEADER_COMPRESS_XFEATURE;
+        break;
+
+      case 3:
+        ret = HEADER_COMPRESS_DIABLO;
+        break;
+    }
+    return ret;
+  }
+}
+
 bool
 DataImpl :: get_server_compression_type (const Quark   & server, CompressionType& setme) const
 {
   const Server * s (find_server (server));
   const bool found (s);
-  if (found) {
+  if (found)
     setme = get_compression_type(s->compression_type);
-  }
 
   return found;
 }
diff --git a/pan/general/locking.h b/pan/general/locking.h
index fa55854..c41f9df 100644
--- a/pan/general/locking.h
+++ b/pan/general/locking.h
@@ -36,7 +36,7 @@ namespace pan
   class Mutex
   {
     private:
-      static GMutex mutex;
+      GMutex mutex;
       GMutex * m;
 
     public:
@@ -44,7 +44,7 @@ namespace pan
       /** Create a new mutex */
       Mutex()
       {
-#if !GLIB_CHECK_VERSION(3,32,0)
+#if !GLIB_CHECK_VERSION(2,34,1)
         m = g_mutex_new();
 #else
         g_mutex_init(&mutex);
@@ -53,9 +53,9 @@ namespace pan
       }
 
       /** Destroy the mutex */
-      ~Mutex()
+      virtual ~Mutex()
       {
-#if !GLIB_CHECK_VERSION(3,32,0)
+#if !GLIB_CHECK_VERSION(2,34,1)
         g_mutex_free(m);
 #endif
       }
diff --git a/pan/gui/server-ui.cc b/pan/gui/server-ui.cc
index 1a7096f..1696c98 100644
--- a/pan/gui/server-ui.cc
+++ b/pan/gui/server-ui.cc
@@ -139,7 +139,7 @@ namespace
     {
       type = HEADER_COMPRESS_XFEATURE;
     }
-    char* others[] = {"newshosting", "easynews","usenetserver" };
+    static char* others[] = {const_cast<char*>("newshosting"), const_cast<char*>("easynews"), const_cast<char*>("usenetserver") };
     for (int i= 0; i < G_N_ELEMENTS(others); i++)
     {
         if (t.strstr(others[i]))
@@ -190,7 +190,6 @@ namespace
     pan_entry_set_text (d->auth_password_entry, pass);
     d->cert = cert;
     d->compressiontype = compression;
-    std::cerr<<compression<<"\n";
 
     gtk_toggle_button_set_active (GTK_TOGGLE_BUTTON(d->compression_checkbox), d->compressiontype != HEADER_COMPRESS_NONE);
 
@@ -485,7 +484,6 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
     }
 
     d->ssl_combo = w = gtk_combo_box_new_with_model (GTK_TREE_MODEL(store));
-    g_signal_connect(w, "changed", G_CALLBACK(ssl_changed_cb), d);
     g_object_unref (G_OBJECT(store));
     gtk_cell_layout_pack_start (GTK_CELL_LAYOUT (w), renderer, true);
     gtk_cell_layout_set_attributes (GTK_CELL_LAYOUT (w), renderer, "text", 0, NULL);
@@ -515,6 +513,9 @@ pan :: server_edit_dialog_new (Data& data, Queue& queue, Prefs& prefs, GtkWindow
     gtk_widget_hide(d->compression_checkbox);
   }
 
+  // avoid NPE on early init
+  g_signal_connect(d->ssl_combo, "changed", G_CALLBACK(ssl_changed_cb), d);
+
   return d->dialog;
 }
 
diff --git a/pan/tasks/Makefile.am b/pan/tasks/Makefile.am
index edf1e1b..f7220b3 100644
--- a/pan/tasks/Makefile.am
+++ b/pan/tasks/Makefile.am
@@ -6,7 +6,6 @@ noinst_LIBRARIES = libtasks.a
 
 libtasks_a_SOURCES = \
   decoder.cc \
-  xzver-decoder.cc \
   encoder.cc \
   task.cc \
   task-article.cc \
@@ -30,7 +29,6 @@ noinst_HEADERS = \
   adaptable-set.cc \
   adaptable-set.h \
   decoder.h \
-  xzver-decoder.h \
   encoder.h \
   defgroup.h \
   health.h \
diff --git a/pan/tasks/nntp-pool.cc b/pan/tasks/nntp-pool.cc
index 4b06e4c..a59bfd1 100644
--- a/pan/tasks/nntp-pool.cc
+++ b/pan/tasks/nntp-pool.cc
@@ -302,8 +302,7 @@ NNTP_Pool :: request_nntp (WorkerPool& threadpool)
     if (!_certstore.in_blacklist(_server))
     {
       ++_pending_connections;
-      const bool ssl(_server_info.get_server_ssl_support(_server));
-      _socket_creator->create_socket (_server_info, address, port, threadpool, this, ssl);
+      _socket_creator->create_socket (_server_info, _server, address, port, threadpool, this);
     }
   }
 }
diff --git a/pan/tasks/nntp.cc b/pan/tasks/nntp.cc
index 3ed9e16..d78ae0d 100644
--- a/pan/tasks/nntp.cc
+++ b/pan/tasks/nntp.cc
@@ -354,15 +354,15 @@ NNTP :: xfeat (const Quark   & group,
 {
    _listener = l;
 
-   if (group != _group)
-      _commands.push_back (build_command ("GROUP %s\r\n", group.c_str()));
-
    _commands.push_back ("XFEATURE COMPRESS GZIP");
+   if (group != _group)
+         _commands.push_back (build_command ("GROUP %s\r\n", group.c_str()));
    _commands.push_back (build_command ("XOVER %"G_GUINT64_FORMAT"-%"G_GUINT64_FORMAT"\r\n", low, high));
 
    write_next_command ();
 }
 
+//TODO
 void
 NNTP :: xover_count_only (const Quark   & group,
                           Listener      * l)
diff --git a/pan/tasks/queue.cc b/pan/tasks/queue.cc
index 3f02c24..40befb8 100644
--- a/pan/tasks/queue.cc
+++ b/pan/tasks/queue.cc
@@ -49,7 +49,6 @@ Queue :: Queue (ServerInfo         & server_info,
   _socket_creator (socket_creator),
   _worker_pool (pool),
   _decoder (pool),
-  _xzver_decoder (pool),
   _encoder (pool),
   _decoder_task (0),
   _encoder_task (0),
@@ -237,16 +236,6 @@ Queue :: give_task_a_decoder (Task * task)
 }
 
 void
-Queue :: give_task_a_xzver_decoder (Task * task)
-{
-  const bool was_active (task_is_active (task));
-  _decoder_task = task;
-  if (!was_active)
-    fire_task_active_changed (task, true);
-  task->give_decoder (this, static_cast<Decoder*>(&_xzver_decoder)); // it's active now...
-}
-
-void
 Queue :: give_task_a_encoder (Task * task)
 {
   const bool was_active (task_is_active (task));
@@ -347,11 +336,6 @@ Queue :: process_task (Task * task)
     if (!_decoder_task)
       give_task_a_decoder (task);
   }
-  else if (state._work == Task::NEED_XZVER_DECODER)
-  {
-    if (!_decoder_task)
-      give_task_a_xzver_decoder (task);
-  }
   else if (state._work == Task::NEED_ENCODER)
   {
     if (!_encoder_task)
@@ -406,20 +390,6 @@ Queue :: find_first_task_needing_decoder ()
 }
 
 Task*
-Queue :: find_first_task_needing_xzver_decoder ()
-{
-  foreach (TaskSet, _tasks, it) {
-    const Task::State& state ((*it)->get_state ());
-    if  ((state._work == Task::NEED_XZVER_DECODER)
-      && (!_stopped.count (*it))
-      && (!_removing.count (*it)))
-      return *it;
-  }
-
-  return 0;
-}
-
-Task*
 Queue :: find_first_task_needing_encoder ()
 {
   foreach (TaskSet, _tasks, it) {
diff --git a/pan/tasks/queue.h b/pan/tasks/queue.h
index 9e23824..de3be6b 100644
--- a/pan/tasks/queue.h
+++ b/pan/tasks/queue.h
@@ -26,7 +26,6 @@
 #include <pan/general/macros.h> // for UNUSED
 #include <pan/general/map-vector.h>
 #include <pan/tasks/decoder.h>
-#include <pan/tasks/xzver-decoder.h>
 #include <pan/tasks/encoder.h>
 #include <pan/general/quark.h>
 #include <pan/tasks/nntp-pool.h>
@@ -50,7 +49,6 @@ namespace pan
   class WorkerPool;
   struct Encoder;
   struct Decoder;
-  struct XZVERDecoder;
 
   /**
    * A Queue helper that saves tasks to disk and restores them from disk.
@@ -192,14 +190,12 @@ namespace pan
     protected:
       void process_task (Task *);
       void give_task_a_decoder (Task*);
-      void give_task_a_xzver_decoder (Task*);
       void give_task_a_encoder (Task*);
       void give_task_a_connection (Task*, NNTP*);
       ServerInfo& _server_info;
       bool _is_online;
       Task* find_first_task_needing_server (const Quark& server);
       Task* find_first_task_needing_decoder ();
-      Task* find_first_task_needing_xzver_decoder ();
       Task* find_first_task_needing_encoder ();
 
       void give_task_an_upload_slot (TaskUpload* task);
@@ -218,7 +214,6 @@ namespace pan
       SocketCreator * _socket_creator;
       WorkerPool & _worker_pool;
       Decoder _decoder;
-      XZVERDecoder _xzver_decoder;
       Encoder _encoder;
       Task * _decoder_task;
       Task * _encoder_task;
diff --git a/pan/tasks/socket-impl-gio.cc b/pan/tasks/socket-impl-gio.cc
index 69e7456..6b303c1 100644
--- a/pan/tasks/socket-impl-gio.cc
+++ b/pan/tasks/socket-impl-gio.cc
@@ -38,6 +38,7 @@ extern "C" {
 #include <pan/general/log.h>
 #include <pan/general/macros.h>
 #include <pan/general/worker-pool.h>
+#include <pan/usenet-utils/xzver.h>
 
 #ifdef G_OS_WIN32
   // this #define is necessary for mingw
@@ -91,6 +92,7 @@ extern "C" {
 #include <pan/usenet-utils/gnksa.h>
 #include "socket-impl-gio.h"
 #include "socket-impl-main.h"
+#include "xzver_filter.h"
 
 using namespace pan;
 
@@ -286,6 +288,11 @@ GIOChannelSocket :: open (const StringView& address, int port, std::string& setm
 {
   _host.assign (address.str, address.len);
   _channel = create_channel (address, port, setme_err);
+#ifdef G_OS_WIN32
+  _id = g_io_channel_win32_get_fd(_channel);
+#else
+   _id = g_io_channel_unix_get_fd(_channel);
+#endif // G_OS_WIN32
   return _channel != 0;
 }
 
@@ -329,12 +336,13 @@ GIOChannelSocket :: do_read ()
     if (status == G_IO_STATUS_NORMAL)
     {
       g_string_prepend_len (g, _partial_read.c_str(), _partial_read.size());
-      _partial_read.clear ();
 
       debug_v ("read [" << g->str << "]"); // verbose debug, if --debug --debug was on the command-line
       increment_xfer_byte_count (g->len);
+
       if (g_str_has_suffix (g->str, "\r\n"))
         g_string_truncate (g, g->len-2);
+
       more = _listener->on_socket_response (this, StringView (g->str, g->len));
       _listener->on_socket_bytes_transferred(g->len, this);
     }
diff --git a/pan/tasks/socket-impl-main.cc b/pan/tasks/socket-impl-main.cc
index d91eff2..084529d 100644
--- a/pan/tasks/socket-impl-main.cc
+++ b/pan/tasks/socket-impl-main.cc
@@ -5,7 +5,7 @@
  * Copyright (C) 2002-2006  Charles Kerr <charles rebelbase com>
  *
  * This file
- * Copyright (C) 2011 Heinrich Mü<henmull src gnome org>
+ * Copyright (C) 2011 Heinrich Mïller <henmull src gnome org>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -71,7 +71,6 @@ namespace
                         public WorkerPool::Worker::Listener
   {
 
-
     ServerInfo& data;
     std::string err;
     const Quark server;
@@ -81,15 +80,11 @@ namespace
     bool ok;
     Socket * socket;
     bool use_ssl;
-#ifdef HAVE_GNUTLS
     CertStore& store;
+
     ThreadWorker (ServerInfo& d, const Quark& s, const StringView& h, int p, Socket::Creator::Listener *l,
                   bool ssl, CertStore& cs):
       data(d), server(s), host(h), port(p), listener(l), ok(false), socket(0), use_ssl(ssl), store(cs) {}
-#else
-    ThreadWorker (ServerInfo& d, const Quark& s, const StringView& h, int p, Socket::Creator::Listener *l):
-      data(d), server(s), host(h), port(p), listener(l), ok(false), socket(0), use_ssl(false) {}
-#endif
 
     void do_work ()
     {
@@ -136,21 +131,18 @@ SocketCreator :: ~SocketCreator()
 
 void
 SocketCreator :: create_socket (ServerInfo& info,
+                                const Quark& server,
                                 const StringView & host,
                                 int                port,
                                 WorkerPool       & threadpool,
-                                Socket::Creator::Listener * listener,
-                                bool               use_ssl)
+                                Socket::Creator::Listener * listener)
 {
-    Quark server;
-    data.find_server_by_hn(host.to_string(), server);
+
+    const bool use_ssl (info.get_server_ssl_support(server));
+
     ensure_module_init ();
     if (store.in_blacklist(server)) return;
-#ifdef HAVE_GNUTLS
     ThreadWorker * w = new ThreadWorker (info, server, host, port, listener, use_ssl, store);
-#else
-    ThreadWorker * w = new ThreadWorker (info, server, host, port, listener);
-#endif
     threadpool.push_work (w, w, true);
 }
 
diff --git a/pan/tasks/socket-impl-main.h b/pan/tasks/socket-impl-main.h
index a38398e..491ca14 100644
--- a/pan/tasks/socket-impl-main.h
+++ b/pan/tasks/socket-impl-main.h
@@ -141,12 +141,12 @@ namespace pan
       CertStore & store;
 
     public:
-      virtual void create_socket (ServerInfo&,
-                                  const StringView & host,
-                                  int                port,
-                                  WorkerPool       & threadpool,
-                                  Socket::Creator::Listener * listener,
-                                  bool               use_ssl);
+      virtual void create_socket  (ServerInfo&,
+                                    const Quark&,
+                                    const StringView & host,
+                                    int                port,
+                                    WorkerPool       & threadpool,
+                                    Socket::Creator::Listener * listener);
 
   };
 
diff --git a/pan/tasks/socket-impl-openssl.cc b/pan/tasks/socket-impl-openssl.cc
index 390b0f2..65659b3 100644
--- a/pan/tasks/socket-impl-openssl.cc
+++ b/pan/tasks/socket-impl-openssl.cc
@@ -4,7 +4,7 @@
  * Copyright (C) 2002-2006  Charles Kerr <charles rebelbase com>
  *
  * This file
- * Copyright (C) 2011 Heinrich Mü<henmull src gnome org>
+ * Copyright (C) 2011 Heinrich Mïller <henmull src gnome org>
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -339,10 +339,15 @@ GIOChannelSocketGnuTLS :: ~GIOChannelSocketGnuTLS ()
 }
 
 bool
-GIOChannelSocketGnuTLS :: open (const StringView& address, int port, std::string& setme_err)
+GIOChannelSocketGnuTLS :: open  (const StringView& address, int port, std::string& setme_err)
 {
   _host.assign (address.str, address.len);
   _channel = create_channel (address, port, setme_err);
+#ifdef G_OS_WIN32
+  _id = g_io_channel_win32_get_fd(_channel);
+#else
+   _id = g_io_channel_unix_get_fd(_channel);
+#endif // G_OS_WIN32
   return _channel != 0;
 }
 
diff --git a/pan/tasks/socket-impl-openssl.h b/pan/tasks/socket-impl-openssl.h
index b3199ba..8a31c2a 100644
--- a/pan/tasks/socket-impl-openssl.h
+++ b/pan/tasks/socket-impl-openssl.h
@@ -44,6 +44,8 @@ extern "C"
   #include <gnutls/x509.h>
 #endif
 
+#include "xzver_filter.h"
+
 
 namespace pan
 {
@@ -113,6 +115,7 @@ namespace pan
       virtual ~GIOChannelSocketGnuTLS ();
       GIOChannelSocketGnuTLS () { debug("SocketSSL stub ctor"); }
 #endif  // HAVE_GNUTLS
+
   };
 }
 
diff --git a/pan/tasks/socket.cc b/pan/tasks/socket.cc
index 3b011ac..5db31ba 100644
--- a/pan/tasks/socket.cc
+++ b/pan/tasks/socket.cc
@@ -35,10 +35,13 @@ Socket :: Socket ():
   _time_of_last_check (time(0)),
   _speed_KiBps (0.0),
   _abort_flag (false),
-  _stream (new std::stringstream())
+  _id(0)
 {
 }
 
+Socket :: ~Socket ()
+{}
+
 void
 Socket :: set_abort_flag (bool b)
 {
diff --git a/pan/tasks/socket.h b/pan/tasks/socket.h
index 91c8160..3e46874 100644
--- a/pan/tasks/socket.h
+++ b/pan/tasks/socket.h
@@ -21,7 +21,10 @@
 #define __Socket_h__
 
 #include <string>
+#include <sstream>
+#include <vector>
 #include <config.h>
+#include <glib.h>
 
 extern "C" {
   #include <stdint.h>
@@ -31,10 +34,9 @@ extern "C" {
   #include <gnutls/gnutls.h>
 #endif
 
-#include <sstream>
-
 namespace pan
 {
+
   class StringView;
   class Quark;
   class WorkerPool;
@@ -49,8 +51,8 @@ namespace pan
   class Socket
   {
     public:
-      Socket ();
-      virtual ~Socket () {}
+      Socket () ;
+      virtual ~Socket () ;
 
     public:
       /** Interface class for objects that listen to a Socket's events */
@@ -63,10 +65,6 @@ namespace pan
         virtual void on_socket_bytes_transferred (uint64_t bytes, Socket*) = 0;
       };
 
-      void write(const std::string& str) { *_stream << str; }
-      void clear() { (*_stream).clear(); }
-      std::stringstream*& get_stream() { return _stream; }
-
     public:
       virtual bool open (const StringView& address, int port, std::string& setme_err) = 0;
       virtual void write_command (const StringView& chars, Listener *) = 0;
@@ -78,6 +76,7 @@ namespace pan
       void set_abort_flag (bool b);
       bool is_abort_set () const;
       virtual void get_host (std::string& setme) const = 0;
+      int get_id () { return _id; }
 
     protected:
       void increment_xfer_byte_count (unsigned long byte_count);
@@ -85,10 +84,9 @@ namespace pan
       mutable time_t _time_of_last_check;
       mutable double _speed_KiBps;
       bool _abort_flag;
-      std::stringstream* _stream;
+      int _id;
 
     public:
-
       /**
        * Interface class for code that creates sockets.
        *
diff --git a/pan/tasks/task-xover.cc b/pan/tasks/task-xover.cc
index 93d88cb..ca1fe3c 100644
--- a/pan/tasks/task-xover.cc
+++ b/pan/tasks/task-xover.cc
@@ -22,17 +22,16 @@
 #include <cerrno>
 
 extern "C" {
-  #define PROTOTYPES
-  #include <stdio.h>
-  #include <uulib/uudeview.h>
-  #include <glib/gi18n.h>
-  #include <gmime/gmime-utils.h>
-  #include <zlib.h>
+#define PROTOTYPES
+#include <stdio.h>
+#include <uulib/uudeview.h>
+#include <glib/gi18n.h>
+#include <gmime/gmime-utils.h>
+#include <zlib.h>
 }
 
 #include <fstream>
 #include <iostream>
-#include <pan/general/log.h>
 #include <pan/general/debug.h>
 #include <pan/general/file-util.h>
 #include <pan/general/macros.h>
@@ -41,577 +40,592 @@ extern "C" {
 #include <pan/data/data.h>
 #include "nntp.h"
 #include "task-xover.h"
-#include "xzver-decoder.h"
 
 using namespace pan;
 
-namespace
-{
-
-   std::string
-   get_short_name (const StringView& in)
-   {
-    static const StringView moderated ("moderated");
-    static const StringView d ("d");
-
-    StringView myline, long_token;
-
-    // find the long token -- use the last, unless that's "moderated" or "d"
-    myline = in;
-    myline.pop_last_token (long_token, '.');
-    if (!myline.empty() && (long_token==moderated || long_token==d))
-      myline.pop_last_token (long_token, '.');
-
-    // build a new string where each token is shortened except for long_token
-    std::string out;
-    myline = in;
-    StringView tok;
-    while (myline.pop_token (tok, '.')) {
-      out.insert (out.end(), tok.begin(), (tok==long_token ? tok.end() : tok.begin()+1));
-      out += '.';
-    }
-    if (!out.empty())
-      out.erase (out.size()-1);
-
-    return out;
-  }
-
-  std::string get_description (const Quark& group, TaskXOver::Mode mode)
-  {
-    char buf[1024];
-    if (mode == TaskXOver::ALL)
-      snprintf (buf, sizeof(buf), _("Getting all headers for \"%s\""), group.c_str());
-    else if (mode == TaskXOver::NEW)
-      snprintf (buf, sizeof(buf), _("Getting new headers for \"%s\""), group.c_str());
-    else // SAMPLE
-      snprintf (buf, sizeof(buf), _("Sampling headers for \"%s\""), group.c_str());
-    return std::string (buf);
-  }
+namespace {
+std::string get_short_name(const StringView& in) {
+	static const StringView moderated("moderated");
+	static const StringView d("d");
+
+	StringView myline, long_token;
+
+	// find the long token -- use the last, unless that's "moderated" or "d"
+	myline = in;
+	myline.pop_last_token(long_token, '.');
+	if (!myline.empty() && (long_token == moderated || long_token == d))
+		myline.pop_last_token(long_token, '.');
+
+	// build a new string where each token is shortened except for long_token
+	std::string out;
+	myline = in;
+	StringView tok;
+	while (myline.pop_token(tok, '.')) {
+		out.insert(out.end(), tok.begin(),
+				(tok == long_token ? tok.end() : tok.begin() + 1));
+		out += '.';
+	}
+	if (!out.empty())
+		out.erase(out.size() - 1);
+
+	return out;
 }
 
-namespace
-{
-  char* build_cachename (char* buf, size_t len, const char* name)
-  {
-    const char * home(file::get_pan_home().c_str());
-    g_snprintf(buf,len,"%s%c%s%c%s",home, G_DIR_SEPARATOR, "encode-cache", G_DIR_SEPARATOR, name);
-    return buf;
-  }
+std::string get_description(const Quark& group, TaskXOver::Mode mode) {
+	char buf[1024];
+	if (mode == TaskXOver::ALL)
+		snprintf(buf, sizeof(buf), _("Getting all headers for \"%s\""),
+				group.c_str());
+	else if (mode == TaskXOver::NEW)
+		snprintf(buf, sizeof(buf), _("Getting new headers for \"%s\""),
+				group.c_str());
+	else
+		// SAMPLE
+		snprintf(buf, sizeof(buf), _("Sampling headers for \"%s\""),
+				group.c_str());
+	return std::string(buf);
 }
-
-TaskXOver :: TaskXOver (Data         & data,
-                        const Quark  & group,
-                        Mode           mode,
-                        unsigned long  sample_size):
-  Task("XOVER", get_description(group,mode)),
-  _data (data),
-  _group (group),
-  _short_group_name (get_short_name (StringView (group.c_str()))),
-  _mode (mode),
-  _sample_size (sample_size),
-  _days_cutoff (mode==DAYS ? (time(0)-(sample_size*24*60*60)) : 0),
-  _group_xover_is_reffed (false),
-  _bytes_so_far (0),
-  _parts_so_far (0ul),
-  _articles_so_far (0ul),
-  _total_minitasks (0),
-  _running_minitasks (0),
-  _decoder(0),
-  _decoder_has_run (false)
-{
-
-
-
-  debug ("ctor for " << group);
-
-  // add a ``GROUP'' MiniTask for each server that has this group
-  // initialize the _high lookup table to boundaries
-  const MiniTask group_minitask (MiniTask::GROUP);
-  quarks_t servers;
-  _data.group_get_servers (group, servers);
-  foreach_const (quarks_t, servers, it)
-    if (_data.get_server_limits(*it))
-    {
-      _server_to_minitasks[*it].push_front (group_minitask);
-      _high[*it] = data.get_xover_high (group, *it);
-    }
-  init_steps (0);
-
-  // tell the users what we're up to
-  set_status (group.c_str());
-
-  update_work ();
 }
 
-void
-TaskXOver :: setHigh(const Quark& server, uint64_t& h)
-{
-  _high[server] = h;
+TaskXOver::TaskXOver(Data & data, const Quark & group, Mode mode,
+		unsigned long sample_size) :
+		Task("XOVER", get_description(group, mode)), _data(data), _group(group), _short_group_name(
+				get_short_name(StringView(group.c_str()))), _mode(mode), _sample_size(
+				sample_size), _days_cutoff(
+				mode == DAYS ? (time(0) - (sample_size * 24 * 60 * 60)) : 0), _group_xover_is_reffed(
+				false), _bytes_so_far(0), _parts_so_far(0ul), _articles_so_far(
+				0ul), _total_minitasks(0) {
+
+	debug("ctor for " << group);
+
+	// add a ``GROUP'' MiniTask for each server that has this group
+	// initialize the _high lookup table to boundaries
+	quarks_t servers;
+	_data.group_get_servers(group, servers);
+	foreach_const (quarks_t, servers, it)if (_data.get_server_limits(*it))
+	{
+		Data::Server* s (_data.find_server(*it));
+		const MiniTask group_minitask (MiniTask::GROUP);
+		_server_to_minitasks[*it].push_front (group_minitask);
+		_high[*it] = data.get_xover_high (group, *it);
+	}
+	init_steps(0);
+
+	// tell the users what we're up to
+	set_status(group.c_str());
+
+	update_work();
 }
 
-TaskXOver :: ~TaskXOver ()
-{
-  if (_group_xover_is_reffed) {
-    foreach (server_to_high_t, _high, it)
-      _data.set_xover_high (_group, it->first, it->second);
-    _data.xover_unref (_group);
-  }
-  if (_decoder)
-    _decoder->cancel_silently();
-
-  _data.fire_group_entered(_group, 1, 0);
+TaskXOver::~TaskXOver() {
+	if (_group_xover_is_reffed) {
+		foreach (server_to_high_t, _high, it)_data.set_xover_high (_group, it->first, it->second);
+		_data.xover_unref (_group);
+	}
+	_data.fire_group_entered(_group, 1, 0);
 }
 
-void
-TaskXOver :: use_nntp (NNTP* nntp)
-{
-  const Quark& server (nntp->_server);
-  debug ("got an nntp from " << nntp->_server);
-  CompressionType type;
-  _data.get_server_compression_type(nntp->_server, type);
-
-  _compression_enabled = type != HEADER_COMPRESS_NONE;
-
-  // if this is the first nntp we've gotten, ref the xover data
-  if (!_group_xover_is_reffed) {
-    _group_xover_is_reffed = true;
-    _data.xover_ref (_group);
-  }
-
-  MiniTasks_t& minitasks (_server_to_minitasks[server]);
-  if (minitasks.empty())
-  {
-    debug ("That's interesting, I got a socket for " << server << " but _have no use for it!");
-    _state._servers.erase (server);
-    check_in (nntp, OK);
-  }
-  else
-  {
-    const MiniTask mt (minitasks.front());
-    minitasks.pop_front ();
-    switch (mt._type)
-    {
-      case MiniTask::GROUP:
-        debug ("GROUP " << _group << " command to " << server);
-        nntp->group (_group, this);
-        break;
-      case MiniTask::XOVER:
-        debug ("XOVER " << mt._low << '-' << mt._high << " to " << server);
-        _last_xover_number[nntp] = mt._low;
-        if (_compression_enabled)
-        {
-// TODO support XFEATURE!
-//          if (type == XZVER)
-            nntp->xzver (_group, mt._low, mt._high, this);
-//          if (type == XFEATURE)
-//            nntp->xfeat (_group, mt._low, mt._high, this);
-          // TODO diablo
-        }
-        else
-          nntp->xover (_group, mt._low, mt._high, this);
-        --_running_minitasks;
-        break;
-      default:
-        assert (0);
-    }
-    update_work ();
-  }
+void TaskXOver::use_nntp(NNTP* nntp) {
+
+	const Quark& server(nntp->_server);
+	CompressionType comp;
+	_data.get_server_compression_type(server, comp);
+
+	debug("got an nntp from " << nntp->_server);
+
+	// if this is the first nntp we've gotten, ref the xover data
+	if (!_group_xover_is_reffed) {
+		_group_xover_is_reffed = true;
+		_data.xover_ref(_group);
+	}
+
+	MiniTasks_t& minitasks(_server_to_minitasks[server]);
+	if (minitasks.empty()) {
+		debug(
+				"That's interesting, I got a socket for " << server << " but have no use for it!");
+		_state._servers.erase(server);
+		check_in(nntp, OK);
+	} else {
+		const MiniTask mt(minitasks.front());
+		minitasks.pop_front();
+		switch (mt._type) {
+		case MiniTask::GROUP:
+			debug("GROUP " << _group << " command to " << server);
+			nntp->group(_group, this);
+			break;
+		case MiniTask::XOVER:
+			debug("XOVER " << mt._low << '-' << mt._high << " to " << server);
+			_last_xover_number[nntp] = mt._low;
+			if (comp == HEADER_COMPRESS_XFEATURE)
+				nntp->xfeat(_group, mt._low, mt._high, this);
+			else if (comp == HEADER_COMPRESS_XZVER)
+				nntp->xzver(_group, mt._low, mt._high, this);
+			else
+				nntp->xover (_group, mt._low, mt._high, this);
+			break;
+		default:
+			assert(0);
+		}
+		update_work();
+	}
 }
 
 /***
-****
-***/
-
-void
-TaskXOver :: on_nntp_group (NNTP          * nntp,
-                            const Quark   & group,
-                            unsigned long   qty,
-                            uint64_t        low,
-                            uint64_t        high)
-{
-  const Quark& servername (nntp->_server);
-
-  // new connections can tickle this...
-  if (_servers_that_got_xover_minitasks.count(servername))
-    return;
-
-  _servers_that_got_xover_minitasks.insert (servername);
-
-  debug ("got GROUP result from " << nntp->_server << " (" << nntp << "): "
-         << " qty " << qty
-         << " low " << low
-         << " high " << high);
-
-  uint64_t l(low), h(high);
-  _data.set_xover_low (group, nntp->_server, low);
-  //std::cerr << LINE_ID << " This group's range is [" << low << "..." << high << ']' << std::endl;
-
-  if (_mode == ALL || _mode == DAYS)
-    l = low;
-  else if (_mode == SAMPLE) {
-    _sample_size = std::min (_sample_size, high-low);
-    //std::cerr << LINE_ID << " and I want to sample " <<  _sample_size << " messages..." << std::endl;
-    l = std::max (low, high+1-_sample_size);
-  }
-  else { // NEW
-    uint64_t xh (_data.get_xover_high (group, nntp->_server));
-    //std::cerr << LINE_ID << " current xover high is " << xh << std::endl;
-    l = std::max (xh+1, low);
-  }
-
-  if (l <= high)
-  {
-//    std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
-
-    add_steps (h-l);
-
-    MiniTasks_t& minitasks (_server_to_minitasks[servername]);
-    if (_compression_enabled)
-    {
-      const int INCREMENT(100000);
-      MiniTasks_t& minitasks (_server_to_minitasks[servername]);
-      for (uint64_t m=l; m<=h; m+=INCREMENT) {
-        MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
-        debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
-        minitasks.push_front (mt);
-        ++_total_minitasks;
-      }
-    }
-    else
-    {
-      const int INCREMENT(1000);
-      MiniTasks_t& minitasks (_server_to_minitasks[servername]);
-      for (uint64_t m=l; m<=h; m+=INCREMENT) {
-        MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
-        debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
-        minitasks.push_front (mt);
-        ++_total_minitasks;
-      }
-    }
-  }
-  else
-  {
-    //std::cerr << LINE_ID << " nothing new here..." << std::endl;
-    _high[nntp->_server] = high;
-  }
-  _running_minitasks = _total_minitasks;
+ ****
+ ***/
+
+///TODO show low and high in UI (is this already there?)
+void TaskXOver::on_nntp_group(NNTP * nntp, const Quark & group,
+		unsigned long qty, uint64_t low, uint64_t high) {
+	const Quark& servername(nntp->_server);
+	CompressionType comp;
+	_data.get_server_compression_type(servername, comp);
+	const bool compression_enabled (comp != HEADER_COMPRESS_NONE);
+
+	// new connections can tickle this...
+	if (_servers_that_got_xover_minitasks.count(servername))
+		return;
+
+	_servers_that_got_xover_minitasks.insert(servername);
+
+	debug(
+			"got GROUP result from " << nntp->_server << " (" << nntp << "): " << " qty " << qty << " low " << low << " high " << high);
+
+	uint64_t l(low), h(high);
+	_data.set_xover_low(group, nntp->_server, low);
+	//std::cerr << LINE_ID << " This group's range is [" << low << "..." << high << ']' << std::endl;
+
+	if (_mode == ALL || _mode == DAYS)
+		l = low;
+	else if (_mode == SAMPLE) {
+		_sample_size = std::min(_sample_size, high - low);
+		//std::cerr << LINE_ID << " and I want to sample " <<  _sample_size << " messages..." << std::endl;
+		l = std::max(low, high + 1 - _sample_size);
+	} else { // NEW
+		uint64_t xh(_data.get_xover_high(group, nntp->_server));
+		//std::cerr << LINE_ID << " current xover high is " << xh << std::endl;
+		l = std::max(xh + 1, low);
+	}
+
+	if (l <= high) {
+		//std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
+		add_steps(h - l);
+		const int INCREMENT(compression_enabled ? 10000 : 1000);
+		MiniTasks_t& minitasks(_server_to_minitasks[servername]);
+		for (uint64_t m = l; m <= h; m += INCREMENT) {
+			const MiniTask mt(MiniTask::XOVER, m, m + INCREMENT);
+			debug(
+					"adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
+			minitasks.push_front(mt);
+			++_total_minitasks;
+		}
+	} else {
+		//std::cerr << LINE_ID << " nothing new here..." << std::endl;
+		_high[nntp->_server] = high;
+	}
 }
 
-namespace
-{
-  unsigned long view_to_ul (const StringView& view)
-  {
-    unsigned long ul = 0ul;
-
-    if (!view.empty()) {
-      errno = 0;
-      ul = strtoul (view.str, 0, 10);
-      if (errno)
-        ul = 0ul;
-    }
-
-    return ul;
-  }
-  uint64_t view_to_ull (const StringView& view)
-  {
-    uint64_t ul = 0ul;
-
-    if (!view.empty()) {
-      errno = 0;
-      ul = g_ascii_strtoull (view.str, 0, 10);
-      if (errno)
-        ul = 0ul;
-    }
-
-    return ul;
-  }
-
-  bool header_is_nonencoded_utf8 (const StringView& in)
-  {
-    const bool is_nonencoded (!in.strstr("=?"));
-    const bool is_utf8 (g_utf8_validate (in.str, in.len, 0));
-    return is_nonencoded && is_utf8;
-  }
+namespace {
+unsigned long view_to_ul(const StringView& view) {
+	unsigned long ul = 0ul;
+
+	if (!view.empty()) {
+		errno = 0;
+		ul = strtoul(view.str, 0, 10);
+		if (errno)
+			ul = 0ul;
+	}
+
+	return ul;
 }
+uint64_t view_to_ull(const StringView& view) {
+	uint64_t ul = 0ul;
 
-/*
-  http://tools.ietf.org/html/rfc2980#section-2.8
-
-  Each line of output will be formatted with the article number,
-  followed by each of the headers in the overview database or the
-  article itself (when the data is not available in the overview
-  database) for that article separated by a tab character.  The
-  sequence of fields must be in this order: subject, author, date,
-  message-id, references, byte count, and line count.  Other optional
-  fields may follow line count.  Other optional fields may follow line
-  count.  These fields are specified by examining the response to the
-  LIST OVERVIEW.FMT command.  Where no data exists, a null field must
-  be provided (i.e. the output will have two tab characters adjacent to
-  each other).  Servers should not output fields for articles that have
-  been removed since the XOVER database was created.
-
-*/
-
-void
-TaskXOver :: on_nntp_line         (NNTP               * nntp,
-                                   const StringView   & line)
-{
-    if (_compression_enabled) {
-
-      std::string l(line.str);
-
-      if (line.strstr("=ybegin line=128"))
-      {
-        l += " name=xzver_decoded\n";
-        nntp->_socket->write(l);
-      }
-      else
-      {
-        l += "\n";
-        nntp->_socket->write(l);
-      }
-    }
-    else
-    {
-      on_nntp_line_process (nntp, line);
-    }
+	if (!view.empty()) {
+		errno = 0;
+		ul = g_ascii_strtoull(view.str, 0, 10);
+		if (errno)
+			ul = 0ul;
+	}
+
+	return ul;
 }
 
-void
-TaskXOver :: on_nntp_line_process (NNTP               * nntp,
-                                   const StringView   & line)
-{
-
-  pan_return_if_fail (nntp != 0);
-  pan_return_if_fail (!nntp->_server.empty());
-  pan_return_if_fail (!nntp->_group.empty());
-
-  _bytes_so_far += line.len;
-
-  unsigned int lines=0u;
-  unsigned long bytes=0ul;
-  uint64_t number=0;
-  StringView subj, author, date, mid, tmp, xref, l(line);
-  std::string ref;
-  bool ok = !l.empty();
-  ok = ok && l.pop_token (tmp, '\t');    if (ok) number = view_to_ull (tmp); tmp.clear();
-  ok = ok && l.pop_token (subj, '\t');   if (ok) subj.trim ();
-  ok = ok && l.pop_token (author, '\t'); if (ok) author.trim ();
-  ok = ok && l.pop_token (date, '\t');   if (ok) date.trim ();
-  ok = ok && l.pop_token (mid, '\t');    if (ok) mid.trim ();
-
-  //handle multiple "References:"-message-ids correctly. (hack for some faulty servers)
-  ok = ok && l.pop_token (tmp, '\t');
-  ref += tmp;
-  do
-  {
-    // usenetbucket uses a (null) (sic!) value for an empty reference list. hence the following hack
-    if (tmp.empty() || tmp == "(null)" || tmp == "null") continue;
-    if (tmp.front() == '<')
-    {
-      tmp.trim();
-      ref += tmp;
-      tmp.clear();
-    } else break;
-  } while ((ok = ok && l.pop_token (tmp, '\t'))) ;
-                                         if (ok) bytes = view_to_ul (tmp); tmp.clear();
-  ok = ok && l.pop_token (tmp, '\t');    if (ok) lines = view_to_ul (tmp);
-  ok = ok && l.pop_token (xref, '\t');   if (ok) xref.trim ();
-
-  if (xref.len>6 && !strncmp(xref.str,"Xref: ", 6)) {
-    xref = xref.substr (xref.str+6, 0);
-    xref.trim ();
-  }
-
-  // is this header corrupt?
-  if (!number // missing number
-      || subj.empty() // missing subject
-      || author.empty() // missing author
-      || date.empty() // missing date
-      || mid.empty() // missing mid
-      || mid.front()!='<') // corrupt mid
-      /// Concerning bug : https://bugzilla.gnome.org/show_bug.cgi?id=650042
-      /// Even if we didn't get a proper reference here, continue.
-      //|| (!ref.empty() && ref.front()!='<'))
-    return;
-
-  // if news server doesn't provide an xref, fake one
-  char * buf (0);
-  if (xref.empty())
-    xref = buf = g_strdup_printf ("%s %s:%"G_GUINT64_FORMAT,
-                                  nntp->_server.c_str(),
-                                  nntp->_group.c_str(),
-                                  number);
-
-  const char * fallback_charset = NULL; // FIXME
-
-  // are we done?
-  const time_t time_posted = g_mime_utils_header_decode_date (date.str, NULL);
-  if( _mode==DAYS && time_posted<_days_cutoff ) {
-    _server_to_minitasks[nntp->_server].clear ();
-    return;
-  }
-
-  ++_parts_so_far;
-
-  const Article * article = _data.xover_add (
-    nntp->_server, nntp->_group,
-    (header_is_nonencoded_utf8(subj) ? subj : header_to_utf8(subj,fallback_charset).c_str()),
-    (header_is_nonencoded_utf8(author) ? author : header_to_utf8(author,fallback_charset).c_str()),
-    time_posted, mid, StringView(ref), bytes, lines, xref);
-
-  if (article)
-    ++_articles_so_far;
-
-//   emit a status update
-  uint64_t& prev = _last_xover_number[nntp];
-  uint64_t& h (_high[nntp->_server]);
-  h = std::max (h, number);
-
-  increment_step (number - prev);
-  prev = number;
-  if (!(_parts_so_far % 500))
-    set_status_va (_("%s (%lu parts, %lu articles)"), _short_group_name.c_str(), _parts_so_far, _articles_so_far);
-
-  // cleanup
-  g_free (buf);
+bool header_is_nonencoded_utf8(const StringView& in) {
+	const bool is_nonencoded(!in.strstr("=?"));
+	const bool is_utf8(g_utf8_validate(in.str, in.len, 0));
+	return is_nonencoded && is_utf8;
 }
+}
+
+/*
+ http://tools.ietf.org/html/rfc2980#section-2.8
+
+ Each line of output will be formatted with the article number,
+ followed by each of the headers in the overview database or the
+ article itself (when the data is not available in the overview
+ database) for that article separated by a tab character.  The
+ sequence of fields must be in this order: subject, author, date,
+ message-id, references, byte count, and line count.  Other optional
+ fields may follow line count.  Other optional fields may follow line
+ count.  These fields are specified by examining the response to the
+ LIST OVERVIEW.FMT command.  Where no data exists, a null field must
+ be provided (i.e. the output will have two tab characters adjacent to
+ each other).  Servers should not output fields for articles that have
+ been removed since the XOVER database was created.
+
+ */
+
+void TaskXOver::on_nntp_line(NNTP * nntp, const StringView & line) {
+
+	const Quark& server(nntp->_server);
+	CompressionType comp;
+	_data.get_server_compression_type(server, comp);
+
+	if (comp != HEADER_COMPRESS_NONE) {
+		int sock_id = nntp->_socket->get_id();
+		if (_streams.count(sock_id) == 0)
+			_streams[sock_id] = new std::stringstream();
+		*_streams[sock_id] << line << "\r\n";
+	} else {
+		on_nntp_line_process(nntp, line);
+	}
 
-void
-TaskXOver :: on_nntp_done (NNTP              * nntp,
-                           Health              health,
-                           const StringView  & response UNUSED)
-{
-  if (_compression_enabled)
-  {
-    DataStream stream;
-    stream.stream = nntp->_socket->get_stream();
-    stream.group = nntp->_group;
-    stream.server = nntp->_server;
-    _data_streams.push_back(stream);
-  }
-
-  update_work (true);
-  check_in (nntp, health);
 }
 
-void
-TaskXOver :: update_work (bool subtract_one_from_nntp_count)
-{
-
-  int nntp_count (get_nntp_count ());
-  if (subtract_one_from_nntp_count)
-    --nntp_count;
-
-  // find any servers we still need
-  quarks_t servers;
-  foreach_const (server_to_minitasks_t, _server_to_minitasks, it)
-    if (!it->second.empty())
-      servers.insert (it->first);
-
-  if (!servers.empty())
-  {
-    _state.set_need_nntp (servers);
-  }
-  else if (nntp_count)
-  {
-    _state.set_working ();
-  }
-  else if (_data_streams.size() != 0 || (!_decoder && !_decoder_has_run)) {
-    _state.set_need_xzverdecoder ();
-  } else if (_decoder_has_run) {
-    _state.set_completed();
-    set_finished (OK);
-  } else if (!_compression_enabled)
-  {
-    _state.set_completed();
-    set_finished (OK);
-  } else assert(0 && "hm, missed a state.");
+void TaskXOver::on_nntp_line_process(NNTP * nntp, const StringView & line) {
+
+	pan_return_if_fail(nntp != 0);
+	pan_return_if_fail(!nntp->_server.empty());
+	pan_return_if_fail(!nntp->_group.empty());
+
+	_bytes_so_far += line.len;
+
+	unsigned int lines = 0u;
+	unsigned long bytes = 0ul;
+	uint64_t number = 0;
+	StringView subj, author, date, mid, tmp, xref, l(line);
+	std::string ref;
+	bool ok = !l.empty();
+	ok = ok && l.pop_token(tmp, '\t');
+	if (ok)
+		number = view_to_ull(tmp);
+	tmp.clear();
+	ok = ok && l.pop_token(subj, '\t');
+	if (ok)
+		subj.trim();
+	ok = ok && l.pop_token(author, '\t');
+	if (ok)
+		author.trim();
+	ok = ok && l.pop_token(date, '\t');
+	if (ok)
+		date.trim();
+	ok = ok && l.pop_token(mid, '\t');
+	if (ok)
+		mid.trim();
+
+	//handle multiple "References:"-message-ids correctly. (hack for some faulty servers)
+	ok = ok && l.pop_token(tmp, '\t');
+	do {
+		// usenetbucket uses a (null) (sic!) value for an empty reference list. hence the following hack
+		if (tmp.empty() || tmp == "(null)" || tmp == "null")
+			continue;
+		if (tmp.front() == '<') {
+			tmp.trim();
+			ref += tmp;
+			tmp.clear();
+		} else
+			break;
+	} while ((ok = ok && l.pop_token(tmp, '\t')));
+	if (ok)
+		bytes = view_to_ul(tmp);
+	tmp.clear();
+	ok = ok && l.pop_token(tmp, '\t');
+	if (ok)
+		lines = view_to_ul(tmp);
+	ok = ok && l.pop_token(xref, '\t');
+	if (ok)
+		xref.trim();
+
+	if (xref.len > 6 && !strncmp(xref.str, "Xref: ", 6)) {
+		xref = xref.substr(xref.str + 6, 0);
+		xref.trim();
+	}
+
+	// is this header corrupt?
+	if (!number // missing number
+	|| subj.empty() // missing subject
+			|| author.empty() // missing author
+			|| date.empty() // missing date
+			|| mid.empty() // missing mid
+			|| mid.front() != '<') // corrupt mid
+	/// Concerning bug : https://bugzilla.gnome.org/show_bug.cgi?id=650042
+	/// Even if we didn't get a proper reference here, continue.
+	//|| (!ref.empty() && ref.front()!='<'))
+		return;
+
+	// if news server doesn't provide an xref, fake one
+	char * buf(0);
+	if (xref.empty())
+		xref = buf = g_strdup_printf("%s %s:%"G_GUINT64_FORMAT,
+				nntp->_server.c_str(), nntp->_group.c_str(), number);
+
+	uint64_t& h(_high[nntp->_server]);
+	h = std::max(h, number);
+
+	const char * fallback_charset = NULL; // FIXME
+
+	// are we done?
+	const time_t time_posted = g_mime_utils_header_decode_date(date.str, NULL);
+	if (_mode == DAYS && time_posted < _days_cutoff) {
+		_server_to_minitasks[nntp->_server].clear();
+		return;
+	}
+
+	++_parts_so_far;
+
+	const Article * article = _data.xover_add(nntp->_server, nntp->_group,
+			(header_is_nonencoded_utf8(subj) ?
+					subj : header_to_utf8(subj, fallback_charset).c_str()),
+			(header_is_nonencoded_utf8(author) ?
+					author : header_to_utf8(author, fallback_charset).c_str()),
+			time_posted, mid, StringView(ref), bytes, lines, xref);
+
+	if (article)
+		++_articles_so_far;
+
+	// emit a status update
+	uint64_t& prev = _last_xover_number[nntp];
+	increment_step(number - prev);
+	prev = number;
+	if (!(_parts_so_far % 500))
+		set_status_va(_("%s (%lu parts, %lu articles)"),
+				_short_group_name.c_str(), _parts_so_far, _articles_so_far);
+
+	// cleanup
+	g_free(buf);
 }
 
-void
-TaskXOver:: use_decoder (Decoder* decoder)
-{
-  if (_state._work != NEED_XZVER_DECODER)
-    check_in (decoder);
-
-  _decoder = static_cast<XZVERDecoder*>(decoder);
-  _state.set_working();
-
-  DataStream* stream = new DataStream();
-  DataStream& ref(_data_streams.back());
-  stream->stream = ref.stream;
-  stream->group = ref.group;
-  stream->server = ref.server;
-  _data_streams.pop_back();
-  _decoder->enqueue (this, stream, &_data);
-  debug ("decoder thread was free, enqueued work");
+namespace {
+unsigned int crc_table[] = { /* CRC polynomial 0xedb88320 */
+0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
+		0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
+		0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
+		0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
+		0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
+		0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
+		0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
+		0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
+		0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
+		0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
+		0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
+		0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
+		0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
+		0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
+		0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
+		0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
+		0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
+		0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
+		0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
+		0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
+		0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
+		0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
+		0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
+		0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
+		0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
+		0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
+		0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
+		0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
+		0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
+		0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
+		0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
+		0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
+		0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
+		0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
+		0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
+		0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
+		0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
+		0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
+		0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
+		0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
+		0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
+		0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
+		0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d };
+
+static unsigned int _crc32(const char *buf, size_t len, unsigned int crc) {
+	crc ^= 0xffffffffU;
+
+	while (len--)
+		crc = (crc >> 8) ^ crc_table[(crc ^ *buf++) & 0xff];
+
+	return crc ^ 0xffffffffU;
 }
 
-void
-TaskXOver :: stop ()
-{
-  if (_decoder)
-      _decoder->cancel();
+void ydecode(std::stringstream* in, std::stringstream* out) {
+	int gotbeg = 0, len, outlen = 0;
+	char buf1[512], buf2[512], c, *p, *p2 = buf2;
+	//unsigned int crc1 = 0, crc = _crc32(NULL, 0, 0);
+
+	while (!in->getline(buf1, sizeof(buf1)).eof()) {
+		if (gotbeg == 0 && strncmp(buf1, "=ybegin ", 8) == 0) {
+			gotbeg = 1;
+		} else if (gotbeg == 1 && strncmp(buf1, "=yend ", 6) == 0) {
+			//p = strstr(buf1, "crc32=");
+			//if (p)
+			//	sscanf(p + 6, "%x", &crc1);
+			break;
+		} else if (gotbeg == 1) {
+			len = strlen(buf1);
+			/* strip the CR LF */
+			if (len > 2 && buf1[len - 1]) {
+				buf1[len - 1] = '\0';
+				len--;
+			}
+			p = buf1;
+			while (*p) {
+				c = *p++;
+				if (c == '=') {
+					c = *p++;
+					if (c == 0)
+						break; /* can't have escape char as last char in line */
+					c = (unsigned char) (c - 64);
+				}
+
+				c = (unsigned char) (c - 42);
+				*p2++ = c;
+				/* flush when buffer full */
+				if (++outlen >= sizeof(buf2)) {
+					//crc = _crc32(buf2, outlen, crc);
+					out->write(buf2, outlen);
+					p2 = buf2;
+					outlen = 0;
+				}
+			}
+		}
+	}
+	/* flush remaining data */
+	if (outlen) {
+		//crc = _crc32(buf2, outlen, crc);
+		out->write(buf2, outlen);
+	}
+
+	// todo log, callback
+	//assert(crc == crc1);
 }
 
-unsigned long
-TaskXOver :: get_bytes_remaining () const
-{
-  unsigned int minitasks_left (0);
-  foreach_const (server_to_minitasks_t, _server_to_minitasks, it)
-    minitasks_left += it->second.size();
-
-  const double percent_done (_total_minitasks ? (1.0 - minitasks_left/(double)_total_minitasks) : 0.0);
-  if (percent_done < 0.1) // impossible to estimate
-    return 0;
-  const unsigned long total_bytes = (unsigned long)(_bytes_so_far / percent_done);
-  return total_bytes - _bytes_so_far;
+#define MEMCHUNK 4096
+
+int inflate_zlib(std::stringstream *source, std::stringstream *dest) {
+	int ret;
+	size_t have;
+	z_stream strm;
+	char in[MEMCHUNK];
+	char out[MEMCHUNK];
+
+	/* allocate inflate state */
+	strm.zalloc = Z_NULL;
+	strm.zfree = Z_NULL;
+	strm.opaque = Z_NULL;
+	strm.avail_in = 0;
+	strm.next_in = Z_NULL;
+	/*ret = inflateInit(&strm);*/
+	ret = inflateInit2(&strm, -MAX_WBITS); /* use -MAX_WBITS to indicate gzip style */
+	if (ret != Z_OK)
+		return ret;
+
+	/* decompress until deflate stream ends or end of file */
+	do {
+		strm.avail_in = source->read(in, MEMCHUNK).gcount();
+		if (source->bad()) {
+			(void) inflateEnd(&strm);
+			return Z_ERRNO;
+		}
+		if (strm.avail_in == 0)
+			break;
+		strm.next_in = (unsigned char*) in;
+
+		/* run inflate() on input until output buffer not full */
+		do {
+			strm.avail_out = MEMCHUNK;
+			strm.next_out = (unsigned char*) out;
+			ret = inflate(&strm, Z_NO_FLUSH);
+			assert(ret != Z_STREAM_ERROR);
+			/* state not clobbered */
+			switch (ret) {
+			case Z_NEED_DICT:
+				ret = Z_DATA_ERROR; /* and fall through */
+			case Z_DATA_ERROR:
+			case Z_MEM_ERROR:
+				(void) inflateEnd(&strm);
+				return ret;
+			}
+			have = MEMCHUNK - strm.avail_out;
+			dest->write(out, have);
+		} while (strm.avail_out == 0);
+
+		/* done when inflate() says it's done */
+	} while (ret != Z_STREAM_END);
+
+	/* clean up and return */
+	(void) inflateEnd(&strm);
+	return ret == Z_STREAM_END ? Z_OK : Z_DATA_ERROR;
+}
+}
+
+void TaskXOver::on_nntp_done(NNTP * nntp, Health health,
+		const StringView & response) {
+
+	const Quark& servername(nntp->_server);
+	CompressionType comp;
+	_data.get_server_compression_type(servername, comp);
+	const bool compression_enabled (comp != HEADER_COMPRESS_NONE);
+
+	if (response == "." && compression_enabled) {
+		std::stringstream* buffer = _streams[nntp->_socket->get_id()];
+		std::stringstream out, out2;
+		ydecode(buffer, &out);
+		inflate_zlib(&out, &out2);
+		char buf1[4096];
+		while (!out2.getline(buf1, sizeof(buf1)).eof()) {
+			on_nntp_line_process(nntp, buf1);
+		}
+	}
+	update_work(true);
+	check_in(nntp, health);
 }
 
+void TaskXOver::update_work(bool subtract_one_from_nntp_count) {
+	int nntp_count(get_nntp_count());
+	if (subtract_one_from_nntp_count)
+		--nntp_count;
+
+	// find any servers we still need
+	quarks_t servers;
+	foreach_const (server_to_minitasks_t, _server_to_minitasks, it)if (!it->second.empty())
+	servers.insert (it->first);
+
+	//std::cerr << LINE_ID << " servers: " << servers.size() << " nntp: " << nntp_count << std::endl;
+
+	if (!servers.empty())
+		_state.set_need_nntp(servers);
+	else if (nntp_count)
+		_state.set_working();
+	else {
+		_state.set_completed();
+		set_finished(OK);
+	}
+}
 
-void
-TaskXOver :: on_worker_done (bool cancelled)
-{
-  assert(_decoder);
-  if (!_decoder) return;
-
-  if (!cancelled)
-  {
-    // the decoder is done... catch up on all housekeeping
-    // now that we're back in the main thread.
-
-    foreach_const(Decoder::log_t, _decoder->log_severe, it)
-    {
-      Log :: add_err(it->c_str());
-      verbose (it->c_str());
-    }
-    foreach_const(Decoder::log_t, _decoder->log_errors, it)
-    {
-      Log :: add_err(it->c_str());
-      verbose (it->c_str());
-    }
-    foreach_const(Decoder::log_t, _decoder->log_infos, it)
-    {
-      Log :: add_info(it->c_str());
-      verbose (it->c_str());
-    }
-
-    if (!_decoder->log_errors.empty())
-      set_error (_decoder->log_errors.front());
-
-    _state.set_health(_decoder->health);
-
-    if (!_decoder->log_severe.empty())
-      _state.set_health (ERR_LOCAL);
-    else {
-      _state.set_completed();
-      _decoder_has_run = true;
-    }
-  }
-
-  Decoder * d (_decoder);
-  _decoder = 0;
-  update_work ();
-  check_in (d);
+unsigned long TaskXOver::get_bytes_remaining() const {
+	unsigned int minitasks_left(0);
+	foreach_const (server_to_minitasks_t, _server_to_minitasks, it)minitasks_left += it->second.size();
+
+	const double percent_done(
+			_total_minitasks ?
+					(1.0 - minitasks_left / (double) _total_minitasks) : 0.0);
+	if (percent_done < 0.1) // impossible to estimate
+		return 0;
+	const unsigned long total_bytes = (unsigned long) (_bytes_so_far
+			/ percent_done);
+	return total_bytes - _bytes_so_far;
 }
diff --git a/pan/tasks/task-xover.h b/pan/tasks/task-xover.h
index 13d4b97..3b54e65 100644
--- a/pan/tasks/task-xover.h
+++ b/pan/tasks/task-xover.h
@@ -23,28 +23,21 @@
 #include <map>
 #include <vector>
 #include <sstream>
-#include <zlib.h>
 
+#include <pan/general/compression.h>
 #include <pan/data/data.h>
 #include <pan/tasks/task.h>
 #include <pan/tasks/nntp.h>
-#include <pan/general/worker-pool.h>
 #include <fstream>
 #include <iostream>
 
 namespace pan
 {
-
-  struct XZVERDecoder;
-
   /**
    * Task for downloading a some or all of a newsgroups' headers
    * @ingroup tasks
    */
-  class TaskXOver: public Task,
-                    private WorkerPool::Worker::Listener,
-                    private NNTP::Listener
-
+  class TaskXOver: public Task, private NNTP::Listener
   {
     public: // life cycle
       enum Mode { ALL, NEW, SAMPLE, DAYS };
@@ -53,20 +46,16 @@ namespace pan
 
     public: // task subclass
       virtual unsigned long get_bytes_remaining () const;
-      virtual void use_decoder (Decoder*);
-      void stop ();
 
     protected: // task subclass
       virtual void use_nntp (NNTP * nntp);
 
     private: // NNTP::Listener
-      void on_nntp_line_process (NNTP*, const StringView&);
       virtual void on_nntp_line (NNTP*, const StringView&);
       virtual void on_nntp_done (NNTP*, Health, const StringView&);
       virtual void on_nntp_group (NNTP*, const Quark&, unsigned long, uint64_t, uint64_t);
 
-    private: // WorkerPool::Listener interface
-      void on_worker_done (bool cancelled);
+      void on_nntp_line_process (NNTP*, const StringView&);
 
     private: // implementation - minitasks
       struct MiniTask {
@@ -81,7 +70,6 @@ namespace pan
       server_to_minitasks_t _server_to_minitasks;
 
     private: // implementation
-      void process_headers (NNTP*);
       Data& _data;
       const Quark _group;
       std::string _short_group_name;
@@ -97,27 +85,10 @@ namespace pan
       unsigned long _bytes_so_far;
       unsigned long _parts_so_far;
       unsigned long _articles_so_far;
-      unsigned long _lines_so_far;
       unsigned long _total_minitasks;
-      unsigned long _running_minitasks;
-      bool _compression_enabled;
-      CompressionType _compressiontype;
-
-      struct DataStream
-      {
-        std::stringstream* stream;
-        Quark group;
-        Quark server;
-      };
-
-      std::vector<DataStream> _data_streams;
 
-      friend class XZVERDecoder;
-      XZVERDecoder * _decoder;
-      bool _decoder_has_run;
+      std::map<int, std::stringstream*> _streams;
 
-    public:
-      void setHigh(const Quark& server, uint64_t& h);
   };
 }
 
diff --git a/pan/tasks/task.h b/pan/tasks/task.h
index 0618247..a63d832 100644
--- a/pan/tasks/task.h
+++ b/pan/tasks/task.h
@@ -56,7 +56,6 @@ namespace pan
             /** Task waiting for a decoder/encoder */
             NEED_DECODER = 2,
             NEED_ENCODER = 3,
-            NEED_XZVER_DECODER = 4,
             /** Task is running */
             WORKING = 5,
             /** Task is paused, woken up if 'current_connections < max_connections' */
@@ -100,9 +99,6 @@ namespace pan
                void set_need_decoder () {
                    _work = NEED_DECODER; _servers.clear(); }
 
-               void set_need_xzverdecoder () {
-                   _work = NEED_XZVER_DECODER; _servers.clear(); }
-
                void set_need_encoder () {
                    _work = NEED_ENCODER; _servers.clear(); }
 
diff --git a/pan/usenet-utils/Makefile.am b/pan/usenet-utils/Makefile.am
index 002e509..f142d1c 100644
--- a/pan/usenet-utils/Makefile.am
+++ b/pan/usenet-utils/Makefile.am
@@ -1,4 +1,6 @@
-AM_CPPFLAGS = -I top_srcdir@ @GMIME_CFLAGS@ @GLIB_CFLAGS@
+AM_CPPFLAGS = -I top_srcdir@ @GMIME_CFLAGS@ @GLIB_CFLAGS@ @ZLIB_CFLAGS@
+
+AM_LDFLAGS = @ZLIB_LIBS@
 
 noinst_LIBRARIES = libusenetutils.a
 
@@ -13,8 +15,9 @@ libusenetutils_a_SOURCES = \
  text-massager.cc \
  url-find.cc \
  blowfish.cc \
- gpg.cc
-	
+ gpg.cc \
+ xzver.cc
+
 noinst_HEADERS = \
  defgroup.h \
  filter-info.h \
@@ -30,7 +33,8 @@ noinst_HEADERS = \
  blowfish.h \
  blowfish_cyphers.h \
  gpg.h \
- MersenneTwister.h
+ MersenneTwister.h \
+ xzver.h
 
 #noinst_PROGRAMS = \
 # gnksa-test \



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]