As part of k8ssandra-operator 1.2, we released a turnkey change data capture (CDC) solution for Apache Cassandra® . For the first time, enterprises can now access the power of their K8ssandra data from the Apache Pulsar™ streaming platform, allowing integrations into downstream data stores such as Spark, Flink, ElasticSearch, Snowflake, and relational databases in real time.

What does this enable?

CDC is a powerful enterprise integration pattern in which mutations to a data store (in this case Cassandra) are fed forward in real time to downstream consumers. Data stores such as Cassandra offer powerful capabilities to persist high volume, high velocity data, but no single system can fulfil every enterprise need.

Where Cassandra’s strengths lie in the transactional fast lane, other systems specialise in interactive analytics/OLAP, search, and real time machine learning. CDC allows data to be immediately persisted in Cassandra before flowing downstream to other specialised systems, offering a robust and scalable mechanism to fan out data throughout the enterprise.

The beauty of CDC is that these fan-out capabilities require no application-side changes, massively accelerating time to value for new data infrastructure. 

Without CDC, enterprises are left trying to juggle bilateral application-destination connections. This entails managing the consistency and write acknowledgements from a heterogenous group of destination systems from inside many source applications. Put it on a plate and you could call it delicious spaghetti; in a distributed system it is just a recipe for trouble.

With CDC we can materialise data into Cassandra in the first instance, knowing that we have replication and low latency. But we can also be confident that the data will replicate into Pulsar; so we are never locked into only Cassandra’s capabilities, and can extend our data platforms at any time.

How do I turn it on?

This blog post will focus on using CDC from within cass-operator. A later post will talk about doing so in multi-cluster contexts using k8ssandra-operator.

The steps to enable CDC are simple. You need to:

  1. Have a working Kubernetes cluster (KinD, Minikube, or Kubernetes on EKS, GKE, AWS etc.)
  2. Install cass-operator.
  3. Install Pulsar.
  4. Deploy a CassandraDatacenter with CDC enabled.
  5. Configure the tables on which you’d like CDC enabled, including:
    1. Enable CDC in the Cassandra schema.
    2. Create a Pulsar source connector for the table.
  6. Write data to the table.

We have an automated test demonstrating these steps here (for the Kubernetes side) and here (for the client side) but we’ve included a step-by-step with explanations below.

First, pull our demo repo from here.

Getting a Kubernetes cluster running

For the purposes of this demonstration, we’ll skip over this step. Do note that if you are running a single KinD node, or doing local testing we recommend using only a single Cassandra node and dropping the replication factor on the keyspace to RF=1, to avoid overwhelming your hardware.

Install cass-operator

Cert manager needs to be installed as it is a dependency of cass-operator. Both can be installed by running the following commands:

until kustomize build ./cass-operator | kubectl apply --server-side --force-conflicts -f -; do sleep 5; done

Why do we need the until? Because cert-manager’s webhook sometimes takes a little time to come up, so even though we’re checking for CRD readiness using kubectl wait, we like to have a little more protection.

Install Pulsar

We have tried to take a non-opinionated approach as to how users should install Pulsar. We are not a Pulsar focused project and are therefore supportive of a BYO model. 

That said, our existing tests use the DataStax Luna Helm chart to install the Luna Pulsar distribution, and we can vouch for the compatibility of the CDC connector when used with this distribution.

To install it, run:

helm repo add datastax-pulsar https://datastax.github.io/pulsar-helm-chart
helm repo update
helm install pulsar --create-namespace --namespace pulsar -f pulsar-values.yaml --version 3.0.0 datastax-pulsar/pulsar

It is important to note the custom values file we’re providing here, which is required to enable the Pulsar functions workers which hosts the sources.

Install a CassandraDatacenter

There are a plethora of options within the CassandraDatacenter, but a very minimal configuration (including deploying the CDC agent) might look like this:

kubectl apply --server-side --force-conflicts -f cassDC.yaml

Note that you may wish to modify your pulsarServiceUrl or your topicPrefix according to how you are running Pulsar, the above configuration will work if you are using the Luna Helm Chart.

Configure the tables

There are a few options here. You can either configure the tables and source connectors programmatically (see here for an example) or you can do so on the command line. We’ll produce the command line versions here for convenience.

Cassandra tables

If you created the CassandraDatacenter as described above, you can dial up a shell by running the below command:

kubectl exec -it -n cass-operator test-cluster-dc1-default-sts-0 -- bash

You can then run the below command to enter a cqlsh shell:

cqlsh test-cluster-dc1-all-pods-service.cass-operator.svc.cluster.local

Before creating tables and keyspaces like so:

CREATE KEYSPACE db1 WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'dc1':'3'};
CREATE TABLE IF NOT EXISTS db1.table1 (key text PRIMARY KEY, c1 text) WITH cdc=true;

Note the WITH cdc=true;is required to enable CDC on the table. Also note that you should leave you cqlsh window open, we’ll come back to it in a moment.

Pulsar source connector

Whenever we want to create a CDC feed from a Cassandra table we need to create a Pulsar source connector on the Pulsar side. Again, this can be done programmatically via the Java admin client as we show here. However, these methods are not very well documented so for convenience we will use the shell.

First, dial into the Pulsar bastion host (if you deployed Pulsar using the Luna Helm Charts).

PULSAR_POD=$(kubectl get pods -n pulsar -l=app=pulsar -l=component=bastion -o=jsonpath='{.items[0].metadata.name}')

kubectl exec -it -n pulsar $PULSAR_POD -- bash

Now you can run the pulsar-admin command to create the Pulsar source connector:

./bin/pulsar-admin source create \
    --source-type cassandra-source \
    --tenant public \
    --namespace default \
    --name cassandra-source-db1-table1 \
    --destination-topic-name data-db1.table1 \
    --source-config  '{
    "events.topic": "persistent://public/default/events-db1.table1",
    "keyspace": "db1",
    "table": "table1",
    "contactPoints": "test-cluster-dc1-all-pods-service.cass-operator.svc.cluster.local",
    "port": 9042,
    "loadBalancing.localDc": "dc1",
    "auth.provider": "None"
    }';

Read data back off Pulsar

To confirm that the events have arrived in the topic as expected you can run the following from your Pulsar bastion terminal:

./bin/pulsar-client consume persistent://public/default/data-db1.table1 --schema-type auto_consume --subscription-position Earliest --subscription-name mysubs --num-messages 0

Leave this terminal open (ideally next to your cqlsh terminal) so we can watch the data arrive.

Add data to Cassandra

In your cqlsh window, run the following commands to add some data, including some schema mutations so we can confirm that this doesn’t break anything:

INSERT INTO db1.table1 (key,c1) VALUES ('0','bob1');

INSERT INTO db1.table1 (key,c1) VALUES ('0','bob2'); INSERT INTO db1.table1 (key,c1) VALUES ('1','bob2');

DELETE FROM db1.table1 WHERE key='1';

ALTER TABLE db1.table1 ADD c2 int;

CREATE TYPE db1.t1 (a text, b text);

ALTER TABLE db1.table1 ADD c3 t1;

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('3','bob', 1, {a:'a', b:'b'});

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('4','bob', 1, {a:'a', b:'b'});

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('5','bob', 1, {a:'a', b:'b'});

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('6','bob', 1, {a:'a', b:'b'});

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('7','bob', 1, {a:'a', b:'b'});

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('8','bob', 1, {a:'a', b:'b'});

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('9','bob', 1, {a:'a', b:'b'});

INSERT INTO db1.table1 (key,c1, c2, c3) VALUES ('10','bob', 1, {a:'a', b:'b'});

You should see data appearing in the Pulsar window as you run these commands.

What did we do to implement this?

We had to tie up a variety of systems into a single, automated API. Let’s go through these one by one.

Apache Cassandra

Apache Cassandra needs little introduction as the premiere wide-column distributed data store. Cassandra is well known for replicating data globally for performance and resilience.

Apache Pulsar

Apache Pulsar is a distributed, real-time pub/sub messaging system which can offer unbounded integration possibilities within the enterprise. By acting as a uniform data substrate it can string together the massive volumes of data generated by microservices in IoT, Telco, banking/payments and customer facing systems, and channel it to where it creates value.

Data lakes are great, but streaming solutions keep the rivers of data in your organisation moving, preventing them turning into a data swamp.

The K8ssandra ecosystem

The K8ssandra ecosystem consists of several interrelated projects that allow Cassandra to run seamlessly on Kubernetes in a truly cloud native fashion. It offers RESTful APIs via Management API for Cassandra (MAAC), observability via Metrics Collector for Cassandra (MCAC), Kubernetes orchestration via cass-operator, and automated multi-DC operations via k8ssandra-operator.

The CDC components

DataStax recently released a CDC suite for Apache Cassandra. This consisted of two components which we integrated into cass-operator and management API – our turnkey solution for deploying Cassandra on Kubernetes.

The CDC Java agent

The CDC Java agent runs in the same JVM as Cassandra and monitors for commitlog rollovers. When the commitlog rolls over, the agent ships its mutations over the CDC connector running as a Pulsar Source (which is implemented as a Pulsar function).

The CDC Pulsar Connector

The Pulsar connector runs inside a Pulsar functions worker. A rough sketch of how to configure a Pulsar source connector can be found here. The connector is responsible for serialising the stream of mutations into a standard envelope (in this example Avro) before feeding them to a Pulsar topic. 

How did we do it?

Integrating Pulsar, Cassandra and Kubernetes isn’t a trivial task, which was one reason we wanted to ensure that users could benefit from a packaged solution. 

The K8ssandra ecosystem has been geared to enable the seamless operation of Cassandra on Kubernetes – orchestrating storage, network, backup and restore, and repair needs across multiple datacenters.

Building an integration to the DataStax CDC agent within this ecosystem made sense as a way to automate a complex rollout while retaining flexibility to tailor data flows to each enterprise’s specific needs. For example, we needed to ensure that CDC feeds for each DC remained separately routable, since each Pulsar cluster should only take a single feed (and each DC may have its own Pulsar destination!).

To make this implementation fly we had to implement the following changes:

  1. Update the Cassandra images we use (which also package the Management API for Apache Cassandra) to include the CDC agent.
  2. Update cass-operator to reconcile the CDC settings from the CassandraDatacenter custom resource (CR) and get them into Cassandra’s start-up sequence. This required modifications to:
    1. The CassandraDatacenter CRs (which are mostly self-documenting, and worth a review for users who may be considering using this functionality.
    2. The logic that cass-operator uses to manipulate the Cassandra configuration files. These are passed down the the spec.config field within the CassandraDatacenter, and changes were required to JVM start-up arguments and the start order of the three agents that we ship.
    3. The K8ssandraCluster CRs within k8ssandra-operator, to enable multi-DC automation across many Kubernetes clusters.

Closing remarks

CDC is a powerful tool in any architect’s toolkit. It offers a robust, real time integration path from the backend – without changes to the source application.

Our recent release of CDC integration for K8ssandra offers enterprises a turnkey solution to automate Cassandra CDC deployments in Kubernetes environments. It retains the resilience and performance of Cassandra and the orchestration powers of Kubernetes, but for the first time opens up new possibilities for how that data can be integrated and processed downstream via Apache Pulsar.

Follow the DataStax Tech Blog for more developer stories. Check out our YouTube channel for tutorials and DataStax Developers on Twitter for the latest news about our developer community.

Resources:

  1. K8ssandra
  2. Apache Cassandra®
  3. Apache Pulsar™
  4. Kubernetes
  5. Change Data Capture (CDC)
  6. Cass-operator
  7. k8ssandra-operator
  8. GitHub: Example configurations for k8ssandra CDC integration
  9. GitHub: K8ssandra / Cass-operator
  10. GitHub: K8ssandra / k8ssandra-testutils
  11. DataStax Luna Helm chart
  12. Java admin client