Table of Contents
 1. Problem
 2. Solution
 3. Sample Code

1. Problem Top

My first task at Network Solutions was to replace the Whois system. It had grown to where the memory footprint was 2.5GB for each query connection and was taking over 36 hours to rebuild the database each time. Several Sun Enterprise E4500 were required simply due to the memory footprint

The desire was to increase the update frequency, expand the capacity, and reduce hardware costs.

After it was built, how to test it under various loads became important.

We're not going to talk about the Whois service here but one of the tools developed to test and validate the system.

2. Solution Top

The Whois service for Network Solutions was replaced with a Linux Cluster running Java coded application using a MySQL database that is populated on a cluster controller from another RDBMS every hour or so and then replicated out to the real-servers in the cluster.

The clusterof 8 PC/Linux based real-servers can handle over 500 million queries per day. We didn't need to handle but a fraction of that - about 20 million, but, the clustering allowed us to defer server hardware failures until more convenient times.

The real-servers in the cluster were self-initializing as new PC's were placed in the cluster.

I wrote this program to allow us to produce load on the new Whois service.

That's why it's called WhoisBeater.

We used to run it from Solaris, Linux and Windows NT/2000 computers at various places to validate capacities and the various TCP stacks

3. Sample Code Top

WhoisBeater.properties

whoisServer=10.128.37.3
whoisPort=43
socketTimeout=2000
minDelay=500
maxDelay=17000
nThreads=75
workDir=/tmp
queryFile=/home/steve/whoisbeater/domains.txt

WhoisBeater.java


import java.io.*;
import java.net.*;
import java.util.*;

public class WhoisBeater
{

  // slow down initial startup util queued is
  // greater than max threads
  public static int startCount = 0;

  // how long we should wait for a connection
  public static int socketTimeout = 5000;  // default is a minimum of 5 sec

  // how long we should delay between page retrievals
  public static int minDelay = 3000;  // default is a minimum of 3000 ms
  public static int maxDelay = 9000;  // default is a maximum of 9000 ms

  // where we should write out our work files
  public static String workDir = "/tmp";

  // where we should find our queries
  public static String queryFile = "whois_queries";

  // where we should send our queries
  public static String whoisServer = "whois.networksolutions.com";

  // what port we should send our queries too
  public static int whoisPort = 43;

  // how many milliseconds to wait before checking for work to do
  public static int workCheckDelay = 60000;  // default is sixty seconds

  // how many threads we want 'querying'
  public static int nThreads = 3;  // default is three threads

  // where we store the list of queries to be processed
  public static QueryQueue queryQueue = null;

  public WhoisBeater()
  {
    new Throwable().printStackTrace();
  }

  public String getWhoisServer()
  {
    return whoisServer;
  }

  public static void main( String[] argv )
  {

    WhoisBeater whoisQuery = new WhoisBeater();

    // ResourceBundle bundle = ResourceBundle.getBundle( "WhoisBeater" );

    Properties bundle = new Properties();
    try
    {
      bundle.load( new FileInputStream( "WhoisBeater.properties" ) );
    }
    catch ( FileNotFoundException ex )
    {
    }
    catch ( IOException ex )
    {
    }

    // process through the resources

    //for ( Enumeration props = bundle.getKeys(); props.hasMoreElements(); )
    for ( Enumeration props = bundle.propertyNames(); props.hasMoreElements(); )
    {
      String key = (String) props.nextElement();
      if ( key.equalsIgnoreCase( "workDir" ) )
      {
        //workDir = (String) bundle.getObject( key );
        workDir = bundle.getProperty( key );
      }
      else if ( key.equalsIgnoreCase( "queryFile" ) )
      {
        //queryFile = (String) bundle.getObject( key );
        queryFile = bundle.getProperty( key );
      }
      else if ( key.equalsIgnoreCase( "whoisServer" ) )
      {
        //whoisServer = (String) bundle.getObject( key );
        whoisServer = bundle.getProperty( key );
      }
      else if ( key.equalsIgnoreCase( "whoisPort" ) )
      {
        //whoisPort = Integer.parseInt( (String) bundle.getObject( key ) );
        whoisPort = Integer.parseInt( bundle.getProperty( key ) );
      }
      else if ( key.equalsIgnoreCase( "nThreads" ) )
      {
        //nThreads = Integer.parseInt( (String) bundle.getObject( key ) );
        nThreads = Integer.parseInt( bundle.getProperty( key ) );
      }
      else if ( key.equalsIgnoreCase( "socketTimeout" ) )
      {
        //socketTimeout = Integer.parseInt( (String) bundle.getObject( key ) );
        socketTimeout = Integer.parseInt( bundle.getProperty( key ) );
      }
      else if ( key.equalsIgnoreCase( "minDelay" ) )
      {
        //minDelay = Integer.parseInt( (String) bundle.getObject( key ) );
        minDelay = Integer.parseInt( bundle.getProperty( key ) );
      }
      else if ( key.equalsIgnoreCase( "maxDelay" ) )
      {
        //maxDelay = Integer.parseInt( (String) bundle.getObject( key ) );
        maxDelay = Integer.parseInt( bundle.getProperty( key ) );
      }
      else if ( key.equalsIgnoreCase( "workCheckDelay" ) )
      {
        //workCheckDelay = Integer.parseInt( (String) bundle.getObject( key ) );
        workCheckDelay = Integer.parseInt( bundle.getProperty( key ) );
      }
    }

    // here's where we'll put any queries to be processed
    queryQueue = new QueryQueue();

    // now we'll start up a batch of threads to do the actual work

    ThreadGroup tg = new ThreadGroup( "WhoisBeater" );

    for ( int i = 0; i < nThreads; i++ )
    {
      Runnable r = new Runnable()
      {
        public void run()
        {
          Date now = null;

          Random randDelay = new Random();

          // direct information

          String whoisQueryString = null;

          // we'll loop forever here waiting for a new query
          while ( true )
          {

            int strLen = 0;
            long startMillis = 0;
            long finishMillis = 0;
            StringBuffer threadName = null;

            try
            {

              whoisQueryString = queryQueue.getRow();

              now = new Date();

              startMillis = System.currentTimeMillis();

              threadName = new StringBuffer( "[" );
              threadName.append( whoisQueryString );
              threadName.append( "] [" );
              threadName.append( now.toString() );
              threadName.append( "] [" );
              threadName.append( startMillis );
              threadName.append( "]" );

              Thread.currentThread().setName( threadName.toString() );

              InetAddress addr = InetAddress.getByName( whoisServer );
              int port = whoisPort;

              Socket s = new Socket( addr, port );

              BufferedWriter wr = new BufferedWriter( new OutputStreamWriter( s.getOutputStream() ) );
              wr.write( whoisQueryString );
              wr.write( "\n" );
              wr.flush();

              BufferedReader rd = new BufferedReader( new InputStreamReader( s.getInputStream() ) );
              String str;
              StringBuffer output = new StringBuffer();
              while ( (str = rd.readLine()) != null )
              {
                output.append( str );
                output.append( "\n" );
                strLen+=str.length();
              }
              rd.close();
              wr.close();

              s.close();

              finishMillis = System.currentTimeMillis();

              StringBuffer finishedMsg = new StringBuffer( "Finished query: " );
              finishedMsg.append( threadName.toString() );
              finishedMsg.append( " [" );
              finishedMsg.append( strLen );
              finishedMsg.append( "] [" );
              finishedMsg.append( finishMillis - startMillis );
              finishedMsg.append( "]" );
              System.out.println( finishedMsg.toString() );
              Thread.sleep (minDelay );

              if ( output.toString().indexOf( "Record expires on" ) < 0 ) {
                System.err.println( finishedMsg.toString() );
                System.err.println( output.toString() );
              }
            }
            catch ( IOException ex )
            {
              StringBuffer exceptionMsg = new StringBuffer( "connectus interruptus: " );
              exceptionMsg.append( threadName.toString() );
              System.out.println( exceptionMsg.toString() );
              System.err.println( exceptionMsg.toString() );
            }
            catch ( InterruptedException ex )
            {
              StringBuffer exceptionMsg = new StringBuffer( "sleepus interruptus: " );
              exceptionMsg.append( threadName.toString() );
              System.out.println( exceptionMsg.toString() );
              System.err.println( exceptionMsg.toString() );
            }
            catch ( Exception ex )
            {
              StringBuffer exceptionMsg = new StringBuffer( "whois beaterus interruptus: " );
              exceptionMsg.append( threadName.toString() );
              System.out.println( exceptionMsg.toString() );
              System.err.println( exceptionMsg.toString() );
            }
            finally
            {
              // mark the thread as idle
              now = new Date();
              threadName = new StringBuffer( "idle: " );
              threadName.append( now.toString() );
              Thread.currentThread().setName( threadName.toString() );
            }
          }
        }
      };
      Thread t = new Thread( tg, r );
      t.setName( "initialized" );
      t.start();
    }

    if ( ! loadQueryQueue() )
    {
      System.err.println( "Query file missing: " + queryFile );
      System.exit(1);
    }

    boolean finished = false;

    while ( ! finished )
    {
      int rowCount = 0;

      // do something here....
      try
      {
        System.err.println( "sleeping, todo: " + rowCount );
        Thread.sleep(30000);

        if ( (rowCount = queryQueue.getRowCount()) <= 0 )
        {
          finished = true;
        }
      }
      catch ( InterruptedException ex )
      {
        System.err.println( "sleepus interruptus" );
      }
      catch ( Exception ex )
      {
        System.err.println( "getrowcountus interruptus" );
      }
    }

    // all done, we can leave now
    System.exit(0);
  }

  // 
  private static boolean loadQueryQueue()
  {
    int count = 0;

    try
    {
      BufferedReader queries = new BufferedReader( new FileReader( queryFile ) );

      String whoisQueryString;

      // figure out how many query names we should have
      String buff = queries.readLine();
      int rowCount;

      if ( buff != null )
      {
        while ( (whoisQueryString = queries.readLine() ) != null )
        {
          try
          {
            while ( (rowCount = queryQueue.getRowCount()) > 5000 )
            {
              Thread.sleep(1000);
              System.err.println( "rowCount: " + rowCount );
            }
          }
          catch ( InterruptedException ex )
          {
            System.err.println( "sleepus interruptus" );
          }
          catch ( Exception ex )
          {
            System.err.println( "getrowcountus interruptus" );
          }
          try
          {
            queryQueue.addRow( whoisQueryString );
            count++;
          }
          catch ( Exception ex )
          {
            System.err.println( "queryQueue.addRow exception: 1: " + ex.getMessage() );
          }
        }
        queries.close();
      }
    }
    catch ( IOException ex )
    {
      return false;
    }
    return true;
  }
}

For more information contact:
Steve Monroe
Phone: 540-822-3946
Internet: steve@pcthree.com