DBPedias

Your Database Knowledge Community

Lars George

  1. Hadoop on EC2 - A Primer

    As more and more companies discover the power of Hadoop and how it solves complex analytical problems it seems that there is a growing interest to quickly prototype new solutions - possibly on short lived or "throw away" cluster setups. Amazon's EC2 provides an ideal platform for such prototyping and there are a lot of great resources on how this can be done. I would like to mention "Tracking Trends with Hadoop and Hive on EC2" on the Cloudera Blog by Pete Skomoroch and "Running Hadoop MapReduce on Amazon EC2 and Amazon S3" by Tom White. They give you full examples of how to process data stored on S3 using EC2 servers. Overall there seems to be a common need to quickly get insight into what a Hadoop and Hive based cluster can add in terms of business value. In this post I would like to take a step back though from the above full featured examples and show how you can use Amazon's services to set up an Hadoop cluster with the focus on the more "nitty gritty" details that are more difficult to find answers for.

    Starting a Cluster

    Let's jump into it head first and solve the problem of actually launching a cluster. You have heard that Hadoop is shipped with EC2 support, but how do you actually start up a Hadoop cluster on EC2? You do have a couple of choices and as Tom's article above explains you could start all instances in the cluster by hand. But why would you want to do that if there are scripts available that do all the work for you? And to complicate matters, how do you select the AMI (the Amazon Machine Image) that has the Hadoop version you need or want? Does it have Hive installed for your subsequent analysis of the collected data? Just running a check to count the available public Hadoop images returns 41!

    $ ec2-describe-images -a | grep hadoop | wc -l
    41

    That gets daunting very quickly. Sure you can roll your own - but that implies even more manual labor that you probably better spend on productive work. But there is help available...

    By far one of the most popular way to install Hadoop today is using Cloudera's Distribution for Hadoop - also known as CDH. It packs all the tools you usually need into easy to install packages and pre-configures everything for typical workloads. Sweet! And since it also offers each "HStack" tool as a separate installable package you can decide what you need and install additional applications just as you need. We will make use of that feature below and also of other advanced configuration options.

    There are not one but at least three script packages available to start a Hadoop cluster. The following table lists the most prominent ones:

    Name Vendor Language Fixed Packages Notes
    Hadoop EC2 Scripts Apache Hadoop Bash Yes Requires special Hadoop AMIs.
    CDH Cloud Scripts Cloudera Python No Fixed to use CDH packages.
    Whirr Apache Whirr Python No Not yet on same level feature wise compared to CDH Cloud Scripts. Can run plain Apache Hadoop images as well as CDH. Supports multiple cloud vendors.

    They are ordered by their availability date, so the first available was the Bash based "Hadoop EC2 Scripts" contribution packages. It is part of the Apache Hadoop tarball and can start selected AMIs with Hadoop preinstalled on them. While you may be able to customize the init script to install additional packages you are bound to whatever version of Hadoop the AMI provides. This limitation is overcome with the CDH Cloud Scripts and Apache Whirr, which is the successor to the CDH scripts. All three EC2 script packages were created by Cloudera's own Tom White, so you may notice similarities between them. In general you could say that each extends on the former while applying what has been learned during their usage in the real world. Also, Python has the advantage to run on Windows, Unix or Linux, because the Bash scripts are not a good fit for "some of these" (*cough*), but that seems obvious.

    For the remainder of this post we will focus on the CDH Cloud Scripts as they are the current status quo when it comes to starting Hadoop on EC2 clusters. But please keep an eye on Whirr as it will supersede the CDH Cloud Scripts sooner or later - and added to the CDH releases subsequently (it is in CDH3B3 now!).

    I have mentioned the various AMIs above and that (at the time of this post) there are at least 41 of them available providing support for Hadoop in one way or another. But why would you have to create your own images or switch to other ones as Hadoop is released in newer version in the future? Wouldn't it make more sense to have a base AMI that somehow magically bootstraps the Hadoop version you need onto the cluster as you materialize it? You may have guessed it by now: that is exactly what the Cloudera AMIs are doing! All of these scripts use a mechanism called Instance Data which allows them to "hand in" configuration details to the AMI instances as they start. While the Hadoop EC2 Scripts only use this for limited configuration (and the rest being up to you - we will see an example of how that is done below) the CDH and Whirr scripts are employing this feature to bootstrap everything, including Hadoop. The instance data is a script called hadoop-ec2-init-remote.sh which is compressed and provided to the server as it starts. The trick is that the Cloudera AMIs have a mechanism to execute this script before starting the Hadoop daemons:

    root@ip-10-194-222-3:~# ls -la /etc/init.d/{hadoop,ec2}*
    -rwxr-xr-x 1 root root 1395 2009-04-18 21:36 /etc/init.d/ec2-get-credentials
    -rwxr-xr-x 1 root root  286 2009-04-18 21:36 /etc/init.d/ec2-killall-nash-hotplug
    -rwxr-xr-x 1 root root  125 2009-04-18 21:36 /etc/init.d/ec2-mkdir-tmp
    -rwxr--r-- 1 root root 1945 2009-06-23 14:37 /etc/init.d/ec2-run-user-data
    -rw-r--r-- 1 root root  709 2009-04-18 21:36 /etc/init.d/ec2-ssh-host-key-gen
    -rwxr-xr-x 1 root root 4280 2010-03-22 06:19 /etc/init.d/hadoop-0.20-datanode
    -rwxr-xr-x 1 root root 4296 2010-03-22 06:19 /etc/init.d/hadoop-0.20-jobtracker
    -rwxr-xr-x 1 root root 4437 2010-03-22 06:19 /etc/init.d/hadoop-0.20-namenode
    -rwxr-xr-x 1 root root 4352 2010-03-22 06:19 /etc/init.d/hadoop-0.20-secondarynamenode
    -rwxr-xr-x 1 root root 4304 2010-03-22 06:19 /etc/init.d/hadoop-0.20-tasktracker

    and

    root@ip-10-194-222-3:~# ls -la /etc/rc2.d/*{hadoop,ec2}*
    lrwxrwxrwx 1 root root 32 2010-09-13 12:32 /etc/rc2.d/S20hadoop-0.20-jobtracker -> ../init.d/hadoop-0.20-jobtracker
    lrwxrwxrwx 1 root root 30 2010-09-13 12:32 /etc/rc2.d/S20hadoop-0.20-namenode -> ../init.d/hadoop-0.20-namenode
    lrwxrwxrwx 1 root root 39 2010-09-13 12:32 /etc/rc2.d/S20hadoop-0.20-secondarynamenode -> ../init.d/hadoop-0.20-secondarynamenode
    lrwxrwxrwx 1 root root 29 2009-06-23 14:58 /etc/rc2.d/S70ec2-get-credentials -> ../init.d/ec2-get-credentials
    lrwxrwxrwx 1 root root 27 2009-06-23 14:58 /etc/rc2.d/S71ec2-run-user-data -> ../init.d/ec2-run-user-data

    work their magic to get the user data (which is one part of the "Instance Data") and optionally decompress it before executing the script handed in. The only other requirement is that the AMI must have Java installed as well. As we look into further pieces of the puzzle we will get back to this init script. For now let it suffice to say that it does the bootstrapping of our instances and installs whatever we need dynamically during the start of the cluster.

    Note: I am using the Ubuntu AMIs for all examples and code snippets in this post.

    All about the options

    First you need to install the CDH Cloud Scripts, which is rather straight forward. For example, first install the Cloudera CDH tarball:

    $ wget http://archive.cloudera.com/cdh/2/hadoop-0.20.1+169.89.tar.gz
    $ tar -zxvf hadoop-0.20.1+169.89.tar.gz
    $ export PATH=$PATH:~/hadoop-0.20.1+169.89/src/contrib/cloud/src/py

    Then install the required Python libs, we assume you have Python already installed in this example:

    $ sudo apt-get install python-setuptools
    $ sudo easy_install "simplejson==2.0.9"
    $ sudo easy_install "boto==1.8d"

    Now you are able to run the CDH Cloud Scripts - but to be really useful you need to configure them first. Cloudera has document that explains the details. Obviously while using those scripts a few more ideas come up and are added subsequently. Have a look at this example .hadoop-cloud directory:

    $ ls -lA .hadoop-cloud/
    total 40
    -rw-r--r--  1 lars lars   489 2010-09-13 09:52 clusters-c1.medium.cfg
    -rw-r--r--  1 lars lars   358 2010-09-10 06:13 clusters-c1.xlarge.cfg
    lrwxrwxrwx  1 lars lars    22 2010-08-22 06:04 clusters.cfg -> clusters-c1.medium.cfg
    -rw-r--r--  1 lars lars 17601 2010-09-13 10:19 hadoop-ec2-init-remote-cdh2.sh
    drwxr-xr-x  2 lars lars  4096 2010-08-15 14:14 lars-test-cluster/

    You can see that it has multiple clusters.cfg configuration files that differ only in their settings for the image_id (the AMI to be used) and the instance_type. Here is is one of those files:

    $ cat .hadoop-cloud/clusters-c1.medium.cfg
    [lars-test-cluster]
    image_id=ami-ed59bf84
    instance_type=c1.medium
    key_name=root
    availability_zone=us-east-1a
    private_key=~/.ec2/root.pem
    ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no
    user_data_file=http://archive.cloudera.com/cloud/ec2/cdh2/hadoop-ec2-init-remote.sh
    user_packages=lynx s3cmd
    env=AWS_ACCESS_KEY_ID=<your-access-key>
        AWS_SECRET_ACCESS_KEY=<your-secret-key>

    All you have to do now is switch the symbolic link to run either cluster setup. Obviously another option would be to use the command line options which hadoop-ec2 offers. Execute $ hadoop-ec2 launch-cluster --help to see what is available. You can override the values from the current clusters.cfg or even select a completely different configuration directory. Personally I like the symlink approach as this allows me to keep the settings for each cluster instance together in a separate configuration file - but a usual, the choice is yours. You could also save each hadoop-ec2 call in a small Bash script along with all command line options in it.

    Back to the .hadoop-cloud directory above. There is another file hadoop-ec2-init-remote-cdh2.sh (see below) and a directory called lars-test-cluster, which is created and maintained by the CDH Cloud Scripts. It contains a local hadoop-site.xml with your current AWS credentials (assuming you have them set in your .profile as per the documentation) that you can use to access S3 from your local Hadoop scripts.

    For the sake of completeness here the other cluster configuration file:

    $ cat .hadoop-cloud/clusters-c1.xlarge.cfg
    [lars-test-cluster]
    image_id=ami-8759bfee
    instance_type=c1.xlarge
    key_name=root
    availability_zone=us-east-1a
    private_key=~/.ec2/root.pem
    ssh_options=-i %(private_key)s -o StrictHostKeyChecking=no
    user_data_file=http://archive.cloudera.com/cloud/ec2/cdh2/hadoop-ec2-init-remote.sh
    user_packages=lynx s3cmd
    
    env=AWS_ACCESS_KEY_ID=<your-access-key>
        AWS_SECRET_ACCESS_KEY=<your-secret-key>

    The user_data_file is where the version of Hadoop and here even Cloudera's Distribution for Hadoop is chosen. You can replace the link with

    user_data_file=http://archive.cloudera.com/cloud/ec2/cdh3/hadoop-ec2-init-remote.sh

    to use the newer CDH3, currently in beta.

    Also note that the AMIs are currently only available in the us-east-x zones and not in any of the others.

    To conclude the setup, here a list of possible configuration options:

    Option CLI Description
    cloud_provider --cloud-provider The cloud provider, e.g. 'ec2' for Amazon EC2.
    auto_shutdown --auto-shutdown The time in minutes after launch when an instance will be automatically shut down.
    image_id --image-id The ID of the image to launch.
    instance_type -t | --instance-type The type of instance to be launched. One of m1.small, m1.large, m1.xlarge, c1.medium, or c1.xlarge.
    key_name -k | --key-name The key pair to use when launching instances. (Amazon EC2 only.)
    availability_zone -z | --availability-zone The availability zone to run the instances in.
    private_key Used with update-slaves-file command. The file is copied to all EC2 servers.
    ssh_options --ssh-options SSH options to use.
    user_data_file -f | --user-data-file The URL of the file containing user data to be made available to instances.
    user_packages -p | --user-packages A space-separated list of packages to install on instances on start up.
    env -e | --env An environment variable to pass to instances. (May be specified multiple times.)
    --client-cidr The CIDR of the client, which is used to allow access through the firewall to the master node. (May be specified multiple times.)
    --security-group Additional security groups within which the instances should be run. (Amazon EC2 only.) (May be specified multiple times.)

    Custom initialization

    Now you can configure and start a cluster up on EC2. Sooner or later though you are facing more challenging issues. One that hits home early on is compression. You are encouraged to use compression in Hadoop as it saves you storage needed but also bandwidth as less data needs to be transferred over the wire. See this and this post for "subtle" hints. Cool, so let's switch on compression - must be easy, right? Well, not exactly. For starters choosing the appropriate codec is not trivial. A very popular one is LZO as described in the posts above because it has many advantage in combination with Hadoop's MapReduce. Problem is that it is GPL licensed and therefore not shipped with Hadoop. You actually have to compile it yourself to be able to install it subsequently. How this is done is described here. You need to follow those steps and compile an installable package on all AMI's you want to use later. For example, log into the master of your EC2 Hadoop cluster and execute the following commands:

    $ hadoop-ec2 login <your-cluster-name>
    # cd ~
    # apt-get install subversion devscripts ant git-core liblzo2-dev
    # git clone http://github.com/toddlipcon/hadoop-lzo-packager.git
    # cd hadoop-lzo-packager/
    # SKIP_RPM=1 ./run.sh
    # build/deb/
    # dpkg -i toddlipcon-hadoop-lzo_20100913142659.20100913142512.6ddda26-1_i386.deb 

    Note: Since I am running Ubuntu AMIs I used the SKIP_RPM=1 flag to skip RedHat package generation.

    Copy the final .deb file to a save location naming it hadoop-lzo_i368.deb or hadoop-lzo_amd64.deb using scp for example. Obviously do the same for the yum packages if you are preferring the Fedora AMIs.

    The next step is to figure out how to install the packages we just built during the bootstrap process described above. This is where the user_data_file comes back into play. Instead of copying the .deb packages we save them on S3 instead, using a tool like s3cmd. For example:

    $ s3cmd put hadoop-lzo_i386.deb s3://dpkg/

    Now we can switch from the default init script to our own. Use wget to download the default file

    $ wget http://archive.cloudera.com/cloud/ec2/cdh2/hadoop-ec2-init-remote.sh
    $ mv hadoop-ec2-init-remote.sh .hadoop-cloud/hadoop-ec2-init-remote-cdh2.sh

    In the clusters.cfg we need to replace the link with our local file like so

    user_data_file=file:///home/lars/.hadoop-cloud/hadoop-ec2-init-remote-cdh2.sh

    Note that we cannot use ~/.hadoop-cloud/... as the filename because the Python code does not resolve the Bash file path syntax.

    The local init script can now be adjusted as needed, here we are adding the functions to set up s3cmd and then install the LZO packages subsequently on server startup:

    ...
      fi
      service $HADOOP-$daemon start
    }
    
    function install_s3cmd() {
      install_packages s3cmd # needed for LZO package on S3
      cat > /tmp/.s3cfg  /dev/null; then
        HADOOP_LZO_FN=${HADOOP_LZO}.deb
        s3cmd -c /tmp/.s3cfg get --force s3://dpkg/$HADOOP_LZO_FN /tmp/$HADOOP_LZO_FN
        dpkg -i /tmp/$HADOOP_LZO_FN
      elif which rpm &> /dev/null; then
        # todo
        echo "do it yum style..."
      fi
    }
    
    register_auto_shutdown
    update_repo
    install_user_packages
    install_s3cmd
    install_hadoop
    install_hadoop_lzo
    configure_hadoop

    By the way, once a cluster is up you can verify what the user data script did (or even "is doing" if you log in promptly) by checking the /var/log/messages file on for example the Hadoop master node:

    $ hadoop-ec2 login lars-test-cluster
    # cat /var/log/messages 
    ...
    Sep 14 12:10:55 ip-10-242-18-80 user-data: + install_hadoop_lzo
    Sep 14 12:10:55 ip-10-242-18-80 user-data: ++ wget -q -O - http://169.254.169.254/latest/meta-data/instance-type
    Sep 14 12:10:55 ip-10-242-18-80 user-data: + INSTANCE_TYPE=c1.medium
    Sep 14 12:10:55 ip-10-242-18-80 user-data: + case $INSTANCE_TYPE in
    Sep 14 12:10:55 ip-10-242-18-80 user-data: + HADOOP_LZO=hadoop-lzo_i386
    Sep 14 12:10:55 ip-10-242-18-80 user-data: + which dpkg
    Sep 14 12:10:55 ip-10-242-18-80 user-data: + HADOOP_LZO_FN=hadoop-lzo_i386.deb
    Sep 14 12:10:55 ip-10-242-18-80 user-data: + s3cmd -c /tmp/.s3cfg get --force s3://dpkg/hadoop-lzo_i386.deb /tmp/hadoop-lzo_i386.deb
    Sep 14 12:10:55 ip-10-242-18-80 user-data: Object s3://dpkg/hadoop-lzo_i386.deb saved as '/tmp/hadoop-lzo_i386.deb' (65810 bytes in 0.0 seconds, 5.06 MB/s)
    Sep 14 12:10:56 ip-10-242-18-80 user-data: + dpkg -i /tmp/hadoop-lzo_i386.deb
    Sep 14 12:10:56 ip-10-242-18-80 user-data: Selecting previously deselected package toddlipcon-hadoop-lzo.
    Sep 14 12:10:56 ip-10-242-18-80 user-data: (Reading database ... 24935 files and directories currently installed.)
    Sep 14 12:10:56 ip-10-242-18-80 user-data: Unpacking toddlipcon-hadoop-lzo (from /tmp/hadoop-lzo_i386.deb) ...
    Sep 14 12:10:56 ip-10-242-18-80 user-data: Setting up toddlipcon-hadoop-lzo (20100913142659.20100913142512.6ddda26-1) ...
    ...

    Note: A quick tip in case you edit the init script yourself and are going to add configuration data that is output to a file using cat (see cat > /tmp/.s3cfg << EOF above): make sure that the final "EOF" has NO trailing whitespaces or the script fails miserably. I had "EOF " (note the trailing space) as opposed to "EOF" and it took me a while to find that! The script would fail to run with an "unexpected end of file" error.

    A comment on EMR (or Elastic MapReduce), Amazon's latest offering in regards to Hadoop support. It is a wrapper around launching a cluster on your behalf and executing MapReduce jobs or Hive queries etc. While this will help many to be up and running with "cloud based" MapReduce work it has also a few drawbacks: for starters you have to work with what you are given in regards to Hadoop versioning. You have to rely on Amazon to keep it current and any "special" version you would like to try may not work at all. Furthermore you have no option to install LZO as described above, i.e. the whole bootstrap process is automated and not accessible to you for modifications. And finally, you pay for it on top of the standard EC2 rates, so it comes at a premium.

    Provision data

    We touched S3 already above but let me get back to it for a moment. Small files like the installation packages are no issue at all obviously. What is a problem though is when you have to deal with huge files larger than the implicit 5GB maximum file size S3 allows. You have two choices here, either split the files into smaller ones or use an IO layer that does that same task for you. That feature is built right into Hadoop itself. This is of course documented but let me add a few notes that may help understand the implications a little bit better. First here a table comparing the different tools you can use:

    Tool Name Supported Description
    s3cmd s3 Supports access to S3 as provided by the AWS API's and also the S3 Management Console over the web.
    hadoop s3, s3n Supports raw or direct S3 access as well as a specialized Hadoop filesystem on S3.

    The thing that is not obvious initially is that "s3cmd get s3://..." is not the same as "hadoop fs -get s3://...". When you use a standard tool that implements the S3 API like s3cmd then you use s3://<bucket-name>/... as the object/file URI. In Hadoop terms that is referred to as "raw" or "native" S3. And if you want to use Hadoop to access a file on S3 in that mode then the URI is s3n://<bucket-name>/... - note the "s3n" URI scheme. In contrast, if you use the "s3" scheme with Hadoop it employs a special file system mode that stores the large files in smaller binary files on S3 completely transparent to the user. For example:

    $ hadoop fs -put verylargefile.log s3://my-bucket/logs/20100916/
    $ s3cmd ls s3://my-bucket/
                           DIR   s3://my-bucket//
    2010-09-16 07:44  33554432   s3://my-bucket/block_-1289596344515350280
    2010-09-16 07:45  33554432   s3://my-bucket/block_-15869508987376965
    2010-09-16 07:46  33554432   s3://my-bucket/block_-172539355612092125
    2010-09-16 07:45  33554432   s3://my-bucket/block_-1894993863630732603
    2010-09-16 07:43  33554432   s3://my-bucket/block_-2049322783060796466
    2010-09-16 07:51  33554432   s3://my-bucket/block_-2070316024499434597
    2010-09-16 07:43  33554432   s3://my-bucket/block_-2107321687364706212
    2010-09-16 07:46  33554432   s3://my-bucket/block_-2117877727016155804
    ...

    The following table provides a comparison between the various access modes and their file size limitations:

    Type Mode Limit Example
    S3 API native 5GB s3cmd get s3://my-bucket/my-s3-dir/my-file.name
    Hadoop native 5GB hadoop fs -get s3n://my-bucket/my-s3-dir/my-file.name
    Hadoop binary blocks unlimited hadoop fs -get s3://my-bucket/my-hadoop-path/my-file.name

    You may now ask yourself which one to use. If you will never deal with very large files it may not matter. But if you do, then you need to decide if you use Hadoop's binary filesystem or chop files yourself to fit into 5GB. Also keep in mind that once you upload files using Hadoop's binary filesystem then you can NOT go back to the native tools as the files stored in your S3 bucket are named (seemingly) randomly and content is spread across many of those smaller files as can be seen in the example above. There is no direct way to parse these files yourself outside of Hadoop.

    One final note on S3 and provisioning data: it seems it makes more sense to copy data from S3 into HDFS before running a job not just because of the improved IO performance (keyword here: data locality!) but also in regards to stability. I have seen jobs fail that read directly from S3 but succeeded happily when reading from HDFS. And copying data from S3 to EC2 is free, so you may want to try your luck with either option and see what is best for your use-case.

    ETL and Processing

    The last part in a usual workflow is to process the data we now have nicely compressed and splittable up on S3 or HDFS. This would ideally be Hive queries if the data is already in a "Hive ready" format. Often though the data comes from legacy resources and needs to be processed before it can be queried. This process is usually referred to as Extract, transform, load or abbreviated as "ETL". It can be comprised of various steps executing dedicated applications or scripts pruning and transforming raw input files. I will leave this for another post though as this points to the same problem we addressed above: you have many tools you could use and have to decide which suits you best. There is Kettle or Spring Batch and also the new kid on the block Oozie. Some combine both steps while Oozie for example concentrates on the workflow aspect.>/p>

    This is particularly interesting as we can use Oozie to spin up our EC2 clusters as well as run the ETL job (which could be a Kettle job for example), followed by the Hive queries. Add Sqoop and you have a tool to read and write from legacy database in the process. But as I said, I leave this whole topic for a follow up post. But I do believe this is important to understand and document the full process of running Hadoop in the "cloud". Only then you have the framework to run the full business process on Amazons Web Services (or any other cloud computing provider).

    Conclusion

    With Whirr being released it seems like the above may become somewhat obsolete soon. I will look into Whirr more and update the post to show you how the same is achieved. My preliminary investigation shows though that you have the same issues - or say "advanced challenges" to be fair as Whirr is not at fault here. Maybe one day we have an Apache licensed alternative to LZO available and installing a suitable compression codec will be much easier. For now this is not the case.

    Another topic we have not touched upon is local storage in EC2. Usually you have an attached volume that is destroyed once the instance is shut down. To get around this restriction you can create Snapshots and mount them as Elastic Block Storage (or EBS) which are persisted across server restarts. They are also supposedly faster than the default volume. This is yet another interesting topic I am planning to post about as especially write performance in EC2 is really, really bad - and that may affect the above ETL process in unsuspected ways. But on the other hand you get persistency and being able to start and stop a cluster while retaining the data it had stored. The CDH Cloud Scripts have full support for EBS while Whirr is said to not have that working yet (although WHIRR-3 seems to say it is implemented).

    Let me know if you are interested in a particular topic regarding this post and which I may not have touched upon. I am curious to hear what you are doing with Hadoop on EC2, so please drop me a note.


  2. HBase vs. BigTable Comparison

    HBase is an open-source implementation of the Google BigTable architecture. That part is fairly easy to understand and grasp. What I personally feel is a bit more difficult is to understand how much HBase covers and where there are differences (still) compared to the BigTable specification. This post is an attempt to compare the two systems.

    Before we embark onto the dark technology side of things I would like to point out one thing upfront: HBase is very close to what the BigTable paper describes. Putting aside minor differences, as of HBase 0.20, which is using ZooKeeper as its lock distributed coordination service, it has all the means to be nearly an exact implementation of BigTable's functionality. What I will be looking into below are mainly subtle variations or differences. Where possible I will try to point out how the HBase team is working on improving the situation given there is a need to do so.

    Scope

    The comparison in this post is based on the OSDI'06 paper that describes the system Google implemented in about seven person-years and which is in operation since 2005. The paper was published 2006 while the HBase sub-project of Hadoop was established only around the end of that same year to early 2007. Back then the current version of Hadoop was 0.15.0. Given we are now about 2 years in, with Hadoop 0.20.1 and HBase 0.20.2 available, you can hopefully understand that indeed much has happened since. Please also note that I am comparing a 14 page high level technical paper with an open-source project that can be examined freely from top to bottom. It usually means that there is more to tell about how HBase does things because the information is available.

    Towards the end I will also address a few newer features that BigTable has nowadays and how HBase is comparing to those. We start though with naming conventions.

    Terminology

    There are a few different terms used in either system describing the same thing. The most prominent being what HBase calls "regions" while Google refers to it as "tablet". These are the partitions of subsequent rows spread across many "region servers" - or "tablet server" respectively. Apart from that most differences are minor or caused by usage of related technologies since Google's code is obviously closed-source and therefore only mirrored by open-source projects. The open-source projects are free to use other terms and most importantly names for the projects themselves.

    Features

    The following table lists various "features" of BigTable and compares them with what HBase has to offer. Some are actual implementation details, some are configurable option and so on. This may be confusing but it would be difficult to sort them into categories and not ending up with one entry only in each of them.

    Feature
     Google BigTable 
     Apache HBase 
    Notes
    Atomic Read/Write/Modify
    Yes, per row
    Yes, per row
    Since BigTable does not strive to be a relational database it does not have transactions. The closest to such a mechanism is the atomic access to each row in the table. HBase also implements a row lock API which allows the user to lock more than one row at a time.
    Lexicographic Row Order
    Yes
    Yes
    All rows are sorted lexicographically in one order and that one order only. Again, this is no SQL database where you can have different sorting orders.
    Block Support
    Yes
    Yes
    Within each storage file data is written as smaller blocks of data. This enables faster loading of data from large storage files. The size is configurable in either system. The typical size is 64K.
    Block Compression
    Yes, per column family
    Yes, per column family
    Google uses BMDiff and Zippy in a two step process. BMDiff works really well because neighboring key-value pairs in the store files are often very similar. This can be achieved by using versioning so that all modifications to a value are stored next to each other but still have a lot in common. Or by designing the row keys in such a way that for example web pages from the same site are all bundled. Zippy then is a modified LZW algorithm. HBase on the other hand uses the standard Java supplied GZip or with a little effort the GPL licensed LZO format. There are indications though that Hadoop also may want to have BMDiff (HADOOP-5793) and possibly Zippy as well.
    Number of Column Families
    Hundreds at Most
    Less than 100
    While the number of rows and columns is theoretically unbound the number of column families is not. This is a design trade-off but does not impose too much restrictions if the tables and key are designed accordingly.
    Column Family Name Format
    Printable
    Printable
    The main reason for HBase here is that column family names are used as directories in the file system.
    Qualifier Format
    Arbitrary
    Arbitrary
    Any arbitrary byte[] array can be used.
    Key/Value Format
    Arbitrary
    Arbitrary
    Like above, any arbitrary byte[] array can be used.
    Access Control
    Yes
    No
    BigTable enforces access control on a column family level. HBase does not have yet have that feature (see HBASE-1697).
    Cell Versions
    Yes
    Yes
    Versioning is done using timestamps. See next feature below too. The number of versions that should be kept are freely configurable on a column family level.
    Custom Timestamps
    Yes (micro)
    Yes (milli)
    With both systems you can either set the timestamp of a value that is stored yourself or leave the default "now". There are "known" restrictions in HBase that the outcome is indeterminate when adding older timestamps after already having stored newer ones beforehand.
    Data Time-To-Live
    Yes
    Yes
    Besides having versions of data cells the user can also set a time-to-live on the stored data that allows to discard data after a specific amount of time.
    Batch Writes
    Yes
    Yes
    Both systems allow to batch table operations.
    Value based Counters
    Yes
    Yes
    BigTable and HBase can use a specific column as atomic counters. HBase does this by acquiring a row lock before the value is incremented.
    Row Filters
    Yes
    Yes
    Again both system allow to apply filters when scanning rows.
    Client Script Execution
    Yes
    No
    BigTable uses Sawzall to enable users to process the stored data.
    MapReduce Support
    Yes
    Yes
    Both systems have convenience classes that allow scanning a table in MapReduce jobs.
    Storage Systems
    GFS
    HDFS, S3, S3N, EBS
    While BigTable works on Google's GFS, HBase has the option to use any file system as long as there is a proxy or driver class for it.
    File Format
    SSTable
    HFile
     
    Block Index
    At end of file
    At end of file
    Both storage file formats have a similar block oriented structure with the block index stored at the end of the file.
    Memory Mapping
    Yes
    No
    BigTable can memory map storage files directly into memory.
    Lock Service
    Chubby
    ZooKeeper
    There is a difference in where ZooKeeper is used to coordinate tasks in HBase as opposed to provide locking services. Overall though ZooKeeper does for HBase pretty much what Chubby does for BigTable with slightly different semantics.
    Single Master
    Yes
    No
    HBase recently added support for multiple masters. These are on "hot" standby and monitor the master's ZooKeeper node.
    Tablet/Region Count
    10-1000
    10-1000
    Both systems recommend about the same amount of regions per region server. Of course this depends on many things but given a similar setup as far as "commodity" machines are concerned it seems to result in the same amount of load on each server.
    Tablet/Region Size
    100-200MB
    256MB
    The maximum region size can be configured for HBase and BigTable. HBase used 256MB as the default value.
    Root Location
    1st META / Chubby
    -ROOT- / ZooKeeper
    HBase handles the Root table slightly different from BigTable, where it is the first region in the Meta table. HBase uses its own table with a single region to store the Root table. Once either system starts the address of the server hosting the Root region is stored in ZooKeeper or Chubby so that the clients can resolve its location without hitting the master.
    Client Region Cache
    Yes
    Yes
    The clients in either system caches the location of regions and has appropriate mechanisms to detect stale information and update the local cache respectively
    Meta Prefetch
    Yes
    No (?)
    A design feature of BigTable is to fetch more than one Meta region information. This proactively fills the client cache for future lookups.
    Historian
    Yes
    Yes
    The history of region related events (such as splits, assignment, reassignment) is recorded in the Meta table.
    Locality Groups
    Yes
    No
    It is not entirely clear but it seems everything in BigTable is defined by Locality Groups. The group multiple column families into one so that they get stored together and also share the same configuration parameters. A single column family is probably a Locality Group with one member. HBase does not have this option and handles each column family separately.
    In-Memory Column Families
    Yes
    Yes
    These are for relatively small tables that need very fast access times.
    KeyValue (Cell) Cache
    Yes
    No
    This is a cache that servers hot cells.
    Block Cache
    Yes
    Yes
    Blocks read from the storage files are cached internally in configurable caches.
    Bloom Filters
    Yes
    Yes
    These filters allow - at a cost of using memory on the region server - to quickly check if a specific cell exists or maybe not.
    Write-Ahead Log (WAL)
    Yes
    Yes
    Each region server in either system stores one modification log for all regions it hosts.
    Secondary Log
    Yes
    No
    In addition to the Write-Ahead log mentioned above BigTable has a second log that it can use when the first is going slow. This is a performance optimization.
    Skip Write-Ahead Log
    ?
    Yes
    For bulk imports the client in HBase can opt to skip writing into the WAL.
    Fast Table/Region Split
    Yes
    Yes
    Splitting a region or tablet is fast as the daughter regions first read the original storage file until a compaction finally rewrites the data into the region's local store.

    New Features

    As mentioned above, a few years have passed since the original OSDI'06 BigTable paper. Jeff Dean - a fellow at Google - has mentioned a few new BigTable features during speeches and presentations he gave recently. We will have a look at some of them here.

    Feature
     Google BigTable 
     Apache HBase 
    Notes
    Client Isolation
    Yes
    No
    BigTable is internally used to server many separate clients and can therefore keep the data between isolated.
    Coprocessors
    Yes
    No
    BigTable can host code that resides with the regions and splits with them as well. See HBASE-2000 for progress on this feature within HBase.
    Corruption Safety
    Yes
    No
    This is an interesting topic. BigTable uses CRC checksums to verify if data has been written safely. While HBase does not have this, the question is if that is build into Hadoop's HDFS?
    Replication
    Yes
    No
    HBase is working on the same topic in HBASE-1295

    Note: the color codes indicate what features have a direct match or where it is missing (yet). Weaker features are colored yellow, as I am not sure if they are immediately necessary or even applicable given HBase's implementation.

    Variations and Differences

    Some of the above features need a bit more looking into as they are difficult to be narrowed down to simple "Yay or Nay" questions. I am addressing them below separately.

    Lock Service

    This is from the BigTable paper:
    Bigtable uses Chubby for a variety of tasks: to ensure that there is at most one active master at any time; to store the bootstrap location of Bigtable data (see Section 5.1); to discover tablet servers and finalize tablet server deaths (see Section 5.2); to store Bigtable schema information (the column family information for each table); and to store access control lists. If Chubby becomes unavailable for an extended period of time, Bigtable becomes unavailable.

    There is a lot of overlap compared to how HBase does use ZooKeeper. What is different though is that schema information is not stored in ZooKeeper (yet, see http://wiki.apache.org/hadoop/ZooKeeper/HBaseUseCases) for details. What is important here though is the same reliance on the lock service being available. From my own experience and reading the threads on the HBase mailing list it is often underestimated what can happen when ZooKeeper does not get the resources it needs to react timely. It is better to have a small ZooKeeper cluster on older machines not doing anything else as opposed to having ZooKeeper nodes running next to the already heavy Hadoop or HBase processes. Once you starve ZooKeeper you will see a domino effect of HBase nodes going down with it - including the master(s).

    Update: After talking to a few guys of the ZooKeeper team I would like to point out that this is indeed not a ZooKeeper issue. It has to do with the fact that if you have an already heavily loaded node trying to also respond in time to ZooKeeper resources then you may face a timeout situation where the HBase RegionServers and even the Master may think that their coordination service is gone and shut themselves down. Patrick Hunt has responded to this by mail and by post. Please read both to see that ZooKeeper is able to handle the load. I personally recommend to set up ZooKeeper in combination with HBase on a separate cluster, maybe a set of spare machines you have from a recent update to the cluster and which are slightly outdated (no 2xquad core CPU with 16GB of memory) but are otherwise perfectly fine. This also allows you to monitor the machines separately and not having to see a combined CPU load of 100% on the servers and not really knowing where it comes from and what effect it may have.

    Another important difference is that ZooKeeper is no lock service like Chubby - and I do think it does not have to be as far as HBase is concerned. ZooKeeper is a distributed coordination service enabling HBase to do Master node elections etc. It also allows using semaphores to indicate state or actions required. So where Chubby creates a lock file to indicate a tablet server is up and running HBase in turn uses ephemeral nodes that exist as long as the session between the RegionServer which creates that node and ZooKeeper is active. This also causes the differences in semantics where in BigTable can delete a tablet servers lock file to indicate that it has lost its lease on tablets. In HBase this has to be handled differently because of the slightly less restrictive architecture of ZooKeeper. These are only semantics as mentioned and do not mean one is better than the other - just different.

    The first level is a file stored in Chubby that contains the location of the root tablet. The root tablet contains the location of all tablets in a special METADATA table. Each METADATA tablet contains the location of a set of user tablets. The root tablet is just the first tablet in the METADATA table, but is treated specially - it is never split - to ensure that the tablet location hierarchy has no more than three levels.

    As mentioned above in HBase the root region is its own table with a single region. If that makes a difference to having it as the first (non-splittable) region of the meta table I doubt strongly. It is just the same feature but implemented differently.

    The METADATA table stores the location of a tablet under a row key that is an encoding of the tablet's table identifier and its end row.

    HBase does have a different layout here. It stores the start and end row with each region where the end row is exclusive and denotes the first (or start) row of the next region. Again, these are minor differences and I am not sure if there is a better or worse solution. It is just done differently.

    Master Operation

    To detect when a tablet server is no longer serving its tablets, the master periodically asks each tablet server for the status of its lock. If a tablet server reports that it has lost its lock, or if the master was unable to reach a server during its last several attempts, the master attempts to acquire an exclusive lock on the server's file. If the master is able to acquire the lock, then Chubby is live and the tablet server is either dead or having trouble reaching Chubby, so the master ensures that the tablet server can never serve again by deleting its server file. Once a server's file has been deleted, the master can move all the tablets that were previously assigned to that server into the set of unassigned tablets. To ensure that a Bigtable cluster is not vulnerable to networking issues between the master and Chubby, the master kills itself if its Chubby session expires.

    This is quite different even up to the current HBase 0.20.2. Here the master uses a heartbeat protocol that is used by the region servers to report for duty and that they are still alive subsequently. I am not sure if this topic is covered by the master rewrite umbrella issue HBASE-1816 - and if it needs to be addressed at all. It could well be that what we have in HBase now is sufficient and does its job just fine. It was created when there was no lock service yet and therefore could be considered legacy code too.

    Master Startup

    The master executes the following steps at startup. (1) The master grabs a unique master lock in Chubby, which prevents concurrent master instantiations. (2) The master scans the servers directory in Chubby to find the live servers. (3) The master communicates with every live tablet server to discover what tablets are already assigned to each server. (4) The master scans the METADATA table to learn the set of tablets. Whenever this scan encounters a tablet that is not already assigned, the master adds the tablet to the set of unassigned tablets, which makes the tablet eligible for tablet assignment.

    Along what I mentioned above, this part of the code was created before ZooKeeper was available. So HBase actually waits for the region servers to report for duty. It also scans the .META. table to learn what is there and which server is assigned to it. ZooKeeper is (yet) only used to publish the server hosting the -ROOT- region.

    Tablet/Region Splits

    In case the split notification is lost (either because the tablet server or the master died), the master detects the new tablet when it asks a tablet server to load the tablet that has now split. The tablet server will notify the master of the split, because the tablet entry it finds in the METADATA table will specify only a portion of the tablet that the master asked it to load.

    The master node in HBase uses the .META. solely to detect when a region was split but the message was lost. For that reason it scans the .META. on a regular basis to see when a region appears that is not yet assigned. It will then assign that region as per its default strategy.

    Compactions

    The following are more terminology differences than anything else.

    As write operations execute, the size of the memtable increases. When the memtable size reaches a threshold, the memtable is frozen, a new memtable is created, and the frozen memtable is converted to an SSTable and written to GFS. This minor compaction process has two goals: it shrinks the memory usage of the tablet server, and it reduces the amount of data that has to be read from the commit log during recovery if this server dies. Incoming read and write operations can continue while compactions occur.

    HBase has a similar operation but it is referred to as a "flush". Opposed to that "minor compactions" in HBase rewrite the last N used store files, i.e. those with the most recent mutations as they are probably much smaller than previously created files that have more data in them.

    ... we bound the number of such files by periodically executing a merging compaction in the background. A merging compaction reads the contents of a few SSTables and the memtable, and writes out a new SSTable. The input SSTables and memtable can be discarded as soon as the compaction has finished.

    This again refers to what is called "minor compaction" in HBase.

    A merging compaction that rewrites all SSTables into exactly one SSTable is called a major compaction.

    Here we have an exact match though, a "major compaction" in HBase also rewrites all files into one.

    Immutable Files

    Knowing that files are fixed once written BigTable makes the following assumption:

    The only mutable data structure that is accessed by both reads and writes is the memtable. To reduce contention during reads of the memtable, we make each memtable row copy-on-write and allow reads and writes to proceed in parallel.

    I do believe this is done similar in HBase but am not sure. It certainly has the same architecture as HDFS files for example are also immutable once written.

    I can only recommend that you read the BigTable too and make up your own mind. This post was inspired by the idea to learn what BigTable really has to offer and how much HBase has already covered. The difficult part is of course that there is not too much information available on BigTable. But the numbers even the 2006 paper lists are more than impressive. If HBase as on open-source project with just a handful of committers of whom most have a full-time day jobs can achieve something even remotely comparable I think this is a huge success. And looking at the 0.21 and 0.22 road map, the already small gap is going to shrink even further!


  3. HBase File Locality in HDFS

    One of the more ambiguous things in Hadoop is block replication: it happens automatically and you should not have to worry about it. HBase relies on it 100% to provide the data safety as it stores its files into the distributed file system. While that works completely transparent, one of the more advanced questions asked though is how does this affect performance? This usually arises when the user starts writing MapReduce jobs against either HBase or Hadoop directly. Especially with larger data being stored in HBase, how does the system take care of placing the data close to where it is needed? This is referred to data locality and in case of HBase using the Hadoop file system (HDFS) there may be doubts how that is working.

    First let's see how Hadoop handles this. The MapReduce documentation advertises the fact that tasks run close to the data they process. This is achieved by breaking up large files in HDFS into smaller chunks, or so called blocks. That is also the reason why the block size in Hadoop is much larger than you may know them from operating systems and their file systems. Default setting is 64MB, but usually 128MB is chosen, if not even larger when you are sure all your files are larger than a single block in size. Each block maps to a task run to process the contained data. That also means larger block sizes equal fewer map tasks to run as the number of mappers is driven by the number of blocks that need processing. Hadoop knows where blocks are located and runs the map tasks directly on the node that hosts it (actually one of them as replication means it has a few hosts to chose from). This is how it guarantees data locality during MapReduce.

    Back to HBase. When you have arrived at that point with Hadoop and you now understand that it can process data locally you start to question how this may work with HBase. If you have read my post on HBase's storage architecture you saw that HBase simply stores files in HDFS. It does so for the actual data files (HFile) as well as its log (WAL). And if you look into the code it simply uses FileSystem.create(Path path) to create these. When you then consider two access patterns, a) direct random access and b) MapReduce scanning of tables, you wonder if care was taken that the HDFS blocks are close to where they are read by HBase.

    One thing upfront, if you do not co-share your cluster with Hadoop and HBase but instead employ a separate Hadoop as well as a stand-alone HBase cluster then there is no data locality - and it can't be. That equals to running a separate MapReduce cluster where it would not be able to execute tasks directly on the datanode. It is imperative for data locality to have them running on the same cluster, Hadoop (as in the HDFS), MapReduce and HBase. End of story.

    OK, you them all co-located on a single (hopefully larger) cluster? Then read on. How does Hadoop figure out where data is located as HBase accesses it. Remember the access pattern above, both go through a single piece of software called a RegionServer. Case a) uses random access patterns while b) scans all contiguous rows of a table but does so through the same API. As explained in my referenced post and mentioned above, HBase simply stores files and those get distributed as replicated blocks across all data nodes of the HDFS. Now imagine you stop HBase after saving a lot of data and restarting it subsequently. The region servers are restarted and assign a seemingly random number of regions. At this very point there is no data locality guaranteed - how could it be?

    The most important factor is that HBase is not restarted frequently and that it performs house keeping on a regular basis. These so called compactions rewrite files as new data is added over time. All files in HDFS once written are immutable (for all sorts of reasons). Because of that, data is written into new files and as their number grows HBase compacts them into another set of new, consolidated files. And here is the kicker: HDFS is smart enough to put the data where it is needed! How does that work you ask? We need to take a deep dive into Hadoop's source code and see how the above FileSystem.create(Path path) that HBase uses works. We are running on HDFS here, so we are actually using DistributedFileSystem.create(Path path) which looks like this:

    public FSDataOutputStream create(Path f) throws IOException {
      return create(f, true);
    }
    It returns a FSDataOutputStream and that is create like so:
    public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, short replication, long blockSize, Progressable progress) throws IOException {
      return new FSDataOutputStream(dfs.create(getPathName(f), permission, overwrite, replication, blockSize, progress, bufferSize), statistics);
    }
    It uses a DFSClient instance that is the "umbilical" cord connecting the client with the NameNode:
    this.dfs = new DFSClient(namenode, conf, statistics);
    What is returned though is a DFSClient.DFSOutputStream instance. As you write data into the stream the DFSClient aggregates it into "packages" which are then written as blocks to the data nodes. This happens in DFSClient.DFSOutputStream.DataStreamer (please hang in there, we are close!) which runs as a daemon thread in the background. The magic unfolds now in a few hops on the stack, first in the daemon run() it gets the list of nodes to store the data on:
    nodes = nextBlockOutputStream(src);
    This in turn calls:
    long startTime = System.currentTimeMillis();
    lb = locateFollowingBlock(startTime);
    block = lb.getBlock();
    nodes = lb.getLocations();
    We follow further down and see that locateFollowingBlocks() calls:
    return namenode.addBlock(src, clientName);
    Here is where it all comes together. The name node is called to add a new block and the src parameter indicates for what file, while clientName is the name of the DFSClient instance. I skip one more small method in between and show you the next bigger step involved:
    public LocatedBlock getAdditionalBlock(String src, String clientName) throws IOException {
      ...
      INodeFileUnderConstruction pendingFile  = checkLease(src, clientName);
      ...
      fileLength = pendingFile.computeContentSummary().getLength();
      blockSize = pendingFile.getPreferredBlockSize();
      clientNode = pendingFile.getClientNode();
      replication = (int)pendingFile.getReplication();
    
      // choose targets for the new block tobe allocated.
      DatanodeDescriptor targets[] = replicator.chooseTarget(replication, clientNode, null, blockSize);
      ...
    }
    We are finally getting to the core of this code in the replicator.chooseTarget() call:
    private DatanodeDescriptor chooseTarget(int numOfReplicas, DatanodeDescriptor writer, List<Node> excludedNodes, long blocksize, int maxNodesPerRack, List<DatanodeDescriptor> results) {
      
      if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
        return writer;
      }
      
      int numOfResults = results.size();
      boolean newBlock = (numOfResults==0);
      if (writer == null && !newBlock) {
        writer = (DatanodeDescriptor)results.get(0); 
      }
      
      try {
        switch(numOfResults) {
        case 0:
          writer = chooseLocalNode(writer, excludedNodes, blocksize, maxNodesPerRack, results);
          if (--numOfReplicas == 0) {
            break;
          }
        case 1:
          chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results);
          if (--numOfReplicas == 0) {
            break;
          }
        case 2:
          if (clusterMap.isOnSameRack(results.get(0), results.get(1))) {
            chooseRemoteRack(1, results.get(0), excludedNodes, blocksize, maxNodesPerRack, results);
          } else if (newBlock) {
            chooseLocalRack(results.get(1), excludedNodes, blocksize, maxNodesPerRack, results);
          } else {
            chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack, results);
          }
          if (--numOfReplicas == 0) {
            break;
          }
        default:
          chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize, maxNodesPerRack, results);
        }
      } catch (NotEnoughReplicasException e) {
        FSNamesystem.LOG.warn("Not able to place enough replicas, still in need of " + numOfReplicas);
      }
      return writer;
    }
    Recall that we have started with the DFSClient and created a file which was subsequently filled with data. As the blocks need writing out the above code checks first if that can be done on the same host that the client is on, i.e. the "writer". That is "case 0". In "case 1" the code tries to find a remote rack to have a distant replication of the block. Lastly is fills the list of required replicas with local or machines of another rack.

    So this means for HBase that as the region server stays up for long enough (which is the default) that after a major compaction on all tables - which can be invoked manually or is triggered by a configuration setting - it has the files local on the same host. The data node that shares the same physical host has a copy of all data the region server requires. If you are running a scan or get or any other use-case you can be sure to get the best performance.

    Finally a good overview over the HDFS design and data replication can be found here. Also note that the HBase team is working on redesigning how the Master is assigning the regions to servers. The plan is to improve it so that regions are deployed on the server where most blocks are. This will particularly be useful after a restart because it would guarantee a better data locality right off the bat. Stay tuned!


  4. Minimal Katta Lucene Client

    A quick post explaining how a minimal Katta Lucene Client is set up. I found this was sort of missing from the Katta site and documentation and since I ran into an issue along the way I thought I post my notes here for others who may attempt the same.

    First was the question, which of the libs needed to be supplied for a client to use a remote Katta cluster. Please note that I am referring here to a "canonical" setup with a distributed Lucene index (which I created on Hadoop from data in HBase using a MapReduce job). I found these libs needed to be added, the rest is for the server:

    katta-core-0.6.rc1.jar
    lucene-core-3.0.0.jar
    zookeeper-3.2.2.jar
    zkclient-0.1-dev.jar
    hadoop-core-0.20.1.jar
    log4j-1.2.15.jar
    commons-logging-1.0.4.jar

    Here is the code for the client, please note that this is a simple test app that expects to get the name of the index, the default Lucene search field and query on the command line. I did not add usage info as this is just a proof of concept.

    package com.worldlingo.test;
    
    import net.sf.katta.lib.lucene.Hit;
    import net.sf.katta.lib.lucene.Hits;
    import net.sf.katta.lib.lucene.LuceneClient;
    import net.sf.katta.util.ZkConfiguration;
    import org.apache.hadoop.io.MapWritable;
    import org.apache.hadoop.io.Writable;
    import org.apache.lucene.analysis.Analyzer;
    import org.apache.lucene.analysis.standard.StandardAnalyzer;
    import org.apache.lucene.queryParser.QueryParser;
    import org.apache.lucene.search.Query;
    import org.apache.lucene.util.Version;
    
    import java.util.Arrays;
    import java.util.Map;
    
    public class KattaLuceneClient {
    
      public static void main(String[] args) {
        try {
          Analyzer analyzer = new StandardAnalyzer(Version.LUCENE_CURRENT);
          Query query = new QueryParser(Version.LUCENE_CURRENT, args[1], analyzer).parse(args[2]);
    
          // assumes "/katta.zk.properties" available on classpath!
          ZkConfiguration conf = new ZkConfiguration();
          LuceneClient luceneClient = new LuceneClient(conf);
          Hits hits = luceneClient.search(query, Arrays.asList(args[0]).toArray(new String[1]), 99);
    
          int num = 0;
          for (Hit hit : hits.getHits()) {
            MapWritable mw = luceneClient.getDetails(hit);
            for (Map.Entry<Writable, Writable> entry : mw.entrySet()) {
              System.out.println("[" + (num++) + "] key -> " + entry.getKey() + ", value -> " + entry.getValue());
            }
          }
        } catch (Exception e) {
          e.printStackTrace();
        }
      }
    
    }

    The first part is standard Lucene code were we parse the query string with an analyzer. The seconds part is Katta related as it creates a configuration object, which assumes we have a ZooKeeper configuration in the class path. That config only needs to have these lines set:

    zookeeper.embedded=false
    zookeeper.servers=server-1:2181,server-2:2181

    The first line is really only used on the server, so it can be left out on the client. I simply copied the server katta.zk.properties to match my setup. The important line is the second one, which tells the client where the ZooKeeper responsible for managing the Katta cluster is running. With this info the client is able to distribute the search calls to the correct Katta slaves.

    Further along we create a LuceneClient instance and start the actual search. Here I simply used no sorting and set the maximum number of hits returned to 99. These two values could be optionally added to the command line parameters but are trivial and not required here - this is a minimal test client after all ;)

    The last part of the app is simply printing out the fields and their values of each found document. Please note that Katta is using the low-level Writable class as part of its response. This is not "too" intuitive for the uninitiated. These are actually Text instances so they can safely be convert to text using ".toString()".

    Finally, I also checked the test project into my GitHub account for your perusal. Have fun!


  5. 3rd Munich OpenHUG Meeting

    I am pleased to invite you to our third Munich Open Hadoop User Group Meeting!

    Like always we are looking forward to see everyone again and are welcoming new attendees to join our group. We are enthusiast about all things related to scalable, distributed storage system. We are not limiting us to a particular system but appreciate anyone who would like to share about their experiences.

    When: Thursday May 6th, 2010 at 6pm (open end)
    Where: eCircle AG, Nymphenburger Straße 86, 80636 München ["Bruckmann" Building, "U1 Mailinger Str", map (in German) and look for the signs]

    Thanks again to Bob Schulze from eCircle for providing the infrastructure.

    We have a talk scheduled by Stefan Seelmann who is a member of the project committee for the Apache Directory project. This is followed by an open discussion.

    Please RSVP at Xing and Yahoo's Upcoming.

    Looking forward to seeing you there!

    Cheers,
    Lars


  6. FOSDEM 2010 NoSQL Talk

    Let me take a minute to wrap up my FOSDEM 2010 experience. I was part of the NoSQL DevRoom organized by @stevenn from Outerthought, who I had the pleasure to visit before a few months back as an HBase Ambassador.

    First things first, the NoSQL DevRoom was just an absolute success and I had a blast attending it. I also made sure to not walk around and see other talks outside the NoSQL track while there were many and plenty good ones. I did so deliberately to see the other projects and what they have to offer. I thought it was great, a good vibe was felt throughout the whole day as the audience got a whirlwind tour through the NoSQL landscape. The room was full to the brim for most presentations and some folks had to miss out as we could not have had more enter. This did prove the great interest in this fairly new kind of technology. Exciting!

    The focus of my talk was about the history I have with HBase starting with it in late 2007. At this point I would like to take to the opportunity to thank Michael Stack, the lead of HBase, as he has helped me many times back then to sort out the problems I ran into. I would also like to say that if you start with HBase today you will not have these problems as HBase has matured tremendously since then. It is an overall stable product and can solve scalability issues you may face with regular RDBMS's today - and that with great ease.

    So the talk I gave did not really sell all the features nor did it explain everything fully. I felt this could be left to the reader to look up on the project's website (or here on my blog) and hence I focused on my use case only. First up, here are the slides.


    My Life with HBase - FOSDEM 2010 NoSQL -

    After my talk and throughout the rest of the day I also had great conversations with the attendees who had many and great questions.

    Having listened to the other talks though I felt I probably could have done a better job selling HBase to the audience. I could have reported about use-cases in well known companies, gave better performance numbers and so on. I have learned a lesson and am making sure I will be doing a better job next time around. I guess this is also another facet of what this is about, i.e. learning to achieve a higher level of professionalism.

    But as I said above, my intend was to report about my life with HBase. I am grateful though that it was accepted as that and please let me cite Todd Hoff (see Hot Scalability Links for February 12, 2010) who put it in such nice words:

    "The hardscabble tale of HBase's growth from infancy to maturity. A very good introduction and overview of HBase."
    Thank you!

    Finally here is the video of the talk:



    I am looking forward to more NoSQL events in Europe in the near future and will attempt to represent HBase once more (including those adjustments I mentioned above). My hope is that we as Europeans are able to adopt these new technologies and stay abreast with the rest of the world. We sure have smart people to do so.


  7. IvyDE and HBase 0.21 (trunk)

    If you are staying on top of HBase development and frequently update to the HBase trunk (0.21.0-dev at the time of this post) you may have noticed that we now have support for Apache Ivy (see HBASE-1433). This is good because it allows to better control dependencies of the required jar files. It does have a few drawbacks though. One issue that you must be online to get your initial set of jars. You can also set up a local mirror or reuse the one you need for Hadoop anyways to share some of them.

    Another issue is that it pulls in many more libs as part of the dependency resolving process. This reminds me bit of aptitude and when you try to install Java, for example on Debian. It often wants to pull in a litany of "required" packages but upon closer look many are only recommended and need not to be installed.

    Finally you need to get the jar files somehow into your development environment. I am using Eclipse 3.5 on my Windows 7 PC as well as on my MacOS machines. If you have not run ant from the command line yet you have no jars downloaded and opening the project in Eclipse yields an endless amount of errors. You have two choices, you can run ant and get all jars and then add them to the project in Eclipse. But that is rather static and does not work well with future changes. It also is not the "ivy" way to resolve the libraries automatically.

    The other option you have is adding a plugin to Eclipse that can handle Ivy for you, right within the IDE. Luckily for Eclipse there is IvyDE. You install it according to its documentation and then add a "Classpath Container" as described here. That part works quite well and after a restart IvyDE is ready to go.

    A few more steps have to be done to get HBase working now - as in compiling without errors. The crucial one is editing the Ivy library and setting the HBase specific Ivy files. In particular the "Ivy settings path" and the properties file. The latter especially is specifying all the various version numbers that the ivy.xml is using. Without it the Ivy resolve process will fail with many errors all over the place. Please note that in the screen shot I added you see how it looks like on my Windows PC. The paths will be slightly different for your setup and probably even using another format if you are on a Mac or Linux machine. As long as you specify both you should be fine.

    The other important issue is that you have to repeat that same step adding the Classpath Container two more times: each of the two larger contrib packages "contrib/stargate" and "contrib/transactional" have their own ivy.xml! For both you have to go into the respective directory and right click on the ivy.xml and follow the steps described in the Ivy documentation. Enter the same information as mentioned above to make the resolve work, leave everything else the way it is. You may notice that the contrib packages have a few more targets unticked. That is OK and can be used as-is.

    As a temporary step you have to add two more static libraries that are in the $HBASE_HOME/lib directory: libthrift-0.2.0.jar and zookeeper-3.2.2.jar. Those will eventually be published on the Ivy repositories and then this step is obsolete (see INFRA-2461).

    Eventually you end up with three containers as shown in the second and third screen shot. The Eclipse toolbar now also has an Ivy "Resolve All Dependencies" button which you can use to trigger the download process. Personally I had to do this a few times as the mirrors with the jars seem to be flaky at times. I ended up with for example "hadoop-mapred.jar" missing. Another resolve run fixed the problem.

    The last screen shot shows the three Ivy related containers once more in the tree view of the Package Explorer in the Java perspective. What you also see is the Ivy console, which also is installed with the plugin. You have to open it as usual using the "Window - Show View - Console" menu (if you do not have the Console View open already) and then use the drop down menu next to the "Open Console" button in that view to open the Ivy console. It gives you access to all the details when resolving the dependencies and can hint when you have done something wrong. Please note though that it also lists a lot of connection errors, one for every mirror or repository that does not respond or yet has the required package available. One of them should respond though or as mentioned above you will have to try later again.

    Eclipse automatically compiles the project and if everything worked out it does so now without a hitch. Good luck!

    Update: Added info about the yet still static thrift and zookeeper jars. See Kay Kay's comment below.


  8. HBase Architecture 101 - Write-ahead-Log

    What is the Write-ahead-Log you ask? In my previous post we had a look at the general storage architecture of HBase. One thing that was mentioned is the Write-ahead-Log, or WAL. This post explains how the log works in detail, but bear in mind that it describes the current version, which is 0.20.3. I will address the various plans to improve the log for 0.21 at the end of this article. For the term itself please read here.

    Big Picture

    The WAL is the lifeline that is needed when disaster strikes. Similar to a BIN log in MySQL it records all changes to the data. This is important in case something happens to the primary storage. So if the server crashes it can effectively replay that log to get everything up to where the server should have been just before the crash. It also means that if writing the record to the WAL fails the whole operation must be considered a failure.

    Let"s look at the high level view of how this is done in HBase. First the client initiates an action that modifies data. This is currently a call to put(Put), delete(Delete) and incrementColumnValue() (abbreviated as "incr" here at times). Each of these modifications is wrapped into a KeyValue object instance and sent over the wire using RPC calls. The calls are (ideally batched) to the HRegionServer that serves the affected regions. Once it arrives the payload, the said KeyValue, is routed to the HRegion that is responsible for the affected row. The data is written to the WAL and then put into the MemStore of the actual Store that holds the record. And that also pretty much describes the write-path of HBase.

    Eventually when the MemStore gets to a certain size or after a specific time the data is asynchronously persisted to the file system. In between that timeframe data is stored volatile in memory. And if the HRegionServer  hosting that memory crashes the data is lost... but for the existence of what is the topic of this post, the WAL!

    We have a look now at the various classes or "wheels" working the magic of the WAL. First up is one of the main classes of this contraption.

    HLog

    The class which implements the WAL is called HLog. What you may have read in my previous post and is also illustrated above is that there is only one instance of the HLog class, which is one per HRegionServer. When a HRegion is instantiated the single HLog is passed on as a parameter to the constructor of HRegion.

    Central part to HLog's functionality is the append() method, which internally eventually calls doWrite(). It is what is called when the above mentioned modification methods are invoked... or is it? One thing to note here is that for performance reasons there is an option for put(), delete(), and incrementColumnValue() to be called with an extra parameter set: setWriteToWAL(boolean). If you invoke this method while setting up for example a Put() instance then the writing to WAL is forfeited! That is also why the downward arrow in the big picture above is done with a dashed line to indicate the optional step. By default you certainly want the WAL, no doubt about that. But say you run a large bulk import MapReduce job that you can rerun at any time. You gain extra performance but need to take extra care that no data was lost during the import. The choice is yours.

    Another important feature of the HLog is keeping track of the changes. This is done by using a "sequence number". It uses an AtomicLong internally to be thread-safe and is either starting out at zero - or at that last known number persisted to the file system. So as the region is opening its storage file, it reads the highest sequence number which is stored as a meta field in each HFile and sets the HLog sequence number to that value if it is higher than what has been recorded before. So at the end of opening all storage files the HLog is initialized to reflect where persisting has ended and where to continue. You will see in a minute where this is used.

    The image to the right shows three different regions. Each of them covering a different row key range. As mentioned above each of these regions shares the the same single instance of HLog. What that means in this context is that the data as it arrives at each region it is written to the WAL in an unpredictable order. We will address this further below.

    Finally the HLog has the facilities to recover and split a log left by a crashed HRegionServer. These are invoked by the HMaster before regions are deployed again.

    HLogKey

    Currently the WAL is using a Hadoop SequenceFile, which stores record as sets of key/values. For the WAL the value is simply the KeyValue sent from the client. The key is represented by an HLogKey instance. If you may recall from my first post in this series the KeyValue does only represent the row, column family, qualifier, timestamp, and value as well as the "Key Type". Last time I did not address that field since there was no context. Now we have one because the Key Type is what identifies what the KeyValue represents, a "put" or a "delete" (where there are a few more variations of the latter to express what is to be deleted, value, column family or a specific column).

    What we are missing though is where the KeyValue belongs to, i.e. the region and the table name. That is stored in the HLogKey. What is also stored is the above sequence number. With each record that number is incremented to be able to keep a sequential order of edits. Finally it records the "Write Time", a time stamp to record when the edit was written to the log.

    LogFlusher

    As mentioned above as data arrives at a HRegionServer in form of KeyValue instances it is written (optionally) to the WAL. And as mentioned as well it is then written to a SequenceFile. While this seems trivial, it is not. One of the base classes in Java IO is the Stream. Especially streams writing to a file system are often buffered to improve performance as the OS is much faster writing data in batches, or blocks. If you write records separately IO throughput would be really bad. But in the context of the WAL this is causing a gap where data is supposedly written to disk but in reality it is in limbo. To mitigate the issue the underlaying stream needs to be flushed on a regular basis. This functionality is provided by the LogFlusher class and thread. It simply calls HLog.optionalSync(), which checks if the  hbase.regionserver.optionallogflushinterval, set to 10 seconds by default, has been exceeded and if that is the case invokes HLog.sync(). The other place invoking the sync method is HLog.doWrite(). Once it has written the current edit to the stream it checks if the hbase.regionserver.flushlogentries parameter, set to 100 by default, has been exceeded and calls sync as well.

    Sync itself invokes HLog.Writer.sync() and is implemented in SequenceFileLogWriter. For now we assume it flushes the stream to disk and all is well. That in reality this is all a bit more complicated is discussed below.

    LogRoller

    Obviously it makes sense to have some size restrictions related to the logs written. Also we want to make sure a log is persisted on a regular basis. This is done by the LogRoller class and thread. It is controlled by the hbase.regionserver.logroll.period parameter in the $HBASE_HOME/conf/hbase-site.xml file. By default this is set to 1 hour. So every 60 minutes the log is closed and a new one started. Over time we are gathering that way a bunch of log files that need to be maintained as well. The HLog.rollWriter() method, which is called by the LogRoller to do the above rolling of the current log file, is taking care of that as well by calling HLog.cleanOldLogs() subsequently. It checks what the highest sequence number written to a storage file is, because up to that number all edits are persisted. It then checks if there is a log left that has edits all less than that number. If that is the case it deletes said logs and leaves just those that are still needed.

    This is a good place to talk about the following obscure message you may see in your logs:

    2009-12-15 01:45:48,427 INFO org.apache.hadoop.hbase.regionserver.HLog: Too
    many hlogs: logs=130, maxlogs=96; forcing flush of region with oldest edits:
    foobar,1b2dc5f3b5d4,1260083783909


    It is printed because the configured maximum number of log files to keep exceeds the number of log files that are required to be kept because they still contain outstanding edits that have not yet been persisted. The main reason I saw this being the case is when you stress out the file system so much that it cannot keep up persisting the data at the rate new data is added. Otherwise log flushes should take care of this. Note though that when this message is printed the server goes into a special mode trying to force flushing out edits to reduce the number of logs required to be kept.

    The other parameters controlling the log rolling are hbase.regionserver.hlog.blocksize and hbase.regionserver.logroll.multiplier, which are set by default to rotate logs when they are at 95% of the blocksize of the SequenceFile, typically 64M. So either the logs are considered full or when a certain amount of time has passed causes the logs to be switched out, whatever comes first.

    Replay

    Once a HRegionServer starts and is opening the regions it hosts it checks if there are some left over log files and applies those all the way down in Store.doReconstructionLog(). Replaying a log is simply done by reading the log and adding the contained edits to the current MemStore. At the end an explicit flush of the MemStore (note, this is not the flush of the log!) helps writing those changes out to disk.

    The old logs usually come from a previous region server crash. When the HMaster is started or detects that region server has crashed it splits the log files belonging to that server into separate files and stores those in the region directories on the file system they belong to. After that the above mechanism takes care of replaying the logs. One thing to note is that regions from a crashed server can only be redeployed if the logs have been split and copied. Splitting itself is done in HLog.splitLog(). The old log is read into memory in the main thread (means single threaded) and then using a pool of threads written to all region directories, one thread for each region.

    Issues

    As mentioned above all edits are written to one HLog per HRegionServer. You would ask why that is the case? Why not write all edits for a specific region into its own log file? Let's quote the BigTable paper once more:

    If we kept the commit log for each tablet in a separate log file, a very large number of files would be written concurrently in GFS. Depending on the underlying file system implementation on each GFS server, these writes could cause a large number of disk seeks to write to the different physical log files.

    HBase followed that principle for pretty much the same reasons. As explained above you end up with many files since logs are rolled and kept until they are safe to be deleted. If you do this for every region separately this would not scale well - or at least be an itch that sooner or later is causing pain.

    So far that seems to be no issue. But again, it causes problems when things go wrong. As long as you have applied all edits in time and persisted the data safely, all is well. But if you have to split the log because of a server crash then you need to divide into suitable pieces, as described above in the "replay" paragraph. But as you have seen above as well all edits are intermingled in the log and there is no index of what is stored at all. For that reason the HMaster cannot redeploy any region from a crashed server until it has split the logs for that very server. And that can be quite a number if the server was behind applying the edits.

    Another problem is data safety. You want to be able to rely on the system to save all your data, no matter what newfangled algorithms are employed behind the scenes. As far as HBase and the log is concerned you can turn down the log flush times to as low as you want - you are still dependent on the underlaying file system as mentioned above; the stream used to store the data is flushed but is it written to disk yet? We are talking about fsync style issues. Now for HBase we are most likely talking Hadoop's HDFS as being the file system that is persisted to.

    Up to this point it should be abundantly clear that the log is what keeps data safe. For that reason a log could be kept open for up to an hour (or more if configured so). As data arrives a new key/value pair is written to the SequenceFile and occasionally flushed to disk. But that is not how Hadoop was set out to work. It was meant to provide an API that allows to open a file, write data into it (preferably a lot) and closed right away, leaving an immutable file for everyone else to read many times. Only after a file is closed it is visible and readable to others. If a process dies while writing the data the file is pretty much considered lost. What is required is a feature that allows to read the log up to the point where the crashed server has written it (or as close as possible).

    Interlude: HDFS append, hflush, hsync, sync... wth?

    It all started with HADOOP-1700 reported by HBase lead Michael Stack. It was committed in Hadoop 0.19.0 and meant to solve the problem. But that was not the case. So the issue was tackled again in HADOOP-4379 aka HDFS-200 and implemented syncFs() that was meant to help syncing changes to a file to be more reliable. For a while we had custom code (see HBASE-1470) that detected a patched Hadoop that exposed that API. But again this did not solve the issue entirely.

    Then came HDFS-265, which revisits the append idea in general. It also introduces a Syncable interface that exposes hsync() and hflush().

    Lastly SequenceFile.Writer.sync() is not the same as the above, it simply writes a synchronization marker into the file that helps reading it later - or recover data if broken.

    While append for HDFS in general is useful it is not used in HBase, but the hflush() is. What it does is writing out everything to disk as the log is written. In case of a server crash we can safely read that "dirty" file up to the last edits. The append in Hadoop 0.19.0 was so badly suited that a hadoop fsck / would report the DFS being corrupt because of the open log files HBase kept.

    Bottom line is, without Hadoop 0.21.0 you can very well face data loss. With Hadoop 0.21.0 you have a state-of-the-art system.

    Planned Improvements

    For HBase 0.21.0 there are quite a few things lined up that affect the WAL architecture. Here are some of the noteworthy ones.

    SequenceFile Replacement

    One of the central building blocks around the WAL is the actual storage file format. The used SequenceFile has quite a few shortcomings that need to be addressed. One for example is the suboptimal performance as all writing in SequenceFile is synchronized, as documented in HBASE-2105.

    As with HFile replacing MapFile in HBase 0.20.0 it makes sense to think about a complete replacement. A first step was done to make the HBase classes independent of the underlaying file format. HBASE-2059 made the class implementing the log configurable.

    Another idea is to change to a different serialization altogether. HBASE-2055 proposes such a format using Hadoop's Avro as the low level system. Avro is also slated to be the new RPC format for Hadoop, which does help as more people are familiar with it.

    Append/Sync

    Even with hflush() we have a problem that calling it too often may cause the system to slow down. Previous tests using the older syncFs() call did show that calling it for every record slows down the system considerably. One step to help is to implement a "Group Commit", done in HBASE-1939. It flushes out records in batches. In addition HBASE-1944 adds the notion of a "deferred log flush" as a parameter of a Column Family. If set to true it leaves the syncing of changes to the log to the newly added LogSyncer class and thread. Finally HBASE-2041 sets the flushlogentries to 1 and optionallogflushinterval to 1000 msecs. The .META. is always synced for every change, user tables can be configured as needed.

    Distributed Log Splitting

    As remarked splitting the log is an issue when regions need to be redeployed. One idea is to keep a list of regions with edits in Zookeeper. That way at least all "clean" regions can be deployed instantly. Only those with edits need to wait then until the logs are split.

    What is left is to improve how the logs are split to make the process faster. Here is how is the BigTable addresses the issue:
    One approach would be for each new tablet server to read this full commit log file and apply just the entries needed for the tablets it needs to recover. However, under such a scheme, if 100 machines were each assigned a single tablet from a failed tablet server, then the log file would be read 100 times (once by each server).
    and further
    We avoid duplicating log reads by first sorting the commit log entries in order of the keys (table, row name, log sequence number). In the sorted output, all mutations for a particular tablet are contiguous and can therefore be read efficiently with one disk seek followed by a sequential read. To parallelize the sorting, we partition the log file into 64 MB segments, and sort each segment in parallel on different tablet servers. This sorting process is coordinated by the master and is initiated when a tablet server indicates that it needs to recover mutations from some commit log file.
    This is where its at. As part of the HMaster rewrite (see HBASE-1816) the log splitting will be addressed as well. HBASE-1364 wraps the splitting of logs into one issue. But I am sure that will evolve in more sub tasks as the details get discussed.


  9. 2nd Munich OpenHUG Meeting

    At the end of last year we had a first meeting in Munich to allow everyone interested in all things Hadoop and related technologies to gather, get to know each other and exchange their knowledge, experiences and information. We would like to invite for the second Munich OpenHUG Meeting and hope to see you all again and meet even more new enthusiasts there. We would also be thrilled if those attending would be willing to share their story so that we can learn about other projects and how people are using the exciting NoSQL related technologies. No pressure though, come along and simply listen if you prefer, we welcome anyone and everybody!

    When: Thursday February 25, 2010 at 5:30pm open end
    Where: eCircle AG, Nymphenburger Straße 86, 80636 München ["Bruckmann" Building, "U1 Mailinger Str", map (in German) and look for the signs]

    Thanks again to Bob Schulze from eCircle to provide the location and projector. So far we have a talk scheduled by Christoph Rupp about HamsterDB. We are still looking for volunteers who would like to present on any related topic (please contact me)! Otherwise we will have an open discussion about whatever is brought up by the attendees.

    Last but not least there will be something to drink and we will get pizzas in. Since we do not know how many of you will come we simply stay at the events location and continue our chats over food.

    Looking forward to seeing you there!

    Please RSVP at Upcoming or Xing.


  10. First Munich OpenHUG Meeting - Summary

    On December 17th we had the first Munich OpenHUG meeting. The location was kindly provided by eCircle's Bob Schulze. This was the first in a series of meetings in Munich based on everything Hadoop and related technologies. Let's use the term NoSQL with care as this is not about blame or finger pointing (I feel that it was used like that in the past and therefore making this express point). We are trying to get together the brightest local and remote talent to report and present on new age and evolved existing technologies.

    The first talk of the night was given by Bob himself presenting his findings of evaluating HBase and Hadoop for their internal use. He went into detail explaining how HBase is structuring its data and how it can be used for their needs. One thing that I noted in particular was his HBase Explorer, which he subsequently published on SourceForge as an Open Source project. The talk was concluded by an open discussion about HBase.

    The second part of the meeting was my own presentation about how we at WorldLingo use HBase and Hadoop (as well as Lucene etc.)

    We continued our discussion on HBase with the developers of eCircle present. This was very interesting and fruitful and we had the chance to exchange experiences made along our similar paths.

    I would have wished for the overall attendance to be a little higher, but it was a great start. Talking to other hosts of similar events it seems that this is normal and therefore my hopes are up for the next meetings throughout this year. We have planned the next meeting for February 25th, 2010 at the same location. If you have interest in presenting a talk on any related topic, please contact me!

    I am looking forward to meeting you all there!

    Lars


  1. 1
  2. Next ›
  3. Last »