使用Kafka和MongoDB进行Go异步处理

在我前面的博客文章 “我的第一个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建” 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数据保存到 MongoDB 数据库。 2018-08-19 09:15:25 MongoDBGo 微服务 记录一次壮烈牺牲的阿里巴巴面试 今天本是一个阳光明媚,鸟语花香的日子。于是我决定在逛街中感受春日的阳光~结果晚上七点的时候,蚂蚁金服后端大佬来了电话,要进行一轮的技术面试。我一脸黑人问号???现在的面试都流行突袭吗?于是我的第一次面试之旅,就此壮烈的展开。 2018-08-15 10:33:03 阿里巴巴面试JAVA Redis详解以及Redis的应用场景 Redis 是完全开源免费的,遵守BSD协议,是一个高性能的key-value数据库。 Redis 是一个强大的内存型存储,具有丰富的数据结构,使其可以应用于很多方面,包括作为数据库、缓存、消息队列等等。

在我前面的博客文章 “我的第一个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建” 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数据保存到 MongoDB 数据库。

[[240575]]

在我前面的博客文章“我的***个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建” 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数据保存到 MongoDB 数据库。

在这个示例中,我将数据的保存和 MongoDB 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

如果你有时间去看,我将这个博客文章的整个过程录制到这个视频中了:)

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

rest-kafka-mongo-microservice-draw-io

rest-kafka-mongo-microservice-draw-io

微服务 1—— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。

微服务 2—— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。

在你继续之前,我们需要能够去运行这些微服务的几件东西:

  1. 下载 Kafka—— 我使用的版本是 kafka_2.11-1.1.0
  2. 安装librdkafka—— 不幸的是,这个库应该在目标系统中
  3. 安装Kafka Go 客户端
  4. 运行 MongoDB。你可以去看我的以前的文章中关于这一块的内容,那篇文章中我使用了一个 MongoDB docker 镜像。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

  1. $ cd /<download path>/kafka_2.11-1.1.0
  2. $ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

  1. $ bin/kafka-server-start.sh config/server.properties

Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。

  1. version: '3'
  2. services:
  3. mongodb:
  4. image: mongo
  5. ports:
  6. - "27017:27017"
  7. volumes:
  8. - "mongodata:/data/db"
  9. networks:
  10. - network1
  11. volumes:
  12. mongodata:
  13. networks:
  14. network1:

使用 Docker Compose 去运行 MongoDB docker 容器。

  1. docker-compose up

这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

  1. func jobsPostHandler(w http.ResponseWriter, r *http.Request) {
  2. //Retrieve body from http request
  3. b, err := ioutil.ReadAll(r.Body)
  4. defer r.Body.Close()
  5. if err != nil {
  6. panic(err)
  7. }
  8. //Save data into Job struct
  9. var _job Job
  10. err = json.Unmarshal(b, &_job)
  11. if err != nil {
  12. http.Error(w, err.Error(), 500)
  13. return
  14. }
  15. saveJobToKafka(_job)
  16. //Convert job struct into json
  17. jsonString, err := json.Marshal(_job)
  18. if err != nil {
  19. http.Error(w, err.Error(), 500)
  20. return
  21. }
  22. //Set content-type http header
  23. w.Header().Set("content-type", "application/json")
  24. //Send back data as response
  25. w.Write(jsonString)
  26. }
  27. func saveJobToKafka(job Job) {
  28. fmt.Println("save to kafka")
  29. jsonString, err := json.Marshal(job)
  30. jobString := string(jsonString)
  31. fmt.Print(jobString)
  32. p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
  33. if err != nil {
  34. panic(err)
  35. }
  36. // Produce messages to topic (asynchronously)
  37. topic := "jobs-topic1"
  38. for _, word := range []string{string(jobString)} {
  39. p.Produce(&kafka.Message{
  40. TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
  41. Value: []byte(word),
  42. }, nil)
  43. }
  44. }

这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

  1. func main() {
  2. //Create MongoDB session
  3. session := initialiseMongo()
  4. mongoStore.session = session
  5. receiveFromKafka()
  6. }
  7. func receiveFromKafka() {
  8. fmt.Println("Start receiving from Kafka")
  9. c, err := kafka.NewConsumer(&kafka.ConfigMap{
  10. "bootstrap.servers": "localhost:9092",
  11. "group.id": "group-id-1",
  12. "auto.offset.reset": "earliest",
  13. })
  14. if err != nil {
  15. panic(err)
  16. }
  17. c.SubscribeTopics([]string{"jobs-topic1"}, nil)
  18. for {
  19. msg, err := c.ReadMessage(-1)
  20. if err == nil {
  21. fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
  22. job := string(msg.Value)
  23. saveJobToMongo(job)
  24. } else {
  25. fmt.Printf("Consumer error: %v (%v)\n", err, msg)
  26. break
  27. }
  28. }
  29. c.Close()
  30. }
  31. func saveJobToMongo(jobString string) {
  32. fmt.Println("Save to MongoDB")
  33. col := mongoStore.session.DB(database).C(collection)
  34. //Save data into Job struct
  35. var _job Job
  36. b := []byte(jobString)
  37. err := json.Unmarshal(b, &_job)
  38. if err != nil {
  39. panic(err)
  40. }
  41. //Insert job into MongoDB
  42. errMongo := col.Insert(_job)
  43. if errMongo != nil {
  44. panic(errMongo)
  45. }
  46. fmt.Printf("Saved to MongoDB : %s", jobString)
  47. }

我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

  1. $ go run rest-kafka-sample.go

我使用 Postman 向微服务 1 发送数据。

Screenshot-2018-04-29-22.20.33

Screenshot-2018-04-29-22.20.33

这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

Screenshot-2018-04-29-22.22.00

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 MongoDB。

  1. $ go run kafka-mongo-sample.go

现在,你将在微服务 2 上看到消费的数据,并将它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

Screenshot-2018-04-29-22.24.15

检查一下数据是否保存到了 MongoDB。如果有数据,我们成功了!

Screenshot-2018-04-29-22.26.39

Screenshot-2018-04-29-22.26.39

完整的源代码可以在这里找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2023年5月6日 10:50
下一篇 2023年5月6日 10:50

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信