Re: Variability in one-way latency and jitter



Hi Frank,

> Interesting results :? wondering.
> Never heard about this before :/
> 
> Can you supply  profile data and test-applications?
> What was your setup?
> 
> Regards,
> Frank

Here is a look at some data (in an open office spreadsheet, let me know
if you cannot read that format) as well as code for a client and server.

Take a look at the local server numbers, the asynchronous (oneway) calls
are way higher latency and jitter than the synchronous.

Tests run on a 3Ghz Xeon, 1GB ram, Fedora Core 1 box.
orbit2 2.8.2
orbitcpp 1.3.9

Please let me know if you have any theories.

-- 
Marc Siegel
MIT Lincoln Laboratory
Group 63, Room C-439
mlsiegel ll mit edu 
(781)981-5434
--

Attachment: ucb_tables.sxc
Description: OpenOffice Calc spreadsheet

/*******************************************************************
 * ucb_clt.C                                                       *
 *                                                                 *
 * Implementation of CORBA client class                            *
 *                                                                 *
 * Marc Siegel, 2004                                               *
 * MIT Lincoln Laboratory Group 63                                 *
 *******************************************************************/
#include <time.h>
#include <iostream>
#include <fstream>
#include <sstream>
#include <iomanip>

#if defined( __UCB_MIDDLEWARE_TAO__ )
#  include "ucbC.H"
#elif defined( __UCB_MIDDLEWARE_ORBITCPP__ )
#  include "ucb-cpp.H"
#else
#  include "ucb.H"
#endif

#ifndef UCB_NUM_TESTS
#  define UCB_NUM_TESTS 10000
#endif

using namespace std;
using namespace Ucb;

const char * Benchmark_Names[BENCHMARK_LAST] = {
    "PING",
    "SEND_ARRAY__64",
    "SEND_ARRAY_128",
    "SEND_ARRAY_256",
    "SEND_ARRAY_512",
    "SEND_ARRAY1024",
    "SEND_ARRAY2048",
    "SEND_SEQUENCE__64",
    "SEND_SEQUENCE_128",
    "SEND_SEQUENCE_256",
    "SEND_SEQUENCE_512",
    "SEND_SEQUENCE1024",
    "SEND_SEQUENCE2048",
    "SEND_STRUCT",

    "ASYNCH_PING",
    "ASYNCH_SEND_ARRAY__64",
    "ASYNCH_SEND_ARRAY_128",
    "ASYNCH_SEND_ARRAY_256",
    "ASYNCH_SEND_ARRAY_512",
    "ASYNCH_SEND_ARRAY1024",
    "ASYNCH_SEND_ARRAY2048",
    "ASYNCH_SEND_SEQUENCE__64",
    "ASYNCH_SEND_SEQUENCE_128",
    "ASYNCH_SEND_SEQUENCE_256",
    "ASYNCH_SEND_SEQUENCE_512",
    "ASYNCH_SEND_SEQUENCE1024",
    "ASYNCH_SEND_SEQUENCE2048",
    "ASYNCH_SEND_STRUCT",

    "FOOTPRINT_DSK",
    "FOOTPRINT_MEM",
};



/*
 * Runs a benchmark by id
 */
void doBenchmark( Server_var ucb_svr, BenchmarkID id ) 
{
    Timestamp before, after;
    TimeDataSeq_var time_seq;
    FootprintDskSeq_var disk_seq;
    SEQVARLEN_var data_seq;
    CORBA::Octet * data_array = NULL;
    STREAM_PARAMS_var data_struct;
    struct timespec tp;
    unsigned long int num_tests;
    unsigned long int footprint = 0;

    /* Set up for Benchmark */
    time_seq = new TimeDataSeq;
    if ( id < FOOTPRINT_DSK ) {
        data_seq = new SEQVARLEN;
        num_tests = UCB_NUM_TESTS;
    }
    else {
        num_tests = 1;
    }
    
    if ((id == SEND_STRUCT) || (id == ASYNCH_SEND_STRUCT)) {
        data_struct = new STREAM_PARAMS;
        data_struct->hop_list.length(32);
    }

    int arsz = 0, sqsz = 0;
    if ((id >= SEND_ARRAY__64) && (id <= SEND_ARRAY2048))
        arsz = SEND_ARRAY__64;
    if ((id >= ASYNCH_SEND_ARRAY__64) && (id <= ASYNCH_SEND_ARRAY2048))
        arsz = ASYNCH_SEND_ARRAY__64;
    if ((id >= SEND_SEQUENCE__64) && (id <= SEND_SEQUENCE2048))
        sqsz = SEND_SEQUENCE__64;
    if ((id >= ASYNCH_SEND_SEQUENCE__64) && (id <= ASYNCH_SEND_SEQUENCE2048))
        sqsz = ASYNCH_SEND_SEQUENCE__64;
    if (arsz) {
        arsz = 64 << (id - arsz);
        data_array = new CORBA::Octet[arsz];
    }
    if (sqsz) {
        sqsz = 64 << (id - sqsz);
        data_seq->length(sqsz);
    }
    

    /* Run the Benchmark */
    for (unsigned long int i = 0; i < num_tests; i++) {
        bool try_again;
        do {
            try_again = false;
            try {
                clock_gettime( CLOCK_REALTIME, &tp );
                before = tp.tv_nsec;
                switch (id) {
                    case FOOTPRINT_DSK:
                        footprint = ucb_svr->FootprintDsk( disk_seq );
                        i = num_tests;
                        break;
                    case FOOTPRINT_MEM:
                        footprint = ucb_svr->FootprintMem();
                        i = num_tests;
                        break;
                    case PING:
                        ucb_svr->Ping();
                        break;
                    case SEND_ARRAY__64:
                        ucb_svr->SendArray__64(data_array);
                        break;
                    case SEND_ARRAY_128:
                        ucb_svr->SendArray_128(data_array);
                        break;
                    case SEND_ARRAY_256:
                        ucb_svr->SendArray_256(data_array);
                        break;
                    case SEND_ARRAY_512:
                        ucb_svr->SendArray_512(data_array);
                        break;
                    case SEND_ARRAY1024:
                        ucb_svr->SendArray1024(data_array);
                        break;
                    case SEND_ARRAY2048:
                        ucb_svr->SendArray2048(data_array);
                        break;
                    case SEND_SEQUENCE__64:
                    case SEND_SEQUENCE_128:
                    case SEND_SEQUENCE_256:
                    case SEND_SEQUENCE_512:
                    case SEND_SEQUENCE1024:
                    case SEND_SEQUENCE2048:
                        ucb_svr->SendSequence(data_seq);
                        break;
                    case SEND_STRUCT:
                        ucb_svr->SendStruct(data_struct);
                        break;
                    case ASYNCH_PING:
                        ucb_svr->AsynchPing(before);
                        break;
                    case ASYNCH_SEND_ARRAY__64:
                        ucb_svr->AsynchSendArray__64(before, data_array);
                        break;
                    case ASYNCH_SEND_ARRAY_128:
                        ucb_svr->AsynchSendArray_128(before, data_array);
                        break;
                    case ASYNCH_SEND_ARRAY_256:
                        ucb_svr->AsynchSendArray_256(before, data_array);
                        break;
                    case ASYNCH_SEND_ARRAY_512:
                        ucb_svr->AsynchSendArray_512(before, data_array);
                        break;
                    case ASYNCH_SEND_ARRAY1024:
                        ucb_svr->AsynchSendArray1024(before, data_array);
                        break;
                    case ASYNCH_SEND_ARRAY2048:
                        ucb_svr->AsynchSendArray2048(before, data_array);
                        break;
                    case ASYNCH_SEND_SEQUENCE__64:
                    case ASYNCH_SEND_SEQUENCE_128:
                    case ASYNCH_SEND_SEQUENCE_256:
                    case ASYNCH_SEND_SEQUENCE_512:
                    case ASYNCH_SEND_SEQUENCE1024:
                    case ASYNCH_SEND_SEQUENCE2048:
                        ucb_svr->AsynchSendSequence(before, data_seq);
                        break;
                    case ASYNCH_SEND_STRUCT:
                        ucb_svr->AsynchSendStruct(before, data_struct);
                        break;
                    default:
                        cerr << "Unknown BenchmarkID " << id << endl;
                        exit(1);
                }
                clock_gettime( CLOCK_REALTIME, &tp );
                after = tp.tv_nsec;
            }
            catch (CORBA::Exception& e) {
#ifdef __UCB_MIDDLEWARE_ORBITCPP__
                cerr << typeid(e).name() << endl;
#else           
                cerr << e << endl;
#endif
                try_again = true;
            }
        } while (try_again);
        
        /* For regular (synchronous) calls, store timing data */
        if ((id >= PING) && (id <= SEND_STRUCT)) {
            long unsigned int length = time_seq->length();
            time_seq->length( length + 1 );
            time_seq[length].id = id;
            time_seq[length].sent_ns = before;
            time_seq[length].recv_ns = after;
        }
    }
    
    /* Cleanups */
    if (arsz) {
        delete[] data_array;
    }

    /* Retrieve timing data for asynchronous calls from server */
    if ((id >= ASYNCH_PING) && (id <= ASYNCH_SEND_STRUCT)) {
        try {
            cerr << "1 Got Here" << endl;
            ucb_svr->getAsynchData( time_seq );
            cerr << "2 Got Here" << endl;
            ucb_svr->clearAsynchData();
            cerr << "3 Got Here" << endl;
        }
        catch (CORBA::Exception& e) {
#ifdef __UCB_MIDDLEWARE_ORBITCPP__
            cerr << typeid(e).name() << endl;
#else           
            cerr << e << endl;
#endif
            cerr << "Could not retrieve AsynchData, aborting" << endl;
            exit(1);
        }
    }

    /* Output results */
    if ( id == FOOTPRINT_DSK ) {
        unsigned long len = disk_seq->length();
        disk_seq->length( len + 1 );
        disk_seq[len].file_nm = CORBA::string_dup("--TOTAL--");
        disk_seq[len].file_sz = footprint;
        for (unsigned long int j=0; j < disk_seq->length(); j++) {
            cout << Benchmark_Names[id] << "\t"
                 << setw(36) << left
                 << disk_seq[j].file_nm << "\t"
                 << setw(10) << right
                 << disk_seq[j].file_sz << endl;
        }
    }
    else if ( id == FOOTPRINT_MEM ) {
        cout << Benchmark_Names[id] << "\t"
             << footprint << endl;
    }
    else {        
        // all other benchmarks
        for (unsigned long int i = 0; i < time_seq->length(); i++) {
            before = time_seq[i].sent_ns;
            after  = time_seq[i].recv_ns;
            /* Nanoseconds wrap at a billion */
            if (after < before) {
                after += 1000000000;
            }
            cout << Benchmark_Names[id] << "\t";
            cout << setw(10) << before << "\t";
            cout << setw(10) << after  << "\t"
                 << "latency (us): "   << (after-before)/1000 << endl;
        }
    }
}

int main(int argc, char **argv)
{
    /*
     * Start our orb
     */
    CORBA::ORB_var orb = CORBA::ORB_init (argc, argv);

    /*
     * IOR is in ucb_svr.ref in the local directory
     */
    char pwd[256], uri[300];
#ifdef __UCB_MIDDLEWARE_ORBITCPP__
    string s;
    sprintf(uri, "%s/ucb_svr.ref", getcwd(pwd, 256));
    ifstream is(uri);
    is >> s;
#else
    sprintf (uri, "file://%s/ucb_svr.ref", getcwd(pwd, 256));
#endif

    /*
     * Bind to ucb_svr
     */
#ifdef __UCB_MIDDLEWARE_ORBITCPP__
    CORBA::Object_var obj = orb->string_to_object(s.c_str());
#else
    CORBA::Object_var obj = orb->string_to_object(uri);
#endif
    Server_var ucb_svr = Server::_narrow(obj);

    if ( CORBA::is_nil( ucb_svr ) ) {
        cout << "oops: could not locate ucb server" << endl;
        exit (1);
    }
    
    /*
     * Run Tests
     */
    for (int i = 0; i < BENCHMARK_LAST; i++) {
        doBenchmark( ucb_svr, static_cast<BenchmarkID>( i ) );
    }
    
    /*
     * Exit
     */
//     ucb_svr->Terminate();
    return 0;
}
/*******************************************************************
 * ucb_svr.C                                                       *
 *                                                                 *
 * Implementation of CORBA interface class                         *
 *                                                                 *
 * Marc Siegel, 2004                                               *
 * MIT Lincoln Laboratory Group 63                                 *
 *******************************************************************/
#include <iostream>
#include <fstream>
#include <sstream>
#include <string>
#include <ext/stdio_filebuf.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdio.h>
#include "ucb_svr.H"

using namespace std;
using namespace Ucb;
using __gnu_cxx::stdio_filebuf;

/*-------------------------------------------------------------------
 * Globals
 *-------------------------------------------------------------------
 */
char **global_argv;
CORBA::ORB_var orb;
CORBA::Object_var poaobj;
PortableServer::POA_var poa;
PortableServer::POAManager_var mgr;


/*-------------------------------------------------------------------
 * Methods
 *-------------------------------------------------------------------
 */
void
Server_impl::Ping()
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void
Server_impl::SendArray__64( const ARRAY__64 data )
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void
Server_impl::SendArray_128( const ARRAY_128 data )
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void
Server_impl::SendArray_256( const ARRAY_256 data )
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void
Server_impl::SendArray_512( const ARRAY_512 data )
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void
Server_impl::SendArray1024( const ARRAY1024 data )
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void
Server_impl::SendArray2048( const ARRAY2048 data )
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void
Server_impl::SendSequence( const SEQVARLEN& data )
    throw (CORBA::SystemException)
{ /* Do nothing */ }

void 
Server_impl::SendStruct( const STREAM_PARAMS & data )   
    throw (CORBA::SystemException)
{ /* Do nothing */ }

inline void 
Server_impl::record_time( BenchmarkID id, Timestamp time ) 
{
    /*
     * Record timestamp data
     */
    struct timespec tp;
    clock_gettime( CLOCK_REALTIME, &tp );
    Timestamp mytime = tp.tv_nsec;
    int length = m_dataseq.length();
    m_dataseq.length( length + 1 );
    m_dataseq[length].id = id;
    m_dataseq[length].sent_ns = time;
    m_dataseq[length].recv_ns = mytime; 
}

void
Server_impl::AsynchPing(Timestamp time) 
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_PING, time );
}

void
Server_impl::AsynchSendArray__64( Timestamp time, const ARRAY__64 data )
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_SEND_ARRAY__64, time );   
}

void
Server_impl::AsynchSendArray_128( Timestamp time, const ARRAY_128 data )
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_SEND_ARRAY_128, time );   
}

void
Server_impl::AsynchSendArray_256( Timestamp time, const ARRAY_256 data )
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_SEND_ARRAY_256, time );   
}

void
Server_impl::AsynchSendArray_512( Timestamp time, const ARRAY_512 data )
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_SEND_ARRAY_512, time );   
}

void
Server_impl::AsynchSendArray1024( Timestamp time, const ARRAY1024 data )
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_SEND_ARRAY1024, time );   
}

void
Server_impl::AsynchSendArray2048( Timestamp time, const ARRAY2048 data )
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_SEND_ARRAY2048, time );   
}

void
Server_impl::AsynchSendSequence( Timestamp time, const SEQVARLEN& data )
    throw (CORBA::SystemException)
{
    BenchmarkID id;
    switch( data.length() ) {
        case   64: id = ASYNCH_SEND_SEQUENCE__64; break;
        case  128: id = ASYNCH_SEND_SEQUENCE_128; break;
        case  256: id = ASYNCH_SEND_SEQUENCE_256; break;
        case  512: id = ASYNCH_SEND_SEQUENCE_512; break;
        case 1024: id = ASYNCH_SEND_SEQUENCE1024; break;
        case 2048: id = ASYNCH_SEND_SEQUENCE2048; break;
        default:   id = BENCHMARK_LAST;           break;
    }
    record_time( id, time );
}

void 
Server_impl::AsynchSendStruct( Timestamp time, const STREAM_PARAMS & data ) 
    throw (CORBA::SystemException)
{
    record_time( ASYNCH_SEND_STRUCT, time );   
}


void
Server_impl::getAsynchData( TimeDataSeq_out time_data ) 
    throw (CORBA::SystemException)
{
    time_data = new TimeDataSeq(m_dataseq);
}

void 
Server_impl::clearAsynchData()
    throw (CORBA::SystemException)
{
    m_dataseq.length(0);
}

void 
Server_impl::Terminate()
    throw (CORBA::SystemException)
{
    cout << "Orb shutdown\n";
    orb->destroy();
}

CORBA::ULong
Server_impl::FootprintDsk( FootprintDskSeq_out disk_data )
    throw (CORBA::SystemException)
{
    stringstream ss;
    FILE *fp;
    string file_nm;
    struct stat stbuf;

    /* Put our filename into the sequence */
    disk_data = new FootprintDskSeq;
    disk_data->length( 1 );
    disk_data[0L].file_nm = CORBA::string_dup( global_argv[0] );

    /* Put all linked libraries into the sequence */
    ss << "ldd " << global_argv[0];
    fp = popen(ss.str().c_str(), "r");
    stdio_filebuf<char> fbuf( fp, ios_base::in );
    istream is( &fbuf );
//    for (unsigned long i = 1; !is.eof(); i++) {
    string s;
    for (unsigned long i = 1; getline(is, s); i++) {
        ss.str(s);
        ss >> file_nm;
        ss >> file_nm;
        ss >> file_nm;
        disk_data->length( i + 1 );
        disk_data[i].file_nm = CORBA::string_dup( file_nm.c_str() );
    }
    pclose(fp);
    
    /* Put sizes of all files into the sequence */
    CORBA::ULong sum = 0;
    for (unsigned long int i = 0; i < disk_data->length(); i++) {
        stat( disk_data[i].file_nm, &stbuf);
        disk_data[i].file_sz = stbuf.st_size;
        sum += stbuf.st_size;
    }
    return sum;
}


CORBA::ULong
Server_impl::FootprintMem()
    throw (CORBA::SystemException)
{
    stringstream ss;
    FILE *fp;
    int vsz;
    ss << "ps -p " << getpid() << " -o vsz=";
    fp = popen(ss.str().c_str(), "r");
    fscanf(fp, "%d", &vsz);
    pclose(fp);
    return vsz * 1024L;
}



int 
main(int argc, char **argv) 
{
    /*
     * Initialize the ORB
     */
    global_argv = argv;
    orb = CORBA::ORB_init (argc, argv);

    /*
     * Obtain a reference to the RootPOA and its Manager
     */
    poaobj = orb->resolve_initial_references("RootPOA");
    poa = PortableServer::POA::_narrow(poaobj);
    mgr = poa->the_POAManager();

    /*
     * Create a server object
     */
    Server_impl * ucb_svr = new Server_impl;

    /*
     * Activate the Servant
     */
    PortableServer::ObjectId_var oid = poa->activate_object(ucb_svr);

    /*
     * Write reference to file
     */
    ofstream of ("ucb_svr.ref");
    CORBA::Object_var ref = poa->id_to_reference (oid.in());
    CORBA::String_var str = orb->object_to_string(ref.in());
    of << str.in() << endl;
    of.close ();

    /*
     * Activate the POA and start serving requests
     */
    cout << "Running." << endl;
    mgr->activate ();
    orb->run();

    /*
     * Shutdown (never reached)
     */
    poa->destroy (true, true);
    delete ucb_svr;

    return 0;
}
/*******************************************************************
 * ucb_svr.H                                                       *
 *                                                                 *
 * Declaration of CORBA interface class                            *
 *                                                                 *
 * Marc Siegel, 2004                                               *
 * MIT Lincoln Laboratory Group 63                                 *
 *******************************************************************/

#ifndef __UCB_SVR_H
#define __UCB_SVR_H

#if defined( __UCB_MIDDLEWARE_TAO__ )
#  include "ucbS.H"
#elif defined( __UCB_MIDDLEWARE_ORBITCPP__ )
#  include "ucb-cpp.H"
#else
#  include "ucb.H"
#endif


using namespace Ucb;

class Server_impl : virtual public POA_Ucb::Server
{
public:    
    /* Synchronous calls */
    void Ping()
        throw (CORBA::SystemException);
    void SendArray__64( const ARRAY__64 data )
        throw (CORBA::SystemException);
    void SendArray_128( const ARRAY_128 data )
        throw (CORBA::SystemException);
    void SendArray_256( const ARRAY_256 data )
        throw (CORBA::SystemException);
    void SendArray_512( const ARRAY_512 data )
        throw (CORBA::SystemException);
    void SendArray1024( const ARRAY1024 data )
        throw (CORBA::SystemException);
    void SendArray2048( const ARRAY2048 data )
        throw (CORBA::SystemException);
    void SendSequence( const SEQVARLEN& data )
        throw (CORBA::SystemException);
    void SendStruct( const STREAM_PARAMS & data )
        throw (CORBA::SystemException);

    /* Asynchronous calls */
    void AsynchPing( Timestamp time )
        throw (CORBA::SystemException);
    void AsynchSendArray__64( Timestamp time, 
                              const ARRAY__64 data )
        throw (CORBA::SystemException);
    void AsynchSendArray_128( Timestamp time, 
                              const ARRAY_128 data )
        throw (CORBA::SystemException);
    void AsynchSendArray_256( Timestamp time, 
                              const ARRAY_256 data )
        throw (CORBA::SystemException);
    void AsynchSendArray_512( Timestamp time, 
                              const ARRAY_512 data )
        throw (CORBA::SystemException);
    void AsynchSendArray1024( Timestamp time, 
                              const ARRAY1024 data )
        throw (CORBA::SystemException);
    void AsynchSendArray2048( Timestamp time, 
                              const ARRAY2048 data )
        throw (CORBA::SystemException);
    void AsynchSendSequence( Timestamp time, 
                             const SEQVARLEN& data )
        throw (CORBA::SystemException);
    void AsynchSendStruct( Timestamp time, 
                           const STREAM_PARAMS & data)
        throw (CORBA::SystemException);

    /* Retrieving timing data after asynchronous calls */
    void getAsynchData( TimeDataSeq_out time_data )
        throw (CORBA::SystemException);
    void clearAsynchData()
        throw (CORBA::SystemException);
    
    /* Footprint data, in K-bytes */
    CORBA::ULong FootprintDsk( FootprintDskSeq_out disk_data )
        throw (CORBA::SystemException);
    CORBA::ULong FootprintMem()
        throw (CORBA::SystemException);
    
    /* Kill the server process */
    void Terminate()
        throw (CORBA::SystemException);

protected:
    inline void record_time( BenchmarkID id, Timestamp time );
    TimeDataSeq m_dataseq;
};

#endif


[Date Prev][Date Next]   [Thread Prev][Thread Next]   [Thread Index] [Date Index] [Author Index]