Springboot整合Kafka Stream实时统计数据

Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。 2021-08-17 06:48:43 SpringbootKafkaStream 我国政务云发展现状及趋势分析 预计2023年将突破千亿元。考虑到云计算与政府职能转变的目标相契合,政府正在积极发挥云服务在政务信息化建设中的价值。 2021-08-17 06:42:43 云计算政务云云应用 IDC:2025年公有云LaaS和PaaS工作负载收入将突破4000亿美元 近日,IDC公布了对全球公有云基础设施即服务(IaaS)和平台即服务(PaaS)市场的最新预测,其中包括了IDC定义的18个企业工作负载细分领域。

Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。

[[417927]]

环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2

Kafka Stream介绍

Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。

流式计算一般被用来和批量计算做比较。批量计算往往有一个固定的数据集作为输入并计算结果。而流式计算的输入往往是“无界”的(Unbounded Data),持续输入的,即永远拿不到全量数据去做计算;同时,计算结果也是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果。

Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。

Kafka Streams的门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。

Kafka Streams的一些特点:

  • 被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中
  • 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性
  • 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations)
  • 支持exactly-once语义
  • 支持纪录级的处理,实现毫秒级的延迟
  • 提供High-Level的Stream DSL和Low-Level的Processor API

Stream Processing Topology流处理拓扑

  • 流是Kafka Streams提供的最重要的抽象:它表示一个无限的、不断更新的数据集。流是不可变数据记录的有序、可重放和容错序列,其中数据记录定义为键值对。
  • Stream Processing Application是使用了Kafka Streams库的应用程序。它通过processor topologies定义计算逻辑,其中每个processor topology都是多个stream processor(节点)通过stream组成的图。
  • A stream processor 是处理器拓扑中的节点;它表示一个处理步骤,通过每次从拓扑中的上游处理器接收一个输入记录,将其操作应用于该记录,来转换流中的数据,并且随后可以向其下游处理器生成一个或多个输出记录。

有两种特殊的processor:

Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。

Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。

Springboot整合Kafka Stream实时统计数据

相关的核心概念查看如下链接

Springboot整合Kafka Stream实时统计数据

下面演示Kafka Stream 在Springboot中的应用

依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-web</artifactId>
  4. </dependency>
  5. <dependency>
  6. <groupId>org.springframework.kafka</groupId>
  7. <artifactId>spring-kafka</artifactId>
  8. </dependency>
  9. <dependency>
  10. <groupId>org.apache.kafka</groupId>
  11. <artifactId>kafka-streams</artifactId>
  12. </dependency>

配置

  1. server:
  2. port:9090
  3. spring:
  4. application:
  5. name:kafka-demo
  6. kafka:
  7. streams:
  8. application-id:${spring.application.name}
  9. properties:
  10. spring.json.trusted.packages:'*'
  11. bootstrap-servers:
  12. -localhost:9092
  13. -localhost:9093
  14. -localhost:9094
  15. producer:
  16. acks:1
  17. retries:10
  18. key-serializer:org.apache.kafka.common.serialization.StringSerializer
  19. value-serializer:org.springframework.kafka.support.serializer.JsonSerializer#org.apache.kafka.common.serialization.StringSerializer
  20. properties:
  21. spring.json.trusted.packages:'*'
  22. consumer:
  23. key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
  24. value-deserializer:org.springframework.kafka.support.serializer.JsonDeserializer#org.apache.kafka.common.serialization.StringDeserializer
  25. enable-auto-commit:false
  26. group-id:ConsumerTest
  27. auto-offset-reset:latest
  28. properties:
  29. session.timeout.ms:12000
  30. heartbeat.interval.ms:3000
  31. max.poll.records:100
  32. spring.json.trusted.packages:'*'
  33. listener:
  34. ack-mode:manual-immediate
  35. type:batch
  36. concurrency:8
  37. properties:
  38. max.poll.interval.ms:300000

消息发送

  1. @Service
  2. publicclassMessageSend{
  3. @Resource
  4. privateKafkaTemplate<String,Message>kafkaTemplate;
  5. publicvoidsendMessage2(Messagemessage){
  6. kafkaTemplate.send(newProducerRecord<String,Message>("test",message)).addCallback(result->{
  7. System.out.println("执行成功..."+Thread.currentThread().getName());
  8. },ex->{
  9. System.out.println("执行失败");
  10. ex.printStackTrace();
  11. });
  12. }
  13. }

消息监听

  1. @KafkaListener(topics={"test"})
  2. publicvoidlistener2(List<ConsumerRecord<String,Message>>records,Acknowledgmentack){
  3. for(ConsumerRecord<String,Message>record:records){
  4. System.out.println(this.getClass().hashCode()+",Thread"+Thread.currentThread().getName()+",key:"+record.key()+",接收到消息:"+record.value()+",patition:"+record.partition()+",offset:"+record.offset());
  5. }
  6. try{
  7. TimeUnit.SECONDS.sleep(0);
  8. }catch(InterruptedExceptione){
  9. e.printStackTrace();
  10. }
  11. ack.acknowledge();
  12. }
  13. @KafkaListener(topics={"demo"})
  14. publicvoidlistenerDemo(List<ConsumerRecord<String,Message>>records,Acknowledgmentack){
  15. for(ConsumerRecord<String,Message>record:records){
  16. System.out.println("DemoTopic:"+this.getClass().hashCode()+",Thread"+Thread.currentThread().getName()+",key:"+record.key()+",接收到消息:"+record.value()+",patition:"+record.partition()+",offset:"+record.offset());
  17. }
  18. ack.acknowledge();
  19. }

Kafka Stream处理

消息转换并转发其它Topic

  1. @Bean
  2. publicKStream<Object,Object>kStream(StreamsBuilderstreamsBuilder){
  3. KStream<Object,Object>stream=streamsBuilder.stream("test");
  4. stream.map((key,value)->{
  5. System.out.println("原始消息内容:"+newString((byte[])value,Charset.forName("UTF-8")));
  6. returnnewKeyValue<>(key,"{\"title\":\"123123\",\"message\":\"重新定义内容\"}".getBytes(Charset.forName("UTF-8")));
  7. }).to("demo");
  8. returnstream;
  9. }

执行结果:

Springboot整合Kafka Stream实时统计数据

Stream对象处理

  1. @Bean
  2. publicKStream<String,Message>kStream4(StreamsBuilderstreamsBuilder){
  3. JsonSerde<Message>jsonSerde=newJsonSerde<>();
  4. JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
  5. descri.addTrustedPackages("*");
  6. KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
  7. stream.map((key,value)->{
  8. value.setTitle("XXXXXXX");
  9. returnnewKeyValue<>(key,value);
  10. }).to("demo",Produced.with(Serdes.String(),jsonSerde));
  11. returnstream;
  12. }

执行结果:

Springboot整合Kafka Stream实时统计数据

分组处理

  1. @Bean
  2. publicKStream<String,Message>kStream5(StreamsBuilderstreamsBuilder){
  3. JsonSerde<Message>jsonSerde=newJsonSerde<>();
  4. JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
  5. descri.addTrustedPackages("*");
  6. KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
  7. stream.selectKey(newKeyValueMapper<String,Message,String>(){
  8. @Override
  9. publicStringapply(Stringkey,Messagevalue){
  10. returnvalue.getOrgCode();
  11. }
  12. })
  13. .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
  14. .count()
  15. .toStream().print(Printed.toSysOut());
  16. returnstream;
  17. }

执行结果:

Springboot整合Kafka Stream实时统计数据

聚合

  1. @Bean
  2. publicKStream<String,Message>kStream6(StreamsBuilderstreamsBuilder){
  3. JsonSerde<Message>jsonSerde=newJsonSerde<>();
  4. JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
  5. descri.addTrustedPackages("*");
  6. KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
  7. stream.selectKey(newKeyValueMapper<String,Message,String>(){
  8. @Override
  9. publicStringapply(Stringkey,Messagevalue){
  10. returnvalue.getOrgCode();
  11. }
  12. })
  13. .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
  14. .aggregate(()->0L,(key,value,aggValue)->{
  15. System.out.println("key="+key+",value="+value+",agg="+aggValue);
  16. returnaggValue+1;
  17. },Materialized.<String,Long,KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
  18. .toStream().print(Printed.toSysOut());
  19. returnstream;
  20. }

执行结果:

Springboot整合Kafka Stream实时统计数据

Filter过滤数据

  1. @Bean
  2. publicKStream<String,Message>kStream7(StreamsBuilderstreamsBuilder){
  3. JsonSerde<Message>jsonSerde=newJsonSerde<>();
  4. JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
  5. descri.addTrustedPackages("*");
  6. KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
  7. stream.selectKey(newKeyValueMapper<String,Message,String>(){
  8. @Override
  9. publicStringapply(Stringkey,Messagevalue){
  10. returnvalue.getOrgCode();
  11. }
  12. })
  13. .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
  14. .aggregate(()->0L,(key,value,aggValue)->{
  15. System.out.println("key="+key+",value="+value+",agg="+aggValue);
  16. returnaggValue+1;
  17. },Materialized.<String,Long,KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
  18. .toStream()
  19. .filter((key,value)->!"2".equals(key))
  20. .print(Printed.toSysOut());
  21. returnstream;
  22. }

执行结果:

Springboot整合Kafka Stream实时统计数据

过滤Key不等于"2"

分支多流处理

  1. @Bean
  2. publicKStream<String,Message>kStream8(StreamsBuilderstreamsBuilder){
  3. JsonSerde<Message>jsonSerde=newJsonSerde<>();
  4. JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
  5. descri.addTrustedPackages("*");
  6. KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
  7. //分支,多流处理
  8. KStream<String,Message>[]arrStream=stream.branch(
  9. (key,value)->"男".equals(value.getSex()),
  10. (key,value)->"女".equals(value.getSex()));
  11. Stream.of(arrStream).forEach(as->{
  12. as.foreach((key,message)->{
  13. System.out.println(Thread.currentThread().getName()+",key="+key+",message="+message);
  14. });
  15. });
  16. returnstream;
  17. }

执行结果:

Springboot整合Kafka Stream实时统计数据

多字段分组

不能使用多个selectKey,后面的会覆盖前面的

  1. @Bean
  2. publicKStream<String,Message>kStreamM2(StreamsBuilderstreamsBuilder){
  3. JsonSerde<Message>jsonSerde=newJsonSerde<>();
  4. JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
  5. descri.addTrustedPackages("*");
  6. KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
  7. stream
  8. .selectKey(newKeyValueMapper<String,Message,String>(){
  9. @Override
  10. publicStringapply(Stringkey,Messagevalue){
  11. System.out.println(Thread.currentThread().getName());
  12. returnvalue.getTime()+"|"+value.getOrgCode();
  13. }
  14. })
  15. .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
  16. .count()
  17. .toStream().print(Printed.toSysOut());
  18. returnstream;
  19. }

执行结果:

Springboot整合Kafka Stream实时统计数据

©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经

(0)
打赏 微信扫码打赏 微信扫码打赏 支付宝扫码打赏 支付宝扫码打赏
清一色的头像清一色管理团队
上一篇 2023年5月4日 03:48
下一篇 2023年5月4日 03:48

相关推荐

发表评论

登录后才能评论

联系我们

在线咨询:1643011589-QQbutton

手机:13798586780

QQ/微信:1074760229

QQ群:551893940

工作时间:工作日9:00-18:00,节假日休息

关注微信