Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。
环境:springboot2.3.12.RELEASE + kafka_2.13-2.7.0 + zookeeper-3.6.2
Kafka Stream介绍
Kafka在0.10版本推出了Stream API,提供了对存储在Kafka内的数据进行流式处理和分析的能力。
流式计算一般被用来和批量计算做比较。批量计算往往有一个固定的数据集作为输入并计算结果。而流式计算的输入往往是“无界”的(Unbounded Data),持续输入的,即永远拿不到全量数据去做计算;同时,计算结果也是持续输出的,只能拿到某一个时刻的结果,而不是最终的结果。
Kafka Streams是一个客户端类库,用于处理和分析存储在Kafka中的数据。它建立在流式处理的一些重要的概念之上:如何区分事件时间和处理时间、Windowing的支持、简单高效的管理和实时查询应用程序状态。
Kafka Streams的门槛非常低:和编写一个普通的Kafka消息处理程序没有太大的差异,可以通过多进程部署来完成扩容、负载均衡、高可用(Kafka Consumer的并行模型)。
Kafka Streams的一些特点:
- 被设计成一个简单的、轻量级的客户端类库,能够被集成到任何Java应用中
- 除了Kafka之外没有任何额外的依赖,利用Kafka的分区模型支持水平扩容和保证顺序性
- 通过可容错的状态存储实现高效的状态操作(windowed joins and aggregations)
- 支持exactly-once语义
- 支持纪录级的处理,实现毫秒级的延迟
- 提供High-Level的Stream DSL和Low-Level的Processor API
Stream Processing Topology流处理拓扑
- 流是Kafka Streams提供的最重要的抽象:它表示一个无限的、不断更新的数据集。流是不可变数据记录的有序、可重放和容错序列,其中数据记录定义为键值对。
- Stream Processing Application是使用了Kafka Streams库的应用程序。它通过processor topologies定义计算逻辑,其中每个processor topology都是多个stream processor(节点)通过stream组成的图。
- A stream processor 是处理器拓扑中的节点;它表示一个处理步骤,通过每次从拓扑中的上游处理器接收一个输入记录,将其操作应用于该记录,来转换流中的数据,并且随后可以向其下游处理器生成一个或多个输出记录。
有两种特殊的processor:
Source Processor 源处理器是一种特殊类型的流处理器,它没有任何上游处理器。它通过使用来自一个或多个kafka topic的记录并将其转发到其下游处理器,从而从一个或多个kafka topic生成其拓扑的输入流。
Sink Processor 接收器处理器是一种特殊类型的流处理器,没有下游处理器。它将从其上游处理器接收到的任何记录发送到指定的kafka topic。
相关的核心概念查看如下链接
下面演示Kafka Stream 在Springboot中的应用
依赖
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.kafka</groupId>
- <artifactId>spring-kafka</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka-streams</artifactId>
- </dependency>
配置
- server:
- port:9090
- spring:
- application:
- name:kafka-demo
- kafka:
- streams:
- application-id:${spring.application.name}
- properties:
- spring.json.trusted.packages:'*'
- bootstrap-servers:
- -localhost:9092
- -localhost:9093
- -localhost:9094
- producer:
- acks:1
- retries:10
- key-serializer:org.apache.kafka.common.serialization.StringSerializer
- value-serializer:org.springframework.kafka.support.serializer.JsonSerializer#org.apache.kafka.common.serialization.StringSerializer
- properties:
- spring.json.trusted.packages:'*'
- consumer:
- key-deserializer:org.apache.kafka.common.serialization.StringDeserializer
- value-deserializer:org.springframework.kafka.support.serializer.JsonDeserializer#org.apache.kafka.common.serialization.StringDeserializer
- enable-auto-commit:false
- group-id:ConsumerTest
- auto-offset-reset:latest
- properties:
- session.timeout.ms:12000
- heartbeat.interval.ms:3000
- max.poll.records:100
- spring.json.trusted.packages:'*'
- listener:
- ack-mode:manual-immediate
- type:batch
- concurrency:8
- properties:
- max.poll.interval.ms:300000
消息发送
- @Service
- publicclassMessageSend{
- @Resource
- privateKafkaTemplate<String,Message>kafkaTemplate;
- publicvoidsendMessage2(Messagemessage){
- kafkaTemplate.send(newProducerRecord<String,Message>("test",message)).addCallback(result->{
- System.out.println("执行成功..."+Thread.currentThread().getName());
- },ex->{
- System.out.println("执行失败");
- ex.printStackTrace();
- });
- }
- }
消息监听
- @KafkaListener(topics={"test"})
- publicvoidlistener2(List<ConsumerRecord<String,Message>>records,Acknowledgmentack){
- for(ConsumerRecord<String,Message>record:records){
- System.out.println(this.getClass().hashCode()+",Thread"+Thread.currentThread().getName()+",key:"+record.key()+",接收到消息:"+record.value()+",patition:"+record.partition()+",offset:"+record.offset());
- }
- try{
- TimeUnit.SECONDS.sleep(0);
- }catch(InterruptedExceptione){
- e.printStackTrace();
- }
- ack.acknowledge();
- }
- @KafkaListener(topics={"demo"})
- publicvoidlistenerDemo(List<ConsumerRecord<String,Message>>records,Acknowledgmentack){
- for(ConsumerRecord<String,Message>record:records){
- System.out.println("DemoTopic:"+this.getClass().hashCode()+",Thread"+Thread.currentThread().getName()+",key:"+record.key()+",接收到消息:"+record.value()+",patition:"+record.partition()+",offset:"+record.offset());
- }
- ack.acknowledge();
- }
Kafka Stream处理
消息转换并转发其它Topic
- @Bean
- publicKStream<Object,Object>kStream(StreamsBuilderstreamsBuilder){
- KStream<Object,Object>stream=streamsBuilder.stream("test");
- stream.map((key,value)->{
- System.out.println("原始消息内容:"+newString((byte[])value,Charset.forName("UTF-8")));
- returnnewKeyValue<>(key,"{\"title\":\"123123\",\"message\":\"重新定义内容\"}".getBytes(Charset.forName("UTF-8")));
- }).to("demo");
- returnstream;
- }
执行结果:
Stream对象处理
- @Bean
- publicKStream<String,Message>kStream4(StreamsBuilderstreamsBuilder){
- JsonSerde<Message>jsonSerde=newJsonSerde<>();
- JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
- descri.addTrustedPackages("*");
- KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
- stream.map((key,value)->{
- value.setTitle("XXXXXXX");
- returnnewKeyValue<>(key,value);
- }).to("demo",Produced.with(Serdes.String(),jsonSerde));
- returnstream;
- }
执行结果:
分组处理
- @Bean
- publicKStream<String,Message>kStream5(StreamsBuilderstreamsBuilder){
- JsonSerde<Message>jsonSerde=newJsonSerde<>();
- JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
- descri.addTrustedPackages("*");
- KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
- stream.selectKey(newKeyValueMapper<String,Message,String>(){
- @Override
- publicStringapply(Stringkey,Messagevalue){
- returnvalue.getOrgCode();
- }
- })
- .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
- .count()
- .toStream().print(Printed.toSysOut());
- returnstream;
- }
执行结果:
聚合
- @Bean
- publicKStream<String,Message>kStream6(StreamsBuilderstreamsBuilder){
- JsonSerde<Message>jsonSerde=newJsonSerde<>();
- JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
- descri.addTrustedPackages("*");
- KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
- stream.selectKey(newKeyValueMapper<String,Message,String>(){
- @Override
- publicStringapply(Stringkey,Messagevalue){
- returnvalue.getOrgCode();
- }
- })
- .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
- .aggregate(()->0L,(key,value,aggValue)->{
- System.out.println("key="+key+",value="+value+",agg="+aggValue);
- returnaggValue+1;
- },Materialized.<String,Long,KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
- .toStream().print(Printed.toSysOut());
- returnstream;
- }
执行结果:
Filter过滤数据
- @Bean
- publicKStream<String,Message>kStream7(StreamsBuilderstreamsBuilder){
- JsonSerde<Message>jsonSerde=newJsonSerde<>();
- JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
- descri.addTrustedPackages("*");
- KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
- stream.selectKey(newKeyValueMapper<String,Message,String>(){
- @Override
- publicStringapply(Stringkey,Messagevalue){
- returnvalue.getOrgCode();
- }
- })
- .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
- .aggregate(()->0L,(key,value,aggValue)->{
- System.out.println("key="+key+",value="+value+",agg="+aggValue);
- returnaggValue+1;
- },Materialized.<String,Long,KeyValueStore<Bytes,byte[]>>as("kvs").withValueSerde(Serdes.Long()))
- .toStream()
- .filter((key,value)->!"2".equals(key))
- .print(Printed.toSysOut());
- returnstream;
- }
执行结果:
过滤Key不等于"2"
分支多流处理
- @Bean
- publicKStream<String,Message>kStream8(StreamsBuilderstreamsBuilder){
- JsonSerde<Message>jsonSerde=newJsonSerde<>();
- JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
- descri.addTrustedPackages("*");
- KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
- //分支,多流处理
- KStream<String,Message>[]arrStream=stream.branch(
- (key,value)->"男".equals(value.getSex()),
- (key,value)->"女".equals(value.getSex()));
- Stream.of(arrStream).forEach(as->{
- as.foreach((key,message)->{
- System.out.println(Thread.currentThread().getName()+",key="+key+",message="+message);
- });
- });
- returnstream;
- }
执行结果:
多字段分组
不能使用多个selectKey,后面的会覆盖前面的
- @Bean
- publicKStream<String,Message>kStreamM2(StreamsBuilderstreamsBuilder){
- JsonSerde<Message>jsonSerde=newJsonSerde<>();
- JsonDeserializer<Message>descri=(JsonDeserializer<Message>)jsonSerde.deserializer();
- descri.addTrustedPackages("*");
- KStream<String,Message>stream=streamsBuilder.stream("test",Consumed.with(Serdes.String(),jsonSerde));
- stream
- .selectKey(newKeyValueMapper<String,Message,String>(){
- @Override
- publicStringapply(Stringkey,Messagevalue){
- System.out.println(Thread.currentThread().getName());
- returnvalue.getTime()+"|"+value.getOrgCode();
- }
- })
- .groupByKey(Grouped.with(Serdes.String(),jsonSerde))
- .count()
- .toStream().print(Printed.toSysOut());
- returnstream;
- }
执行结果:
©本文为清一色官方代发,观点仅代表作者本人,与清一色无关。清一色对文中陈述、观点判断保持中立,不对所包含内容的准确性、可靠性或完整性提供任何明示或暗示的保证。本文不作为投资理财建议,请读者仅作参考,并请自行承担全部责任。文中部分文字/图片/视频/音频等来源于网络,如侵犯到著作权人的权利,请与我们联系(微信/QQ:1074760229)。转载请注明出处:清一色财经