[pan2] fix gzip giganews feature



commit a66286bff1f1c20f5da5c08b0d91db591fe21b1b
Author: Heinrich MÃller <henmull src gnome org>
Date:   Wed Jan 9 19:31:57 2013 +0100

    fix gzip giganews feature

 pan/general/compression.cc       |   80 +++++++++++++++++++++-----------------
 pan/general/locking.h            |    6 +-
 pan/tasks/nntp-pool.cc           |    4 +-
 pan/tasks/nntp.cc                |   35 ++++++----------
 pan/tasks/nntp.h                 |   14 +++---
 pan/tasks/socket-impl-gio.cc     |    6 ++-
 pan/tasks/socket-impl-openssl.cc |    4 +-
 pan/tasks/task-groups.cc         |    7 ++-
 pan/tasks/task-xover.cc          |   52 ++++++++----------------
 9 files changed, 97 insertions(+), 111 deletions(-)
---
diff --git a/pan/general/compression.cc b/pan/general/compression.cc
index 821c7d3..90a2287 100644
--- a/pan/general/compression.cc
+++ b/pan/general/compression.cc
@@ -185,7 +185,7 @@ compression::ydecode(std::stringstream* in, std::stringstream* out)
   char buf1[512], buf2[512], c, *p, *p2 = buf2;
   //unsigned int crc1 = 0, crc = _crc32(NULL, 0, 0);
 
-  while (!in->getline(buf1, sizeof(buf1)).eof())
+  while (!in->getline(buf1, sizeof(buf1)).bad())
     {
       if (gotbeg == 0 && strncmp(buf1, "=ybegin ", 8) == 0)
         {
@@ -247,7 +247,7 @@ int
 compression::inflate_zlib(std::stringstream *source, std::stringstream *dest,
     const CompressionType& compression)
 {
-  int ret;
+  int ret = Z_DATA_ERROR;
   size_t have;
   z_stream strm;
   char in[MEMCHUNK];
@@ -264,7 +264,7 @@ compression::inflate_zlib(std::stringstream *source, std::stringstream *dest,
     ret = inflateInit2(&strm, -MAX_WBITS); /* use -MAX_WBITS to indicate gzip style */
 
   if (compression == HEADER_COMPRESS_XFEATURE
-      || compression == HEADER_COMPRESS_DIABLO)
+     || compression == HEADER_COMPRESS_DIABLO)
     ret = inflateInit(&strm);
 
   if (ret != Z_OK)
@@ -273,36 +273,37 @@ compression::inflate_zlib(std::stringstream *source, std::stringstream *dest,
   /* decompress until deflate stream ends or end of file */
   do
     {
-      strm.avail_in = source->read(in, MEMCHUNK).gcount();
-      if (source->bad())
-        {
-          (void) inflateEnd(&strm);
-          return Z_ERRNO;
-        }
+      strm.avail_in = source->readsome(in, MEMCHUNK);
+      if (strm.avail_in < 0) strm.avail_in = 0;
+      if (source->fail())
+      {
+        (void) inflateEnd(&strm);
+        return Z_ERRNO;
+      }
       if (strm.avail_in == 0)
         break;
       strm.next_in = (unsigned char*) in;
 
       /* run inflate() on input until output buffer not full */
       do
+      {
+        strm.avail_out = MEMCHUNK;
+        strm.next_out = (unsigned char*) out;
+        ret = inflate(&strm, Z_NO_FLUSH);
+        assert(ret != Z_STREAM_ERROR);
+        /* state not clobbered */
+        switch (ret)
         {
-          strm.avail_out = MEMCHUNK;
-          strm.next_out = (unsigned char*) out;
-          ret = inflate(&strm, Z_NO_FLUSH);
-          assert(ret != Z_STREAM_ERROR);
-          /* state not clobbered */
-          switch (ret)
-            {
           case Z_NEED_DICT:
             ret = Z_DATA_ERROR; /* and fall through */
           case Z_DATA_ERROR:
           case Z_MEM_ERROR:
             (void) inflateEnd(&strm);
             return ret;
-            }
-          have = MEMCHUNK - strm.avail_out;
-          dest->write(out, have);
         }
+        have = MEMCHUNK - strm.avail_out;
+        dest->write(out, have);
+      }
       while (strm.avail_out == 0);
 
       /* done when inflate() says it's done */
@@ -321,28 +322,35 @@ void compression::inflate_gzip (std::stringstream* stream, std::vector<std::stri
   char* buf;
   char buf2[4096];
 
-  std::stringstream dest, dest2;
-  //line_init(&g_line, stream);
+  line_init(&g_line, stream);
 
-  /*
+  std::stringstream dest, dest2;
   while (!g_line.eof) {
-    while ((len = line_read(&g_line, &buf)) > 0) {
-        buf[len-1] = '\n';
-        dest.write(buf, len);
-        if (len >= 3 && strncmp(buf + len - 3, ".\r\n", 3) == 0) {
-            g_line.eof = 1;
-            break;
-        }
-    }
-  }*/
+      while ((len = line_read(&g_line, &buf)) > 0) {
+          buf[len-1] = '\n';
+          dest.write(buf, len);
+          if (len >= 3 && strncmp(buf + len - 1, ".", 1) == 0) {
+              g_line.eof = 1;
+              break;
+          }
+      }
+  }
 
-  std::ofstream out ("/home/imhotep/compression/out");
-  out << stream->str();
-  out.close();
+  std::cerr<<"inflate : "<<inflate_zlib(&dest, &dest2, HEADER_COMPRESS_XFEATURE)<<"\n\n";
 
-  std::cerr<<"inflate : "<<inflate_zlib(stream, &dest2, HEADER_COMPRESS_XFEATURE)<<"\n\n";
+  std::ofstream out ("/home/imhotep/compression/out");
+    out << dest2.str();
+    out.close();
 
+  int cnt=0;
   while (!dest2.getline(buf2,4096).eof())
-    fillme.push_back(std::string(buf2));
+    {if (buf2) fillme.push_back(std::string(buf2));
+    ++cnt;}
+
+  stream->clear();
+
+  std::cerr<<cnt<<"\n";
+
+
 
 }
diff --git a/pan/general/locking.h b/pan/general/locking.h
index c41f9df..2ee0e8c 100644
--- a/pan/general/locking.h
+++ b/pan/general/locking.h
@@ -36,7 +36,7 @@ namespace pan
   class Mutex
   {
     private:
-      GMutex mutex;
+      static GMutex mutex;
       GMutex * m;
 
     public:
@@ -44,7 +44,7 @@ namespace pan
       /** Create a new mutex */
       Mutex()
       {
-#if !GLIB_CHECK_VERSION(2,34,1)
+#if !GLIB_CHECK_VERSION(2,34,2)
         m = g_mutex_new();
 #else
         g_mutex_init(&mutex);
@@ -55,7 +55,7 @@ namespace pan
       /** Destroy the mutex */
       virtual ~Mutex()
       {
-#if !GLIB_CHECK_VERSION(2,34,1)
+#if !GLIB_CHECK_VERSION(2,34,2)
         g_mutex_free(m);
 #endif
       }
diff --git a/pan/tasks/nntp-pool.cc b/pan/tasks/nntp-pool.cc
index a59bfd1..e8b60d2 100644
--- a/pan/tasks/nntp-pool.cc
+++ b/pan/tasks/nntp-pool.cc
@@ -193,11 +193,11 @@ NNTP_Pool :: on_socket_created (const StringView  & host,
     {
       std::string pw (pass ? pass : "");
       if (pass) g_free(pass);
-      nntp = new NNTP (_server, user, pw, _meter, socket);
+      nntp = new NNTP (_server, user, pw, _meter, _server_info, socket);
     }
     else
     {
-      nntp = new NNTP ( _server, user, pass, _meter, socket);
+      nntp = new NNTP ( _server, user, pass, _meter, _server_info, socket);
     }
     nntp->handshake (this);
   }
diff --git a/pan/tasks/nntp.cc b/pan/tasks/nntp.cc
index 604cfbe..fce8106 100644
--- a/pan/tasks/nntp.cc
+++ b/pan/tasks/nntp.cc
@@ -98,20 +98,20 @@ NNTP :: on_socket_response (Socket * sock UNUSED, const StringView& line_in)
 
          assert (_listener != 0);
          if (_listener)
-            _listener->on_nntp_line (this, line);
+            _listener->on_nntp_line (this, line_in);
       }
 
       if (_compression)
       {
-        state = CMD_DONE;
+        state = CMD_MORE;
         assert (_listener != 0);
         if (_listener)
           _listener->on_nntp_line (this, line_in);
-        if (line_in.len >= 3 && line.str[line.len-1] == '.')
+        if (line_in.len >= 3 && strncmp(line_in.str + line_in.len - 3, ".\r\n", 3) == 0)
         {
           _compression = false;
           _nntp_response_text = false;
-          line = "COMPRESS_DONE";
+          line = EOL;
           state = CMD_DONE;
         }
       }
@@ -163,9 +163,15 @@ NNTP :: on_socket_response (Socket * sock UNUSED, const StringView& line_in)
         }
 
         case AUTH_ACCEPTED:
-           // try to enable compression xfeature
-           _socket->write_command (ENABLE_COMPRESS_GZIP, this);
-           state = CMD_NEXT;
+          CompressionType ctype;
+           _server_info.get_server_compression_type(_server, ctype);
+           if (ctype == HEADER_COMPRESS_XFEATURE)
+           {
+             // try to enable compression xfeature
+             _socket->write_command (ENABLE_COMPRESS_GZIP, this);
+             state = CMD_NEXT;
+           } else
+             state = CMD_DONE;
            break;
 
         case FEATURE_ENABLED:
@@ -368,21 +374,6 @@ NNTP :: xzver (const Quark   & group,
    write_next_command ();
 }
 
-void
-NNTP :: xfeat (const Quark   & group,
-               uint64_t        low,
-               uint64_t        high,
-               Listener      * l)
-{
-   _listener = l;
-
-   write_next_command();
-   _commands.push_back ("XFEATURE COMPRESS GZIP");
-   write_next_command();
-   xover (group, low, high, l);
-
-}
-
 //TODO
 void
 NNTP :: xover_count_only (const Quark   & group,
diff --git a/pan/tasks/nntp.h b/pan/tasks/nntp.h
index e37e7d7..eccab45 100644
--- a/pan/tasks/nntp.h
+++ b/pan/tasks/nntp.h
@@ -76,6 +76,8 @@ namespace
       NO_PERMISSION              = 502,
       FEATURE_NOT_SUPPORTED      = 503
    };
+
+   const char* EOL = ".";
 }
 
 namespace pan
@@ -151,9 +153,11 @@ namespace pan
         NNTP (const Quark        & server,
               const std::string  & username,
               const std::string  & password,
-              DownloadMeter       & meter,
-              Socket              * socket):
+              DownloadMeter      & meter,
+              ServerInfo         & info,
+              Socket             * socket):
           _server(server),
+          _server_info(info),
           _meter(meter),
           _socket(socket),
           _socket_error(false),
@@ -207,11 +211,6 @@ namespace pan
                              uint64_t             high,
                              Listener           * l);
 
-      void xfeat            (const Quark        & group,
-                             uint64_t             low,
-                             uint64_t             high,
-                             Listener           * l);
-
       /**
        * Executes an XOVER command: "XOVER" to count
        * the xover numbers internally
@@ -321,6 +320,7 @@ namespace pan
     public:
 
       const Quark _server;
+      ServerInfo& _server_info;
       Quark _group;
       Socket * _socket;
       DownloadMeter& _meter;
diff --git a/pan/tasks/socket-impl-gio.cc b/pan/tasks/socket-impl-gio.cc
index b81c0cb..b11e926 100644
--- a/pan/tasks/socket-impl-gio.cc
+++ b/pan/tasks/socket-impl-gio.cc
@@ -334,12 +334,14 @@ GIOChannelSocket :: do_read ()
     if (status == G_IO_STATUS_NORMAL)
     {
       g_string_prepend_len (g, _partial_read.c_str(), _partial_read.size());
+      //TODO validate!
+      _partial_read.clear ();
 
       debug_v ("read [" << g->str << "]"); // verbose debug, if --debug --debug was on the command-line
       increment_xfer_byte_count (g->len);
 
-      if (g_str_has_suffix (g->str, "\r\n"))
-        g_string_truncate (g, g->len-2);
+      //if (g_str_has_suffix (g->str, "\r\n"))
+      //  g_string_truncate (g, g->len-2);
 
       more = _listener->on_socket_response (this, StringView (g->str, g->len));
       _listener->on_socket_bytes_transferred(g->len, this);
diff --git a/pan/tasks/socket-impl-openssl.cc b/pan/tasks/socket-impl-openssl.cc
index 229788e..2b33479 100644
--- a/pan/tasks/socket-impl-openssl.cc
+++ b/pan/tasks/socket-impl-openssl.cc
@@ -623,8 +623,8 @@ GIOChannelSocketGnuTLS :: do_read ()
 
       debug_v ("read [" << g->str << "]");
       increment_xfer_byte_count (g->len);
-      if (g_str_has_suffix (g->str, "\r\n"))
-        g_string_truncate (g, g->len-2);
+      //if (g_str_has_suffix (g->str, "\r\n"))
+      //  g_string_truncate (g, g->len-2);
       more = _listener->on_socket_response (this, StringView (g->str, g->len));
       //_listener->on_socket_bytes_transferred(g->len, this);
     }
diff --git a/pan/tasks/task-groups.cc b/pan/tasks/task-groups.cc
index 0bc8be7..b8765b0 100644
--- a/pan/tasks/task-groups.cc
+++ b/pan/tasks/task-groups.cc
@@ -80,7 +80,7 @@ TaskGroups :: on_nntp_line (NNTP               * nntp,
                             const StringView   & line)
 {
   if (nntp->_compression)
-    stream << line;
+    stream<<line;
   else on_nntp_line_process (nntp, line);
 }
 void
@@ -148,15 +148,16 @@ TaskGroups :: on_nntp_done (NNTP              * nntp,
   }
   else // health is OK or FAIL
   {
-    if (response == "COMPRESS_DONE")
+
+    if (response == EOL)
     {
       std::vector<std::string> lines;
       compression::inflate_gzip (&stream, lines);
       foreach (std::vector<std::string>, lines, it)
         on_nntp_line_process (nntp, *it);
-      std::cerr<<"len "<<stream.str().length()<<"\n";
     }
 
+
     if (_step == LIST_NEWSGROUPS)
     {
       int i (0);
diff --git a/pan/tasks/task-xover.cc b/pan/tasks/task-xover.cc
index 912fa3b..8bd7491 100644
--- a/pan/tasks/task-xover.cc
+++ b/pan/tasks/task-xover.cc
@@ -289,24 +289,6 @@ namespace
   }
 }
 
-/*
- http://tools.ietf.org/html/rfc2980#section-2.8
-
- Each line of output will be formatted with the article number,
- followed by each of the headers in the overview database or the
- article itself (when the data is not available in the overview
- database) for that article separated by a tab character.  The
- sequence of fields must be in this order: subject, author, date,
- message-id, references, byte count, and line count.  Other optional
- fields may follow line count.  Other optional fields may follow line
- count.  These fields are specified by examining the response to the
- LIST OVERVIEW.FMT command.  Where no data exists, a null field must
- be provided (i.e. the output will have two tab characters adjacent to
- each other).  Servers should not output fields for articles that have
- been removed since the XOVER database was created.
-
- */
-
 void
 TaskXOver::on_nntp_line(NNTP * nntp, const StringView & line)
 {
@@ -320,7 +302,7 @@ TaskXOver::on_nntp_line(NNTP * nntp, const StringView & line)
     int sock_id = nntp->_socket->get_id();
     if (_streams.count(sock_id) == 0)
       _streams[sock_id] = new std::stringstream();
-    *_streams[sock_id] << line << "\r\n";
+    *_streams[sock_id] << line;
   }
   else
     on_nntp_line_process(nntp, line);
@@ -453,26 +435,28 @@ TaskXOver::on_nntp_done(NNTP * nntp, Health health, const StringView & response)
   const Quark& servername(nntp->_server);
   CompressionType comp;
   _data.get_server_compression_type(servername, comp);
-  const bool compression_enabled(comp != HEADER_COMPRESS_NONE);
+  const bool compression_enabled(comp == HEADER_COMPRESS_XZVER || comp == HEADER_COMPRESS_DIABLO || comp == HEADER_COMPRESS_XFEATURE);
 
-  if (response == "." && compression_enabled)
+  if (response == EOL && compression_enabled)
+  {
+    std::stringstream* buffer = _streams[nntp->_socket->get_id()];
+    std::stringstream out, out2;
+    if (comp == HEADER_COMPRESS_XZVER || comp == HEADER_COMPRESS_DIABLO )
     {
-      std::stringstream* buffer = _streams[nntp->_socket->get_id()];
-      std::stringstream out, out2;
-      if (comp == HEADER_COMPRESS_XZVER || comp == HEADER_COMPRESS_DIABLO)
-      {
       compression::ydecode(buffer, &out);
       compression::inflate_zlib(&out, &out2, comp);
-      }
-      else
-        compression::inflate_zlib(buffer, &out2, comp);
-
-      char buf1[4096];
-      while (!out2.getline(buf1, sizeof(buf1)).eof())
-        {
-          on_nntp_line_process(nntp, buf1);
-        }
     }
+    else
+    {
+      compression::inflate_zlib(buffer, &out2, comp);
+    }
+
+    buffer->clear();
+
+    char buf1[4096];
+    while (!out2.getline(buf1, sizeof(buf1)).eof())
+      on_nntp_line_process(nntp, buf1);
+  }
   update_work(true);
   check_in(nntp, health);
 }



[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]