The Connect API in Kafka is part of the Confluent Platform, providing a set of connectors and a standard interface with which to ingest data to Apache Kafka, and store or process it the other end. Initially launched with a JDBC source and HDFS sink, the list of connectors has grown to include a dozen certified connectors, and twice as many again ‘community’ connectors. These cover technologies such as MongoDB, InfluxDB, Kudu, MySQL – and of course as with any streaming technology, twitter, the de-facto source for any streaming how-to. Two connectors of note that were recently released are for Oracle GoldenGate as a source, and Elasticsearch as a sink. In this article I’m going to walk through how to set these up, and demonstrate how the flexibility and power of the Kafka Connect platform can enable rapid changes and evolutions to the data pipeline.

Oracle GoldenGate Kafka Connect

The above diagram shows an overview of what we’re building. Change Data Capture (CDC) on the database streams every single change made to the data over to Kafka, from where it is streamed into Elasticsearch. Once in Elasticsearch it can be viewed in tools search as Kibana, for search and analytics:

Oracle GoldenGate Elastic Search

 

Oracle GoldenGate Kibana

Oracle GoldenGate (OGG) is a realtime data replication tool, falling under the broad umbrella of Change Data Capture (CDC) software, albeit at the high end in terms of functionality. It supports multiple RDBMS platforms, including – obviously – Oracle, as well as DB2, MySQL, and SQL Server. You can find the full certification list here. It uses log-based technology to stream all changes to a database from source, to target – which may be another database of the same type, or a different one. It is commonly used for data integration, as well as replication of data for availability purposes.

In the context of Kafka, Oracle GoldenGate provides a way of streaming all changes made to a table, or set of tables, and making them available to other processes in our data pipeline. These processes could include microservices relying on an up-to-date feed of data from a particular table, as well as persisting a replica copy of the data from the source system into a common datastore for analysis alongside data from other systems.

Elasticsearch is an open-source distributed document store, used heavily for both search, and analytics. It comes with some great tools including Kibana for data discovery and analysis, as well as a Graph tool. Whilst Elasticsearch is capable of being a primary data store in its own right, it is also commonly used as a secondary store in ordier to take advantage of its rapid search and analytics capabilities. It is the latter use-case that we’re interested in here – using Elasticsearch to store a copy of data produced in Oracle.

Confluent’s Elasticsearch Connector is a source-available connector plug-in for the Connect API in Kafka that sends data from Kafka to Elasticsearch. It is highly efficient, utilising Elasticsearch’s bulk API. It also supports all Elasticsearch’s data types which it automatically infers, and evolves the Elasticsearch mappings from the schema stored in Kafka records.

Oracle GoldenGate can be used with Kafka to directly stream every single change made to your database. Everything that happens in the database gets recorded in the transaction log (OGG or not), and OGG takes that and sends it to Kafka. In this blog we’re using Oracle as the source database, but don’t forget that Oracle GoldenGate supports many sources. To use Oracle GoldenGate with Kafka, we use the “Oracle GoldenGate for Big Data” version (which has different binaries). Oracle GoldenGate has a significant advantage over the JDBC Source Connector for the Connect API in that it is a ‘push’ rather than periodic ‘pull’ from the source, thus it :

  1. Has much lower latency
  2. Requires less resource on the source database, since OGG mines the transaction log instead of directly querying the database for changes made based on a timestamp or key.
  3. Scales better, since entire schemas or whole databases can be replicated with minimal configuration changes. The JDBC connector requires each table, or SQL statement, to be specified.

Note that Oracle Golden Gate for Big Data also has its own native Kafka Handler, which can produce data in various formats directly to Kafka (rather than integrating with the Kafka Connect framework).

Environment

I’m using the Oracle BigDataLite VM 4.5 as the base machine for this. It includes Oracle 12c, Oracle GoldenGate for Big Data, as well as a CDH installation which provides HDFS and Hive for us to also integrate with later on.

On to the VM you need to also install:

To generate the schema and continuous workload, I used Swingbench 2.5.

For a step-by-step guide on how to set up these additional components, see this gist.

Starting Confluent Platform

There are three processes that need starting up, and each retains control of the session, so you’ll want to use screen/tmux here, or wrap the commands in nohup [.. command ..] & so that they don’t die when you close the window.

On BigDataLite the Zookeeper service is already installed, and should have started at server boot:

[oracle@bigdatalite ~]$ sudo service zookeeper-server status
zookeeper-server is running

If it isn’t running, then start it with sudo service zookeeper-server start.

Next start up Kafka:

# On BigDataLite I had to remove this folder for Kafka to start
sudo rm -r /var/lib/kafka/.oracle_jre_usage
sudo /usr/bin/kafka-server-start /etc/kafka/server.properties

and finally the Schema Registry:

sudo /usr/bin/schema-registry-start /etc/schema-registry/schema-registry.properties

Note that on BigDataLite the Oracle TNS Listener is using port 8081 – the default for the Schema Registry – so I amended /etc/schema-registry/schema-registry.properties to change

listeners=http://0.0.0.0:8081

to

listeners=http://0.0.0.0:18081

Configuring Oracle GoldenGate to send transactions to the Connect API in Kafka

Oracle GoldenGate (OGG) works on the concept of an Extract process which reads the source-specific transaction log and writes an OGG trail file in a generic OGG format. From this a Replicat process reads the trail file and delivers the transactions to the target.

In this example we’ll be running the Extract against Oracle database, specifically, the SOE schema that Swingbench generated for us – and which we’ll be able to generate live transactions against using Swingbench later on.

The Replicat will be sending the transactions from the trail file over to Kafka Connect.

I’m assuming here that you’ve already successfully defined and set running an extract against the Swingbench schema (SOE), with a trail file being delivered to /u01/ogg-bd/dirdat. For a step-by-step guide on how to do this all from scratch, see here.

You can find information about the OGG-Kafka Connect adapter in the README available as part of the download.

To use it, first configure the replicat and supporting files as shown.

  1. Replicat parametersCreate /u01/ogg-bd/dirprm/rconf.prm with the following contents:
    REPLICAT rconf
    TARGETDB LIBFILE libggjava.so SET property=dirprm/conf.props
    REPORTCOUNT EVERY 1 MINUTES, RATE
    GROUPTRANSOPS 1000
    MAP *.*.*, TARGET *.*.*;
    
  2. Handler configurationEdit the existing /u01/ogg-bd/dirprm/conf.props and amend gg.classpath as shown below. The classpath shown works for BigDataLite – on your own environment you need to make the necessary jar files available per the dependencies listed in the README (available as part of the download).
    gg.handlerlist=confluent