Apache Jclouds Integration test with Openstack SWIFT

  • A java program to test jclouds with SWIFT
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import java.io.BufferedWriter;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.BlobMetadata;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.contains;
import static org.jclouds.Constants.*;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;

import org.jclouds.ContextBuilder;
import org.jclouds.apis.ApiMetadata;
import org.jclouds.apis.Apis;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.openstack.swift.SwiftKeystoneClient;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.providers.Providers;
import org.jclouds.s3.S3Client;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;

class RejectedExecutionHandlerImpl1 implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    	//only for debugging
     }  
 } 

class MyMonitorThread1 implements Runnable  
{
	  private ThreadPoolExecutor executor;
	 
	  MyMonitorThread1(ThreadPoolExecutor executor) {
		this.executor = executor;
	  }
	 
	  @Override
	  public void run() {
		  long first_timestamp = 0;
		  long last_timestamp = 0;
		  long total_req_served = 0;
		  long last_req_served = 0;
		  while(true) {    
			  try {
                    Thread.sleep(60000);  // Take performance check every 60sec
                    total_req_served = this.executor.getCompletedTaskCount();
                    first_timestamp = System.currentTimeMillis();
                    if (total_req_served-last_req_served > 0) {
                    	System.out.println("TPS " +((total_req_served-last_req_served)*1000)/(first_timestamp-last_timestamp) + "  Avg Timetaken per request" + (first_timestamp-last_timestamp)/(total_req_served-last_req_served) + " ms" + "CurrentPoolSize: " + this.executor.getPoolSize() +  "Task Active: " +  this.executor.getActiveCount());
                    }
                    last_req_served = this.executor.getCompletedTaskCount();
                	last_timestamp = System.currentTimeMillis();
			  } catch (Exception e) {
              }  
		  }
	  }
}

class MyRunnable1 implements Runnable {
	  private final long count;
	  private long count1;
	  private long count2;
	  
	  MyRunnable1(long count) {
	    this.count = count;
	    this.count1 = count+1;
	    this.count2 = count+2;
	  }

	  @Override
	  public void run() {
	    String key = "objkey" + UUID.randomUUID();;
	    
        try {
		@SuppressWarnings("deprecation")
   		Blob blob = Example8.blobStore.blobBuilder(key).payload(Example8.file).build();
   		Example8.blobStore.putBlob(Example8.containerName+count, blob);
        	Blob recv_object = Example8.blobStore.getBlob(Example8.containerName+count, key);
        	Example8.blobStore.removeBlob(Example8.containerName+count, key);
        } catch (Exception ace) {
        	System.out.println("Request failed for objkey " + key + "    " + ace);
		ace.printStackTrace(System.out);
        }
	} 
}

public class Example8 {
  public static String containerName = "container-view";
  public static String accessKey = "ed2c3cc529";
  public static String secretKey = "5183";
  public static BlobStore blobStore;
  
  //Default params 
  public static int fileSize = 5*1024; //  5KB data
  public static int duration = 300;// sec
  public static int maxThreads =10; // max allowed parallel sessions(threads)
  public static int minThreads = 10;  // starting pool of requests, we can keep it same as max for fixed pool
  public static File file = null;
	
 public static final Map<String, ApiMetadata> allApis = Maps.uniqueIndex(Apis.viewableAs(BlobStoreContext.class),
      Apis.idFunction());
 public static final Map<String, ProviderMetadata> appProviders = Maps.uniqueIndex(Providers.viewableAs(BlobStoreContext.class),
      Providers.idFunction());   
 public static final Set<String> allKeys = ImmutableSet.copyOf(Iterables.concat(appProviders.keySet(), allApis.keySet()));
 
 /*
  * Amazon S3 client view
  */
 static BlobStoreContext getS3ClientView() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "s3"), "provider %s not in supported list: %s", "s3", allKeys);
		  
		  // Proxy check
		  Properties overrides = new Properties(); 
		  overrides.setProperty(PROPERTY_PROXY_HOST, "xx.xx.xx.xx"); 
		  overrides.setProperty(PROPERTY_PROXY_PORT, "xxxx");
		  overrides.setProperty(PROPERTY_TRUST_ALL_CERTS, "true"); 
		  overrides.setProperty(PROPERTY_RELAX_HOSTNAME, "true"); 
		  
	      
	       return ContextBuilder.newBuilder("s3")
                .credentials("AKIAIRZDK5T2A", "LaNmauVOJUj9UrVHcKZp")
                .endpoint("http://s3.amazonaws.com")
                .overrides(overrides)
                //.buildView(S3BlobStoreContext.class);
                .buildView(BlobStoreContext.class);
 }
 
 /*
  * SWIFT client view
  */
 
 static BlobStoreContext getSwiftClientView() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "swift-keystone"), "provider %s not in supported list: %s", "swift-keystone", allKeys);
		  
	       return ContextBuilder.newBuilder("swift-keystone")
	                  .credentials("testtenant:testeruser", "testpass")
	                  .endpoint("http://xx.xx.xx.xx:5000/v2.0/")
	                  .buildView(BlobStoreContext.class);
	   }
 /*
  * Amazon S3 client api
  */
 static S3Client getS3Client() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "s3"), "provider %s not in supported list: %s", "s3", allKeys);
		  
		  // Proxy check
		  Properties overrides = new Properties(); 
		  overrides.setProperty(PROPERTY_PROXY_HOST, "xx.xx.xx.xx"); 
		  overrides.setProperty(PROPERTY_PROXY_PORT, "xxxx");
		  overrides.setProperty(PROPERTY_TRUST_ALL_CERTS, "true"); 
		  overrides.setProperty(PROPERTY_RELAX_HOSTNAME, "true");
		  
		  
	      return ContextBuilder.newBuilder("s3")
	                           .endpoint("http://s3.amazonaws.com")
	    		  			   .overrides(overrides)
	                           .buildApi(S3Client.class);
	   }
 /*
  * SWIFT client api
  */
 
 static SwiftKeystoneClient getSwiftClient() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "swift-keystone"), "provider %s not in supported list: %s", "swift-keystone", allKeys);
		  
	      return ContextBuilder.newBuilder("swift-keystone")
	                           .credentials("admin:admin", "admin")
	                           .endpoint("http://xx.xx.xx.xx:5000/v2.0/")
	                           .buildApi(SwiftKeystoneClient.class);
	   }

  
  /*
   * Main function 
   */
 public static void main(String[] args) {
	 if(args.length > 0) {
		 if (args.length < 5) {
			System.out.println("Need 5 args in sequence swiftBucketName fileSize maxThreads minThreads (<=maxThreds) duration(in sec)");
		 	System.exit(0);
		 }
		 containerName = args[0];
		 fileSize = Integer.parseInt(args[1]);
		 maxThreads = Integer.parseInt(args[2]);
		 minThreads = Integer.parseInt(args[3]);
		 duration = Integer.parseInt(args[4]);
	 }   
	    
	 // Sample file
	 try {
		 file = File.createTempFile("jcloud-swift-test", ".txt");
		 BufferedWriter writer = null;
		 try {
			 writer = new BufferedWriter(new FileWriter(file));
			 for (int i = 0; i < fileSize; i++) {
				 writer.write("0");
			 }
		 } finally {
			 if (writer != null) try { writer.close(); } catch (IOException ignore) {}
		 }
	 } catch (Exception e) {
	 }

	 /*
	  * Common operations
	  */
	 BlobStoreContext context = getSwiftClientView();
	 //BlobStoreContext context = getS3ClientView();

	 /*
	  * Create Basic Containers
	  */
	 try {
		 //Thread.sleep(30000); 
		 //Get BlobStore
		 blobStore = context.getBlobStore();

		 //PUT Container
		 for (int i = 0 ; i <= 102 ; i++) {
			 blobStore.createContainerInLocation(null, containerName+i);
			 System.out.println("PUT Container for S3/Swift Service -> " + containerName+i);
		 }

	 } catch (Exception e ) {
		e.printStackTrace();
	 } finally {
		 context.close();
	 }
	      
	System.out.println("Setup completed. Test started with "+ maxThreads +" threads pool and payload " + fileSize +" for "+ duration +"secs.....");
	
        RejectedExecutionHandlerImpl1 rejectionHandler = new RejectedExecutionHandlerImpl1(); 
	ThreadFactory threadFactory = Executors.defaultThreadFactory(); 
	ThreadPoolExecutor executorPool = new ThreadPoolExecutor(minThreads, maxThreads, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory, rejectionHandler);
	MyMonitorThread1 monitor = new MyMonitorThread1(executorPool);
	Thread monitorThread = new Thread(monitor); 
	monitorThread.start();
	
	/*
	 * Starting Test execution
	 */
	int i = 0;
    long first_timestamp = System.currentTimeMillis();
    for (long stop=System.nanoTime()+TimeUnit.SECONDS.toNanos(duration); stop>System.nanoTime();) {
    	if (i > 99) {
    		i = 0;
    	}
    	try {
			Runnable worker = new MyRunnable1(i);
			executorPool.execute(worker);
			i++;
    	} catch (Exception e){
        	System.out.println("Executor failed " + e );
	}
    }
    
    System.out.println("Test iteration finished. Waiting for threads to finish.....");
    executorPool.shutdown(); 
    
    // Wait until all threads are finish
    try {
    	executorPool.awaitTermination(5,TimeUnit.MINUTES);
	} catch (Exception e) {
	}
	long last_timestamp = System.currentTimeMillis();
	
	try {
		System.out.println(":: Test Output ::");
		System.out.println("Total requests serverd " + executorPool.getCompletedTaskCount() + " in " +  (last_timestamp-first_timestamp) + " ms. Average TPS = " + executorPool.getCompletedTaskCount()/((last_timestamp-first_timestamp)/1000) + " Average Time taken per req " +  (last_timestamp-first_timestamp)/(executorPool.getCompletedTaskCount()) + " ms ");
		Thread.sleep(5000); //for last few results from monitor
		System.exit(0);
	} catch (Exception e) {
	}
		
  }
}



  • A java servlet to test jclouds with SWIFT

 

import java.io.BufferedWriter;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.contains;
import static org.jclouds.Constants.*;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;

import org.jclouds.ContextBuilder;
import org.jclouds.apis.ApiMetadata;
import org.jclouds.apis.Apis;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.openstack.swift.SwiftKeystoneClient;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.providers.Providers;
import org.jclouds.s3.S3Client;
//import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;


import java.util.ArrayList;
import java.util.Date;
import java.util.List;



public class Worker implements Runnable 
{
  public boolean running = false;
  public static String containerName = "container-view";
  public static BlobStore blobStore;

  //Default params
  public static int fileSize = 5*1024; //  5KB data
  public static int duration = 600;// sec
  public static int maxThreads =1; // max allowed parallel sessions(threads)
  public static int minThreads = 1;  // starting pool of requests, we can keep it same as max for fixed pool
  public static File file = null;

  private long counter = 0;  
  private long BucketCounter = 0;  
  private long Del_BucketCounter=0;
  private long del_counter=0;

  public Worker (long b_count, long count, long d_b_count, long d_count)
  {
    this.counter = count;
    this.BucketCounter = b_count;
    this.del_counter = d_count;
    this.Del_BucketCounter = d_b_count;
    Thread thread = new Thread(this);
    thread.start();
  }
 static BlobStoreContext getSwiftClientView() {
                  // Check if a provider is present ahead of time

               return ContextBuilder.newBuilder("swift-keystone")
                          .credentials("service:swiftte", "cloudlabte")
                          .endpoint("http://xx.xx.xx.xx:5000/v2.0/")
                          .buildView(BlobStoreContext.class);
           }

  
  public static void main (String[] args) throws InterruptedException
  {
    List<Worker> Workers = new ArrayList<Worker>();
    
         if(args.length > 0) {
                 if (args.length < 5) {
                        System.out.println("Need 5 args in sequence swiftBucketName fileSize maxThreads minThreads (<=maxThreds) duration(in sec)");
                        System.exit(0);
                 }
                 containerName = args[0];
                 fileSize = Integer.parseInt(args[1]);
                 maxThreads = Integer.parseInt(args[2]);
                 minThreads = Integer.parseInt(args[3]);
                 duration = Integer.parseInt(args[4]);
         }

         // Sample file
         try {
                 file = File.createTempFile("jcloud-s3-test", ".txt");
                 BufferedWriter writer = null;
                 try {
                         writer = new BufferedWriter(new FileWriter(file));
                         for (int i = 0; i < fileSize; i++) {
                                 writer.write("0");
                         }
                 } finally {
                         if (writer != null) try { writer.close(); } catch (IOException ignore) {}
                 }
         } catch (Exception e) {
                e.printStackTrace();
         }

         BlobStoreContext context = getSwiftClientView();
         try {
                 //Thread.sleep(30000);
                 //Get BlobStore
                 blobStore = context.getBlobStore();

                 //PUT Container
                 for (int i = 0 ; i < 500 ; i++) {
                         blobStore.createContainerInLocation(null, containerName+i);
                         System.out.println("PUT Container for Swift Service -> " + containerName+i);
                 }

         } catch (Exception e ) {
                e.printStackTrace();
         } finally {
                 context.close();
         }

    //Date start = new Date();
    long first_timestamp = System.currentTimeMillis();
    long bucket_num = 0;
    long object_num = 0;
    long d_bucket_num = 0;
    long d_object_num = 0;
    long old_object_num = 0;
    long loop_count = 0;
    long loop_first_timestamp = 0;
    long loop_last_timestamp = 0;
    long loop_difference = 0;
    for (long stop=System.nanoTime()+TimeUnit.SECONDS.toNanos(duration); stop>System.nanoTime();) {

    for (int i=0; i<maxThreads; i++)
    {
       if (bucket_num >= 500 )
	bucket_num = 0;

	if ( object_num >= 5000) {
       		if (d_bucket_num >= 500 )
			d_bucket_num = 0;
	}

       Workers.add(new Worker(bucket_num,object_num,d_bucket_num,d_object_num)); 
       object_num++;
       bucket_num++;
	if (object_num >= 5000) {
		d_bucket_num++;
		d_object_num++;
	}
    }
    
    // We must force the main thread to wait for all the Workers
    //  to finish their work before we check to see how long it
    //  took to complete
    for (Worker Worker : Workers)
    {
      while (Worker.running)
      {
        Thread.sleep(20);
      }
    }

    if (loop_count%50 == 0) {
	loop_last_timestamp = System.currentTimeMillis();
	loop_difference = loop_last_timestamp - loop_first_timestamp;
	System.out.println ("Approax TPS is " + (object_num-old_object_num)*1000/loop_difference + "Time taken by 50 loops" + loop_difference);
	loop_first_timestamp = loop_last_timestamp;
	old_object_num = object_num;
    }
    loop_count++;

    }
    long last_timestamp = System.currentTimeMillis();
    //Date end = new Date();
    
    long difference = last_timestamp - first_timestamp;
    
    System.out.println ("This whole process took: " + difference/1000 + " seconds and TPS is " + object_num*1000/difference + " Total Objects PUT " + object_num + "Total Objects  GET and DEL" + d_object_num);
    System.exit(0);
  }
  
  @Override
  public void run() 
  {
    this.running = true;
    String key = "objkey" + counter;
    String del_key = "objkey" + del_counter;
    try 
    {
		@SuppressWarnings("deprecation")
                Blob blob = Worker.blobStore.blobBuilder(key).payload(Worker.file).build();
                Worker.blobStore.putBlob(Worker.containerName+BucketCounter, blob);
		if (counter > 5000) {
		    //System.out.println (" Del_BucketCounter" + Del_BucketCounter + "del_key " + del_key);
                    Worker.blobStore.getBlob(Worker.containerName+Del_BucketCounter, del_key);
                    Worker.blobStore.removeBlob(Worker.containerName+Del_BucketCounter, del_key);
		}

    }
    catch (Exception e) 
    {
	System.out.println ("Error for key" + del_key);
      e.printStackTrace();

    }
    this.running = false;
  }

}
Advertisements
This entry was posted in ObjectStorage and tagged . Bookmark the permalink.

One Response to Apache Jclouds Integration test with Openstack SWIFT

  1. Torie says:

    That’s a nicely made answer to a challenging question

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s