Re: [gnet] Problems with GNet and GIOChannels...



On Sat, 2004-08-21 at 11:04, Andrewlanoix aol com wrote:
> I will comment on the underlying problem:
> 
> 1) I always recommend that you use the newest release and try the version in CVS. We you may be running into a bug we already fixed.
> 2) I would recommend that you try GConn as Tim suggested.
> 3) I would recommend that you verify that your ref counting on your objects is correct. To do this, put print statements in glib (giounix.c and perhaps gmain.c) and in the gnet code. I find this is a huge problem. Have them print out the ref count and what function was called. I found this to be very usefully with debugging glib itself.
> 4)  I would recommend that you put break points where you put the print statements, take a break yourself and then come back and debugger should break to right place in glib, which you then can walk through glib, gnet and then your code. 
> 
> To answer your questions:
> 1) G_IO_IN and then a read of 0 bytes, or a G_IO_* error condition.
> 2) No.
> 3) See my suggestions above.
> 
> Andrew
> 

 Andrew,

   Thanks for your feedback on this.  I've implemented a solution based
on GServer/GConn as you and Tim both suggested.  I'm running into some
problems with it that have me more or less stumped.  There's one basic
question that I never even *thought* to ask, but perhaps I should.  All
of the example codes in the GNet distribution deal solely with ASCII
character data.  Can the sockets maintained by GNet handle *BINARY*
data or can they not?  I need to be able to pass any kind of data into
these sockets and have it come back out again at the other end, intact,
without killing my code in the process.

   That said, I have some code snippets to pass along to see if anyone
can spot something obviously wrong.  Here's some structure definitions:

typedef struct _ServerProcess {
        gchar                   Name[32];
        GInetAddr               *IpAddr;
        gint                    Port;
        GServer                 *Server;
        GServerFunc             (*ConnFunc)();
        GThread                 *Thread;
        GAsyncQueue             *Queue;
        GMainContext    *Context;
        GMainLoop               *MainLoop;
        GTree                   *ClientTree;
        GMutex                  *ClientTreeMutex;
        gint                    ref_count;
} ServerProcess;

typedef struct _ClientProcess {
        gchar                   Name[32];
        GInetAddr               *IpAddr;
        gint                    Port;
        guint                   UniqueID;
        gint                    PermOrTemp;
        gint                    IncomingOrOutgoing;
        ServerProcess   *ServerProc;
        GServer                 *Server;
        GConn                   *Connect;
        GConnFunc               (*CallbackFunc)();
        GThread                 *Thread;
        GAsyncQueue     *Queue;
        GMainContext    *Context;
        GMainLoop               *MainLoop;
        GQueue                  *IncomingData;
        GQueue                  *OutgoingData;
        gint                    waitingForMessage;
        gint                    waitingForPayload;
        gint                    waitingForHeader;
        NetworkMessage  *IncomingMessage;
        gint                    ref_count;
} ClientProcess;

#define SOHByteVal 0x2B;
#define EOHByteVal 0x5F;

typedef struct _NetworkMessageHeader {
        guchar                  SOHByte;
        guchar                  HeaderSizeByte;
        NetworkMsgType  MsgType;
        gsize                   MsgSize;
        guint                   UniqueID;
        guchar                  EOHByte1;
        guchar                  EOHByte2;
} NetworkMessageHeader;

typedef struct _NetworkMessage {
        guchar                  SOHByte;
        guchar                  HeaderSizeByte;
        NetworkMsgType  MsgType;
        gsize                   MsgSize;
        guint                   UniqueID;
        guchar                  EOHByte1;
        guchar                  EOHByte2;
        gpointer                MsgData;
} NetworkMessage;

   And some of the basic functions involved.  I've trimmed out
some debugging code and haven't bothered showing the functions
that handle the data once it's received as they have no effect
on the sockets portion of the code.  The data that goes into
the OutgoingData queues is generated in other threads and gets
to the ClientProcess object's queue through a GAsyncQueue that
is managed by a central communications thread.  Haven't had a
reason to push data into the "Incoming" queue yet.  I apologize
for the length of the code, but if it's any consolidation, if
you like it you're free to use it. ;^)

ServerProcess *create_server_process( gchar *Name )
{
  ServerProcess *newServer = NULL;
  
  newServer = g_new0( ServerProcess, 1 );
  
  strncpy( newServer->Name, Name, sizeof(newServer->Name) );
  
  newServer->Thread = CommThread;
  newServer->Queue = CommQueue;
  newServer->Context = CommContext;
  newServer->MainLoop = CommMainLoop;
  newServer->ClientTree = ClientProcessTree;
  newServer->ClientTreeMutex = ClientProcessTreeMutex;
  newServer->ref_count = 1;
  	
  get_my_address_info( newServer->Name, &(newServer->IpAddr), 
    &(newServer->Port) );
  g_assert( newServer->IpAddr != NULL );
  
  newServer->ConnFunc = accept_client_connection;
  
  newServer->Server = gnet_server_new( newServer->IpAddr, 
    newServer->Port, (gpointer)(newServer->ConnFunc), 
    newServer );
  
  return(newServer);
}

void accept_client_connection( GServer *theServer, GConn *theConn,
gpointer servproc )
{
  gint InitialReadSize;
  ServerProcess *theServerProcess;
  ClientProcess *newClientProcess;
  
  theServerProcess = (ServerProcess *)servproc;
  
  if( theConn == NULL ) {
    fprintf( stderr, "accept_client_connection: NULL GConn -- connection
failed!\n" );
    fflush( stderr );
  }
  else {
    newClientProcess = g_new0( ClientProcess, 1 );
    
    newClientProcess->Server = theServer;
    newClientProcess->Connect = theConn;
    newClientProcess->CallbackFunc = handle_client_events;
    newClientProcess->IncomingData = g_queue_new();
    newClientProcess->OutgoingData = g_queue_new();
    newClientProcess->ServerProc = theServerProcess;
    newClientProcess->PermOrTemp = 1; /* Assumed permanent */
    newClientProcess->IncomingOrOutgoing = 1; /* Incoming */
    newClientProcess->ref_count = 1;
    /* Now we read the initial connection request from the queue */
    
    gnet_conn_set_callback( theConn, newClientProcess->CallbackFunc, 
      newClientProcess );
    
    if( gnet_conn_is_connected( theConn ) ) {
      /* Start a read for the NetConnectReq message and get out. */
      InitialReadSize = sizeof(NetworkMessageHeader) + 
         sizeof(NetConnectReqPayload);
      newClientProcess->waitingForHeader = FALSE;
      newClientProcess->waitingForPayload = FALSE;
      newClientProcess->waitingForMessage = InitialReadSize;
      gnet_conn_readn( theConn, InitialReadSize );
    }
    else {
      fprintf( stderr, "accept_client_connection: GConn is not
connected!\n" );
      fflush( stderr );
    }
  }
}

ClientProcess *request_server_connect( gchar *ServerName,
  gchar *myName )
{
  ClientProcess *newClientProcess;
  
  newClientProcess = g_new0( ClientProcess, 1 );
  strncpy( newClientProcess->Name, myName, 
    sizeof(newClientProcess->Name) );
  get_server_address_info( ServerName, &(newClientProcess->IpAddr), 
    &(newClientProcess->Port) );
  newClientProcess->UniqueID = 0;
  newClientProcess->PermOrTemp = 1; /* Assumed permanent for now */
  newClientProcess->IncomingOrOutgoing = -1;  /* Outgoing! */
  newClientProcess->Server = NULL;
  newClientProcess->ServerProc = NULL;
  newClientProcess->CallbackFunc = handle_client_events;
  newClientProcess->IncomingData = g_queue_new();
  newClientProcess->OutgoingData = g_queue_new();
  newClientProcess->IncomingMessage = NULL;
  newClientProcess->waitingForHeader = FALSE;
  newClientProcess->waitingForPayload = FALSE;
  newClientProcess->waitingForMessage = FALSE;
  newClientProcess->Connect = gnet_conn_new_inetaddr( 
    newClientProcess->IpAddr, newClientProcess->CallbackFunc,
    newClientProcess );
  newClientProcess->ref_count = 1;
  
#ifdef DEBUGGING
  if( TRUE ) {
    fprintf( stdout, "request_server_connect: ServerName = %s, myName =
%s, newClientProcess->Port = %d\n",
      ServerName, myName, newClientProcess->Port );
    fprintf( stdout, "request_server_connect: newClientProcess->Connect
= %p\n",
      newClientProcess->Connect );
    fflush( stdout );
  }
#endif /* DEBUGGING */
  gnet_conn_connect( newClientProcess->Connect );
  gnet_conn_set_watch_error( newClientProcess->Connect, TRUE );
  /* Set 60 second initial timeout */
  gnet_conn_timeout( newClientProcess->Connect, 60000 );
  
  return(newClientProcess);
}


void handle_client_events( GConn *theConn, GConnEvent *theEvent,
   gpointer theData )
{
  ClientProcess *theClient;
  NetworkMessage *theMessage;
  
  theClient = (ClientProcess *)theData;
  
  switch( theEvent->type ) {
    case GNET_CONN_ERROR :
      /* Handle a GNET error */
      fprintf( stdout, "handle_client_events: Received Error event\n" );
      fflush( stdout );
    break;
    
    case GNET_CONN_CLOSE :
      /* The connection is closed; deal with that */
      fprintf( stdout, "handle_client_events: Received Close event\n" );
      fflush( stdout );
    break;
    
    case GNET_CONN_TIMEOUT :
      /* Connection timer timed out */
      fprintf( stdout, "handle_client_events: Received Timeout event\n"
);
      fflush( stdout );
    break;
    
    case GNET_CONN_CONNECT :
      /* Handle a connection complete event */
      fprintf( stdout, "handle_client_events: Received Connect event\n"
);
      fflush( stdout );
      
      /* Send a NetConnectReq message */
      gnet_conn_timeout( theConn, 0 ); /* Unset the timeout value */
      theMessage = create_network_message( NetConnectReq, 
        sizeof(theClient->Name), theClient->UniqueID, theClient->Name,
        TRUE );
      gnet_conn_write( theConn, (gchar *)theMessage, 
        sizeof(NetworkMessageHeader) );
      if( theMessage->MsgSize ) /* Non-zero message size, send message
data */
        gnet_conn_write( theConn, (gchar *)(theMessage->MsgData), 
          theMessage->MsgSize );
      /* Data was copied by gnet_conn_write; safe to delete */
      
      g_free( theMessage->MsgData );
      g_free( theMessage );
      
      /* Now start waiting for a message header */
      theClient->waitingForHeader = sizeof(NetworkMessageHeader);
      gnet_conn_readn( theConn, theClient->waitingForHeader );
    break;
    
    case GNET_CONN_READ :
      /* Data read from the connection is complete */
      fprintf( stdout, "handle_client_events: Received Read event\n" );
      fflush( stdout );
      
      if( theClient->waitingForHeader ) {
        /* Waiting for a new message header to arrive */
        /* Read header */
        if( theEvent->length < sizeof(NetworkMessageHeader) ) {
          fprintf( stderr, "handle_client_events: theEvent->length = %d,
sizeof(NetworkMessageHeader) = %d\n",
            theEvent->length, sizeof(NetworkMessageHeader) );
          fflush( stderr );
        }
        theClient->IncomingMessage =
          g_memdup( theEvent->buffer, sizeof(NetworkMessageHeader) );
        /* Unset wait for header flag */
        theClient->waitingForHeader = FALSE;
        /* Get payload size, store in waitingForPayload */
        theClient->waitingForPayload = 
          theClient->IncomingMessage->MsgSize;
        /* Start read for payload */
        if( theClient->waitingForPayload )
          gnet_conn_readn(theConn, theClient->IncomingMessage->MsgSize);
      }
      else if( theClient->waitingForPayload ) {
        /* Waiting for the payload for a previously received header to
arrive */
        if( theEvent->length < theClient->IncomingMessage->MsgSize ) {
          fprintf( stderr, "handle_client_events: theEvent->length = %d,
theClient->IncomingMessage->MsgSize = %d\n",
            theEvent->length, theClient->IncomingMessage->MsgSize );
          fflush( stderr );
        }
        theClient->IncomingMessage->MsgData =
          g_memdup( theEvent->buffer, 
            theClient->IncomingMessage->MsgSize);
        theClient->waitingForPayload = FALSE;
        /* OK, we got the payload, now we have to deal with it. */
        handle_received_client_message( theClient );
        /* And register a read for the next message header */
        if( ! (theClient->waitingForHeader ||
          theClient->waitingForMessage) ) {
          
          theClient->waitingForHeader = sizeof(NetworkMessageHeader);
          gnet_conn_readn( theConn, theClient->waitingForHeader );
        }
      }
      else if( theClient->waitingForMessage ) {
        /* Waiting for a full message -- probably the connect request */
        theClient->IncomingMessage =
          g_memdup( theEvent->buffer, sizeof(NetworkMessageHeader) );
#ifdef DEBUGGING
        if( TRUE ) {
          fprintf( stdout, "handle_client_events: Got header;
IncomingMessage->MsgSize = %d\n",
            theClient->IncomingMessage->MsgSize );
          fflush( stdout );
        }
#endif /* DEBUGGING */
        theClient->IncomingMessage->MsgData =
          g_memdup( &(theEvent->buffer[sizeof(NetworkMessageHeader)]),
            theClient->IncomingMessage->MsgSize );
        /* Handle the message just received */
        handle_received_client_message( theClient );
       if( ! (theClient->waitingForHeader + 
          theClient->waitingForPayload +
          theClient->waitingForMessage) ) {
          /* Message handler didn't reset client to read.  Start read
for message header. */
          theClient->waitingForHeader = sizeof(NetworkMessageHeader);
          gnet_conn_readn( theConn, theClient->waitingForHeader );
        }
      }
      else {
        /* Client isn't waiting for anything.  Set it waiting for a
header */
        theClient->waitingForHeader = sizeof(NetworkMessageHeader);
        gnet_conn_readn( theConn, theClient->waitingForHeader );
      }
    break;
    
    case GNET_CONN_WRITE :
      /* Data write to the connection is complete; grab next message and
send it */
      fprintf( stdout, "handle_client_events: Received Write event\n" );
      fflush( stdout );
      
      theMessage = (NetworkMessage *)g_queue_pop_head( 
        theClient->OutgoingData );
      if( theMessage != NULL ) {
        /* Send the message header */
        gnet_conn_write( theConn, (gchar *)theMessage, 
          sizeof(NetworkMessageHeader) );
        if( theMessage->MsgSize ) /* Non-zero message size, send message
data */
          gnet_conn_write( theConn, theMessage->MsgData, 
            theMessage->MsgSize );
        /* Data was copied by gnet_conn_write; safe to delete */
        g_free( theMessage->MsgData );
        g_free( theMessage );
      }
    break;
    
    default :
      fprintf( stderr, "handle_client_events: Unknown event type %d\n",
theEvent->type );
       fflush( stderr );
    break;
  }
}

  A few things that come to mind:

1) The basic functionality of the GServer/GConn and their associated
utility functions is remarkably flexible and useful, once you get a
grip on the unerlying methodology.
2) There doesn't seem to be any way to attach the callbacks to the
main context of another thread.  They have to execute in the default
(i.e. NULL) context.
3) Again, it's not clear if the sockets are capable of moving binary
data safely.
4) Did I mention you can't force the callbacks into the main context
of a separate thread?

   I'm finding that the "server" side of the connection gets along
very happily, but the "client" side, soon after receiving the Ack
response message to its NetConnectReq message, segfaults in another
thread while doing a memory allocation.  At this point, as you can
see I'm not explicitly freeing the data from the GConnEvents when
done with a READ event (is it safe to do this?).  I removed those
g_free's while trying to track down the bug.  Does anything leap
out at anyone?  I know it's a lot of code.  Took most of the day
to write it...

thanks,
Jim Wiggs




[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]