实时流式计算
实时流式计算 – Kafka Stream
2.1 概述
Kafka Stream是Apache Kafka从0.10版本引入的一个新Feature。它是提供了对存储于Kafka内的数据进行流式处理和分析的功能。
Kafka Stream的特点如下:
- Kafka Stream提供了一个非常简单而轻量的Library,它可以非常方便地嵌入任意Java应用中,也可以任意方式打包和部署
- 除了Kafka外,无任何外部依赖
- 充分利用Kafka分区机制实现水平扩展和顺序性保证
- 通过可容错的state store实现高效的状态操作(如windowed join和aggregation)
- 支持正好一次处理语义
- 提供记录级的处理能力,从而实现毫秒级的低延迟
- 支持基于事件时间的窗口操作,并且可处理晚到的数据(late arrival of records)
- 同时提供底层的处理原语Processor(类似于Storm的spout和bolt),以及高层抽象的DSL(类似于Spark的map/group/reduce)
2.2 Kafka Streams的关键概念
(1)Stream处理拓扑
- 流是Kafka Stream提出的最重要的抽象概念:它表示一个无限的,不断更新的数据集。流是一个有序的,可重放(反复的使用),不可变的容错序列,数据记录的格式是键值对(key-value)。
- 通过Kafka Streams编写一个或多个的计算逻辑的处理器拓扑。其中处理器拓扑是一个由流(边缘)连接的流处理(节点)的图。
-
流处理器是
处理器拓扑
中的一个节点;它表示一个处理的步骤,用来转换流中的数据(从拓扑中的上游处理器一次接受一个输入消息,并且随后产生一个或多个输出消息到其下游处理器中)。
(2)在拓扑中有两个特别的处理器:
- 源处理器(Source Processor):源处理器是一个没有任何上游处理器的特殊类型的流处理器。它从一个或多个kafka主题生成输入流。通过消费这些主题的消息并将它们转发到下游处理器。
- Sink处理器:sink处理器是一个没有下游流处理器的特殊类型的流处理器。它接收上游流处理器的消息发送到一个指定的Kafka主题。
2.3 KStream&KTable
(1)数据结构类似于map,如下图,key-value键值对
(2)KStream
KStream数据流(data stream),即是一段顺序的,可以无限长,不断更新的数据集。
数据流中比较常记录的是事件,这些事件可以是一次鼠标点击(click),一次交易,或是传感器记录的位置数据。
KStream负责抽象的,就是数据流。与Kafka自身topic中的数据一样,类似日志,每一次操作都是向其中插入(insert)新数据。
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回4
了alice
。为什么?因为第二条数据记录将不被视为先前记录的更新。(insert)新数据
(3)KTable
KTable传统数据库,包含了各种存储了大量状态(state)的表格。KTable负责抽象的,就是表状数据。每一次操作,都是更新插入(update)
为了说明这一点,让我们想象一下以下两个数据记录正在发送到流中:
(“ alice”,1)->(“” alice“,3)
如果您的流处理应用是要总结每个用户的价值,它将返回3
了alice
。为什么?因为第二条数据记录将被视为先前记录的更新。
KStream – 每个新数据都包含了部分信息。
KTable – 每次更新都合并到原记录上。
2.4 Kafka Stream入门案例编写
(1)引入依赖
在之前的kafka-demo工程的pom文件中引入
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>${kafka.client.version}</version>
<exclusions>
<exclusion>
<artifactId>connect-json</artifactId>
<groupId>org.apache.kafka</groupId>
</exclusion>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
(2)创建类
package com.itheima.kafka.simple;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.*;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
/**
* 统计消息中hello出现的次数
*/
public class KafkaStreamSample {
private static final String INPUT_TOPIC="article_behavior_input";
private static final String OUT_TOPIC="article_behavior_out";
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.200.130:9092");
props.put(StreamsConfig.APPLICATION_ID_CONFIG,"article_behavior_count");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
// 构建过滤聚合条件
group(builder);
final KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
// 如果定义流计算过程
static void group(final StreamsBuilder builder){
/**
* 第一个参数:消费的消息名称
* 第二个参数:LATEST 最近的 EARLIEST 更早的
*/
KStream<String,String> stream = builder.stream(INPUT_TOPIC, Consumed.with(Topology.AutoOffsetReset.LATEST));
//聚合的时间窗口,10秒中聚合一次
KStream<String, String> map = stream.groupByKey()
.windowedBy(TimeWindows.of(10000))
.aggregate(new Initializer<String>() {
//初始聚合
@Override
public String apply() {
return "啥都没有";
}
}, new Aggregator<String, String, String>() {
/**
* 多次聚合
* @param aggKey 消息的key值,发送消息的key
* @param value 消息的value值 发送消息的value值
* @param aggValue 上面apply的方法的返回值
* * @return
*/
@Override
public String apply(String aggKey, String value, String aggValue) {
if(StringUtils.isEmpty(value)){
return aggValue;
}
int count = 0;
List<String> strings = Arrays.asList(value.toString().toLowerCase()split(","));
for (String string : strings) {
if(string.equals("hello")){
count++;
}
}
return String.format("hello:%d", count);
}
}, Materialized.as("count-article-num-miukoo-1"))
/**
* key : 发消息的key值
* value : 上面聚合以后的返回值
*/
.toStream().map((key, value) -> {
return new KeyValue<>(key.key().toString(), value);
});
/**
* 把聚合之后的消息发送到另外一个topic中
*/
map.to(OUT_TOPIC,Produced.with(Serdes.String(),Serdes.String()));
}
}
(3)测试
准备
- 使用生产者在topic为:
input_topic
中发送多条消息 - 使用消费者接收topic为:
out_topic
结果:
- 通过流式计算,会把生产者的多条消息汇总成一条发送到消费者中输出
2.5 SpringBoot集成Kafka Stream
从资料文件夹中把提供好的4个类拷贝到项目的config目录下
当前kafka-demo项目需要添加lombok的依赖包
<properties>
<lombok.version>1.18.8</lombok.version>
</properties>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
(1)自定配置参数
/**
* 通过重新注册KafkaStreamsConfiguration对象,设置自定配置参数
*/
@Setter
@Getter
@Configuration
@EnableKafkaStreams
@ConfigurationProperties(prefix="kafka")
public class KafkaStreamConfig {
private static final int MAX_MESSAGE_SIZE = 16* 1024 * 1024;
private String hosts;
private String group;
/**
* 重新定义默认的KafkaStreams配置属性,包括:
* 1、服务器地址
* 2、应用ID
* 3、流消息的副本数等配置
* @return
*/
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration defaultKafkaStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, hosts);
props.put(StreamsConfig.APPLICATION_ID_CONFIG, this.getGroup()+"_stream_aid");
props.put(StreamsConfig.CLIENT_ID_CONFIG, this.getGroup()+"_stream_cid");
props.put(StreamsConfig.RETRIES_CONFIG, 10);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
// 消息副本数量
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 1);
props.put(StreamsConfig.RETRY_BACKOFF_MS_CONFIG, 5_000);
props.put(StreamsConfig.SEND_BUFFER_CONFIG, 3*MAX_MESSAGE_SIZE);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, Topology.AutoOffsetReset.EARLIEST.name().toLowerCase());
return new KafkaStreamsConfiguration(props);
}
}
修改application.yml文件,在最下方添加自定义配置
kafka:
hosts: 192.168.200.130:9092
group: ${spring.application.name}
(2)定义监听接口
/**
* 流数据的监听消费者实现的接口类,系统自动会通过
* KafkaStreamListenerFactory类扫描项目中实现该接口的类,
* 并注册为流数据的消费端。
*
* 其中泛型可是KStream或KTable
* @param <T>
*/
public interface KafkaStreamListener<T> {
// 监听的类型
String listenerTopic();
// 处理结果发送的类
String sendTopic();
// 对象处理逻辑
T getService(T stream);
}
(3)KafkaStream自动处理包装类
/**
* KafkaStream自动处理包装类
*/
public class KafkaStreamProcessor {
// 流构建器
StreamsBuilder streamsBuilder;
private String type;
KafkaStreamListener listener;
public KafkaStreamProcessor(StreamsBuilder streamsBuilder,KafkaStreamListener kafkaStreamListener){
this.streamsBuilder = streamsBuilder;
this.listener = kafkaStreamListener;
this.parseType();
Assert.notNull(this.type,"Kafka Stream 监听器只支持kstream、ktable,当前类型是"+this.type);
}
/**
* 通过泛型类型自动注册对应类型的流处理器对象
* 支持KStream、KTable
* @return
*/
public Object doAction(){
if("kstream".equals(this.type)) {
KStream<?, ?> stream = streamsBuilder.stream(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
stream=(KStream)listener.getService(stream);
stream.to(listener.sendTopic());
return stream;
}else{
KTable<?, ?> table = streamsBuilder.table(listener.listenerTopic(), Consumed.with(Topology.AutoOffsetReset.LATEST));
table = (KTable)listener.getService(table);
table.toStream().to(listener.sendTopic());
return table;
}
}
/**
* 解析传入listener类的泛型类
*/
private void parseType(){
Type[] types = listener.getClass().getGenericInterfaces();
if(types!=null){
for (int i = 0; i < types.length; i++) {
if( types[i] instanceof ParameterizedType){
ParameterizedType t = (ParameterizedType)types[i];
String name = t.getActualTypeArguments()[0].getTypeName().toLowerCase();
if(name.contains("org.apache.kafka.streams.kstream.kstream")||name.contains("org.apache.kafka.streams.kstream.ktable")){
this.type = name.substring(0,name.indexOf(\'<\')).replace("org.apache.kafka.streams.kstream.","").trim();
break;
}
}
}
}
}
}
(4)KafkaStreamListener扫描和实例化成KafkaStreamProcessor.doAction的返回类,完成监听器实际注册的过程
@Component
public class KafkaStreamListenerFactory implements InitializingBean {
Logger logger = LoggerFactory.getLogger(KafkaStreamListenerFactory.class);
@Autowired
DefaultListableBeanFactory defaultListableBeanFactory;
/**
* 初始化完成后自动调用
*/
@Override
public void afterPropertiesSet() {
Map<String, KafkaStreamListener> map = defaultListableBeanFactory.getBeansOfType(KafkaStreamListener.class);
for (String key : map.keySet()) {
KafkaStreamListener k = map.get(key);
KafkaStreamProcessor processor = new KafkaStreamProcessor(defaultListableBeanFactory.getBean(StreamsBuilder.class),k);
String beanName = k.getClass().getSimpleName()+"AutoProcessor" ;
//注册baen,并且执行doAction方法
defaultListableBeanFactory.registerSingleton(beanName,processor.doAction());
logger.info("add kafka stream auto listener [{}]",beanName);
}
}
}
(5)手动创建监听器
1,该类需要实现KafkaStreamListener接口
2,listenerTopic方法返回需要监听的topic
3,sendTopic方法返回需要处理完后发送的topic
4,getService方法,主要处理流数据
package com.itheima.kafka.stream;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.*;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import java.util.Arrays;
import java.util.List;
@Component
public class StreamListener implements com.itheima.kafka.config.KafkaStreamListener<KStream<?, String>> {
@Override
public String listenerTopic() {
return "input_topic";
}
@Override
public String sendTopic() {
return "out_topic";
}
@Override
public KStream<?, String> getService(KStream<?, String> stream) {
return stream.groupByKey().windowedBy(TimeWindows.of(10000)).aggregate(
new Initializer<String>() {
@Override
public String apply() {
return "啥也没有";
}
}, new Aggregator<Object, String, String>() {
@Override
public String apply(Object key, String value, String aggValue) {
System.out.println("spring整合以后的第二次聚合-------");
if(StringUtils.isEmpty(value)){
return aggValue;
}
List<String> strings = Arrays.asList(value.toLowerCase().split(","));
int count = 0;
for (String string : strings) {
if(string.equals("hello")){
count++;
}
}
return String.format("hello:%d",count);
}
},Materialized.as("kafka-stream-sample-count")
).toStream().map((key, value) -> {
return new KeyValue<>(key.key().toString(), value);
});
}
}
测试:
启动微服务,正常发送消息,可以正常接收到消息