Problems with GNet and GIOChannels...



 Folks,

   I am working on a multithreaded process for monitoring realtime
incoming market data.  The process connects to a dedicated hardware
receiver via a TCP socket and a UDP socket.  It also connects to a
second process via another TCP socket to forward the data after
"massaging" it into Intel native format.  I am developing to glib
2.4.4 and gnet 2.0.4 (2.0.5 does not install properly for me).

   My problem is that the TCP socket between the multithreaded
monitoring process and the single-threaded client process keeps
choking.  I have code in place which is supposed to recognize
this and close out the socket, then reconnect, but it does not
work.  I'm trying to figure out what I'm doing wrong.

   The process starts up the following threads:

1) The main thread
  - Does nothing except poll the other threads to verify that they
   are still running.

2) The communications thread
  - Maintains callbacks that monitors server sockets waiting for any
   connection requests.
  - Central distribution point for inter-thread communications.
  - Central distribution point for inter-process communications.

3) The "Engine" thread
  - Processes incoming data into Intel format

4) The FeedReceive thread
  - Monitors a UDP socket created using the GNet GUdpSocket API for
   incoming data from the hardware data receiver.  Any data received
   is forwarded to the Engine thread.
  - No problems encountered with this thread so far.  It receives its
   data packets reliably and so far has never choked.

5) The FeedCommand thread
  - Maintains a TCP socket created using the GNet GTcpSocket API that
   connects to the hardware data receiver.  Commands/requests are sent
   via this socket, and explicit reply data is read from it.

   Each thread maintains an asynchronous message queue for receiving
messages from other threads.  The main loop for each thread consists
of a loop like this (note that I have omitted a lot of debugging/log
code so that this post isn't even longer):

  -----
while( 1 ) {
  maxiter = 10;
  while( maxiter && (Msg = g_async_queue_try_pop( MyQueue )) ) {
    /* Handle the message if there is one */
    maxiter--;
  }
  
  maxiter = 10;
  if( g_main_context_pending( MyContext ) && maxiter ) {
    /* Deal with callbacks, etc. */
    g_main_context_iteration( MyContext, FALSE );
    maxiter--;
  }
  
  g_usleep( 1000 );  /* Pause one millisecond, then loop */
}
  -----

   The server socket is initialized in the communications thread so:

  -----
   /* Create socket to forward data */
  LocalAddr = gnet_inetaddr_new( "localhost", PORT );
  SendServerSocket = gnet_tcp_socket_server_new_full( LocalAddr, PORT );
  g_assert( SendServerSocket );
  
  /* Set up server watch */
  SendServerSocketIOC = gnet_tcp_socket_get_io_channel( SendServerSocket
);
  g_assert( SendServerSocketIOC );
  
  source = g_io_create_watch( SendServerSocketIOC, G_IO_IN );
  g_source_set_callback( source, (GSourceFunc)(AcceptClientFunc),
                        (gpointer)SendServerSocket, NULL );
  g_source_attach( source, MyContext );
  g_source_unref( source );
  -----
  
   Here is the code from AcceptClientFunc that accepts the socket
connections and sets the I/O watch:

  -----
  SendSocket = gnet_tcp_socket_server_accept( SendServerSocket );
  g_assert( SendSocket );
  
  SendSocketIOC = gnet_tcp_socket_get_io_channel( SendSocket );
  g_assert( SendSocketIOC );
  
  /* Force encoding to NULL to permit binary data */
  g_io_channel_set_encoding( SendSocketIOC, NULL, NULL );
  
  source = g_io_create_watch( SendSocketIOC, G_IO_OUT );
  g_source_set_callback( source, (GSourceFunc)Foward_Data_Packets,
    (gpointer)OutputQueue, NULL );
  g_source_attach( source, MyContext );
  g_source_unref( source );
  -----

   Now then, the OutputQueue is a glib double-ended data queue which
is used to line up "translated" data packets before writing them out
through the SendSocket in the function Forward_Data_Packets.  Any
data packets read in through the UDP or TCP sockets in the FeedReceive
and FeedCommand threads will be sent through the AMQ to the Engine
thread, which will clean it and push it to the OutputQueue tail.  When
the SendSocket signals that it is writable, the Forward_Data_Packets
callback gets called.  It pops cleaned packets from the head of the
OutputQueue structure and writes them to SendSocketIOC using the
g_io_channel_write_chars function until it either empties the queue
or the socket returns a G_IO_STATUS_AGAIN when the data is written.
If so, it marks where in the output it was stopped and exits, to pick
up again where it left off the next time G_IO_OUT condition appears.

   My problem is that the socket connection SendSocket keeps shutting
down unexpectedly.  The code will work fine for minutes or even hours,
but eventually the Forward_Data_Packets stops being able to write to
the socket and OutputQueue starts filling up with data.  The client
on the other end of the socket eventually closes its end of the socket
and tries to reconnect, but gets a "connection refused" error message
when it does so.  I have branches of logic in the server that are
supposed to get called when the socket conditions G_IO_ERR, G_IO_HUP
or G_IO_NVAL come up, which is supposed to close the SendSocket and
re-register the callback that accepts socket connections, but it
seems like it never gets called.  The problem sometimes occurs during
periods of low traffic, but it also has happened during high traffic
periods.  I've got debugging/logging output all over the place, but
not much of it seems to be getting fired off.  I don't have a lot of
info to go on.

   So, the questions:

1) What, SPECIFICALLY, will happen when the client end of the socket
closes its connection?  HOW is the server supposed to know when this
has happened?  I know what happens with plain sockets, but how is it
"manifested" in the giochannels?

2) Are there any problems with having multiple socket connections to
the same server IP/Port simultaneously?

3) Any suggestions on how I might set up my error checking code to
give me more specific status output?  Is there any sort of debugging
level I can set up to get more information out of glib/gnet?

   Most example code I see for handling sockets simply catches errors
and closes the sockets, then returns FALSE from the callbacks to kill
them.  This is unacceptable for my purposes.  The socket MUST stay
open 24x365.  If the socket gets disconnected, it must immediately do
what has to be done to reconnect.  As long as both processes are up,
they must be connected, PERIOD.  The only thing that should ever put
this connection down is actual hardware failure.  I figure if I can
get both the server and the client to quickly and accurately identify
when there has been a break in the connection, then I can get them to
reconnect within a few seconds at most and not have any major break
in service.  Is this *possible* using nothing but gnet and glib, or
must I write custom sockets code myself to accomplish it?

   If can post more detailed code samples if necessary, but this note
is already twice as long as I intended.  Happy to respond to any follow
up questions!

thanks,
Jim




[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]