Change Data Capture (CDC) involves observing the changes happening in a database and making them available in a form that can be exploited by other systems.

更改数据捕获 (CDC)涉及观察数据库中发生的更改,并将其以可被其他系统利用的形式提供。

One of the most interesting use-cases is to make them available as a stream of events. This means you can, for example, catch the events and update a search index as the data are written to the database.

最有趣的用例之一是使它们作为事件流可用。 这意味着,例如,您可以捕获事件并在将数据写入数据库时​​更新搜索索引。

Interesting right? Let's see how to implement a CDC system that can observe the changes made to a NoSQL database (MongoDB), stream them through a message broker (Kafka), process the messages of the stream (Kafka Streams), and update a search index (Elasticsearch)!🚀

有趣吧? 让我们看一下如何实现一个CDC系统,该系统可以观察对NoSQL数据库( MongoDB )所做的更改,通过消息代理( Kafka )进行流传输,处理流中的消息( Kafka Streams ),以及更新搜索索引( Elasticsearch)。 )!🚀


The full code of the project is available on GitHub in this . If you want to skip all my jibber jabber and just run the example, go straight to the How to run the project section near the end of the article!😁

该项目的完整代码可在此 GitHub上找到。 如果您想跳过我所有的短剑而只运行示例,请直接转到文章末尾的“ 如何运行项目”部分!😁

用例和基础架构 (Use case & infrastructure)

We run a web application that stores photos uploaded by users. People can share their shots, let others download them, create albums, and so on. Users can also provide a description of their photos, as well as Exif metadata and other useful information.

我们运行一个Web应用程序来存储用户上传的照片。 人们可以共享他们的镜头,让其他人下载它们,创建相册等等。 用户还可以提供其照片的描述,以及Exif元数据和其他有用信息。

We want to store such information and use it to improve our search engine. We will focus on this part of our system that is depicted in the following diagram.

我们想存储这些信息,并用它来改善我们的搜索引擎。 我们将专注于下图所示的系统的这一部分。

The information is provided in JSON format. Since I like to post my shots on , and the website provides free access to its API, I used their model for the photo JSON document.

该信息以JSON格式提供。 由于我喜欢将照片发布在 ,并且该网站可以免费访问其API,因此我将其模型用于照片JSON文档。

Once the JSON is sent through a POST request to our server, we store the document inside a MongoDB database. We will also store it in Elasticsearch for indexing and quick search.

通过POST请求将JSON发送到我们的服务器后,我们将文档存储在MongoDB数据库中。 我们还将其存储在Elasticsearch中以进行索引和快速搜索。

However, we love long exposure shots, and we would like to store in a separate index a subset of information regarding this kind of photo. It can be the exposure time, as well as the location (latitude and longitude) where the photo has been taken. In this way, we can create a map of locations where photographers usually take long exposure photos.

但是,我们喜欢长时间曝光的照片 ,因此我们希望将有关此类照片的信息子集存储在单独的索引中。 它可以是曝光时间,也可以是拍摄照片的位置(经度和纬度)。 通过这种方式,我们可以创建摄影师通常拍摄长时间曝光照片的位置地图。

Here comes the interesting part: instead of explicitly calling Elasticsearch in our code once the photo info is stored in MongoDB, we can implement a CDC exploiting Kafka and Kafka Streams.

这里是有趣的部分:一旦照片信息存储在MongoDB中,我们无需在代码中显式调用Elasticsearch,而是可以利用Kafka和Kafka Streams来实现CDC

We listen to modifications to MongoDB oplog using the interface provided by MongoDB itself. When the photo is stored we send it to a photo Kafka topic. Using Kafka Connect, an Elasticsearch sink is configured to save everything sent to that topic to a specific index. In this way, we can index all photos stored in MongoDB automatically.

我们使用MongoDB本身提供的接口收听对MongoDB oplog的修改。 存储照片后,我们将其发送给photo Kafka主题。 使用Kafka Connect ,将Elasticsearch接收器配置为将发送到该主题的所有内容保存到特定索引。 这样,我们可以自动索引存储在MongoDB中的所有照片。

We need to take care of the long exposure photos too. It requires some processing of the information to extract what we need. For this reason, we use Kafka Streams to create a processing topology to:

我们也需要注意长时间曝光的照片。 它需要对信息进行一些处理以提取我们需要的东西。 因此,我们使用Kafka Streams创建处理拓扑以:

  1. Read from the photo topic


  2. Extract Exif and location information

  3. Filter long exposure photos (exposure time > 1 sec.)

    过滤长时间曝光的照片(曝光时间> 1秒)。
  4. Write to a long-exposure topic.


Then another Elasticsearch sink will read data from the long-exposure topic and write it to a specific index in Elasticsearch.


It is quite simple, but it's enough to have fun with CDC and Kafka Streams! 😁

这很简单,但足以让CDC和Kafka Streams玩得开心! 😁

服务器实施 (Server implementation)

Let's have a look at what we need to implement: our server exposing the REST APIs!

让我们看看我们需要实现什么:我们的服务器公开了REST API

模型和DAO (Models and DAO)

First things first, we need a model of our data and a Data Access Object (DAO) to talk to our MongoDB database.

首先,我们需要一个数据模型和一个数据访问对象 (DAO)以与我们的MongoDB数据库对话。

As I said, the model for the photo JSON information is the one used by Unsplash. Check out the free API for an example of the JSON we will use.

如我所说,照片JSON信息的模型是Unsplash使用的模型。 查看免费的API 获取我们将使用的JSON示例。

I created the mapping for the serializaion/deserialization of the photo JSON using . I'll skip the details about this, if you are curious just look at the !

我使用为照片JSON的序列化/反序列化创建了映射。 如果您感到好奇,我将跳过有关详细信息,请查看 !

Let's focus on the model for the long exposusure photo.


case class LongExposurePhoto(id: String, exposureTime: Float, createdAt: Date, location: Location)object LongExposurePhotoJsonProtocol extends DefaultJsonProtocol {  implicit val longExposurePhotoFormat:RootJsonFormat[LongExposurePhoto] = jsonFormat(LongExposurePhoto, "id", "exposure_time", "created_at", "location")}

This is quite simple: we keep from the photo JSON the information about the id, the exposure time (exposureTime), when the photo has been created (createdAt), and the location where it has been taken. The location comprehends the city, the country, and the position composed of latitude and longitude.

这是很简单:我们不断从照片JSON有关的信息id ,曝光时间( exposureTime ),当照片已创建( createdAt ),以及location的地方已采取。 该location领会的city ,在countryposition组成的latitudelongitude

The DAO consists of just the PhotoDao.scala class.


class PhotoDao(database: MongoDatabase, photoCollection: String) {  val collection: MongoCollection[Document] = database.getCollection(photoCollection)  def createPhoto(photo: Photo): Future[String] = {    val doc = Document(photo.toJson.toString())    doc.put("_id", photo.id)    collection.insertOne(doc).toFuture()      .map(_ => photo.id)  }}

Since I want to keep this example minimal and focused on the CDC implementation, the DAO has just one method to create a new photo document in MongoDB.


It is straightforward: create a document from the photo JSON, and insert it in mongo using id as the one of the photo itself. Then, we can return the id of the photo just inserted in a Future (the MongoDB API is async).

这很简单:根据照片JSON创建文档,然后使用id作为照片本身之一将其插入mongo中。 然后,我们可以返回刚插入Future中的照片的id (MongoDB API是异步的)。

卡夫卡制片人 (Kafka Producer)

Once the photo is stored inside MongoDB, we have to send it to the photo Kafka topic. This means we need a producer to write the message in its topic. The PhotoProducer.scala class looks like this.

将照片存储在MongoDB中后,我们必须将其发送到photo Kafka主题。 这意味着我们需要生产者在其主题中编写消息。 PhotoProducer.scala类看起来像这样。

case class PhotoProducer(props: Properties, topic: String) {  createKafkaTopic(props, topic)  val photoProducer = new KafkaProducer[String, String](props)  def sendPhoto(photo: Photo): Future[RecordMetadata] = {    val record = new ProducerRecord[String, String](topic, photo.id, photo.toJson.compactPrint)    photoProducer.send(record)  }  def closePhotoProducer(): Unit = photoProducer.close()}

I would say that this is pretty self-explanatory. The most interesting part is probably the createKafkaTopic method that is implemented in the utils package.

我会说这很不言自明。 最有趣的部分可能是在utils包中实现的createKafkaTopic方法。

def createKafkaTopic(props: Properties, topic: String): Unit = {    val adminClient = AdminClient.create(props)    val photoTopic = new NewTopic(topic, 1, 1)    adminClient.createTopics(List(photoTopic).asJava)  }

This method creates the topic in Kafka setting 1 as a partition and replication factor (it is enough for this example). It is not required, but creating the topic in advance lets Kafka balance partitions, select leaders, and so on. This will be useful to get our stream topology ready to process as we start our server.

此方法在Kafka设置1中创建主题作为分区和复制因子(对于此示例来说就足够了)。 它不是必需的,但是预先创建主题可使Kafka平衡分区,选择领导者等。 这将有助于在启动服务器时准备好处理流拓扑。

事件监听器 (Event Listener)

We have the DAO that writes in MongoDB and the producer that sends the message in Kafka. We need to glue them together in some way so that when the document is stored in MongoDB the message is sent to the photo topic. This is the purpose of the PhotoListener.scala class.

我们有在MongoDB中编写的DAO和在Kafka中发送消息的生产者。 我们需要以某种方式将它们粘合在一起,以便将文档存储在MongoDB中时,消息会发送到photo主题。 这是PhotoListener.scala类的目的。

case class PhotoListener(collection: MongoCollection[Document], producer: PhotoProducer) {  val cursor: ChangeStreamObservable[Document] = collection.watch()  cursor.subscribe(new Observer[ChangeStreamDocument[Document]] {    override def onNext(result: ChangeStreamDocument[Document]): Unit = {      result.getOperationType match {        case OperationType.INSERT => {          val photo = result.getFullDocument.toJson().parseJson.convertTo[Photo]          producer.sendPhoto(photo).get()          println(s"Sent photo with Id ${photo.id}")        }        case _ => println(s"Operation ${result.getOperationType} not supported")      }    }    override def onError(e: Throwable): Unit = println(s"onError: $e")    override def onComplete(): Unit = println("onComplete")})}

We exploit the provided by the MongoDB scala library.

我们利用MongoDB scala库提供的 。

Here is how it works: we watch() the collection where photos are stored. When there is a new event (onNext) we run our logic.

它是这样工作的:我们watch()存储照片的集合。 当有一个新事件( onNext )时,我们运行我们的逻辑。

For this example we are interested only in the creation of new documents, so we explicitly check that the operation is of type OperationType.INSERT. If the operation is the one we are interested in, we get the document and convert it to a Photo object to be sent by our producer.

对于此示例,我们仅对创建新文档感兴趣,因此,我们明确检查该操作的类型是否为OperationType.INSERT 。 如果该操作是我们感兴趣的操作,我们将获取文档并将其转换为由生产者发送的Photo对象。

That's it! With few lines of code we connected the creation of documents in MongoDB to a stream of events in Kafka.🎉

而已! 只需几行代码,我们就可以将MongoDB中的文档创建与Kafka中的事件流连接起来。

As a side note, be aware that to use the Change Streams interface we have to setup a MongoDB replica set. This means we need to run 3 instances of MongoDB and configure them to act as a replica set using the following command in mongo client:

另外,请注意,要使用Change Streams接口, 我们必须设置MongoDB副本集 。 这意味着我们需要运行3个MongoDB实例,并在mongo client中使用以下命令将它们配置为副本集:

rs.initiate({_id : "r0", members: [{ _id : 0, host : "mongo1:27017", priority : 1 },{ _id : 1, host :"mongo2:27017", priority : 0 },{ _id : 2, host : "mongo3:27017", priority : 0, arbiterOnly: true }]})

Here our instances are the containers we will run in the docker-compose file, that is mongo1, mongo2, and mongo3.

这里的实例是我们将在mongo1 -compose文件中运行的容器,即mongo1mongo2mongo3

处理拓扑 (Processing Topology)

Time to build our processing topology! It will be in charge of the creation of the long-exposure index in Elasticsearch. The topology is described by the following diagram:

是时候建立我们的处理拓扑了! 它将负责在Elasticsearch中创建long-exposure索引。 下图描述了拓扑:

and it is implemented in the LongExposureTopology.scala object class. Let's analyse every step of our processing topology.

它在LongExposureTopology.scala对象类中实现。 让我们分析处理拓扑的每个步骤。

val stringSerde = new StringSerdeval streamsBuilder = new StreamsBuilder()val photoSource: KStream[String, String] = streamsBuilder.stream(sourceTopic, Consumed.`with`(stringSerde, stringSerde))

The first step is to read from a source topic. We start a stream from the sourceTopic (that is photo topic) using the StreamsBuilder() object. The stringSerde object is used to serialise and deserialise the content of the topic as a String.

第一步是阅读源主题。 我们使用StreamsBuilder()对象从sourceTopic (即photo主题)开始一个流。 stringSerde对象用于将主题的内容序列化和反序列化为String

Please notice that at each step of the processing we create a new stream of data with a KStream object. When creating the stream, we specify the key and the value produced by the stream. In our topology the key will always be a String. In this step the value produced is still a String.

请注意,在处理的每个步骤中,我们都会使用KStream对象创建一个新的数据流。 创建流时,我们指定流所生成的键和值。 在我们的拓扑中,键将始终是String 。 在此步骤中,生成的值仍然是String

val covertToPhotoObject: KStream[String, Photo] =      photoSource.mapValues((_, jsonString) => {        val photo = jsonString.parseJson.convertTo[Photo]        println(s"Processing photo ${photo.id}")        photo      })

The next step is to convert the value extracted from the photo topic into a proper Photo object.


So we start from the photoSource stream and work on the values using the mapValues function. We simply parse the value as a JSON and create the Photo object that will be sent in the convertToPhotoObject stream.

因此,我们从photoSource流开始,然后使用mapValues函数处理这些值。 我们只需将值解析为JSON并创建将在convertToPhotoObject流中发送的Photo对象。

val filterWithLocation: KStream[String, Photo] = covertToPhotoObject.filter((_, photo) => photo.location.exists(_.position.isDefined))

There is no guarantee that the photo we are processing will have the info about the location, but we want it in our long exposure object. This step of the topology filters out from the covertToPhotoObject stream the photos that have no info about the location, and creates the filterWithLocation stream.

不能保证我们正在处理的照片会包含有关位置的信息,但是我们希望在我们的长时间曝光对象中使用它。 拓扑的此步骤从covertToPhotoObject流中筛选出没有位置信息的照片,并创建filterWithLocation流。

val filterWithExposureTime: KStream[String, Photo] = filterWithLocation.filter((_, photo) => photo.exif.exists(_.exposureTime.isDefined))

Another important fact for our processing is the exposure time of the photo. For this reason, we filter out from the filterWithLocation stream the photos without exposure time info, creating the filterWithExposureTime.

我们处理的另一个重要事实是照片的曝光时间。 因此,我们从filterWithLocation流中筛选出没有曝光时间信息的照片,从而创建了filterWithExposureTime

val dataExtractor: KStream[String, LongExposurePhoto] =      filterWithExposureTime.mapValues((_, photo) => LongExposurePhoto(photo.id, parseExposureTime(photo.exif.get.exposureTime.get), photo.createdAt, photo.location.get))

We now have all we need to create a LongExposurePhoto object! That is the result of the dataExtractor: it takes the Photo coming from the filterWithExposureTime stream and produces a new stream containing LongExposurePhoto.

现在,我们拥有创建LongExposurePhoto对象所需的LongExposurePhoto ! 那是dataExtractor的结果:它获取来自filterWithExposureTime流的Photo并产生一个包含LongExposurePhoto的新流。

val longExposureFilter: KStream[String, String] =      dataExtractor.filter((_, item) => item.exposureTime > 1.0).mapValues((_, longExposurePhoto) => {        val jsonString = longExposurePhoto.toJson.compactPrint        println(s"completed processing: $jsonString")        jsonString      })

We are almost there. We now have to keep the photos with a long exposure time (that we decided is more then 1 sec.). So we create a new longExposureFilter stream without the photos that are not long exposure.

我们就快到了。 现在,我们必须将照片保持较长的曝光时间(我们决定将其设置为1秒以上)。 因此,我们创建了一个新的longExposureFilter流,其中没有长时间曝光的照片。

This time we also serialise the LongExposurePhotos into the corresponding JSON string, which will be written to Elasticsearch in the next step.


longExposureFilter.to(sinkTopic, Produced.`with`(stringSerde, stringSerde))streamsBuilder.build()

This is the last step of our topology. We write to our sinkTopic (that is long-exposure topic) using the string serialiser/deserialiser what is inside the longExposureFilter stream. The last command simply builds the topology we just created.

这是我们拓扑的最后一步。 我们使用字符串serialiser / longExposureFilter流中的内容to sinkTopic (即long-exposure主题)。 最后一个命令只是build我们刚刚创建的拓扑。

Now that we have our topology, we can use it in our server. The PhotoStreamProcessor.scala class is what manages the processing.

现在我们有了拓扑,我们可以在服务器中使用它了。 PhotoStreamProcessor.scala类是用于管理处理的类。

class PhotoStreamProcessor(kafkaProps: Properties, streamProps: Properties, sourceTopic: String, sinkTopic: String) {  createKafkaTopic(kafkaProps, sinkTopic)  val topology: Topology = LongExposureTopology.build(sourceTopic, sinkTopic)  val streams: KafkaStreams = new KafkaStreams(topology, streamProps)  sys.ShutdownHookThread {    streams.close(java.time.Duration.ofSeconds(10))  }  def start(): Unit = new Thread {    override def run(): Unit = {      streams.cleanUp()      streams.start()      println("Started long exposure processor")    }  }.start()}

First we create the sinkTopic, using the same utility method we saw before. Then we build the stream topology and initialize a KafkaStreams object with that topology.

首先,我们使用之前看到的相同的实用程序方法来创建sinkTopic 。 然后,我们构建流拓扑并使用该拓扑初始化KafkaStreams对象。

To start the stream processing, we need to create a dedicated Thread that will run the streaming while the server is alive. According to the official documentation, it is always a good idea to cleanUp() the stream before starting it.

要开始流处理,我们需要创建一个专用Thread ,该Thread将在服务器处于活动状态时运行流。 根据官方文档,在启动流之前先cleanUp()它总是一个好主意。

Our PhotoStreamProcessor is ready to go!🎉



The server exposes REST APIs to send it the photo information to store. We make use of for the API implementation.

服务器公开REST API来向其发送照片信息以进行存储。 我们将用于API实现。

trait AppRoutes extends SprayJsonSupport {  implicit def system: ActorSystem  implicit def photoDao: PhotoDao  implicit lazy val timeout = Timeout(5.seconds)  lazy val healthRoute: Route = pathPrefix("health") {    concat(      pathEnd {        concat(          get {            complete(StatusCodes.OK)          }        )      }    )  }  lazy val crudRoute: Route = pathPrefix("photo") {    concat(      pathEnd {        concat(          post {            entity(as[Photo]) { photo =>              val photoCreated: Future[String] =                photoDao.createPhoto(photo)              onSuccess(photoCreated) { id =>              complete((StatusCodes.Created, id))              }            }          }        )      }    )  }}

To keep the example minimal, we have only two routes:


  • GET /health - to check if the server is up & running

    GET /health检查服务器是否已启动并正在运行

  • POST /photo - to send to the system the JSON of the photo information we want to store. This endpoint uses the DAO to store the document in MongoDB and returns a 201 with the id of the stored photo if the operation succeeded.

    POST /photo JSON将要存储的照片信息的JSON发送到系统。 该端点使用DAO将文档存储在MongoDB中,如果操作成功,则返回201和所存储照片的ID。

This is by no means a complete set of APIs, but it is enough to run our example.😉


服务器主类 (Server main class)

OK, we implemented all the components of our server, so it's time to wrap everything up. This is our Server.scala object class.

好的,我们实现了服务器的所有组件,因此该打包所有内容了。 这是我们的Server.scala对象类。

implicit val system: ActorSystem = ActorSystem("kafka-stream-playground")implicit val materializer: ActorMaterializer = ActorMaterializer()

First a couple of Akka utility values. Since we use to run our server and REST API, these implicit values are required.

首先是几个Akka实用价值。 由于我们使用来运行服务器和REST API,因此需要这些隐式值。

val config: Config = ConfigFactory.load()val address = config.getString("http.ip")val port = config.getInt("http.port")val mongoUri = config.getString("mongo.uri")val mongoDb = config.getString("mongo.db")val mongoUser = config.getString("mongo.user")val mongoPwd = config.getString("mongo.pwd")val photoCollection = config.getString("mongo.photo_collection")val kafkaHosts = config.getString("kafka.hosts").split(',').toListval photoTopic = config.getString("kafka.photo_topic")val longExposureTopic = config.getString("kafka.long_exposure_topic")

Then we read all the configuration properties. We will come back to the configuration file in a moment.

然后,我们阅读所有配置属性。 稍后我们将回到配置文件。

val kafkaProps = new Properties()kafkaProps.put("bootstrap.servers", kafkaHosts.mkString(","))kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val streamProps = new Properties()streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "long-exp-proc-app")streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts.mkString(","))val photoProducer = PhotoProducer(kafkaProps, photoTopic)val photoStreamProcessor = new PhotoStreamProcessor(kafkaProps, streamProps, photoTopic, "long-exposure")photoStreamProcessor.start()

We have to configure both our Kafka producer and the stream processor. We also start the stream processor, so the server will be ready to process the documents sent to it.

我们必须同时配置我们的Kafka生产者和流处理器。 我们还启动了流处理器,因此服务器将准备好处理发送给它的文档。

val client = MongoClient(s"mongodb://$mongoUri/$mongoUser")val db = client.getDatabase(mongoDb)val photoDao: PhotoDao = new PhotoDao(db, photoCollection)val photoListener = PhotoListener(photoDao.collection, photoProducer)

Also MongoDB needs to be configured. We setup the connection and initialize the DAO as well as the listener.

另外,还需要配置MongoDB。 我们建立连接,并初始化DAO和侦听器。

lazy val routes: Route = healthRoute ~ crudRouteHttp().bindAndHandle(routes, address, port)Await.result(system.whenTerminated, Duration.Inf)

Everything has been initialized. We create the REST routes for the communication to the server, bind them to the handlers, and finally start the server!🚀

一切都已初始化。 我们创建用于与服务器通信的REST路由,将它们绑定到处理程序,最后启动服务器!🚀

服务器配置 (Server configuration)

This is the configuration file used to setup the server:


http {  ip = ""  ip = ${?SERVER_IP}  port = 8000  port = ${?SERVER_PORT}}mongo {  uri = ""  uri = ${?MONGO_URI}  db = "kafka-stream-playground"  user = "admin"  pwd = "admin"  photo_collection = "photo"}kafka {  hosts = ""  hosts = ${?KAFKA_HOSTS}  photo_topic = "photo"  long_exposure_topic = "long-exposure"}

I think that this one does not require much explanation, right?😉


连接器配置 (Connectors configuration)

The server we implemented writes in two Kafka topics: photo and long-exposure. But how are messages written in Elasticsearch as documents? Using Kafka Connect!

我们实现的服务器写了两个Kafka主题: photolong-exposure 。 但是,如何在Elasticsearch中将消息作为文档编写? 使用Kafka Connect

We can setup two connectors, one per topic, and tell the connectors to write every message going through that topic in Elasticsearch.


First we need . We can use the container provided by Confluence in the docker-compose file:

首先,我们需要 。 我们可以在docker-compose文件中使用Confluence提供的容器:

connect:    image: confluentinc/cp-kafka-connect    ports:      - 8083:8083    networks:      - kakfa_stream_playground    depends_on:      - zookeeper      - kafka    volumes:      - $PWD/connect-plugins:/connect-plugins    environment:      CONNECT_BOOTSTRAP_SERVERS: kafka:9092      CONNECT_REST_ADVERTISED_HOST_NAME: connect      CONNECT_REST_PORT: 8083      CONNECT_GROUP_ID: compose-connect-group      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false"      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"      CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181      CONNECT_PLUGIN_PATH: /connect-plugins      CONNECT_LOG4J_ROOT_LOGLEVEL: INFO

I want to focus on some of the configuration values.


First of all, we need to expose the port 8083 - that will be our endpoint to configure the connectors (CONNECT_REST_PORT).

首先,我们需要公开端口8083这将是配置连接器( CONNECT_REST_PORT )的端点。

We also need to map a volume to the /connect-plugins path, where we will place the to write to Elasticsearch. This is reflected also in the CONNECT_PLUGIN_PATH.

我们还需要将卷映射到/connect-plugins路径,在该路径中,我们将放置以写入Elasticsearch。 这也反映在CONNECT_PLUGIN_PATH

The connect container should know how to find the Kafka servers, so we set CONNECT_BOOTSTRAP_SERVERS as kafka:9092.


Once Kafka Connect is ready, we can send the configurations of our connectors to the http://localhost:8083/connectors endpoint. We need 2 connectors, one for the photo topic and one for the long-exposure topic. We can send the configuration as a JSON with a POST request.

Kafka Connect准备就绪后,我们可以将连接器的配置发送到http://localhost:8083/connectors端点。 我们需要2个连接器,一个用于photo主题,一个用于long-exposure主题。 我们可以通过POST请求将配置作为JSON发送。

{  "name": "photo-connector",  "config": {    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",    "tasks.max": "1",    "topics": "photo",    "key.converter": "org.apache.kafka.connect.storage.StringConverter",    "value.converter": "org.apache.kafka.connect.json.JsonConverter",    "value.converter.schemas.enable": "false",    "schema.ignore": "true",    "connection.url": "http://elastic:9200",    "type.name": "kafka-connect",    "behavior.on.malformed.documents": "warn",    "name": "photo-connector"  }}

We explicitly say we are gonna use the ElasticsearchSinkConnector as the connector.class , as well as the topics that we want to sink - in this case photo.

我们明确表示要使用ElasticsearchSinkConnector作为connector.class以及我们要介绍的topics (在本例中为photo

We don't want to use a schema for the value.converter, so we can disable it (value.converter.schemas.enable) and tell the connector to ignore the schema (schema.ignore).

我们不想将架构用于value.converter ,因此我们可以禁用它( value.converter.schemas.enable )并告诉连接器忽略该架构( schema.ignore )。

The connector for the long-exposure topic is exactly like this one. The only difference is the name and of course the topics.

long-exposure主题的连接器与此类似。 唯一的区别是nametopics

如何运行项目 (How to run the project)

We have all we need to test the CDC! How can we do it? It's quite easy: simply run the setup.sh script in the root folder of the repo!

我们拥有测试CDC所需的一切! 我们该怎么做? 这很容易:只需在setup.sh的根文件夹中运行setup.sh脚本!

What will the script do?


  1. Run the docker-compose file with all the services.


  2. Configure MongoDB replica set. This is required to enable the Change Stream interface to capture data changes. More info about this .

    配置MongoDB副本集。 这是启用Change Stream界面捕获数据更改所必需的。 关于这方面更多的信息 。

  3. Configure the Kafka connectors.

  4. Connect to the logs of the server.


The docker-compose will run the following services:


  • Our Server

  • 3 instances of MongoDB (required for the replica set)

  • Mongoku, a MongoDB client

  • Kafka (single node)

  • Kafka connect

  • Zookeeper (required by Kafka)

  • Elasticsearch

  • Kibana


There are a lot of containers to run, so make sure you have enough resources to run everything properly. If you want, remove Mongoku and Kibana from the compose-file, since they are used just for a quick look inside the DBs.

有很多容器要运行,因此请确保您有足够的资源来正常运行所有容器。 如果需要,请从撰写文件中删除Mongoku和Kibana,因为它们仅用于快速查看数据库内部。

Once everything is up and running, you just have to send data to the server.


I collected some JSON documents of photos from Unplash that you can use to test the system in the photos.txt file.


There are a total of 10 documents, with 5 of them containing info about long exposure photos. Send them to the server running the send-photos.sh script in the root of the repo. Check that everything is stored in MongoDB connecting to Mongoku at http://localhost:3100. Then connect to Kibana at http://localhost:5601 and you will find two indexes in Elasticsearch: photo, containing the JSON of all the photos stored in MongoDB, and long-exposure, containing just the info of the long exposure photos.

共有10个文档,其中5个包含有关长时间曝光照片的信息。 将它们发送到在存储send-photos.sh根目录中运行send-photos.sh脚本的服务器。 检查所有内容是否都存储在通过http://localhost:3100连接到Mongoku的MongoDB中。 然后在http://localhost:5601连接到Kibana,您将在Elasticsearch中找到两个索引: photo (包含MongoDB中存储的所有照片的JSON)和long-exposure (仅包含长时间曝光的照片的信息)。

Amazing, right? 😄

太好了吧? 😄

结论 (Conclusion)

We made it guys!😄


Starting from the design of the use-case, we built our system that connected a MongoDB database to Elasticsearch using CDC.


Kafka Streams is the enabler, allowing us to convert database events to a stream that we can process.

Kafka Streams是启动器,允许我们将数据库事件转换为可以处理的流。

Do you need to see the whole project? Just checkout the on GitHub!😉

您需要查看整个项目吗? 只需在GitHub上签出 !😉

That's it, enjoy! 🚀

就是这样,享受! 🚀


kafka streams


