mainAsyncGenerator.cpp File Reference

#include <ndb_global.h>
#include <NdbHost.h>
#include <NdbSleep.h>
#include <NdbThread.h>
#include <NdbMain.h>
#include <NdbOut.hpp>
#include <NdbEnv.h>
#include <NdbTest.hpp>
#include "userInterface.h"
#include "dbGenerator.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <time.h>
#include "ndb_schema.hpp"
#include "ndb_error.hpp"
#include <NdbMutex.h>
#include <NdbTick.h>
#include <NdbApi.hpp>
#include <unistd.h>

Go to the source code of this file.

Functions

NdbasyncDbConnect (int parallellism)
void asyncDbDisconnect (Ndb *pNDB)
 NDB_COMMAND (DbAsyncGenerator,"DbAsyncGenerator","DbAsyncGenerator","DbAsyncGenerator", 65535)
static int parse_args (int argc, const char **argv)
void print_stats (const char *title, unsigned int length, unsigned int transactionFlag, GeneratorStatistics *gen, int numProc, int parallellism)
static void print_transaction (const char *header, unsigned long totalCount, TransactionDefinition *trans, unsigned int printBranch, unsigned int printRollback)
void showTime ()
static void * threadRoutine (void *arg)
static void usage (const char *prog)
double userGetTime (void)

Variables

static ThreadDatadata
static int forceSendPoll
static Ndb_cluster_connectiong_cluster_connection = 0
static int millisSendPoll
static int minEventSendPoll
static int numProcesses
static int numSeconds
static int numWarmSeconds
static int parallellism


Function Documentation

Ndb* asyncDbConnect int  parallellism  ) 
 

Definition at line 442 of file mainAsyncGenerator.cpp.

References Ndb::init(), and Ndb::waitUntilReady().

Referenced by threadRoutine().

00442                                 {
00443   Ndb * pNDB = new Ndb(g_cluster_connection, "TEST_DB");
00444   
00445   pNDB->init(parallellism + 1);
00446   
00447   while(pNDB->waitUntilReady() != 0){
00448   }
00449   
00450   return pNDB;
00451 }

void asyncDbDisconnect Ndb pNDB  ) 
 

Definition at line 454 of file mainAsyncGenerator.cpp.

Referenced by threadRoutine().

00455 {
00456   delete pNDB;
00457 }

NDB_COMMAND DbAsyncGenerator  ,
"DbAsyncGenerator"  ,
"DbAsyncGenerator"  ,
"DbAsyncGenerator"  ,
65535 
 

Definition at line 282 of file mainAsyncGenerator.cpp.

References TransactionDefinition::branchExecuted, ThreadData::changedTime, con, Ndb_cluster_connection::connect(), ThreadData::coolDownSeconds, TransactionDefinition::count, free, ThreadData::generator, TransactionDefinition::latency, malloc, ndb_init(), NDB_THREAD_PRIO_LOW, ndbout(), ndbout_c(), NDBT_FAILED, NDBT_OK, NDBT_ProgramExit(), NDBT_WRONGARGS, NdbThread_Create(), NdbThread_Destroy(), NdbThread_WaitFor(), NdbTick_CurrentMillisecond(), NULL, NUM_TRANSACTION_TYPES, numProcesses, numSeconds, numWarmSeconds, GeneratorStatistics::outerTps, p, parallellism, parse_args(), perror(), print_stats(), ThreadData::randomSeed, NDBT_Stats::reset(), TransactionDefinition::rollbackExecuted, Runnable, ThreadData::runState, showTime(), stats, ThreadData::testSeconds, threadRoutine(), GeneratorStatistics::totalTransactions, GeneratorStatistics::transactions, usage, Ndb_cluster_connection::wait_until_ready(), and ThreadData::warmUpSeconds.

00284 {
00285   ndb_init();
00286   int i;
00287   int j;
00288   int k;
00289   struct NdbThread* pThread = NULL;
00290   GeneratorStatistics  stats;
00291   GeneratorStatistics *p;
00292   char threadName[32];
00293   int rc = NDBT_OK;
00294   void* tmp = NULL;
00295   if(parse_args(argc,argv) != 0){
00296     usage(argv[0]);
00297     return NDBT_ProgramExit(NDBT_WRONGARGS);
00298   }
00299     
00300 
00301   ndbout_c("\nStarting Test with %d process(es) for %d %s parallellism %d",
00302            numProcesses,
00303            numSeconds,
00304            "sec",
00305            parallellism);
00306 
00307   ndbout_c("   WarmUp/coolDown = %d sec", numWarmSeconds);
00308 
00309   Ndb_cluster_connection con;
00310   if(con.connect(12, 5, 1) != 0)
00311   {
00312     ndbout << "Unable to connect to management server." << endl;
00313     return 0;
00314   }
00315   if (con.wait_until_ready(30,0) < 0)
00316   {
00317     ndbout << "Cluster nodes not ready in 30 seconds." << endl;
00318     return 0;
00319   }
00320   
00321   g_cluster_connection= &con;
00322   data = (ThreadData*)malloc((numProcesses*parallellism)*sizeof(ThreadData));
00323  
00324   for(i = 0; i < numProcesses; i++) {
00325     for(j = 0; j<parallellism; j++){
00326       data[i*parallellism+j].warmUpSeconds   = numWarmSeconds;
00327       data[i*parallellism+j].testSeconds     = numSeconds;
00328       data[i*parallellism+j].coolDownSeconds = numWarmSeconds;
00329       data[i*parallellism+j].randomSeed      = 
00330         NdbTick_CurrentMillisecond()+i+j;
00331       data[i*parallellism+j].changedTime     = 0;
00332       data[i*parallellism+j].runState        = Runnable;
00333     }
00334     sprintf(threadName, "AsyncThread[%d]", i);
00335     pThread = NdbThread_Create(threadRoutine, 
00336                               (void**)&data[i*parallellism], 
00337                               65535, 
00338                               threadName,
00339                               NDB_THREAD_PRIO_LOW);
00340     if(pThread != 0 && pThread != NULL){
00341       (&data[i*parallellism])->pThread = pThread;
00342     } else {      
00343       perror("Failed to create thread");
00344       rc = NDBT_FAILED;
00345     }
00346   }
00347 
00348   showTime();
00349 
00350   /*--------------------------------*/
00351   /* Wait for all processes to exit */
00352   /*--------------------------------*/
00353   for(i = 0; i < numProcesses; i++) {
00354     NdbThread_WaitFor(data[i*parallellism].pThread, &tmp);
00355     NdbThread_Destroy(&data[i*parallellism].pThread);
00356   }
00357    
00358   ndbout_c("All threads have finished");
00359   
00360   /*-------------------------------------------*/
00361   /* Clear all structures for total statistics */
00362   /*-------------------------------------------*/
00363   stats.totalTransactions = 0;
00364   stats.outerTps          = 0.0;
00365   
00366   for(i = 0; i < NUM_TRANSACTION_TYPES; i++ ) {
00367     stats.transactions[i].count            = 0;
00368     stats.transactions[i].branchExecuted   = 0;
00369     stats.transactions[i].rollbackExecuted = 0;
00370     stats.transactions[i].latency.reset();
00371   }
00372   
00373   /*--------------------------------*/
00374   /* Add the values for all Threads */
00375   /*--------------------------------*/
00376   for(i = 0; i < numProcesses; i++) {
00377     for(k = 0; k<parallellism; k++){
00378       p = &data[i*parallellism+k].generator;
00379       
00380       stats.totalTransactions += p->totalTransactions;
00381       stats.outerTps          += p->outerTps;
00382       
00383       for(j = 0; j < NUM_TRANSACTION_TYPES; j++ ) {
00384         stats.transactions[j].count += 
00385           p->transactions[j].count;
00386         stats.transactions[j].branchExecuted += 
00387           p->transactions[j].branchExecuted;
00388         stats.transactions[j].rollbackExecuted += 
00389           p->transactions[j].rollbackExecuted;
00390         stats.transactions[j].latency += 
00391           p->transactions[j].latency;
00392       }
00393     }
00394   }
00395 
00396   print_stats("Test Results", 
00397               numSeconds,
00398               0,
00399               &stats,
00400               numProcesses,
00401               parallellism);
00402 
00403   free(data);
00404   
00405   NDBT_ProgramExit(rc);
00406 }

static int parse_args int  argc,
const char **  argv
[static]
 

Definition at line 78 of file mainAsyncGenerator.cpp.

References forceSendPoll, millisSendPoll, minEventSendPoll, ndbout_c(), numProcesses, numSeconds, numWarmSeconds, parallellism, and usage.

00079 {
00080    int i;
00081 
00082    numProcesses     = 1;
00083    numSeconds       = 10;
00084    numWarmSeconds   = 10;
00085    parallellism     = 1;
00086    millisSendPoll   = 10000;
00087    minEventSendPoll = 1;
00088    forceSendPoll    = 0;
00089    
00090 
00091    i = 1;
00092    while (i < argc){
00093      if (strcmp("-proc",argv[i]) == 0) {
00094        if (i + 1 >= argc) {
00095          return 1;
00096        }
00097        if (sscanf(argv[i+1], "%d", &numProcesses) == -1 ||
00098            numProcesses <= 0 || numProcesses > 127) {
00099          ndbout_c("-proc flag requires a positive integer argument [1..127]");
00100          return 1;
00101        }
00102        i += 2;
00103      } else if (strcmp("-p", argv[i]) == 0){
00104        if(i + 1 >= argc){
00105          usage(argv[0]);
00106          return 1;
00107        }
00108        if (sscanf(argv[i+1], "%d", &parallellism) == -1 ||
00109            parallellism <= 0){
00110          ndbout_c("-p flag requires a positive integer argument");
00111          return 1;
00112        }
00113        i += 2;
00114      }
00115      else if (strcmp("-time",argv[i]) == 0) {
00116        if (i + 1 >= argc) {
00117          return 1;
00118        }
00119        if (sscanf(argv[i+1], "%d", &numSeconds) == -1 ||
00120            numSeconds < 0) {
00121          ndbout_c("-time flag requires a positive integer argument");
00122          return 1;
00123        }
00124        i += 2;
00125      }
00126      else if (strcmp("-warm",argv[i]) == 0) {
00127        if (i + 1 >= argc) {
00128          return 1;
00129        }
00130        if (sscanf(argv[i+1], "%d", &numWarmSeconds) == -1 ||
00131            numWarmSeconds < 0) {
00132          ndbout_c("-warm flag requires a positive integer argument");
00133          return 1;
00134        }
00135        i += 2;
00136      }
00137      else if (strcmp("-e",argv[i]) == 0) {
00138        if (i + 1 >= argc) {
00139          return 1;
00140        }
00141        if (sscanf(argv[i+1], "%d", &minEventSendPoll) == -1 ||
00142            minEventSendPoll < 0) {
00143          ndbout_c("-e flag requires a positive integer argument");
00144          return 1;
00145        }
00146        i += 2;
00147      }
00148      else if (strcmp("-f",argv[i]) == 0) {
00149        if (i + 1 >= argc) {
00150          usage(argv[0]);
00151          return 1;
00152        }
00153        if (sscanf(argv[i+1], "%d", &forceSendPoll) == -1 ||
00154            forceSendPoll < 0) {
00155          ndbout_c("-f flag requires a positive integer argument");
00156          return 1;
00157        }
00158        i += 2;
00159      }
00160      else {
00161        return 1;
00162      }
00163    }
00164 
00165    if(minEventSendPoll > parallellism){
00166      ndbout_c("minEventSendPoll(%d) > parallellism(%d)",
00167              minEventSendPoll, parallellism);
00168      ndbout_c("not very good...");
00169      ndbout_c("very bad...");
00170      ndbout_c("exiting...");
00171      return 1;
00172    }
00173    return 0;
00174 }

void print_stats const char *  title,
unsigned int  length,
unsigned int  transactionFlag,
GeneratorStatistics gen,
int  numProc,
int  parallellism
 

Definition at line 216 of file mainAsyncGenerator.cpp.

References buf, MAXHOSTNAMELEN, name, NdbHost_GetHostName(), ndbout_c(), GeneratorStatistics::outerTps, print_transaction(), GeneratorStatistics::totalTransactions, and GeneratorStatistics::transactions.

Referenced by NDB_COMMAND().

00221 {
00222   int    i;
00223   char buf[10];
00224   char name[MAXHOSTNAMELEN];
00225   
00226   name[0] = 0;
00227   NdbHost_GetHostName(name);
00228   
00229   ndbout_c("\n------ %s ------",title);
00230   ndbout_c("Length        : %d %s",
00231          length,
00232          transactionFlag ? "Transactions" : "sec");
00233   ndbout_c("Processor     : %s", name);
00234   ndbout_c("Number of Proc: %d",numProc);
00235   ndbout_c("Parallellism  : %d", parallellism);
00236   ndbout_c("\n");
00237 
00238   if( gen->totalTransactions == 0 ) {
00239     ndbout_c("   No Transactions for this test");
00240   }
00241   else {
00242     for(i = 0; i < 5; i++) {
00243       sprintf(buf, "T%d",i+1);
00244       print_transaction(buf,
00245                         gen->totalTransactions,
00246                         &gen->transactions[i],
00247                         i >= 2,
00248                         i >= 3 );
00249     }
00250     
00251     ndbout_c("\n");
00252     ndbout_c("  Overall Statistics:");
00253     ndbout_c("     Transactions: %d", gen->totalTransactions);
00254     ndbout_c("     Outer       : %.0f TPS",gen->outerTps);
00255     ndbout_c("\n");
00256   }
00257 }

static void print_transaction const char *  header,
unsigned long  totalCount,
TransactionDefinition trans,
unsigned int  printBranch,
unsigned int  printRollback
[static]
 

Definition at line 178 of file mainAsyncGenerator.cpp.

References TransactionDefinition::branchExecuted, TransactionDefinition::count, f(), NDBT_Stats::getCount(), NDBT_Stats::getMax(), NDBT_Stats::getMean(), NDBT_Stats::getMin(), NDBT_Stats::getStddev(), TransactionDefinition::latency, ndbout_c(), and TransactionDefinition::rollbackExecuted.

Referenced by print_stats().

00183 {
00184   double f;
00185   
00186   ndbout_c("  %s: %d (%.2f%%) "
00187            "Latency(ms) avg: %d min: %d max: %d std: %d n: %d",
00188            header,
00189            trans->count,
00190            (double)trans->count / (double)totalCount * 100.0,
00191            (int)trans->latency.getMean(),
00192            (int)trans->latency.getMin(),
00193            (int)trans->latency.getMax(),
00194            (int)trans->latency.getStddev(),
00195            (int)trans->latency.getCount()
00196            );
00197   
00198   if( printBranch ){
00199     if( trans->count == 0 )
00200       f = 0.0;
00201     else
00202       f = (double)trans->branchExecuted / (double)trans->count * 100.0;
00203     ndbout_c("      Branches Executed: %d (%.2f%%)", trans->branchExecuted, f);
00204   }
00205   
00206   if( printRollback ){
00207     if( trans->count == 0 )
00208       f = 0.0;
00209     else
00210       f = (double)trans->rollbackExecuted / (double)trans->count * 100.0;
00211     ndbout_c("      Rollback Executed: %d (%.2f%%)",trans->rollbackExecuted,f);
00212   }
00213 }

void showTime  ) 
 

Definition at line 484 of file mainAsyncGenerator.cpp.

References buf, ndbout_c(), NULL, and snprintf.

Referenced by CHECK_ALLOWED_ERROR(), and NDB_COMMAND().

00485 {
00486   char buf[128];
00487   struct tm* tm_now;
00488   time_t now;
00489   now = ::time((time_t*)NULL);
00490   tm_now = ::gmtime(&now);
00491 
00492   ::snprintf(buf, 128,
00493              "%d-%.2d-%.2d %.2d:%.2d:%.2d", 
00494              tm_now->tm_year + 1900, 
00495              tm_now->tm_mon, 
00496              tm_now->tm_mday,
00497              tm_now->tm_hour,
00498              tm_now->tm_min,
00499              tm_now->tm_sec);
00500 
00501   ndbout_c("Time: %s", buf);
00502 }

static void* threadRoutine void *  arg  )  [static]
 

Definition at line 261 of file mainAsyncGenerator.cpp.

References asyncDbConnect(), asyncDbDisconnect(), asyncGenerator(), forceSendPoll, millisSendPoll, minEventSendPoll, NULL, parallellism, and ThreadData::pNDB.

Referenced by NDB_COMMAND().

00262 {
00263   int i;
00264   ThreadData *data = (ThreadData *)arg;
00265   Ndb * pNDB;
00266 
00267   pNDB = asyncDbConnect(parallellism);                
00268   /* NdbSleep_MilliSleep(rand() % 10); */
00269 
00270   for(i = 0; i<parallellism; i++){
00271     data[i].pNDB = pNDB;
00272   }
00273   millisSendPoll = 30000;
00274   asyncGenerator(data, parallellism,
00275                  millisSendPoll, minEventSendPoll, forceSendPoll);
00276 
00277   asyncDbDisconnect(pNDB);
00278 
00279   return NULL;
00280 }

static void usage const char *  prog  )  [static]
 

Definition at line 42 of file mainAsyncGenerator.cpp.

References ndbout_c(), progname, and strrchr().

00043 {
00044   const char  *progname;
00045 
00046    /*--------------------------------------------*/
00047    /* Get the name of the program (without path) */
00048    /*--------------------------------------------*/
00049    progname = strrchr(prog, '/');
00050 
00051    if (progname == 0)
00052      progname = prog;
00053    else
00054      ++progname;
00055 
00056    ndbout_c(
00057            "Usage: %s [-proc <num>] [-warm <num>] [-time <num>] [ -p <num>] " 
00058            "[-t <num> ] [ -e <num> ] [ -f <num>] \n"
00059            "  -proc <num>    Specifies that <num> is the number of\n"
00060            "                 threads. The default is 1.\n"
00061            "  -time <num>    Specifies that the test will run for <num> sec.\n"
00062            "                 The default is 10 sec\n"
00063            "  -warm <num>    Specifies the warm-up/cooldown period of <num> "
00064            "sec.\n"
00065            "                 The default is 10 sec\n"
00066            "  -p <num>       The no of parallell transactions started by "
00067            "one thread\n"
00068            "  -e <num>       Minimum no of events before wake up in call to "
00069            "sendPoll\n"
00070            "                 Default is 1\n"
00071            "  -f <num>       force parameter to sendPoll\n"
00072            "                 Default is 0\n",
00073            progname);
00074 }

double userGetTime void   ) 
 

Definition at line 460 of file mainAsyncGenerator.cpp.

References initialized, and NdbTick_CurrentMicrosecond().

Referenced by asyncGenerator().

00461 {
00462   static bool initialized = false;
00463   static NDB_TICKS initSecs = 0;
00464   static Uint32 initMicros = 0;
00465   double timeValue = 0;
00466 
00467   if ( !initialized ) {
00468     initialized = true;
00469     NdbTick_CurrentMicrosecond(&initSecs, &initMicros); 
00470     timeValue = 0.0;
00471   } else {
00472     NDB_TICKS secs = 0;
00473     Uint32 micros = 0;
00474 
00475     NdbTick_CurrentMicrosecond(&secs, &micros);
00476     double s  = (double)secs  - (double)initSecs;
00477     double us = (double)micros - (double)initMicros;
00478     
00479     timeValue = s + (us / 1000000.0);
00480   }
00481   return timeValue;
00482 }


Variable Documentation

ThreadData* data [static]
 

Definition at line 38 of file mainAsyncGenerator.cpp.

int forceSendPoll [static]
 

Definition at line 36 of file mainAsyncGenerator.cpp.

Referenced by parse_args(), and threadRoutine().

Ndb_cluster_connection* g_cluster_connection = 0 [static]
 

Definition at line 39 of file mainAsyncGenerator.cpp.

int millisSendPoll [static]
 

Definition at line 34 of file mainAsyncGenerator.cpp.

Referenced by parse_args(), and threadRoutine().

int minEventSendPoll [static]
 

Definition at line 35 of file mainAsyncGenerator.cpp.

Referenced by parse_args(), and threadRoutine().

int numProcesses [static]
 

Definition at line 30 of file mainAsyncGenerator.cpp.

Referenced by NDB_COMMAND(), and parse_args().

int numSeconds [static]
 

Definition at line 31 of file mainAsyncGenerator.cpp.

Referenced by NDB_COMMAND(), and parse_args().

int numWarmSeconds [static]
 

Definition at line 32 of file mainAsyncGenerator.cpp.

Referenced by NDB_COMMAND(), and parse_args().

int parallellism [static]
 

Definition at line 33 of file mainAsyncGenerator.cpp.

Referenced by NDB_COMMAND(), parse_args(), and threadRoutine().


Generated on Wed Jul 20 21:09:48 2005 for MySQL 5.0.9 Beta by  doxygen 1.4.3