[pan2/xzver_support: 42/47] force-merged xzver_st into xzver



commit 2040c16fa991cec9cabf54c4bc32edd513206628
Author: Heinrich MÃller <henmull src gnome org>
Date:   Sun Jun 10 19:17:17 2012 +0200

    force-merged xzver_st into xzver

 pan/gui/Makefile.am        |    2 +-
 pan/tasks/Makefile.am      |    2 +
 pan/tasks/decoder.h        |    2 +
 pan/tasks/nntp.h           |   17 +++--
 pan/tasks/queue.cc         |   11 +++
 pan/tasks/queue.h          |    1 +
 pan/tasks/task-xover.cc    |  178 +++++++++++++++++++++++++++++---------------
 pan/tasks/task-xover.h     |   11 +++-
 pan/tasks/task.h           |   14 ++--
 pan/tasks/xzver-decoder.cc |   30 +++-----
 pan/tasks/xzver-decoder.h  |    4 +-
 11 files changed, 175 insertions(+), 97 deletions(-)
---
diff --git a/pan/gui/Makefile.am b/pan/gui/Makefile.am
index 2c25ffe..e5762fb 100644
--- a/pan/gui/Makefile.am
+++ b/pan/gui/Makefile.am
@@ -103,7 +103,7 @@ endif
 
 pan_SOURCES = gui.cc pan.cc $(WINRC)
 pan_LDADD = ./libpangui.a $(WINRCOBJ) ../data-impl/libpandata.a ../tasks/libtasks.a ../data/libdata.a ../usenet-utils/libusenetutils.a ../general/libgeneralutils.a ../../uulib/libuu.a \
-		@GTKSPELL_LIBS@ @ENCHANT_LIBS@ @GTK_LIBS@ @GMIME_LIBS@ @GLIB_LIBS@ @GNUTLS_LIBS@ @LIBNOTIFY_LIBS@ @LIBGNOME_KEYRING_1_LIBS@ @ZLIB_LIBS@ 
+		@GTKSPELL_LIBS@ @ENCHANT_LIBS@ @GTK_LIBS@ @GMIME_LIBS@ @GLIB_LIBS@ @GNUTLS_LIBS@ @LIBNOTIFY_LIBS@ @LIBGNOME_KEYRING_1_LIBS@ @ZLIB_LIBS@
 if HAVE_WIN32
 pan_LDFLAGS = -mwindows
 endif
diff --git a/pan/tasks/Makefile.am b/pan/tasks/Makefile.am
index f7220b3..edf1e1b 100644
--- a/pan/tasks/Makefile.am
+++ b/pan/tasks/Makefile.am
@@ -6,6 +6,7 @@ noinst_LIBRARIES = libtasks.a
 
 libtasks_a_SOURCES = \
   decoder.cc \
+  xzver-decoder.cc \
   encoder.cc \
   task.cc \
   task-article.cc \
@@ -29,6 +30,7 @@ noinst_HEADERS = \
   adaptable-set.cc \
   adaptable-set.h \
   decoder.h \
+  xzver-decoder.h \
   encoder.h \
   defgroup.h \
   health.h \
diff --git a/pan/tasks/decoder.h b/pan/tasks/decoder.h
index b62fff7..4f5649c 100644
--- a/pan/tasks/decoder.h
+++ b/pan/tasks/decoder.h
@@ -76,6 +76,8 @@ namespace pan
 
     private:
 
+      friend class XZVERDecoder;
+
       TaskArticle * task;
       std::string save_path;
       strings_t input_files;
diff --git a/pan/tasks/nntp.h b/pan/tasks/nntp.h
index fdfd0ca..178d376 100644
--- a/pan/tasks/nntp.h
+++ b/pan/tasks/nntp.h
@@ -194,6 +194,12 @@ namespace pan
                              uint64_t             low,
                              uint64_t             high,
                              Listener           * l);
+
+      void xzver            (const Quark        & group,
+                             uint64_t             low,
+                             uint64_t             high,
+                             Listener           * l);
+
       /**
        * Executes an XOVER command: "XOVER" to count
        * the xover numbers internally
@@ -207,13 +213,6 @@ namespace pan
       void xover_count_only (const Quark        & group,
                              Listener           * l);
 
-
-      /** Experimental XZVER header compression support */
-      void  xzver           (const Quark   & group,
-                             uint64_t        low,
-                             uint64_t        high,
-                             Listener      * l) ;
-
       /**
        * Executes a LIST command: "LIST"
        *
@@ -314,6 +313,10 @@ namespace pan
       Quark _request_group;
       Socket * _socket;
       bool _socket_error;
+      const std::string& get_username()
+      {
+        return _username;
+      }
 
     protected:
 
diff --git a/pan/tasks/queue.cc b/pan/tasks/queue.cc
index dede377..3667fc5 100644
--- a/pan/tasks/queue.cc
+++ b/pan/tasks/queue.cc
@@ -45,6 +45,7 @@ Queue :: Queue (ServerInfo         & server_info,
   _socket_creator (socket_creator),
   _worker_pool (pool),
   _decoder (pool),
+  _xzver_decoder (pool),
   _encoder (pool),
   _decoder_task (0),
   _encoder_task (0),
@@ -231,6 +232,16 @@ Queue :: give_task_a_decoder (Task * task)
 }
 
 void
+Queue :: give_task_a_xzver_decoder (Task * task)
+{
+  const bool was_active (task_is_active (task));
+  _decoder_task = task;
+  if (!was_active)
+    fire_task_active_changed (task, true);
+  task->give_decoder (this, static_cast<Decoder*>(&_xzver_decoder)); // it's active now...
+}
+
+void
 Queue :: give_task_a_encoder (Task * task)
 {
   const bool was_active (task_is_active (task));
diff --git a/pan/tasks/queue.h b/pan/tasks/queue.h
index 59479d6..d9d997a 100644
--- a/pan/tasks/queue.h
+++ b/pan/tasks/queue.h
@@ -215,6 +215,7 @@ namespace pan
       SocketCreator * _socket_creator;
       WorkerPool & _worker_pool;
       Decoder _decoder;
+      XZVERDecoder _xzver_decoder;
       Encoder _encoder;
       Task * _decoder_task;
       Task * _encoder_task;
diff --git a/pan/tasks/task-xover.cc b/pan/tasks/task-xover.cc
index 433846b..18403ed 100644
--- a/pan/tasks/task-xover.cc
+++ b/pan/tasks/task-xover.cc
@@ -30,6 +30,7 @@ extern "C" {
 }
 #include <fstream>
 #include <iostream>
+#include <pan/general/log.h>
 #include <pan/general/debug.h>
 #include <pan/general/file-util.h>
 #include <pan/general/macros.h>
@@ -112,13 +113,12 @@ TaskXOver :: TaskXOver (Data         & data,
   _articles_so_far (0ul),
   _total_minitasks (0),
   _running_minitasks (0),
-  _xzver (_xzver_support),
+  _xzver (false),//_xzver_support),
   _decoder(0),
-  _decoder_has_run (false)
+  _decoder_has_run (false),
+  _extract_done(false)
 {
 
-
-
   if (_xzver)
   {
     char buf[4096];
@@ -153,6 +153,8 @@ TaskXOver :: ~TaskXOver ()
       _data.set_xover_high (_group, it->first, it->second);
     _data.xover_unref (_group);
   }
+  if (_decoder)
+    _decoder->cancel_silently();
 }
 
 void
@@ -243,31 +245,36 @@ TaskXOver :: on_nntp_group (NNTP          * nntp,
 
   if (l <= high)
   {
-    //std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
-    add_steps (_xzver ? 2 : 1*(h-l));
-    int INCREMENT = 1000;
+    std::cerr << LINE_ID << " okay, I'll try to get articles in [" << l << "..." << h << ']' << std::endl;
+
+    add_steps (h-l);
+
     MiniTasks_t& minitasks (_server_to_minitasks[servername]);
     if (_xzver)
     {
       MiniTask mt (MiniTask::XOVER, l, h);
+      minitasks.push_front (mt);
+      debug ("adding xzver MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
       ++_total_minitasks;
     }
     else
     {
-        for (uint64_t m=l; m<=h; m+=INCREMENT) {
+      const int INCREMENT(1000);
+      MiniTasks_t& minitasks (_server_to_minitasks[servername]);
+      for (uint64_t m=l; m<=h; m+=INCREMENT) {
         MiniTask mt (MiniTask::XOVER, m, m+INCREMENT);
         debug ("adding MiniTask for " << servername << ": xover [" << mt._low << '-' << mt._high << "]");
         minitasks.push_front (mt);
         ++_total_minitasks;
       }
     }
-    _running_minitasks = _total_minitasks;
   }
   else
   {
     //std::cerr << LINE_ID << " nothing new here..." << std::endl;
     _high[nntp->_server] = high;
   }
+  _running_minitasks = _total_minitasks;
 }
 
 namespace
@@ -311,16 +318,17 @@ void
 TaskXOver :: on_nntp_line         (NNTP               * nntp,
                                    const StringView   & line)
 {
-
     if (_xzver ) {
-        if (line.strstr("=ybegin line=128"))
-          _headers << line.str << " name=xzver_decoded\n";
-        else
-          _headers << line.str <<"\n";
+      if (line.strstr("=ybegin line=128"))
+        _headers << line.str << " name=xzver_decoded\n";
+      else
+        _headers << line.str <<"\n";
+      increment_step(1);
     }
     else
+    {
       on_nntp_line_process (nntp, line);
-    increment_step(1);
+    }
 }
 
 void
@@ -389,6 +397,8 @@ TaskXOver :: on_nntp_line_process (NNTP               * nntp,
   uint64_t& h (_high[nntp->_server]);
   h = std::max (h, number);
 
+//  std::cerr<<h<<" "<<number<<"\n";
+
   const char * fallback_charset = NULL; // FIXME
 
   // are we done?
@@ -411,7 +421,9 @@ TaskXOver :: on_nntp_line_process (NNTP               * nntp,
 
   // emit a status update
   uint64_t& prev = _last_xover_number[nntp];
+
   increment_step (number - prev);
+//  std::cerr<<"stepping "<<number<<" "<<prev<<" "<<get_steps()<<" "<<get_step()<<" "<<get_progress_of_100()<<"\n";
   prev = number;
   if (!(_parts_so_far % 500))
     set_status_va (_("%s (%lu parts, %lu articles)"), _short_group_name.c_str(), _parts_so_far, _articles_so_far);
@@ -425,8 +437,8 @@ TaskXOver :: on_nntp_done (NNTP              * nntp,
                            Health              health,
                            const StringView  & response UNUSED)
 {
-  if (_running_minitasks == 0) process_headers(nntp);
-
+  if (_running_minitasks == 0) //process_headers(nntp);
+    _backup_nntp = new NNTP(nntp->_server, nntp->get_username(), "", 0);
   update_work (true);
   check_in (nntp, health);
 }
@@ -444,35 +456,53 @@ TaskXOver :: update_work (bool subtract_one_from_nntp_count)
     if (!it->second.empty())
       servers.insert (it->first);
 
-  //std::cerr << LINE_ID << " servers: " << servers.size() << " nntp: " << nntp_count << std::endl;
-
-//  if (!servers.empty())
-//    _state.set_need_nntp (servers);
-//  else if (nntp_count)
-//    _state.set_working ();
-//  else {
-//    _state.set_completed();
-//    set_finished(OK);
-//  }
   if (!servers.empty())
     _state.set_need_nntp (servers);
   else if (nntp_count)
     _state.set_working ();
-  else if (!_decoder && !_decoder_has_run) {
+  else if (!_decoder && !_decoder_has_run && _xzver) {
+    _headers.close();
     _state.set_need_xzverdecoder ();
     set_step(0);
-  } else if (_decoder_has_run) {
+  } else if (_decoder_has_run && !_extract_done && _xzver) {
+    _state.set_working();
+    process_headers(_backup_nntp);
+  } else if (_decoder_has_run && _extract_done && _xzver) {
+    _state.set_completed();
+    set_finished (OK);
+  } else if (!_xzver)
+  {
     _state.set_completed();
     set_finished (OK);
   } else assert(0 && "hm, missed a state.");
 }
 
-namespace
+void
+TaskXOver:: use_decoder (Decoder* decoder)
 {
+  if (_state._work != NEED_XZVER_DECODER)
+    check_in (decoder);
 
+  _decoder = static_cast<XZVERDecoder*>(decoder);
+  init_steps(100);
+  _state.set_working();
+  _decoder->enqueue (this);
+  set_status_va (_("Decoding XZVER Headers"));
+  debug ("decoder thread was free, enqueued work");
+}
+
+void
+TaskXOver :: stop ()
+{
+  if (_decoder)
+      _decoder->cancel();
+}
+
+namespace
+{
   #define CHUNK 16384
 
-  int inf(FILE *source, FILE *dest)
+  int inflate_xzver(FILE *source, FILE *dest)
   {
     int ret;
     unsigned have;
@@ -532,43 +562,17 @@ namespace
 }
 
 void
-TaskXOver:: use_decoder (Decoder* decoder)
-{
-  if (_state._work != NEED_DECODER)
-    check_in (decoder);
-
-  _decoder = static_cast<XZVERDecoder*>(decoder);
-  init_steps(100);
-  _state.set_working();
-  _decoder->enqueue (this);
-  set_status_va (_("Decoding XZVER Headers"));
-  debug ("decoder thread was free, enqueued work");
-}
-
-void
-TaskXOver :: stop ()
-{
-  if (_decoder)
-      _decoder->cancel();
-}
-
-void
- TaskXOver :: process_headers (NNTP* nntp)
+TaskXOver :: process_headers (NNTP* nntp)
 {
   char buf[4096];
 
-  _headers.close();
-  /* yenc-decode */
-  UUInitialize ();
-  UULoadFile (build_cachename(buf,sizeof(buf), "xzver_test"), 0, 1);
-  std::cerr<<"uudecode "<<UUstrerror(UUDecodeFile (UUGetFileListItem (0), build_cachename(buf,sizeof(buf), "xzver_decoded")))<<"\n";
-  UUCleanUp ();
+  build_cachename(buf,sizeof(buf), "xzver_decoded");
 
   /* raw zlib inflate */
   FILE * in = fopen (buf, "rb");
   FILE * out = fopen (build_cachename(buf,sizeof(buf), "xzver_out"), "wb");
   int res(Z_OK);
-  if (in && out) res = inf (in,out);
+  if (in && out) res = inflate_xzver (in,out);
 
   /* feed to on_nntp_line */
   if (in) fclose(in);
@@ -579,9 +583,11 @@ void
     char buf[4096];
     while (f.getline(buf,sizeof(buf))) on_nntp_line_process(nntp,StringView(buf));
   }
-
+  _extract_done = true;
+  update_work();
 }
 
+
 unsigned long
 TaskXOver :: get_bytes_remaining () const
 {
@@ -595,3 +601,53 @@ TaskXOver :: get_bytes_remaining () const
   const unsigned long total_bytes = (unsigned long)(_bytes_so_far / percent_done);
   return total_bytes - _bytes_so_far;
 }
+
+
+void
+TaskXOver :: on_worker_done (bool cancelled)
+{
+  assert(_decoder);
+  if (!_decoder) return;
+
+  std::cerr<<"worker done\n";
+
+  if (!cancelled)
+  {
+    // the decoder is done... catch up on all housekeeping
+    // now that we're back in the main thread.
+
+    foreach_const(Decoder::log_t, _decoder->log_severe, it)
+    {
+      Log :: add_err(it->c_str());
+      verbose (it->c_str());
+    }
+    foreach_const(Decoder::log_t, _decoder->log_errors, it)
+    {
+      Log :: add_err(it->c_str());
+      verbose (it->c_str());
+    }
+    foreach_const(Decoder::log_t, _decoder->log_infos, it)
+    {
+      Log :: add_info(it->c_str());
+      verbose (it->c_str());
+    }
+
+    if (!_decoder->log_errors.empty())
+      set_error (_decoder->log_errors.front());
+
+    _state.set_health(_decoder->health);
+
+    if (!_decoder->log_severe.empty())
+      _state.set_health (ERR_LOCAL);
+    else {
+      _state.set_completed();
+      set_step (100);
+      _decoder_has_run = true;
+    }
+  }
+
+  Decoder * d (_decoder);
+  _decoder = 0;
+  update_work ();
+  check_in (d);
+}
diff --git a/pan/tasks/task-xover.h b/pan/tasks/task-xover.h
index c5ba3a8..8f8051e 100644
--- a/pan/tasks/task-xover.h
+++ b/pan/tasks/task-xover.h
@@ -27,6 +27,7 @@
 #include <pan/data/data.h>
 #include <pan/tasks/task.h>
 #include <pan/tasks/nntp.h>
+#include <pan/general/worker-pool.h>
 #include <fstream>
 #include <iostream>
 
@@ -39,7 +40,10 @@ namespace pan
    * Task for downloading a some or all of a newsgroups' headers
    * @ingroup tasks
    */
-  class TaskXOver: public Task, private NNTP::Listener
+  class TaskXOver: public Task,
+                    private WorkerPool::Worker::Listener,
+                    private NNTP::Listener
+
   {
     public: // life cycle
       enum Mode { ALL, NEW, SAMPLE, DAYS };
@@ -60,6 +64,9 @@ namespace pan
       virtual void on_nntp_done (NNTP*, Health, const StringView&);
       virtual void on_nntp_group (NNTP*, const Quark&, unsigned long, uint64_t, uint64_t);
 
+    private: // WorkerPool::Listener interface
+      void on_worker_done (bool cancelled);
+
     private: // implementation - minitasks
       struct MiniTask {
         enum Type { GROUP, XOVER };
@@ -94,10 +101,12 @@ namespace pan
       int           _running_minitasks;
       bool _xzver;
       std::ofstream _headers;
+      NNTP * _backup_nntp;
 
       friend class XZVERDecoder;
       XZVERDecoder * _decoder;
       bool _decoder_has_run;
+      bool _extract_done;
   };
 }
 
diff --git a/pan/tasks/task.h b/pan/tasks/task.h
index aaabacf..8acb77b 100644
--- a/pan/tasks/task.h
+++ b/pan/tasks/task.h
@@ -50,17 +50,17 @@ namespace pan
          enum Work
          {
             /** Task finished successfully */
-            COMPLETED,
+            COMPLETED = 0,
             /** Task is waiting on an nntp connection */
-            NEED_NNTP,
+            NEED_NNTP = 1,
             /** Task waiting for a decoder/encoder */
-            NEED_DECODER,
-            NEED_ENCODER,
-            NEED_XZVER_DECODER,
+            NEED_DECODER = 2,
+            NEED_ENCODER = 3,
+            NEED_XZVER_DECODER = 4,
             /** Task is running */
-            WORKING,
+            WORKING = 5,
             /** Task is paused, woken up if 'current_connections < max_connections' */
-            PAUSED
+            PAUSED = 6
          };
 
          /**
diff --git a/pan/tasks/xzver-decoder.cc b/pan/tasks/xzver-decoder.cc
index 21567d6..d2be871 100644
--- a/pan/tasks/xzver-decoder.cc
+++ b/pan/tasks/xzver-decoder.cc
@@ -43,6 +43,18 @@ using namespace pan;
 ****
 ***/
 
+using namespace pan;
+
+XZVERDecoder :: XZVERDecoder (WorkerPool& pool) :
+  Decoder(pool)
+{
+}
+
+XZVERDecoder :: ~XZVERDecoder()
+{
+  disable_progress_update();
+}
+
 void
 XZVERDecoder :: enqueue (TaskXOver          * task)
 {
@@ -80,8 +92,6 @@ XZVERDecoder :: do_work()
 
   enable_progress_update();
 
-  char buf[4096];
-
   int res;
   if (((res = UUInitialize())) != UURET_OK)
     log_errors.push_back(_("Error initializing uulib")); // log error
@@ -96,22 +106,6 @@ XZVERDecoder :: do_work()
     UUCleanUp ();
   }
 
-  /* raw zlib inflate */
-//  FILE * in = fopen (buf, "rb");
-//  FILE * out = fopen (build_cachename(buf,sizeof(buf), "xzver_out"), "wb");
-//  int res(Z_OK);
-//  if (in && out) res = inf (in,out);
-//
-//  /* feed to on_nntp_line */
-//  if (in) fclose(in);
-//  if (out) fclose(out);
-//  if (res==Z_OK)
-//  {
-//    std::ifstream f(buf, std::ios::in);
-//    char buf[4096];
-//    while (f.getline(buf,sizeof(buf))) on_nntp_line_process(nntp,StringView(buf));
-//  }
-
   disable_progress_update();
 }
 
diff --git a/pan/tasks/xzver-decoder.h b/pan/tasks/xzver-decoder.h
index f4b33ca..838d9cb 100644
--- a/pan/tasks/xzver-decoder.h
+++ b/pan/tasks/xzver-decoder.h
@@ -90,8 +90,8 @@ namespace pan
 //      std::string current_file; // the current file we are decoding, with path
 //      volatile int num_scanned_files;
 //
-//      static void uu_log(void *thiz, char *message, int severity);
-//      double get_percentage (const uuprogress& p) const;
+      static void uu_log(void *thiz, char *message, int severity);
+      double get_percentage (const uuprogress& p) const;
 //      static int uu_busy_poll(void * self, uuprogress *p);
 //      /** tell our task about the decode's progress */
 //      static gboolean progress_update_timer_func(gpointer decoder);



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]