Hadoop !!

logo

Hadoop(NoSQL) if a fork off from Google’s Nutch project.

Apache Hadoop is an open source framework that allows for distributed processing of large data sets across computing clusters

Apache Hadoop has these major projects:
Hadoop Distributed File System (HDFS): A distributed File System for high-throughput access to large sets of data

Hadoop Common: The common utilities that support the other Hadoop modules.

Hadoop YARN: A framework for job scheduling and cluster “resource management”. YARN is a generic platform that run any distributed applicaiton and MR2 is a distributed application that run on top of YARN.

MapReduce 2 : A YARN-based system for parallel processing of large sets of data

Hadoop

Thanks to http://www.youtube.com/playlist?list=PL9ooVrP1hQOHrhnO86Z9m9tDi91W2d1b6

HADOOP stack  :-

YARN

Hadoop is best suited for:
Processing unstructured data
Complex parallel information processing
Large Data Sets/Files
Machine Learning
Critical fault tolerant data processing
Reports not needed in real time
Queries that cannot be expressed by SQL
Data processing Jobs needs to be faster

How Hadoop processes data (MapReduce:- analogy is java servlet)
1) Hadoop provides framework MapReduce for processing the stored big data. The important innovation of MapReduce
is the ability to take a query over a dataset, divide it, and run it in parallel over multiple nodes.

2) You can run your indexing job by sending your code to each of the dozens of servers in your cluster,
and each server operates on its own little piece of the data. Results are then delivered back to you in a unified whole.
(MapReduce)you map the operation out to all of those servers and then you reduce the results back into a single result set.

How Hadoop stores files (HDFS)
1) Hadoop lets you store files bigger than what can be stored on one particular node or server.

2) When you want to load all of your organization’s data into Hadoop, what the software does is bust
that data into pieces that it then spreads across your different servers. There’s no one place where
you go to talk to all of your data; Hadoop keeps track of where the data resides. And because there are
multiple copy stores, data stored on a server that goes offline or dies can be automatically replicated
from a known good copy.

3) Hadoop is designed to run on a large number of machines that don’t share any memory or disks.

4) Each server must have access to the data. This is the role of HDFS, the Hadoop Distributed File System.
HDFS ensures data is replicated with redundancy across the cluster.

Hadoop programming
Programming Hadoop at the MapReduce level is a case of working with the Java APIs, and manually loading data files into HDFS.
Working directly with Java APIs can be tedious and error prone. It also restricts usage of Hadoop to Java programmers. Hadoop offers two solutions for making Hadoop programming easier. PIG and HIVE

1) Pig is a programming language that simplifies the common tasks of working with Hadoop: loading data, expressing transformations on the data, and storing the final results.
2) Hive enables Hadoop to operate as a data warehouse. It superimposes structure on data in HDFS and then permits queries over the data using a familiar SQL-like syntax.

Build

http://hadoop.apache.org/docs/r2.3.0/hadoop-project-dist/hadoop-common/SingleCluster.html
to fix build issues:
https://issues.apache.org/jira/secure/attachment/12614482/HADOOP-10110.patch

==================================================

Openstack – Sahara with Hadoop   
1) Sahara = Managing  Hadoop + Provisioing infrastructure + tools
●  Create and manage clusters
●  Define and run analysis jobs
●  All through a programmatic interface Or a web console
2) Sahara ReST API  http://docs.openstack.org/developer/sahara/restapi/rest_api_v1.0.html

Sahara

Thanks to http://www.slideshare.net/spinningmatt/sahara-dev-nation-2014

============================================

Hadoop Ecosystem:- 

Hadoop Ecosystem

Thanks to  http://techblog.baghel.com/index.php?itemid=132

Whats Next ….

Try out  http://hortonworks.com/products/hortonworks-sandbox/

 

Posted in OpenStack | Tagged , , , | Leave a comment

Python-Debugging

logo

 

** Steps to debug a python script

* apt-get install gdb python2.7-dbg

* We need a gdb  compiled with –with-python
wget http://ftp.gnu.org/gnu/gdb/gdb-7.6.2.tar.gz
./configure –with-python=/usr/bin
make
make install
It will install in /usr/local/bin/

* On Ubuntu we have to install python-dbg, Further for debug session we have to use python-dbg and not python

apt-get install python-dbg

* Also we need python debug build for symbols to be loaded in gdb session

So clone python and build it, we need below path for debug session

/usr/local/src/Python-2.7.3/Tools/gdb

* To check if gdb support python
gdb –batch –eval-command=”python print gdb”

** A Sample session

root@proxy1:~/sumit# /usr/local/bin/gdb –args python-dbg ~/sumit/example.py
GNU gdb (GDB) 7.6.2
Copyright (C) 2013 Free Software Foundation, Inc.
License GPLv3+: GNU GPL version 3 or later <http://gnu.org/licenses/gpl.html&gt;
This is free software: you are free to change and redistribute it.
There is NO WARRANTY, to the extent permitted by law. Type “show copying”
and “show warranty” for details.
This GDB was configured as “x86_64-unknown-linux-gnu”.
For bug reporting instructions, please see:
<http://www.gnu.org/software/gdb/bugs/>&#8230;
Reading symbols from /usr/bin/python2.7…done.
(gdb) run
Starting program: /usr/local/bin/python /root/sumit/example.py
[Thread debugging using libthread_db enabled]
Using host libthread_db library “/lib/x86_64-linux-gnu/libthread_db.so.1”.
Program received signal SIGSEGV, Segmentation fault.
0x000000000046f06f in PyString_FromString (str=0xdeadbeef <Address 0xdeadbeef out of bounds>) at Objects/stringobject.c:121
121 size = strlen(str);
(gdb) py-bt
Undefined command: “py-bt”. Try “help”.
(gdb) python import sys;
(gdb) python sys.path.insert(0, “/usr/local/src/Python-2.7.3/Tools/gdb”);
(gdb) python import libpython;
(gdb) py-bt
#10 Frame 0xa932e0, for file /usr/lib/python2.7/ctypes/__init__.py, line 509, in string_at (ptr=3735928559, size=-1)
return _string_at(ptr, size)
#14 Frame 0xa8e920, for file /root/sumit/example.py, line 5, in bar (self=<Foo(someattr=42, someotherattr={‘three’: [(), (None,), (None, None)], ‘two’: 2L, ‘one’: 1}) at remote 0xa32768>, string_at=<function at remote 0xaa73a8>)
string_at(0xDEADBEEF) # this code will cause Python to segfault
#17 Frame 0xa8e710, for file /root/sumit/example.py, line 12, in main (f=<Foo(someattr=42, someotherattr={‘three’: [(), (None,), (None, None)], ‘two’: 2L, ‘one’: 1}) at remote 0xa32768>)
f.bar()
#20 Frame 0xa8e010, for file /root/sumit/example.py, line 16, in <module> ()
main()

(gdb) py-list

(gdb) thread apply all py-list

 py-locals , py-up, py-down,

** Further reading 

https://docs.python.org/devguide/gdb.html

** Steps to debug a python daemon

Example Session

* Start daemon with python-dbg

python-dbg daemon.py

root@object1:~/sumit# /usr/local/bin/gdb –args python-dbg
GNU gdb (GDB) 7.6.2
::::::::::::::::::::::::::::::::
Reading symbols from /usr/bin/python2.7-dbg…done.
(gdb) python import sys;
(gdb) python sys.path.insert(0,”/root/sumit/Python-2.7.3/Tools/gdb”);
(gdb) python import libpython;
(gdb) attach 14268
Attaching to program: /usr/bin/python-dbg, process 14268
Reading symbols from /lib/x86_64-linux-gnu/libpthread.so.0…(no debugging symbols found)…done.
[Thread debugging using libthread_db enabled]
Using host libthread_db library “/lib/x86_64-linux-gnu/libthread_db.so.1”.
Loaded symbols for /lib/x86_64-linux-gnu/libpthread.so.0
Reading symbols from /lib/x86_64-linux-gnu/libdl.so.2…(no debugging symbols found)…done.
Loaded symbols for /lib/x86_64-linux-gnu/libdl.so.2
Reading symbols from /lib/x86_64-linux-gnu/libutil.so.1…(no debugging symbols found)…done.
Loaded symbols for /lib/x86_64-linux-gnu/libutil.so.1
Reading symbols from /lib/x86_64-linux-gnu/libssl.so.1.0.0…(no debugging symbols found)…done.
Loaded symbols for /lib/x86_64-linux-gnu/libssl.so.1.0.0
Reading symbols from /lib/x86_64-linux-gnu/libcrypto.so.1.0.0…(no debugging symbols found)…done.
Loaded symbols for /lib/x86_64-linux-gnu/libcrypto.so.1.0.0
Reading symbols from /lib/x86_64-linux-gnu/libz.so.1…(no debugging symbols found)…done.
Loaded symbols for /lib/x86_64-linux-gnu/libz.so.1
Reading symbols from /lib/x86_64-linux-gnu/libm.so.6…(no debugging symbols found)…done.
Loaded symbols for /lib/x86_64-linux-gnu/libm.so.6
Reading symbols from /lib/x86_64-linux-gnu/libc.so.6…(no debugging symbols found)…done.
Loaded symbols for /lib/x86_64-linux-gnu/libc.so.6
Reading symbols from /lib64/ld-linux-x86-64.so.2…(no debugging symbols found)…done.
Loaded symbols for /lib64/ld-linux-x86-64.so.2
0x00007fdcf1213743 in select () from /lib/x86_64-linux-gnu/libc.so.6
(gdb) py-bt
#5 Frame 0xd88480, for file /root/sumit/temp.py, line 4, in next (i=1)
time.sleep(10)
#8 Frame 0xd85d70, for file /root/sumit/temp.py, line 9, in <module> ()
next(i)

(gdb) py-list
1 import time
2
3 def next(i):
>4 time.sleep(10)
5 i = 1 – i
6
7 i = 1
8 while True:
9 next(i)

(gdb) finish
Run till exit from #0 0x00007f9849a6a743 in select () from /lib/x86_64-linux-gnu/libc.so.6
0x000000000058d1f3 in floatsleep (secs=10) at ../Modules/timemodule.c:943
943 ../Modules/timemodule.c: No such file or directory.
(gdb) next
954 in ../Modules/timemodule.c
(gdb) cd Python-2.7.3/Python/
Working directory /root/sumit/Python-2.7.3/Python.
(gdb) next
1051 return 0;
(gdb) list
1046 Py_BEGIN_ALLOW_THREADS
1047 sleep((int)secs);
1048 Py_END_ALLOW_THREADS
1049 #endif
1050
1051 return 0;
1052 }
1053

*** Aattch gdb to openstack swift process

swift-init stop all

apt-get install python-dbg

apt-get install python-greenlet-dbg

apt-get install python-netifaces-dbg

ln -s /usr/bin/python2.7-dbg /usr/bin/python

swift-init start main

and then attach gdb as above to the swift process

Posted in Programming | Leave a comment

Apache Jclouds Integration test with Openstack SWIFT

  • A java program to test jclouds with SWIFT
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import java.io.BufferedWriter;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;

import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.BlobMetadata;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.contains;
import static org.jclouds.Constants.*;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;

import org.jclouds.ContextBuilder;
import org.jclouds.apis.ApiMetadata;
import org.jclouds.apis.Apis;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.openstack.swift.SwiftKeystoneClient;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.providers.Providers;
import org.jclouds.s3.S3Client;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;

class RejectedExecutionHandlerImpl1 implements RejectedExecutionHandler { 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    	//only for debugging
     }  
 } 

class MyMonitorThread1 implements Runnable  
{
	  private ThreadPoolExecutor executor;
	 
	  MyMonitorThread1(ThreadPoolExecutor executor) {
		this.executor = executor;
	  }
	 
	  @Override
	  public void run() {
		  long first_timestamp = 0;
		  long last_timestamp = 0;
		  long total_req_served = 0;
		  long last_req_served = 0;
		  while(true) {    
			  try {
                    Thread.sleep(60000);  // Take performance check every 60sec
                    total_req_served = this.executor.getCompletedTaskCount();
                    first_timestamp = System.currentTimeMillis();
                    if (total_req_served-last_req_served > 0) {
                    	System.out.println("TPS " +((total_req_served-last_req_served)*1000)/(first_timestamp-last_timestamp) + "  Avg Timetaken per request" + (first_timestamp-last_timestamp)/(total_req_served-last_req_served) + " ms" + "CurrentPoolSize: " + this.executor.getPoolSize() +  "Task Active: " +  this.executor.getActiveCount());
                    }
                    last_req_served = this.executor.getCompletedTaskCount();
                	last_timestamp = System.currentTimeMillis();
			  } catch (Exception e) {
              }  
		  }
	  }
}

class MyRunnable1 implements Runnable {
	  private final long count;
	  private long count1;
	  private long count2;
	  
	  MyRunnable1(long count) {
	    this.count = count;
	    this.count1 = count+1;
	    this.count2 = count+2;
	  }

	  @Override
	  public void run() {
	    String key = "objkey" + UUID.randomUUID();;
	    
        try {
		@SuppressWarnings("deprecation")
   		Blob blob = Example8.blobStore.blobBuilder(key).payload(Example8.file).build();
   		Example8.blobStore.putBlob(Example8.containerName+count, blob);
        	Blob recv_object = Example8.blobStore.getBlob(Example8.containerName+count, key);
        	Example8.blobStore.removeBlob(Example8.containerName+count, key);
        } catch (Exception ace) {
        	System.out.println("Request failed for objkey " + key + "    " + ace);
		ace.printStackTrace(System.out);
        }
	} 
}

public class Example8 {
  public static String containerName = "container-view";
  public static String accessKey = "ed2c3cc529";
  public static String secretKey = "5183";
  public static BlobStore blobStore;
  
  //Default params 
  public static int fileSize = 5*1024; //  5KB data
  public static int duration = 300;// sec
  public static int maxThreads =10; // max allowed parallel sessions(threads)
  public static int minThreads = 10;  // starting pool of requests, we can keep it same as max for fixed pool
  public static File file = null;
	
 public static final Map<String, ApiMetadata> allApis = Maps.uniqueIndex(Apis.viewableAs(BlobStoreContext.class),
      Apis.idFunction());
 public static final Map<String, ProviderMetadata> appProviders = Maps.uniqueIndex(Providers.viewableAs(BlobStoreContext.class),
      Providers.idFunction());   
 public static final Set<String> allKeys = ImmutableSet.copyOf(Iterables.concat(appProviders.keySet(), allApis.keySet()));
 
 /*
  * Amazon S3 client view
  */
 static BlobStoreContext getS3ClientView() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "s3"), "provider %s not in supported list: %s", "s3", allKeys);
		  
		  // Proxy check
		  Properties overrides = new Properties(); 
		  overrides.setProperty(PROPERTY_PROXY_HOST, "xx.xx.xx.xx"); 
		  overrides.setProperty(PROPERTY_PROXY_PORT, "xxxx");
		  overrides.setProperty(PROPERTY_TRUST_ALL_CERTS, "true"); 
		  overrides.setProperty(PROPERTY_RELAX_HOSTNAME, "true"); 
		  
	      
	       return ContextBuilder.newBuilder("s3")
                .credentials("AKIAIRZDK5T2A", "LaNmauVOJUj9UrVHcKZp")
                .endpoint("http://s3.amazonaws.com")
                .overrides(overrides)
                //.buildView(S3BlobStoreContext.class);
                .buildView(BlobStoreContext.class);
 }
 
 /*
  * SWIFT client view
  */
 
 static BlobStoreContext getSwiftClientView() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "swift-keystone"), "provider %s not in supported list: %s", "swift-keystone", allKeys);
		  
	       return ContextBuilder.newBuilder("swift-keystone")
	                  .credentials("testtenant:testeruser", "testpass")
	                  .endpoint("http://xx.xx.xx.xx:5000/v2.0/")
	                  .buildView(BlobStoreContext.class);
	   }
 /*
  * Amazon S3 client api
  */
 static S3Client getS3Client() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "s3"), "provider %s not in supported list: %s", "s3", allKeys);
		  
		  // Proxy check
		  Properties overrides = new Properties(); 
		  overrides.setProperty(PROPERTY_PROXY_HOST, "xx.xx.xx.xx"); 
		  overrides.setProperty(PROPERTY_PROXY_PORT, "xxxx");
		  overrides.setProperty(PROPERTY_TRUST_ALL_CERTS, "true"); 
		  overrides.setProperty(PROPERTY_RELAX_HOSTNAME, "true");
		  
		  
	      return ContextBuilder.newBuilder("s3")
	                           .endpoint("http://s3.amazonaws.com")
	    		  			   .overrides(overrides)
	                           .buildApi(S3Client.class);
	   }
 /*
  * SWIFT client api
  */
 
 static SwiftKeystoneClient getSwiftClient() {
		  // Check if a provider is present ahead of time
		  checkArgument(contains(allKeys, "swift-keystone"), "provider %s not in supported list: %s", "swift-keystone", allKeys);
		  
	      return ContextBuilder.newBuilder("swift-keystone")
	                           .credentials("admin:admin", "admin")
	                           .endpoint("http://xx.xx.xx.xx:5000/v2.0/")
	                           .buildApi(SwiftKeystoneClient.class);
	   }

  
  /*
   * Main function 
   */
 public static void main(String[] args) {
	 if(args.length > 0) {
		 if (args.length < 5) {
			System.out.println("Need 5 args in sequence swiftBucketName fileSize maxThreads minThreads (<=maxThreds) duration(in sec)");
		 	System.exit(0);
		 }
		 containerName = args[0];
		 fileSize = Integer.parseInt(args[1]);
		 maxThreads = Integer.parseInt(args[2]);
		 minThreads = Integer.parseInt(args[3]);
		 duration = Integer.parseInt(args[4]);
	 }   
	    
	 // Sample file
	 try {
		 file = File.createTempFile("jcloud-swift-test", ".txt");
		 BufferedWriter writer = null;
		 try {
			 writer = new BufferedWriter(new FileWriter(file));
			 for (int i = 0; i < fileSize; i++) {
				 writer.write("0");
			 }
		 } finally {
			 if (writer != null) try { writer.close(); } catch (IOException ignore) {}
		 }
	 } catch (Exception e) {
	 }

	 /*
	  * Common operations
	  */
	 BlobStoreContext context = getSwiftClientView();
	 //BlobStoreContext context = getS3ClientView();

	 /*
	  * Create Basic Containers
	  */
	 try {
		 //Thread.sleep(30000); 
		 //Get BlobStore
		 blobStore = context.getBlobStore();

		 //PUT Container
		 for (int i = 0 ; i <= 102 ; i++) {
			 blobStore.createContainerInLocation(null, containerName+i);
			 System.out.println("PUT Container for S3/Swift Service -> " + containerName+i);
		 }

	 } catch (Exception e ) {
		e.printStackTrace();
	 } finally {
		 context.close();
	 }
	      
	System.out.println("Setup completed. Test started with "+ maxThreads +" threads pool and payload " + fileSize +" for "+ duration +"secs.....");
	
        RejectedExecutionHandlerImpl1 rejectionHandler = new RejectedExecutionHandlerImpl1(); 
	ThreadFactory threadFactory = Executors.defaultThreadFactory(); 
	ThreadPoolExecutor executorPool = new ThreadPoolExecutor(minThreads, maxThreads, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory, rejectionHandler);
	MyMonitorThread1 monitor = new MyMonitorThread1(executorPool);
	Thread monitorThread = new Thread(monitor); 
	monitorThread.start();
	
	/*
	 * Starting Test execution
	 */
	int i = 0;
    long first_timestamp = System.currentTimeMillis();
    for (long stop=System.nanoTime()+TimeUnit.SECONDS.toNanos(duration); stop>System.nanoTime();) {
    	if (i > 99) {
    		i = 0;
    	}
    	try {
			Runnable worker = new MyRunnable1(i);
			executorPool.execute(worker);
			i++;
    	} catch (Exception e){
        	System.out.println("Executor failed " + e );
	}
    }
    
    System.out.println("Test iteration finished. Waiting for threads to finish.....");
    executorPool.shutdown(); 
    
    // Wait until all threads are finish
    try {
    	executorPool.awaitTermination(5,TimeUnit.MINUTES);
	} catch (Exception e) {
	}
	long last_timestamp = System.currentTimeMillis();
	
	try {
		System.out.println(":: Test Output ::");
		System.out.println("Total requests serverd " + executorPool.getCompletedTaskCount() + " in " +  (last_timestamp-first_timestamp) + " ms. Average TPS = " + executorPool.getCompletedTaskCount()/((last_timestamp-first_timestamp)/1000) + " Average Time taken per req " +  (last_timestamp-first_timestamp)/(executorPool.getCompletedTaskCount()) + " ms ");
		Thread.sleep(5000); //for last few results from monitor
		System.exit(0);
	} catch (Exception e) {
	}
		
  }
}



  • A java servlet to test jclouds with SWIFT

 

import java.io.BufferedWriter;

import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.Iterables.contains;
import static org.jclouds.Constants.*;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;

import org.jclouds.ContextBuilder;
import org.jclouds.apis.ApiMetadata;
import org.jclouds.apis.Apis;
import org.jclouds.blobstore.BlobStore;
import org.jclouds.blobstore.BlobStoreContext;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.openstack.swift.SwiftKeystoneClient;
import org.jclouds.providers.ProviderMetadata;
import org.jclouds.providers.Providers;
import org.jclouds.s3.S3Client;
//import static org.jclouds.blobstore.options.PutOptions.Builder.multipart;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Maps;


import java.util.ArrayList;
import java.util.Date;
import java.util.List;



public class Worker implements Runnable 
{
  public boolean running = false;
  public static String containerName = "container-view";
  public static BlobStore blobStore;

  //Default params
  public static int fileSize = 5*1024; //  5KB data
  public static int duration = 600;// sec
  public static int maxThreads =1; // max allowed parallel sessions(threads)
  public static int minThreads = 1;  // starting pool of requests, we can keep it same as max for fixed pool
  public static File file = null;

  private long counter = 0;  
  private long BucketCounter = 0;  
  private long Del_BucketCounter=0;
  private long del_counter=0;

  public Worker (long b_count, long count, long d_b_count, long d_count)
  {
    this.counter = count;
    this.BucketCounter = b_count;
    this.del_counter = d_count;
    this.Del_BucketCounter = d_b_count;
    Thread thread = new Thread(this);
    thread.start();
  }
 static BlobStoreContext getSwiftClientView() {
                  // Check if a provider is present ahead of time

               return ContextBuilder.newBuilder("swift-keystone")
                          .credentials("service:swiftte", "cloudlabte")
                          .endpoint("http://xx.xx.xx.xx:5000/v2.0/")
                          .buildView(BlobStoreContext.class);
           }

  
  public static void main (String[] args) throws InterruptedException
  {
    List<Worker> Workers = new ArrayList<Worker>();
    
         if(args.length > 0) {
                 if (args.length < 5) {
                        System.out.println("Need 5 args in sequence swiftBucketName fileSize maxThreads minThreads (<=maxThreds) duration(in sec)");
                        System.exit(0);
                 }
                 containerName = args[0];
                 fileSize = Integer.parseInt(args[1]);
                 maxThreads = Integer.parseInt(args[2]);
                 minThreads = Integer.parseInt(args[3]);
                 duration = Integer.parseInt(args[4]);
         }

         // Sample file
         try {
                 file = File.createTempFile("jcloud-s3-test", ".txt");
                 BufferedWriter writer = null;
                 try {
                         writer = new BufferedWriter(new FileWriter(file));
                         for (int i = 0; i < fileSize; i++) {
                                 writer.write("0");
                         }
                 } finally {
                         if (writer != null) try { writer.close(); } catch (IOException ignore) {}
                 }
         } catch (Exception e) {
                e.printStackTrace();
         }

         BlobStoreContext context = getSwiftClientView();
         try {
                 //Thread.sleep(30000);
                 //Get BlobStore
                 blobStore = context.getBlobStore();

                 //PUT Container
                 for (int i = 0 ; i < 500 ; i++) {
                         blobStore.createContainerInLocation(null, containerName+i);
                         System.out.println("PUT Container for Swift Service -> " + containerName+i);
                 }

         } catch (Exception e ) {
                e.printStackTrace();
         } finally {
                 context.close();
         }

    //Date start = new Date();
    long first_timestamp = System.currentTimeMillis();
    long bucket_num = 0;
    long object_num = 0;
    long d_bucket_num = 0;
    long d_object_num = 0;
    long old_object_num = 0;
    long loop_count = 0;
    long loop_first_timestamp = 0;
    long loop_last_timestamp = 0;
    long loop_difference = 0;
    for (long stop=System.nanoTime()+TimeUnit.SECONDS.toNanos(duration); stop>System.nanoTime();) {

    for (int i=0; i<maxThreads; i++)
    {
       if (bucket_num >= 500 )
	bucket_num = 0;

	if ( object_num >= 5000) {
       		if (d_bucket_num >= 500 )
			d_bucket_num = 0;
	}

       Workers.add(new Worker(bucket_num,object_num,d_bucket_num,d_object_num)); 
       object_num++;
       bucket_num++;
	if (object_num >= 5000) {
		d_bucket_num++;
		d_object_num++;
	}
    }
    
    // We must force the main thread to wait for all the Workers
    //  to finish their work before we check to see how long it
    //  took to complete
    for (Worker Worker : Workers)
    {
      while (Worker.running)
      {
        Thread.sleep(20);
      }
    }

    if (loop_count%50 == 0) {
	loop_last_timestamp = System.currentTimeMillis();
	loop_difference = loop_last_timestamp - loop_first_timestamp;
	System.out.println ("Approax TPS is " + (object_num-old_object_num)*1000/loop_difference + "Time taken by 50 loops" + loop_difference);
	loop_first_timestamp = loop_last_timestamp;
	old_object_num = object_num;
    }
    loop_count++;

    }
    long last_timestamp = System.currentTimeMillis();
    //Date end = new Date();
    
    long difference = last_timestamp - first_timestamp;
    
    System.out.println ("This whole process took: " + difference/1000 + " seconds and TPS is " + object_num*1000/difference + " Total Objects PUT " + object_num + "Total Objects  GET and DEL" + d_object_num);
    System.exit(0);
  }
  
  @Override
  public void run() 
  {
    this.running = true;
    String key = "objkey" + counter;
    String del_key = "objkey" + del_counter;
    try 
    {
		@SuppressWarnings("deprecation")
                Blob blob = Worker.blobStore.blobBuilder(key).payload(Worker.file).build();
                Worker.blobStore.putBlob(Worker.containerName+BucketCounter, blob);
		if (counter > 5000) {
		    //System.out.println (" Del_BucketCounter" + Del_BucketCounter + "del_key " + del_key);
                    Worker.blobStore.getBlob(Worker.containerName+Del_BucketCounter, del_key);
                    Worker.blobStore.removeBlob(Worker.containerName+Del_BucketCounter, del_key);
		}

    }
    catch (Exception e) 
    {
	System.out.println ("Error for key" + del_key);
      e.printStackTrace();

    }
    this.running = false;
  }

}
Posted in ObjectStorage | Tagged | 1 Comment

Continuous Integration

“Continuous Integration” is a development practice that requires developers to integrate code into a shared repository  time to times. Each check-in is then verified by an automated build, allowing teams to detect problems early. Apart from automating the builds we can automate the deployment and test.

So complete activity can be divided in to 2 pieces

1) Sync latest Code, Build and run unit test using Jenkins.

http://jenkins-ci.org/

2) Automate the deplyment to test environment and run funcitonal and integration tests using CHEF.

http://www.getchef.com/chef/

Posted in OpenStack | Tagged | Leave a comment

OpenStack Cinder with SolidFire (NextGen Scale-Out Storage)

 

logo

 

Why I like SolidFire

1) 100% flash

2) Fine-grain Quality of Service (min, max, burst)

3) Adjust resources on the fly, Linear resource scalability

4) Automatic load balancing

5)  Global deduplication and compression. Thin Provisioning.

6) 100% REST-based API driven

7) Extremely low power consumption

8) Performance virtualization (Read/Write from cache)

Openstack Cinder with SolidFire storage 

solidfire-cinder

Role of cinder volume driver

1) A Cinder volume can be used as the boot disk for a Cloud instance; in that scenario, an ephemeral disk is not required.
2) Cinder allows block devices to be exposed and connected to compute instances for expanded stroage, better performance and integration with different storage platform like SolidFire.

CinderIntegration

3) Cinder volume driver manages the creation, attaching, detaching, snapshoting and monitoring of block devices to servers.

 

Posted in BlockStorage | Tagged , | Leave a comment

Ceph (A Distributed Object Store)

Image

1.Basic Ceph cluster installation

1.1Prerequisites for Ceph cluster

  • Basic installation includes 5 nodes. 2 For OSDs, 1 for monitor 1 for Ceph admin and last for ceph client.
  • In case Nodes are behind Http Proxy. We need to add below entries in /etc/apt/apt.conf for all nodes

Acquire::http::proxy “http://<proxy&gt;:<port>/”;

Acquire::https::proxy “https://<proxy&gt;:<port>/”;

  • All hosts must be reachable by its hostname (e.g., you can modify /etc/hosts if necessary).
  • Add the Ceph packages and release key to our repository.

sudo –i ; set http proxies (if required)

wget -q –no-check-certificate -O- ‘https://ceph.com/git/?p=ceph.git;a=blob_plain;f=keys/release.asc&#8217; | apt-key add –

echo deb http://ceph.com/debian-emperor/ $(lsb_release -sc) main | sudo tee /etc/apt/sources.list.d/ceph.list

1.2 Installation 

  • Install ceph-deploy on “admin node”

sudo apt-get update && sudo apt-get install ceph-deploy

  • Use ceph-deploy to create the SSH key and copy it to the initial monitor nodes automatically when you create the new cluster

ceph-deploy new node1 (monitor node)

  • For other Ceph Nodes perform the following steps:
  1. Create a user on each Ceph Node.

ssh user@ceph-server

sudo useradd -d /home/ceph -m ceph

sudo passwd ceph

  1. Add root privileges for the user on each Ceph Node.

echo “ceph ALL = (root) NOPASSWD:ALL” | sudo tee /etc/sudoers.d/ceph

sudo chmod 0440 /etc/sudoers.d/ceph

  1. Install an SSH server (if necessary) on each Ceph Node:

sudo apt-get install openssh-server

sudo yum install openssh-server

Configure your ceph-deploy admin node with password-less SSH access to each Ceph Node. When configuring SSH

access, do not use sudo or the root user. Leave the passphrase empty:

ssh-keygen

  1. Copy the key to each Ceph Node.

ssh-copy-id ceph@node1

ssh-copy-id ceph@node2

ssh-copy-id ceph@node3

  1. Modify the ~/.ssh/config file of your ceph-deploy admin node so that it logs in to Ceph Nodes as the user you created (e.g., ceph).
  • Create a directory on your admin node for maintaining the configuration that ceph-deploy generates for your cluster. Run all admin commands from this directory. Do not use sudo for any ceph-deploy command.

mkdir my-cluster

cd my-cluster

  • Create Base Ceph cluster from Admin node. On your admin node from the directory you created for holding your configuration file, perform the following steps using ceph-deploy.
  1. Create the cluster.

ceph-deploy new {initial-monitor-node(s)} i.e. ceph-deploy new node1

** Check the output of ceph-deploy with ls and cat in the current directory. You should see a Ceph configuration file, a monitor secret keyring, and a log file for the new cluster. See ceph-deploy new -h for additional details.

** If you have more than one network interface, add the public network setting under the [global] section of your Ceph configuration file. See the Network Configuration Reference for details.

public network = {ip-address}/{netmask}

N.B. to get CIDR format of network run “ip route list”

  1. Install Ceph.

ceph-deploy install –no-adjust-repos node1 node2 node3

  1. Add the initial monitor(s) and gather the keys

ceph-deploy mon create-initial

** Once you complete the process, your local directory should have the following keyrings:

{cluster-name}.client.admin.keyring

{cluster-name}.bootstrap-osd.keyring

{cluster-name}.bootstrap-mds.keyring

  1. Add two OSDs. For fast setup, this quick start uses a directory rather than an entire disk per Ceph OSD Daemon.

ssh node2

sudo mkdir /var/local/osd0

exit

ssh node3

sudo mkdir /var/local/osd1

exit

  1. From admin node, use ceph-deploy to prepare the OSDs.

ceph-deploy osd prepare node2:/var/local/osd0 node3:/var/local/osd1

  1. Activate the OSDs.

ceph-deploy osd activate node2:/var/local/osd0 node3:/var/local/osd1

  1. Use ceph-deploy to copy the configuration file and admin key to your admin node and your Ceph Nodes so that you can use the ceph CLI without having to specify the monitor address and ceph.client.admin.keyring each time you execute a command.

ceph-deploy admin node1 node2 node3 admin-node

  1. Ensure that you have the correct permissions for the ceph.client.admin.keyring.

sudo chmod +r /etc/ceph/ceph.client.admin.keyring

  1. Check your cluster’s health.

ceph health

**Your cluster should return an active + clean state when it has finished peering.

  1. Create Ceph Block device using QENU
  • QEMU/KVM can interact with Ceph Block Devices via librbd.
  • Install virtualization stack for Ceph Block storage on client node

sudo apt-get install qemu

sudo apt-get update && sudo apt-get install libvirt-bin

  • To configure Ceph for use with libvirt, perform the following steps:
  1. Create a pool (or use the default). The following example uses the pool name libvirt-pool with 128 placement groups.

ceph osd pool create libvirt-pool 128 128

  1. Verify the pool exists.

ceph osd lspools

  1. Verify if client.admin exists in “ceph auth list” command

** libvirt will access Ceph using the ID libvirt, not the Ceph name client.libvirt. See Cephx Commandline for detailed explanation of the difference between ID and name.

  1. Use QEMU to create an “block device image” in your RBD pool.

qemu-img create -f rbd rbd:libvirt-pool/new-libvirt-image 2G

  1. Verify the image exists.

rbd -p libvirt-pool ls

** You can also use rbd create to create an image, but we recommend ensuring that QEMU is working properly.

** This Block device image could directly be used in VMs (http://ceph.com/docs/next/rbd/libvirt/)

 

  • To configure a Block device image
  1. On the client node, load the rbd client module.

sudo modprobe rbd

  1. On the client node, map the image to a block device.

sudo rbd map new-libvirt-image –pool rbd –name client.admin [-m {monitor-IP}] [-k /path/to/ceph.client.admin.keyring]

  1. Use the block device by creating a file system on the client node.

sudo mkfs.ext4 -m0 /dev/rbd/rbd/new-libvirt-image

** This may take a few moments.

  1. Mount the file system on the client node.

sudo mkdir /mnt/ceph-block-device

sudo mount /dev/rbd/rbd/new-libvirt-image /mnt/ceph-block-device

cd /mnt/ceph-block-device

3.Use Ceph Block device from Cinder

https://ceph.com/docs/master/rbd/rbd-openstack/#configuring-cinder

http://docs.openstack.org/havana/install-guide/install/apt/content/cinder-controller.html

http://docs.openstack.org/developer/cinder/api/cinder.volume.api.html

https://wiki.openstack.org/wiki/Cinder-multi-backend#Volume_Type

  • When Glance and Cinder are both using Ceph block devices, the image is a copy-on-write clone, so volume creation is very fast.

4.Extend the cluster

http://ceph.com/docs/master/rados/operations/add-or-rm-osds/

http://ceph.com/docs/master/rados/deployment/ceph-deploy-mon/

5.Snapshot

http://ceph.com/docs/master/rbd/rbd-snapshot/

6. Crush(Controlled Replication under Scalable Hashing)

http://ceph.com/docs/master/rados/operations/crush-map/

http://ceph.com/docs/master/architecture/

  • By using Crush algorithm to store and retrieve data, we can avoid a single point of failure and scale easily.
  • Data Placement strategy in ceph has two parts placement groups and the Crush map.
  • Each object must belong to some placement group.
  • Ceph Clients (RADOS / librados) and Ceph OSD both use the CRUSH algorithm to efficiently compute information about data containers on demand, instead of having to depend on broker.
  • Ceph OSD Daemons create object replicas on other Ceph Nodes to ensure data safety and high availability. This replication is synchronous, such that a new or updated object guarantees its availability before an application is notified that the write has completed.
  • Ceph OSD Daemons having knowledge of the cluster topology using Cluster MAP. Cluster MAP = Crush map + Monitor map + OSD map + PG map + MDS map.

7. Architecture:

7.1 Logical placement

  • Pools are logical partitions for storing objects.Ceph clusters have the concept of pools, where each pool has a certain number of placement groups. Placement groups are just collections of mappings to OSDs. Each PG has a primary OSD and a number of secondary ones, based on the replication level you set when you make the pool. When an object gets written to the cluster, CRUSH will determine which PG the data should be sent to. The data will first hit the primary OSD and then replicated out to the other OSDs in the same placement group. Ceph Clients retrieve a latest Cluster Map from a Ceph Monitor, and write objects to pools.
  • Currently reads always come from the primary OSD in the placement group rather than a secondary even if the secondary is closer to the client. In many cases spreading reads out over all of the OSDs in the cluster is better than trying to optimize reads to only hit local OSDs.

NB

  1. Could be a potential bottleneck if lot of clients want to read the same file, all requests will land on the same OSD though other replica OSDs are lying idle.
  • The only input required by the client is the object ID and the pool.

7.2 Physical placement

  • CRUSH algorithm maps each object to a placement group and then maps each placement group to one or more Ceph OSD Daemons.
  • Ceph client uses the CRUSH algorithm to compute where to store an object, maps the object to a pool and placement group, then looks at the CRUSH map to identify the primary OSD for the placement group.

o    Total PGs =    (OSD *100) /Replicas     {rounded up to the nearest power of 2}

o    The client inputs the pool ID and the object ID. (e.g., pool = “liverpool” and object-id = “john”)

o    CRUSH takes the object ID and hashes it.

o    CRUSH calculates the hash modulo the number of PGs (e.g., 0x58) to get a PG ID.

o    CRUSH gets the pool ID given the pool name (e.g., “liverpool” = 4)

o    CRUSH prepends the pool ID to the PG ID (e.g., 4.0×58).

  • With a copy of the cluster map and the CRUSH algorithm, the client can compute exactly which OSD to use when reading or writing a particular object.
  • Replication is always executed at the PG level: All objects of a placement group are replicated between different OSDs in the RADOS cluster.
  • An object ID is unique across the entire cluster, not just the local file system.
  • The client writes the object to the identified placement group in the primary OSD. Then, the primary OSD with its own copy of the CRUSH map identifies the secondary and tertiary OSDs for replication purposes, and replicates the object to the appropriate placement groups in the secondary and tertiary OSDs (as many OSDs as additional replicas), and responds to the client once it has confirmed the object was stored successfully.

7.Common cluster commands/library/Tips

# ceph –s

# ceph osd dump

# ceph mon dump

# ceph osd pool get {pool-name} {field}  (Get properties of a pool)

# ceph pg dump -o pg.txt  (Get all Placement Groups details)

 

  • Get crushmap file for cluster

# ceph osd getcrushmap -o {compiled-crushmap-filename} 

  • Decompile the crushmap file in plain format

# crushtool -d {compiled-crushmap-filename} -o {decompiled-crushmap-filename} 

  • To see mapped block devices

# sudo rbd showmapped

# rbd list

  • To see block device image information

qemu-img info -f rbd rbd:{pool-name}/{image-name}

  • To see all the ceph running services

# sudo initctl list | grep ceph

  • To start all the ceph services

# sudo stop ceph-all

  • To stop all the ceph services

# sudo start ceph-all

  • Pools:- A pool differs from CRUSH’s location-based buckets in that a pool doesn’t have a single physical location, and a pool provides you with some additional functionality, including replicas, Placement groups, Crush rules, snapshots and setting ownership.
  •  Object copies always spread on different OSD.

8.Troubleshooting Logs/Configurations/Tips

http://ceph.com/docs/master/rados/operations/crush-map/

  • Understand data placement

http://ceph.com/docs/master/rados/operations/data-placement/

http://www.admin-magazine.com/HPC/Articles/RADOS-and-Ceph-Part-2

  • To get CIDR for of network

ip route list

  • Common Logs

/var/log/ceph/*

  • Common Configuration files

/etc/ceph/*

  • Can we disable journal?

Ans: With btrfs, yes. Otherwise no. The journal is needed for consistency of the fs; Ceph rely on

writeahead journaling.  It can’t be turned off though we can use SSD/ramdisk for journal. Loss of the journal will kill any osds using that journal.

http://ceph.com/docs/master/rados/configuration/osd-config-ref/

http://ceph.com/docs/master/rados/configuration/journal-ref/

  • So I have 1 SSD PER storage node for journaling?
    Ans: Not necessarily. It depends on a number of factors. In some cases that may be sufficient, and in others the SSD can become a bottleneck and rapidly weak out. Different applications will have a different ideal ratio of SSD journals to spinning disks, taking into account rate of write IO and bandwidth requirements for the node.
  • What happens in case of a big file (for example, 100MB) with multiple chunks? Is ceph smart enough to read multiple chunks from multiple servers simultaneously or the whole file will be served by just an OSD.

Ans: RADOS is the underlying storage cluster, but the access methods (block, object, and file) stripe their data across many RADOS objects, which CRUSH very effectively distributes across all of the servers.  A 100MB read or write turns into dozens of parallel operations to servers all over the cluster.

The problem with reading from random/multiple replicas by default is cache efficiency.  If every reader picks a random replica, then there are effectively N locations that may hae an object cached in RAM (instead of on disk), and the caches for each OSD will be about 1/Nth as effective.  The only time in makes sense to read from replicas is when you are CPU or network limited; the rest of the time it is better to read from the primary’s cache than a replica’s disk.

 

  • How write get performed?

OSDs use a write-ahead mode for local operations: a write hits the journal first, and from there is then being copied into the backing filestore.

This, in turn, leads to a common design principle for Ceph clusters that are both fast and cost-effective:

  1. Put your filestores on slow, cheap drives (such as SATA spinners),
  2. put your journals on fast drives (SSDs, DDR drives, Fusion-IO cards, whatever you can afford).

Another common design principle is that you create one OSD per spinning disk that you have in the system. Many contemporary systems come with only two SSD slots, and then as many spinners as you want. That is not a problem for journal capacity — a single OSD’s journal is usually no larger than about 6 GB, so even for a 16-spinner system (approx. 96GB journal space) appropriate SSDs are available at reasonable expense.

 

  • What happens if an OSD fails – TBD
  • What happens if an MON fails – TBD
  • What happens if an Journal SSD/Disk fails – TBD
Posted in BlockStorage | Tagged , , , , , | Leave a comment

Compile gcc for Solaris 10 on x86 platform

Buzz sentence for me these days “How to compile gcc on Solaris10 for x86 platform”. There are a lot of problems and missing links exists in the latest source code available for gcc. Use following steps to get gcc compiled successfully on S10 for x86.
1) Download the gcc source code using any tool available to you, for example in my case i used svn to download the aource tree of gcc.
svn checkout http://gcc.gnu.org/svn/gcc/trunk gcc
This will create a gcc directory with all source files init.
2) Download latest binutils from http://ftp.gnu.org/gnu/binutils/ to compile loader and other tools.
3) Optional thing is to create a seprate directory by linking all the source files for running gmake.
4) Now first compile binutil directory using following command
./configure –prefix= –with-gmp= –with-mpfr=

** Here –with-gmp and –with-mpfr paths need to set before running configure command. So if you do not have gmp and mprf library then first download and compile them.
The GNU MP Bignum Library
The MPFR Library

then
./gmake
./gmake install
5) Now go in gcc soource dir
./configure –prefix= –with-gmp-include= –with-gmp-lib= -with-mpfr-include= –with-mpfr-lib= –disable-nls
–disable-multilib –with-gnu-ld –with-gnu-as
./gmake -j 8
./gmake install

now you are done .

BUT it is not that easy … these are some troubles i faced and fixed. Have an look …

1) Following changes are required for lib/gen-classlist.sh

abs_top_builddir=`cd “${top_builddir}”; pwd`
> abs_top_srcdir=`cd “/net/dv1.india/vol/wsvol/ws1/sg160228/OPENAIS/gcc/libjava/classpath”; pwd`
> if test “$abs_top_builddir” != “$abs_top_srcdir”; then

if test -f ${top_builddir}/lib/classes.2; then

2) Following changes are required for classpath/lib/Makefile

< if ! [ -e gnu ]; then mkdir gnu; fi
< if ! [ -e gnu/java ]; then mkdir gnu/java; fi
< if ! [ -e gnu/java/locale ]; then mkdir gnu/java/locale; fi
if test ! -d gnu; then mkdir gnu; fi
> if test ! -d gnu/java; then mkdir gnu/java; fi
> if test ! -d gnu/java/locale; then mkdir gnu/java/locale; fi

> if test ! -d gnu/javax/swing/plaf/gtk/icons; then mkdir -p gnu/javax/swing/plaf/gtk/icons; fi

if test ! -d $$p; then mkdir $$p; fi; \

3) Following changes are required for libjava/scripts/jar

if test -f “$2″/”$1”; then

elif test -f “$1”; then

mkdir_p_fun () {

mkdir_p=’mkdir_p_fun’
+
4) Following changes are required for /gcc/gcc/Makefile

< STRICT2_WARN = -pedantic -Wno-long-long -Wno-variadic-macros \
STRICT2_WARN = -pedantic -Wno-long-long \
> -Wold-style-definition -Wmissing-format-attribute \

< echo 'SYSTEM_HEADER_DIR="'"$(SYSTEM_HEADER_DIR)"'"'
$(DESTDIR)$(itoolsdatadir)/mkheaders.conf

> echo ‘SYSTEM_HEADER_DIR=”$(SYSTEM_HEADER_DIR)”‘ > $(DESTDIR)$(itoolsdatadir)/mkheaders.conf

5) There could the case when gcc loader could not load some directories then we need to copy that particular .so file in $BuildDir/$BuildDir/lib/ . For this i think there is some bug with binutils loader that is why it could not load same library from /usr/bin or /lib/. So need to go for above work around.

6) Set following environmental param if required :
LD_LIBRARY_PATH , LD_RUN_PATH , CC , RPATH.

Posted in Solaris/Linux | Tagged | Leave a comment