Running Cassandra as an embedded service

While developing an application at outbrain, using Cassandra I was looking for a good way to test my app. The application consists of a Cassandra Client package, some Data Access Objects (DAOs) and some bean object that represent the data entities in cassandra. I wanted to test them all.

As unit test tradition goes, my requirement was zero-configuration, zero preparation, no external dependencies, full isolation, fully reproducible results and fast. Database testing has always been a challenge in this perspective, for example when testing SQL clients in java often HSQLDB is used to to mock the database. Cassandra, however, did not have something ready just yet so I had to build it.

One way to go was to setup a cassandra instance just for unit testing. There are many downsides to this approach, such as it’s not zero-configuration, tests need to cleanup before they execute, if two tests are run at the same time by two developers they can collide and change the results in unexpected way, it’s slow… out of the question, not good.

Enter the embedded cassandra server.

With the help of the community I’ve built an embedded cassandra service ideal for unit testing and perhaps other uses. I’ve also built a cleanup utility that helps wipe out all data before the service starts running so the combination of both provides isolation etc. Now each test process runs an in-process, embedded instance of cassandra.

Below is the source code, already committed to cassandra SCM on trunk. If you want to use it for the current stable release(0.5.0) only a small package rename is required (in trunk some classes moved a bit), and it’s presented at the end of the post.

The embedded service:

package org.apache.cassandra.service;
 
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
 
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.thrift.CassandraDaemon;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
 
/**
 * An embedded, in-memory cassandra storage service that listens
 * on the thrift interface as configured in storage-conf.xml
 * This kind of service is useful when running unit tests of
 * services using cassandra for example.
 *
 
 * This is the implementation of https://issues.apache.org/jira/browse/CASSANDRA-740
 *
 
 * How to use:
 * In the client code create a new thread and spawn it with its {@link Thread#start()} method.
 * Example:
 *
 *      // Tell cassandra where the configuration files are.
        System.setProperty("storage-config", "conf");
 
        cassandra = new EmbeddedCassandraService();
        cassandra.init();
 
        // spawn cassandra in a new thread
        Thread t = new Thread(cassandra);
        t.setDaemon(true);
        t.start();
 
 *
 * @author Ran Tavory (rantav@gmail.com)
 *
 */
public class EmbeddedCassandraService implements Runnable
{
 
    CassandraDaemon cassandraDaemon;
 
    public void init() throws TTransportException, IOException
    {
        cassandraDaemon = new CassandraDaemon();
        cassandraDaemon.init(null);
    }
 
    public void run()
    {
        cassandraDaemon.start();
    }
}

The data cleaner:

package org.apache.cassandra.contrib.utils.service;
 
import java.io.File;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
 
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.io.util.FileUtils;
 
/**
 * A cleanup utility that wipes the cassandra data directories.
 *
 * @author Ran Tavory (rantav@gmail.com)
 *
 */
public class CassandraServiceDataCleaner {
 
    /**
     * Creates all data dir if they don't exist and cleans them
     * @throws IOException
     */
    public void prepare() throws IOException {
        makeDirsIfNotExist();
        cleanupDataDirectories();
    }
 
    /**
     * Deletes all data from cassandra data directories, including the commit log.
     * @throws IOException in case of permissions error etc.
     */
    public void cleanupDataDirectories() throws IOException {
        for (String s: getDataDirs()) {
            cleanDir(s);
        }
    }
    /**
     * Creates the data diurectories, if they didn't exist.
     * @throws IOException if directories cannot be created (permissions etc).
     */
    public void makeDirsIfNotExist() throws IOException {
        for (String s: getDataDirs()) {
            mkdir(s);
        }
    }
 
    /**
     * Collects all data dirs and returns a set of String paths on the file system.
     *
     * @return
     */
    private Set getDataDirs() {
        Set dirs = new HashSet();
        for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
            dirs.add(s);
        }
        dirs.add(DatabaseDescriptor.getLogFileLocation());
        return dirs;
    }
    /**
     * Creates a directory
     *
     * @param dir
     * @throws IOException
     */
    private void mkdir(String dir) throws IOException {
        FileUtils.createDirectory(dir);
    }
 
    /**
     * Removes all directory content from file the system
     *
     * @param dir
     * @throws IOException
     */
    private void cleanDir(String dir) throws IOException {
        File dirFile = new File(dir);
        if (dirFile.exists() && dirFile.isDirectory()) {
            FileUtils.delete(dirFile.listFiles());
        }
    }
}

And an example test that uses both:

package org.apache.cassandra.contrib.utils.service;
 
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
 
import java.io.IOException;
import java.io.UnsupportedEncodingException;
 
import org.apache.cassandra.service.EmbeddedCassandraService;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.InvalidRequestException;
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.TimedOutException;
import org.apache.cassandra.thrift.UnavailableException;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.junit.BeforeClass;
import org.junit.Test;
 
/**
 * Example how to use an embedded and a data cleaner.
 *
 * @author Ran Tavory (rantav@gmail.com)
 *
 */
public class CassandraServiceTest {
 
    private static EmbeddedCassandraService cassandra;
 
    /**
     * Set embedded cassandra up and spawn it in a new thread.
     *
     * @throws TTransportException
     * @throws IOException
     * @throws InterruptedException
     */
    @BeforeClass
    public static void setup() throws TTransportException, IOException,
            InterruptedException {
        // Tell cassandra where the configuration files are.
        // Use the test configuration file.
        System.setProperty("storage-config", "../../test/conf");
 
        CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
        cleaner.prepare();
        cassandra = new EmbeddedCassandraService();
        cassandra.init();
        Thread t = new Thread(cassandra);
        t.setDaemon(true);
        t.start();
    }   
 
    @Test
    public void testInProcessCassandraServer()
            throws UnsupportedEncodingException, InvalidRequestException,
            UnavailableException, TimedOutException, TException,
            NotFoundException {
        Cassandra.Client client = getClient();
 
        String key_user_id = "1";
 
        long timestamp = System.currentTimeMillis();
        ColumnPath cp = new ColumnPath("Standard1");
        cp.setColumn("name".getBytes("utf-8"));
 
        // insert
        client.insert("Keyspace1", key_user_id, cp, "Ran".getBytes("UTF-8"),
                timestamp, ConsistencyLevel.ONE);
 
        // read
        ColumnOrSuperColumn got = client.get("Keyspace1", key_user_id, cp,
                ConsistencyLevel.ONE);
 
        // assert
        assertNotNull("Got a null ColumnOrSuperColumn", got);
        assertEquals("Ran", new String(got.getColumn().getValue(), "utf-8"));
    }
 
    /**
     * Gets a connection to the localhost client
     *
     * @return
     * @throws TTransportException
     */
    private Cassandra.Client getClient() throws TTransportException {
        TTransport tr = new TSocket("localhost", 9170);
        TProtocol proto = new TBinaryProtocol(tr);
        Cassandra.Client client = new Cassandra.Client(proto);
        tr.open();
        return client;
    }
}

To use this source code in v0.5.0 a small package rename is required:
org.apache.cassandra.io.util.FileUtils => org.apache.cassandra.utils.FileUtils
org.apache.thrift.transport.TTransportException => org.apache.transport.TTransportException
org.apache.cassandra.thrift.CassandraDaemon => org.apache.cassandra.CassandraDaemon

One nifty detail: When running multiple tests serially, make sure to spawn each test in a separate JVM (fork mode) since cassandra doesn’t shut down all threads immediately. Running each in separate jvm ensures the previous test dies before the next one begins.

20 Responses to “Running Cassandra as an embedded service”

  1. Sweet! Just what the doctor ordered.

    By drtune on Mar 3, 2010

  2. good stuff!

    By arnon on Mar 16, 2010

  3. Hi, could you suggest how to configure it to start the daemon per test?
    I keep on seeing NullPointerException at
    org.apache.cassandra.db.ColumnFamilyStore.onStart(line:191).

    Good stuff.
    thanks!

    By Patricio on Mar 19, 2010

  4. I actually use it in another project, see here http://github.com/rantav/hector/blob/master/src/test/java/me/prettyprint/cassandra/service/KeyspaceTest.java#L81

    By Ran Tavory on Mar 19, 2010

  5. Hi Ran!

    I was trying to achieve something similar but I went thru a completely different path… I tried to reproduce what CassandraDaemon does but with a in memory TTransport that I implemented. Unfortunately it did not work as I expected (at least up to now). Have you seen anything on the internet about using a different (something like a in memory) TTransport? I was thinking about having Cassandra daemon embedded in the application so in case my application was running in a J2EE cluster, each clustered instance would have its own Cassandra Service builtin.

    regards,
    Rafael Ribeiro

    By Rafael Ribeiro on Apr 1, 2010

  6. There used to be something called cassandra “fat client” which may be what you’re looking for, I’m not sure (google it…)
    But the thing is that it hasn’t been maintained lately and I think I read a few days ago in the mailing list that it’s been taken off the wiki for not being up to date.
    Anyhow, google it and read about it, then your best bet is to email user@cassandra… and ask.
    My motivation for creating the embedded service was unit testing, so I wanted an environment as close as possible to a real instance running. I used it in the client I wrote http://github.com/rantav/hector

    By Ran Tavory on Apr 1, 2010

  7. Hi Ran!

    Errh… I should have googled it before I tried to implement my own in memory TTransport… in fact there is already documentation over Cassandra Wiki explaining how to do this:
    http://wiki.apache.org/cassandra/StorageProxy
    http://wiki.apache.org/cassandra/Embedding

    anyways… tks a lot!

    By Rafael Ribeiro on Apr 1, 2010

  8. Hi Ran,

    This is really a good stuff, and I am looking for such things for unit test. But I am wondering how do you kill the embedded-cassandra server. Seems your code does not do such things.

    By Jeff Zhang on Apr 13, 2010

  9. There’s a limitation in cassandra that it cannot be killed…
    Actually it can, but only by stopping the jvm. I know, it sucks, but that’s what cassandra can provide now…
    For tests, using fork mode bypasses this problem since each individual test runs in its own jvm

    By Ran Tavory on Apr 13, 2010

  10. Is there a solution for the shutdown problem? One that does not require me to kill the JVM?

    By Michael Zehrer on Aug 2, 2010

  11. no, that’s a design limitation in cassandra

    By Ran Tavory on Aug 2, 2010

  12. Thanks for putting up this embedded server, very helpful
    Any ideas why this is a cassandra design limitation ?

    By Kannan on Sep 9, 2010

  13. you mean why it cannot be killed? it’s been discussed at user@cassandra a few times. Basically it’s a design decision and was never considered important enough to rethink it

    By Ran Tavory on Sep 9, 2010

  14. Thanks for the reply. I notice that if I kill the jvm and the memtable is not full yet, its not flushing the in-memory data to disk. Any ideas how to get around this ? Currently I am using something like this:

    for(String t: DatabaseDescriptor.getTables()) {
    Table table = Table.open(t);
    for(ColumnFamilyStore cfs: table.getColumnFamilyStores()) {
    cfs.forceBlockingFlush();
    }
    }

    Do you see any problems with this? Is there a better way to do it ?

    Thanks again

    By Kannan on Sep 10, 2010

  15. @Kannan if memtables aren’t flushed then cassandra will read the commit log when it starts up next time so data isn’t lost.
    Per your code, I’m now too far away from the latest on trunk so it’s best to advice with dev@cassandra

    By Ran Tavory on Sep 10, 2010

  16. 0.7.0 has some differences i’ve found.

    Instead of
    System.setProperty(“storage-config”, “../../test/conf”);
    you’ll need (because of the change to yaml)
    System.setProperty(“cassandra.config”, “../../test/conf/cassandra.yaml”);

    Also if the property value is an absolute filepath it needs to be in URL format, see DatabaseDescriptor.getStorageConfigURL()

    By mck on Nov 12, 2010

4 Trackback(s)

  1. Feb 23, 2010: uberVU - social comments
  2. Apr 27, 2010: links for 2010-04-26 « Daniel Harrison's Personal Blog
  3. May 2, 2010: Understanding Cassandra Code Base | PrettyPrint.me
  4. Jul 30, 2010: Confluence: Team Message Archiver

Sorry, comments for this entry are closed at this time.