Tags

, , , , ,

Apache Cassandra is an open source distributed database management system designed to handle large amounts of data across many commodity servers, providing high availability with no single point of failure. Cassandra offers robust support for clusters spanning multiple datacenters, with asynchronous masterless replication allowing low latency operations for all clients.

Apache Spark is an open-source cluster computing framework originally developed in the AMPLab at UC Berkeley. In contrast to Hadoop’s two-stage disk-based MapReduce paradigm, Spark’s in-memory primitives provide performance up to 100 times faster for certain applications. By allowing user programs to load data into a cluster’s memory and query it repeatedly, Spark is well-suited to machine learning algorithms.

Consider we have a table with name “table_user” with in our Cassandra database with the columns “user_first_name, user_last_name, user_email” then we will create a pojo class as following

public class UserBean implements Serializable {
    private static final long serialVersionUID = 3775871090088504659L;

    private String userFirstName;
    private String userLastName;
    private String userEmail;

    // getters & setters methods
}

To read the data from Cassandra tables using Apache Spark

1. Add Apache Spark & Cassandra dependencies in pom.xml
2. We need to configure SparkConf object with the Cassandra database details
3. By using SparkConf object create JavaSparkContext object
4. Use JavaSparkContext object to read the data from Cassandra table.

Step1: For integrating Apache Spark & Cassandra database we will be using the following dependencies

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.10</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.10</artifactId>
    <version>1.2.1</version>
</dependency>

<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector_2.10</artifactId>
    <version>1.2.1</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.10</artifactId>
    <version>1.2.1</version>
</dependency>

Step2: Configure SparkConf object with the Cassandra database details

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Spark-Cassandra Integration");
sparkConf.setMaster("local[4]");
sparkConf.set("spark.cassandra.connection.host", "127.0.0.1");
sparkConf.set("spark.cassandra.connection.native.port", "9042");
sparkConf.set("spark.cassandra.connection.rpc.port", "9160");
sparkConf.set("spark.cassandra.connection.timeout_ms", "5000");
sparkConf.set("spark.cassandra.read.timeout_ms", "200000");
sparkConf.set("spark.cassandra.auth.username", "test_user");
sparkConf.set("spark.cassandra.auth.password", "test_password");

Step3: Create JavaSparkContext object using SparkConf object

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

Step4: Once we have JavaSparkContext object we can read data from Cassandra table by providing the keyspace and table name using the following code snippet

String keySpaceName = "test_key_space";
String tableName = "table_user";

CassandraJavaRDD<CassandraRow> cassandraRDD = CassandraJavaUtil.javaFunctions(javaSparkContext).cassandraTable(keySpaceName, tableName);

JavaRDD<UserBean> userRDD = cassandraRDD.map(new Function<CassandraRow, UserBean>() {

    private static final long serialVersionUID = -165799649937652815L;

    @Override
    public UserBean call(CassandraRow row) throws Exception {
        UserBean userBean = new UserBean();
        userBean.setUserFirstName(row.getString("user_first_name"));
        userBean.setUserLastName(row.getString("user_last_name"));
        userBean.setUserEmail(row.getString("user_email"));
        return userBean;
    }
});

Now userRDD will have all the records from the table in the form of Spark RDD and we can perform any aggregate or spark operation on top of this filter

References:
https://en.wikipedia.org/wiki/Apache_Cassandra
https://en.wikipedia.org/wiki/Apache_Spark

Advertisements