本文共 29119 字,大约阅读时间需要 97 分钟。
kafka streams
Change Data Capture (CDC) involves observing the changes happening in a database and making them available in a form that can be exploited by other systems.
更改数据捕获 (CDC)涉及观察数据库中发生的更改,并将其以可被其他系统利用的形式提供。
One of the most interesting use-cases is to make them available as a stream of events. This means you can, for example, catch the events and update a search index as the data are written to the database.
最有趣的用例之一是使它们作为事件流可用。 这意味着,例如,您可以捕获事件并在将数据写入数据库时更新搜索索引。
Interesting right? Let's see how to implement a CDC system that can observe the changes made to a NoSQL database (MongoDB), stream them through a message broker (Kafka), process the messages of the stream (Kafka Streams), and update a search index (Elasticsearch)!🚀
有趣吧? 让我们看一下如何实现一个CDC系统,该系统可以观察对NoSQL数据库( MongoDB )所做的更改,通过消息代理( Kafka )进行流传输,处理流中的消息( Kafka Streams ),以及更新搜索索引( Elasticsearch)。 )!🚀
The full code of the project is available on GitHub in this . If you want to skip all my jibber jabber and just run the example, go straight to the How to run the project section near the end of the article!😁
该项目的完整代码可在此 GitHub上找到。 如果您想跳过我所有的短剑而只运行示例,请直接转到文章末尾的“ 如何运行项目”部分!😁
We run a web application that stores photos uploaded by users. People can share their shots, let others download them, create albums, and so on. Users can also provide a description of their photos, as well as Exif metadata and other useful information.
我们运行一个Web应用程序来存储用户上传的照片。 人们可以共享他们的镜头,让其他人下载它们,创建相册等等。 用户还可以提供其照片的描述,以及Exif元数据和其他有用信息。
We want to store such information and use it to improve our search engine. We will focus on this part of our system that is depicted in the following diagram.
我们想存储这些信息,并用它来改善我们的搜索引擎。 我们将专注于下图所示的系统的这一部分。
The information is provided in JSON
format. Since I like to post my shots on , and the website provides free access to its API, I used their model for the photo JSON
document.
该信息以JSON
格式提供。 由于我喜欢将照片发布在 ,并且该网站可以免费访问其API,因此我将其模型用于照片JSON
文档。
Once the JSON
is sent through a POST
request to our server, we store the document inside a MongoDB database. We will also store it in Elasticsearch for indexing and quick search.
通过POST
请求将JSON
发送到我们的服务器后,我们将文档存储在MongoDB数据库中。 我们还将其存储在Elasticsearch中以进行索引和快速搜索。
However, we love long exposure shots, and we would like to store in a separate index a subset of information regarding this kind of photo. It can be the exposure time, as well as the location (latitude and longitude) where the photo has been taken. In this way, we can create a map of locations where photographers usually take long exposure photos.
但是,我们喜欢长时间曝光的照片 ,因此我们希望将有关此类照片的信息子集存储在单独的索引中。 它可以是曝光时间,也可以是拍摄照片的位置(经度和纬度)。 通过这种方式,我们可以创建摄影师通常拍摄长时间曝光照片的位置地图。
Here comes the interesting part: instead of explicitly calling Elasticsearch in our code once the photo info is stored in MongoDB, we can implement a CDC exploiting Kafka and Kafka Streams.
这里是有趣的部分:一旦照片信息存储在MongoDB中,我们无需在代码中显式调用Elasticsearch,而是可以利用Kafka和Kafka Streams来实现CDC 。
We listen to modifications to MongoDB oplog using the interface provided by MongoDB itself. When the photo is stored we send it to a photo
Kafka topic. Using Kafka Connect, an Elasticsearch sink is configured to save everything sent to that topic to a specific index. In this way, we can index all photos stored in MongoDB automatically.
我们使用MongoDB本身提供的接口收听对MongoDB oplog的修改。 存储照片后,我们将其发送给photo
Kafka主题。 使用Kafka Connect ,将Elasticsearch接收器配置为将发送到该主题的所有内容保存到特定索引。 这样,我们可以自动索引存储在MongoDB中的所有照片。
We need to take care of the long exposure photos too. It requires some processing of the information to extract what we need. For this reason, we use Kafka Streams to create a processing topology to:
我们也需要注意长时间曝光的照片。 它需要对信息进行一些处理以提取我们需要的东西。 因此,我们使用Kafka Streams创建处理拓扑以:
Read from the photo
topic
阅读photo
主题
Write to a long-exposure
topic.
写一个long-exposure
话题。
Then another Elasticsearch sink will read data from the long-exposure
topic and write it to a specific index in Elasticsearch.
然后,另一个Elasticsearch接收器将从long-exposure
主题中读取数据,并将其写入Elasticsearch中的特定索引。
It is quite simple, but it's enough to have fun with CDC and Kafka Streams! 😁
这很简单,但足以让CDC和Kafka Streams玩得开心! 😁
Let's have a look at what we need to implement: our server exposing the REST APIs!
让我们看看我们需要实现什么:我们的服务器公开了REST API !
First things first, we need a model of our data and a Data Access Object (DAO) to talk to our MongoDB database.
首先,我们需要一个数据模型和一个数据访问对象 (DAO)以与我们的MongoDB数据库对话。
As I said, the model for the photo JSON
information is the one used by Unsplash. Check out the free API for an example of the JSON
we will use.
如我所说,照片JSON
信息的模型是Unsplash使用的模型。 查看免费的API 获取我们将使用的JSON
示例。
I created the mapping for the serializaion/deserialization of the photo JSON
using . I'll skip the details about this, if you are curious just look at the !
我使用为照片JSON
的序列化/反序列化创建了映射。 如果您感到好奇,我将跳过有关详细信息,请查看 !
Let's focus on the model for the long exposusure photo.
让我们关注长曝光照片的模型。
case class LongExposurePhoto(id: String, exposureTime: Float, createdAt: Date, location: Location)object LongExposurePhotoJsonProtocol extends DefaultJsonProtocol { implicit val longExposurePhotoFormat:RootJsonFormat[LongExposurePhoto] = jsonFormat(LongExposurePhoto, "id", "exposure_time", "created_at", "location")}
This is quite simple: we keep from the photo JSON
the information about the id
, the exposure time (exposureTime
), when the photo has been created (createdAt
), and the location
where it has been taken. The location
comprehends the city
, the country
, and the position
composed of latitude
and longitude
.
这是很简单:我们不断从照片JSON
有关的信息id
,曝光时间( exposureTime
),当照片已创建( createdAt
),以及location
的地方已采取。 该location
领会的city
,在country
和position
组成的latitude
和longitude
。
The DAO consists of just the PhotoDao.scala
class.
DAO仅由PhotoDao.scala
类组成。
class PhotoDao(database: MongoDatabase, photoCollection: String) { val collection: MongoCollection[Document] = database.getCollection(photoCollection) def createPhoto(photo: Photo): Future[String] = { val doc = Document(photo.toJson.toString()) doc.put("_id", photo.id) collection.insertOne(doc).toFuture() .map(_ => photo.id) }}
Since I want to keep this example minimal and focused on the CDC implementation, the DAO has just one method to create a new photo document in MongoDB.
由于我想使该示例最少,并且只关注CDC实施,因此DAO只有一种在MongoDB中创建新照片文档的方法。
It is straightforward: create a document from the photo JSON
, and insert it in mongo using id
as the one of the photo itself. Then, we can return the id
of the photo just inserted in a Future
(the MongoDB API is async).
这很简单:根据照片JSON
创建文档,然后使用id
作为照片本身之一将其插入mongo中。 然后,我们可以返回刚插入Future
中的照片的id
(MongoDB API是异步的)。
Once the photo is stored inside MongoDB, we have to send it to the photo
Kafka topic. This means we need a producer to write the message in its topic. The PhotoProducer.scala
class looks like this.
将照片存储在MongoDB中后,我们必须将其发送到photo
Kafka主题。 这意味着我们需要生产者在其主题中编写消息。 PhotoProducer.scala
类看起来像这样。
case class PhotoProducer(props: Properties, topic: String) { createKafkaTopic(props, topic) val photoProducer = new KafkaProducer[String, String](props) def sendPhoto(photo: Photo): Future[RecordMetadata] = { val record = new ProducerRecord[String, String](topic, photo.id, photo.toJson.compactPrint) photoProducer.send(record) } def closePhotoProducer(): Unit = photoProducer.close()}
I would say that this is pretty self-explanatory. The most interesting part is probably the createKafkaTopic
method that is implemented in the utils
package.
我会说这很不言自明。 最有趣的部分可能是在utils
包中实现的createKafkaTopic
方法。
def createKafkaTopic(props: Properties, topic: String): Unit = { val adminClient = AdminClient.create(props) val photoTopic = new NewTopic(topic, 1, 1) adminClient.createTopics(List(photoTopic).asJava) }
This method creates the topic in Kafka setting 1 as a partition and replication factor (it is enough for this example). It is not required, but creating the topic in advance lets Kafka balance partitions, select leaders, and so on. This will be useful to get our stream topology ready to process as we start our server.
此方法在Kafka设置1中创建主题作为分区和复制因子(对于此示例来说就足够了)。 它不是必需的,但是预先创建主题可使Kafka平衡分区,选择领导者等。 这将有助于在启动服务器时准备好处理流拓扑。
We have the DAO that writes in MongoDB and the producer that sends the message in Kafka. We need to glue them together in some way so that when the document is stored in MongoDB the message is sent to the photo
topic. This is the purpose of the PhotoListener.scala
class.
我们有在MongoDB中编写的DAO和在Kafka中发送消息的生产者。 我们需要以某种方式将它们粘合在一起,以便将文档存储在MongoDB中时,消息会发送到photo
主题。 这是PhotoListener.scala
类的目的。
case class PhotoListener(collection: MongoCollection[Document], producer: PhotoProducer) { val cursor: ChangeStreamObservable[Document] = collection.watch() cursor.subscribe(new Observer[ChangeStreamDocument[Document]] { override def onNext(result: ChangeStreamDocument[Document]): Unit = { result.getOperationType match { case OperationType.INSERT => { val photo = result.getFullDocument.toJson().parseJson.convertTo[Photo] producer.sendPhoto(photo).get() println(s"Sent photo with Id ${photo.id}") } case _ => println(s"Operation ${result.getOperationType} not supported") } } override def onError(e: Throwable): Unit = println(s"onError: $e") override def onComplete(): Unit = println("onComplete")})}
We exploit the provided by the MongoDB scala library.
我们利用MongoDB scala库提供的 。
Here is how it works: we watch()
the collection where photos are stored. When there is a new event (onNext
) we run our logic.
它是这样工作的:我们watch()
存储照片的集合。 当有一个新事件( onNext
)时,我们运行我们的逻辑。
For this example we are interested only in the creation of new documents, so we explicitly check that the operation is of type OperationType.INSERT
. If the operation is the one we are interested in, we get the document and convert it to a Photo
object to be sent by our producer.
对于此示例,我们仅对创建新文档感兴趣,因此,我们明确检查该操作的类型是否为OperationType.INSERT
。 如果该操作是我们感兴趣的操作,我们将获取文档并将其转换为由生产者发送的Photo
对象。
That's it! With few lines of code we connected the creation of documents in MongoDB to a stream of events in Kafka.🎉
而已! 只需几行代码,我们就可以将MongoDB中的文档创建与Kafka中的事件流连接起来。
As a side note, be aware that to use the Change Streams interface we have to setup a MongoDB replica set. This means we need to run 3 instances of MongoDB and configure them to act as a replica set using the following command in mongo client:
另外,请注意,要使用Change Streams接口, 我们必须设置MongoDB副本集 。 这意味着我们需要运行3个MongoDB实例,并在mongo client中使用以下命令将它们配置为副本集:
rs.initiate({_id : "r0", members: [{ _id : 0, host : "mongo1:27017", priority : 1 },{ _id : 1, host :"mongo2:27017", priority : 0 },{ _id : 2, host : "mongo3:27017", priority : 0, arbiterOnly: true }]})
Here our instances are the containers we will run in the docker-compose file, that is mongo1
, mongo2
, and mongo3
.
这里的实例是我们将在mongo1
-compose文件中运行的容器,即mongo1
, mongo2
和mongo3
。
Time to build our processing topology! It will be in charge of the creation of the long-exposure
index in Elasticsearch. The topology is described by the following diagram:
是时候建立我们的处理拓扑了! 它将负责在Elasticsearch中创建long-exposure
索引。 下图描述了拓扑:
and it is implemented in the LongExposureTopology.scala
object class. Let's analyse every step of our processing topology.
它在LongExposureTopology.scala
对象类中实现。 让我们分析处理拓扑的每个步骤。
val stringSerde = new StringSerdeval streamsBuilder = new StreamsBuilder()val photoSource: KStream[String, String] = streamsBuilder.stream(sourceTopic, Consumed.`with`(stringSerde, stringSerde))
The first step is to read from a source topic. We start a stream from the sourceTopic
(that is photo
topic) using the StreamsBuilder()
object. The stringSerde
object is used to serialise and deserialise the content of the topic as a String
.
第一步是阅读源主题。 我们使用StreamsBuilder()
对象从sourceTopic
(即photo
主题)开始一个流。 stringSerde
对象用于将主题的内容序列化和反序列化为String
。
Please notice that at each step of the processing we create a new stream of data with a KStream
object. When creating the stream, we specify the key and the value produced by the stream. In our topology the key will always be a String
. In this step the value produced is still a String
.
请注意,在处理的每个步骤中,我们都会使用KStream
对象创建一个新的数据流。 创建流时,我们指定流所生成的键和值。 在我们的拓扑中,键将始终是String
。 在此步骤中,生成的值仍然是String
。
val covertToPhotoObject: KStream[String, Photo] = photoSource.mapValues((_, jsonString) => { val photo = jsonString.parseJson.convertTo[Photo] println(s"Processing photo ${photo.id}") photo })
The next step is to convert the value extracted from the photo
topic into a proper Photo
object.
下一步是将从photo
主题中提取的值转换为适当的Photo
对象。
So we start from the photoSource
stream and work on the values using the mapValues
function. We simply parse the value as a JSON
and create the Photo
object that will be sent in the convertToPhotoObject
stream.
因此,我们从photoSource
流开始,然后使用mapValues
函数处理这些值。 我们只需将值解析为JSON
并创建将在convertToPhotoObject
流中发送的Photo
对象。
val filterWithLocation: KStream[String, Photo] = covertToPhotoObject.filter((_, photo) => photo.location.exists(_.position.isDefined))
There is no guarantee that the photo we are processing will have the info about the location, but we want it in our long exposure object. This step of the topology filters out from the covertToPhotoObject
stream the photos that have no info about the location, and creates the filterWithLocation
stream.
不能保证我们正在处理的照片会包含有关位置的信息,但是我们希望在我们的长时间曝光对象中使用它。 拓扑的此步骤从covertToPhotoObject
流中筛选出没有位置信息的照片,并创建filterWithLocation
流。
val filterWithExposureTime: KStream[String, Photo] = filterWithLocation.filter((_, photo) => photo.exif.exists(_.exposureTime.isDefined))
Another important fact for our processing is the exposure time of the photo. For this reason, we filter out from the filterWithLocation
stream the photos without exposure time info, creating the filterWithExposureTime
.
我们处理的另一个重要事实是照片的曝光时间。 因此,我们从filterWithLocation
流中筛选出没有曝光时间信息的照片,从而创建了filterWithExposureTime
。
val dataExtractor: KStream[String, LongExposurePhoto] = filterWithExposureTime.mapValues((_, photo) => LongExposurePhoto(photo.id, parseExposureTime(photo.exif.get.exposureTime.get), photo.createdAt, photo.location.get))
We now have all we need to create a LongExposurePhoto
object! That is the result of the dataExtractor
: it takes the Photo
coming from the filterWithExposureTime
stream and produces a new stream containing LongExposurePhoto
.
现在,我们拥有创建LongExposurePhoto
对象所需的LongExposurePhoto
! 那是dataExtractor
的结果:它获取来自filterWithExposureTime
流的Photo
并产生一个包含LongExposurePhoto
的新流。
val longExposureFilter: KStream[String, String] = dataExtractor.filter((_, item) => item.exposureTime > 1.0).mapValues((_, longExposurePhoto) => { val jsonString = longExposurePhoto.toJson.compactPrint println(s"completed processing: $jsonString") jsonString })
We are almost there. We now have to keep the photos with a long exposure time (that we decided is more then 1 sec.). So we create a new longExposureFilter
stream without the photos that are not long exposure.
我们就快到了。 现在,我们必须将照片保持较长的曝光时间(我们决定将其设置为1秒以上)。 因此,我们创建了一个新的longExposureFilter
流,其中没有长时间曝光的照片。
This time we also serialise the LongExposurePhotos
into the corresponding JSON
string, which will be written to Elasticsearch in the next step.
这次我们还将LongExposurePhotos
序列化为相应的JSON
字符串,该字符串将在下一步中写入到Elasticsearch中。
longExposureFilter.to(sinkTopic, Produced.`with`(stringSerde, stringSerde))streamsBuilder.build()
This is the last step of our topology. We write to
our sinkTopic
(that is long-exposure
topic) using the string serialiser/deserialiser what is inside the longExposureFilter
stream. The last command simply build
s the topology we just created.
这是我们拓扑的最后一步。 我们使用字符串serialiser / longExposureFilter
流中的内容to
sinkTopic
(即long-exposure
主题)。 最后一个命令只是build
我们刚刚创建的拓扑。
Now that we have our topology, we can use it in our server. The PhotoStreamProcessor.scala
class is what manages the processing.
现在我们有了拓扑,我们可以在服务器中使用它了。 PhotoStreamProcessor.scala
类是用于管理处理的类。
class PhotoStreamProcessor(kafkaProps: Properties, streamProps: Properties, sourceTopic: String, sinkTopic: String) { createKafkaTopic(kafkaProps, sinkTopic) val topology: Topology = LongExposureTopology.build(sourceTopic, sinkTopic) val streams: KafkaStreams = new KafkaStreams(topology, streamProps) sys.ShutdownHookThread { streams.close(java.time.Duration.ofSeconds(10)) } def start(): Unit = new Thread { override def run(): Unit = { streams.cleanUp() streams.start() println("Started long exposure processor") } }.start()}
First we create the sinkTopic
, using the same utility method we saw before. Then we build the stream topology and initialize a KafkaStreams
object with that topology.
首先,我们使用之前看到的相同的实用程序方法来创建sinkTopic
。 然后,我们构建流拓扑并使用该拓扑初始化KafkaStreams
对象。
To start the stream processing, we need to create a dedicated Thread
that will run the streaming while the server is alive. According to the official documentation, it is always a good idea to cleanUp()
the stream before starting it.
要开始流处理,我们需要创建一个专用Thread
,该Thread
将在服务器处于活动状态时运行流。 根据官方文档,在启动流之前先cleanUp()
它总是一个好主意。
Our PhotoStreamProcessor
is ready to go!🎉
我们PhotoStreamProcessor
是准备去!🎉
The server exposes REST APIs to send it the photo information to store. We make use of for the API implementation.
服务器公开REST API来向其发送照片信息以进行存储。 我们将用于API实现。
trait AppRoutes extends SprayJsonSupport { implicit def system: ActorSystem implicit def photoDao: PhotoDao implicit lazy val timeout = Timeout(5.seconds) lazy val healthRoute: Route = pathPrefix("health") { concat( pathEnd { concat( get { complete(StatusCodes.OK) } ) } ) } lazy val crudRoute: Route = pathPrefix("photo") { concat( pathEnd { concat( post { entity(as[Photo]) { photo => val photoCreated: Future[String] = photoDao.createPhoto(photo) onSuccess(photoCreated) { id => complete((StatusCodes.Created, id)) } } } ) } ) }}
To keep the example minimal, we have only two routes:
为了使示例最少,我们只有两条路线:
GET /health
- to check if the server is up & running
GET /health
检查服务器是否已启动并正在运行
POST /photo
- to send to the system the JSON
of the photo information we want to store. This endpoint uses the DAO to store the document in MongoDB and returns a 201
with the id of the stored photo if the operation succeeded.
POST /photo
JSON
将要存储的照片信息的JSON
发送到系统。 该端点使用DAO将文档存储在MongoDB中,如果操作成功,则返回201
和所存储照片的ID。
This is by no means a complete set of APIs, but it is enough to run our example.😉
这绝不是一套完整的API,但是足以运行我们的示例。
OK, we implemented all the components of our server, so it's time to wrap everything up. This is our Server.scala
object class.
好的,我们实现了服务器的所有组件,因此该打包所有内容了。 这是我们的Server.scala
对象类。
implicit val system: ActorSystem = ActorSystem("kafka-stream-playground")implicit val materializer: ActorMaterializer = ActorMaterializer()
First a couple of Akka utility values. Since we use to run our server and REST API, these implicit values are required.
首先是几个Akka实用价值。 由于我们使用来运行服务器和REST API,因此需要这些隐式值。
val config: Config = ConfigFactory.load()val address = config.getString("http.ip")val port = config.getInt("http.port")val mongoUri = config.getString("mongo.uri")val mongoDb = config.getString("mongo.db")val mongoUser = config.getString("mongo.user")val mongoPwd = config.getString("mongo.pwd")val photoCollection = config.getString("mongo.photo_collection")val kafkaHosts = config.getString("kafka.hosts").split(',').toListval photoTopic = config.getString("kafka.photo_topic")val longExposureTopic = config.getString("kafka.long_exposure_topic")
Then we read all the configuration properties. We will come back to the configuration file in a moment.
然后,我们阅读所有配置属性。 稍后我们将回到配置文件。
val kafkaProps = new Properties()kafkaProps.put("bootstrap.servers", kafkaHosts.mkString(","))kafkaProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")kafkaProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")val streamProps = new Properties()streamProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "long-exp-proc-app")streamProps.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHosts.mkString(","))val photoProducer = PhotoProducer(kafkaProps, photoTopic)val photoStreamProcessor = new PhotoStreamProcessor(kafkaProps, streamProps, photoTopic, "long-exposure")photoStreamProcessor.start()
We have to configure both our Kafka producer and the stream processor. We also start the stream processor, so the server will be ready to process the documents sent to it.
我们必须同时配置我们的Kafka生产者和流处理器。 我们还启动了流处理器,因此服务器将准备好处理发送给它的文档。
val client = MongoClient(s"mongodb://$mongoUri/$mongoUser")val db = client.getDatabase(mongoDb)val photoDao: PhotoDao = new PhotoDao(db, photoCollection)val photoListener = PhotoListener(photoDao.collection, photoProducer)
Also MongoDB needs to be configured. We setup the connection and initialize the DAO as well as the listener.
另外,还需要配置MongoDB。 我们建立连接,并初始化DAO和侦听器。
lazy val routes: Route = healthRoute ~ crudRouteHttp().bindAndHandle(routes, address, port)Await.result(system.whenTerminated, Duration.Inf)
Everything has been initialized. We create the REST routes for the communication to the server, bind them to the handlers, and finally start the server!🚀
一切都已初始化。 我们创建用于与服务器通信的REST路由,将它们绑定到处理程序,最后启动服务器!🚀
This is the configuration file used to setup the server:
这是用于设置服务器的配置文件:
http { ip = "127.0.0.1" ip = ${?SERVER_IP} port = 8000 port = ${?SERVER_PORT}}mongo { uri = "127.0.0.1:27017" uri = ${?MONGO_URI} db = "kafka-stream-playground" user = "admin" pwd = "admin" photo_collection = "photo"}kafka { hosts = "127.0.0.1:9092" hosts = ${?KAFKA_HOSTS} photo_topic = "photo" long_exposure_topic = "long-exposure"}
I think that this one does not require much explanation, right?😉
我认为这不需要太多解释,对吗?😉
The server we implemented writes in two Kafka topics: photo
and long-exposure
. But how are messages written in Elasticsearch as documents? Using Kafka Connect!
我们实现的服务器写了两个Kafka主题: photo
和long-exposure
。 但是,如何在Elasticsearch中将消息作为文档编写? 使用Kafka Connect !
We can setup two connectors, one per topic, and tell the connectors to write every message going through that topic in Elasticsearch.
我们可以设置两个连接器,每个主题一个,然后告诉连接器在Elasticsearch中编写通过该主题的每条消息。
First we need . We can use the container provided by Confluence in the docker-compose file:
首先,我们需要 。 我们可以在docker-compose文件中使用Confluence提供的容器:
connect: image: confluentinc/cp-kafka-connect ports: - 8083:8083 networks: - kakfa_stream_playground depends_on: - zookeeper - kafka volumes: - $PWD/connect-plugins:/connect-plugins environment: CONNECT_BOOTSTRAP_SERVERS: kafka:9092 CONNECT_REST_ADVERTISED_HOST_NAME: connect CONNECT_REST_PORT: 8083 CONNECT_GROUP_ID: compose-connect-group CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1 CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000 CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1 CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1 CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter" CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "false" CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter" CONNECT_ZOOKEEPER_CONNECT: zookeeper:2181 CONNECT_PLUGIN_PATH: /connect-plugins CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
I want to focus on some of the configuration values.
我想重点介绍一些配置值。
First of all, we need to expose the port 8083
- that will be our endpoint to configure the connectors (CONNECT_REST_PORT
).
首先,我们需要公开端口8083
这将是配置连接器( CONNECT_REST_PORT
)的端点。
We also need to map a volume to the /connect-plugins
path, where we will place the to write to Elasticsearch. This is reflected also in the CONNECT_PLUGIN_PATH
.
我们还需要将卷映射到/connect-plugins
路径,在该路径中,我们将放置以写入Elasticsearch。 这也反映在CONNECT_PLUGIN_PATH
。
The connect
container should know how to find the Kafka servers, so we set CONNECT_BOOTSTRAP_SERVERS
as kafka:9092
.
connect
容器应该知道如何查找Kafka服务器,因此我们将CONNECT_BOOTSTRAP_SERVERS
设置为kafka:9092
。
Once Kafka Connect is ready, we can send the configurations of our connectors to the http://localhost:8083/connectors
endpoint. We need 2 connectors, one for the photo
topic and one for the long-exposure
topic. We can send the configuration as a JSON
with a POST
request.
Kafka Connect准备就绪后,我们可以将连接器的配置发送到http://localhost:8083/connectors
端点。 我们需要2个连接器,一个用于photo
主题,一个用于long-exposure
主题。 我们可以通过POST
请求将配置作为JSON
发送。
{ "name": "photo-connector", "config": { "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector", "tasks.max": "1", "topics": "photo", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "false", "schema.ignore": "true", "connection.url": "http://elastic:9200", "type.name": "kafka-connect", "behavior.on.malformed.documents": "warn", "name": "photo-connector" }}
We explicitly say we are gonna use the ElasticsearchSinkConnector
as the connector.class
, as well as the topics
that we want to sink - in this case photo
.
我们明确表示要使用ElasticsearchSinkConnector
作为connector.class
以及我们要介绍的topics
(在本例中为photo
。
We don't want to use a schema for the value.converter
, so we can disable it (value.converter.schemas.enable
) and tell the connector to ignore the schema (schema.ignore
).
我们不想将架构用于value.converter
,因此我们可以禁用它( value.converter.schemas.enable
)并告诉连接器忽略该架构( schema.ignore
)。
The connector for the long-exposure
topic is exactly like this one. The only difference is the name
and of course the topics
.
long-exposure
主题的连接器与此类似。 唯一的区别是name
和topics
。
We have all we need to test the CDC! How can we do it? It's quite easy: simply run the setup.sh
script in the root folder of the repo!
我们拥有测试CDC所需的一切! 我们该怎么做? 这很容易:只需在setup.sh
的根文件夹中运行setup.sh
脚本!
What will the script do?
该脚本将做什么?
Run the docker-compose
file with all the services.
使用所有服务运行docker-compose
文件。
Configure MongoDB replica set. This is required to enable the Change Stream interface to capture data changes. More info about this .
配置MongoDB副本集。 这是启用Change Stream界面捕获数据更改所必需的。 关于这方面更多的信息 。
The docker-compose will run the following services:
docker-compose将运行以下服务:
There are a lot of containers to run, so make sure you have enough resources to run everything properly. If you want, remove Mongoku and Kibana from the compose-file, since they are used just for a quick look inside the DBs.
有很多容器要运行,因此请确保您有足够的资源来正常运行所有容器。 如果需要,请从撰写文件中删除Mongoku和Kibana,因为它们仅用于快速查看数据库内部。
Once everything is up and running, you just have to send data to the server.
一旦一切就绪并运行,您只需要将数据发送到服务器。
I collected some JSON
documents of photos from Unplash that you can use to test the system in the photos.txt
file.
我从Unplash收集了一些照片的JSON
文档,可用于在photos.txt
文件中测试系统。
There are a total of 10 documents, with 5 of them containing info about long exposure photos. Send them to the server running the send-photos.sh
script in the root of the repo. Check that everything is stored in MongoDB connecting to Mongoku at http://localhost:3100
. Then connect to Kibana at http://localhost:5601
and you will find two indexes in Elasticsearch: photo
, containing the JSON of all the photos stored in MongoDB, and long-exposure
, containing just the info of the long exposure photos.
共有10个文档,其中5个包含有关长时间曝光照片的信息。 将它们发送到在存储send-photos.sh
根目录中运行send-photos.sh
脚本的服务器。 检查所有内容是否都存储在通过http://localhost:3100
连接到Mongoku的MongoDB中。 然后在http://localhost:5601
连接到Kibana,您将在Elasticsearch中找到两个索引: photo
(包含MongoDB中存储的所有照片的JSON)和long-exposure
(仅包含长时间曝光的照片的信息)。
Amazing, right? 😄
太好了吧? 😄
We made it guys!😄
我们做到了!😄
Starting from the design of the use-case, we built our system that connected a MongoDB database to Elasticsearch using CDC.
从用例的设计开始,我们构建了使用CDC将MongoDB数据库连接到Elasticsearch的系统。
Kafka Streams is the enabler, allowing us to convert database events to a stream that we can process.
Kafka Streams是启动器,允许我们将数据库事件转换为可以处理的流。
Do you need to see the whole project? Just checkout the on GitHub!😉
您需要查看整个项目吗? 只需在GitHub上签出 !😉
That's it, enjoy! 🚀
就是这样,享受! 🚀
翻译自:
kafka streams
转载地址:http://sduzd.baihongyu.com/