和其他所有的计算框架一样,flink也有一些基础的开发步骤以及基础,核心的API,从开发步骤的角度来讲,主要分为四大部分

Flink从入门到入土

Flink从入门到入土

Flink Job在提交执行计算时,需要首先建立和Flink框架之间的联系,也就指的是当前的flink运行环境,只有获取了环境信息,才能将task调度到不同的taskManager执行。而这个环境对象的获取方式相对比较简单

  1. // 批处理环境
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. // 流式数据处理环境
  4. val env = StreamExecutionEnvironment.getExecutionEnvironment

 

Flink从入门到入土

Flink框架可以从不同的来源获取数据,将数据提交给框架进行处理, 我们将获取数据的来源称之为数据源.

一般情况下,可以将数据临时存储到内存中,形成特殊的数据结构后,作为数据源使用。这里的数据结构采用集合类型是比较普遍的

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:从集合读取数据
  10. */
  11. object SourceList {
  12. def main(args: Array[String]): Unit = {
  13. //1.创建执行的环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. //2.从集合中读取数据
  16. val sensorDS: DataStream[WaterSensor] = env.fromCollection(
  17. // List(1,2,3,4,5)
  18. List(
  19. WaterSensor("ws_001", 1577844001, 45.0),
  20. WaterSensor("ws_002", 1577844015, 43.0),
  21. WaterSensor("ws_003", 1577844020, 42.0)
  22. )
  23. )
  24. //3.打印
  25. sensorDS.print()
  26. //4.执行
  27. env.execute("sensor")
  28. }
  29. /**
  30. * 定义样例类:水位传感器:用于接收空高数据
  31. *
  32. * @param id 传感器编号
  33. * @param ts 时间戳
  34. * @param vc 空高
  35. */
  36. case class WaterSensor(id: String, ts: Long, vc: Double)
  37. }

 

Flink从入门到入土

通常情况下,我们会从存储介质中获取数据,比较常见的就是将日志文件作为数据源

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:从文件读取数据
  10. */
  11. object SourceFile {
  12. def main(args: Array[String]): Unit = {
  13. //1.创建执行的环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. //2.从指定路径获取数据
  16. val fileDS: DataStream[String] = env.readTextFile("input/data.log")
  17. //3.打印
  18. fileDS.print()
  19. //4.执行
  20. env.execute("sensor")
  21. }
  22. }
  23. /**
  24. * 在读取文件时,文件路径可以是目录也可以是单一文件。如果采用相对文件路径,会从当前系统参数user.dir中获取路径
  25. * System.getProperty("user.dir")
  26. */
  27. /**
  28. * 如果在IDEA中执行代码,那么系统参数user.dir自动指向项目根目录,
  29. * 如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外,
  30. * 也可以将路径设置为分布式文件系统路径,如HDFS
  31. val fileDS: DataStream[String] =
  32. env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")
  33. */

 

Flink从入门到入土

如果是standalone集群环境, 默认为集群节点根目录,当然除了相对路径以外,也可以将路径设置为分布式文件系统路径,如HDFS

  1. val fileDS: DataStream[String] =
  2. env.readTextFile( "hdfs://hadoop02:9000/test/1.txt")

 

默认读取时,flink的依赖关系中是不包含Hadoop依赖关系的,所以执行上面代码时,会出现错误。

Flink从入门到入土

解决方法就是增加相关依赖jar包就可以了

Flink从入门到入土

Kafka作为消息传输队列,是一个分布式的,高吞吐量,易于扩展地基于主题发布/订阅的消息系统。在现今企业级开发中,Kafka 和 Flink成为构建一个实时的数据处理系统的首选

  1. <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.11 -->
  2. <dependency>
  3. <groupId>org.apache.flink</groupId>
  4. <artifactId>flink-connector-kafka-0.11_2.11</artifactId>
  5. <version>1.10.0</version>
  6. </dependency>

 

  1. import java.util.Properties
  2. import org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
  4. import org.apache.flink.streaming.util.serialization.SimpleStringSchema
  5. /**
  6. * description: SourceList
  7. * date: 2020/8/28 19:02
  8. * version: 1.0
  9. *
  10. * @author 阳斌
  11. * 邮箱:1692207904@qq.com
  12. * 类的说明:从kafka读取数据
  13. */
  14. object SourceKafka {
  15. def main(args: Array[String]): Unit = {
  16. val env: StreamExecutionEnvironment =
  17. StreamExecutionEnvironment.getExecutionEnvironment
  18. val properties = new Properties()
  19. properties.setProperty("bootstrap.servers", "hadoop02:9092")
  20. properties.setProperty("group.id", "consumer-group")
  21. properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  22. properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
  23. properties.setProperty("auto.offset.reset", "latest")
  24. val kafkaDS: DataStream[String] = env.addSource(
  25. new FlinkKafkaConsumer011[String](
  26. "sensor",
  27. new SimpleStringSchema(),
  28. properties)
  29. )
  30. kafkaDS.print()
  31. env.execute("sensor")
  32. }
  33. }

 

大多数情况下,前面的数据源已经能够满足需要,但是难免会存在特殊情况的场合,所以flink也提供了能自定义数据源的方式

  1. import com.atyang.day01.Source.SourceList.WaterSensor
  2. import org.apache.flink.streaming.api.functions.source.SourceFunction
  3. import scala.util.Random
  4. /**
  5. * description: ss
  6. * date: 2020/8/28 20:36
  7. * version: 1.0
  8. *
  9. * @author 阳斌
  10. * 邮箱:1692207904@qq.com
  11. * 类的说明:自定义数据源
  12. */
  13. class MySensorSource extends SourceFunction[WaterSensor] {
  14. var flg = true
  15. override def run(ctx: SourceFunction.SourceContext[WaterSensor]): Unit = {
  16. while ( flg ) {
  17. // 采集数据
  18. ctx.collect(
  19. WaterSensor(
  20. "sensor_" +new Random().nextInt(3),
  21. 1577844001,
  22. new Random().nextInt(5)+40
  23. )
  24. )
  25. Thread.sleep(100)
  26. }
  27. }
  28. override def cancel(): Unit = {
  29. flg = false;
  30. }
  31. }

 

Flink从入门到入土

Flink从入门到入土

在Spark中,算子分为转换算子和行动算子,转换算子的作用可以通过算子方法的调用将一个RDD转换另外一个RDD,Flink中也存在同样的操作,可以将一个数据流转换为其他的数据流。

转换过程中,数据流的类型也会发生变化,那么到底Flink支持什么样的数据类型呢,其实我们常用的数据类型,Flink都是支持的。比如:Long, String, Integer, Int, 元组,样例类,List, Map等。

  • 映射:将数据流中的数据进行转换, 形成新的数据流,消费一个元素并产出一个元素
  • 参数:Scala匿名函数或MapFunction
  • 返回:DataStream
  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:从集合读取数据
  10. */
  11. object Transfrom_map {
  12. def main(args: Array[String]): Unit = {
  13. //1.创建执行的环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. //2.从集合中读取数据
  16. val sensorDS: DataStream[WaterSensor] = env.fromCollection(
  17. // List(1,2,3,4,5)
  18. List(
  19. WaterSensor("ws_001", 1577844001, 45.0),
  20. WaterSensor("ws_002", 1577844015, 43.0),
  21. WaterSensor("ws_003", 1577844020, 42.0)
  22. )
  23. )
  24. val sensorDSMap = sensorDS.map(x => (x.id+"_1",x.ts+"_1",x.vc + 1))
  25. //3.打印
  26. sensorDSMap.print()
  27. //4.执行
  28. env.execute("sensor")
  29. }
  30. /**
  31. * 定义样例类:水位传感器:用于接收空高数据
  32. *
  33. * @param id 传感器编号
  34. * @param ts 时间戳
  35. * @param vc 空高
  36. */
  37. case class WaterSensor(id: String, ts: Long, vc: Double)
  38. }

 

Flink从入门到入土

Flink为每一个算子的参数都至少提供了Scala匿名函数和函数类两种的方式,其中如果使用函数类作为参数的话,需要让自定义函数继承指定的父类或实现特定的接口。例如:MapFunction

sensor-data.log 文件数据

  1. sensor_1,1549044122,10
  2. sensor_1,1549044123,20
  3. sensor_1,1549044124,30
  4. sensor_2,1549044125,40
  5. sensor_1,1549044126,50
  6. sensor_2,1549044127,60
  7. sensor_1,1549044128,70
  8. sensor_3,1549044129,80
  9. sensor_3,1549044130,90
  10. sensor_3,1549044130,100
  11. import org.apache.flink.streaming.api.scala._
  12. /**
  13. * description: SourceList
  14. * date: 2020/8/28 19:02
  15. * version: 1.0
  16. *
  17. * @author 阳斌
  18. * 邮箱:1692207904@qq.com
  19. * 类的说明:从文件读取数据
  20. */
  21. object SourceFileMap {
  22. def main(args: Array[String]): Unit = {
  23. //1.创建执行的环境
  24. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  25. //2.从指定路径获取数据
  26. val fileDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  27. val MapDS = fileDS.map(
  28. lines => {
  29. //更加逗号切割 获取每个元素
  30. val datas: Array[String] = lines.split(",")
  31. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  32. }
  33. )
  34. //3.打印
  35. MapDS.print()
  36. //4.执行
  37. env.execute("map")
  38. }
  39. /**
  40. * 定义样例类:水位传感器:用于接收空高数据
  41. *
  42. * @param id 传感器编号
  43. * @param ts 时间戳
  44. * @param vc 空高
  45. */
  46. case class WaterSensor(id: String, ts: Long, vc: Double)
  47. }

 

Flink从入门到入土

  1. import org.apache.flink.api.common.functions.MapFunction
  2. import org.apache.flink.streaming.api.scala._
  3. /**
  4. * description: SourceList
  5. * date: 2020/8/28 19:02
  6. * version: 1.0
  7. *
  8. * @author 阳斌
  9. * 邮箱:1692207904@qq.com
  10. * 类的说明:从文件读取数据
  11. */
  12. object Transform_MapFunction {
  13. def main(args: Array[String]): Unit = {
  14. //1.创建执行的环境
  15. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  16. //2.从指定路径获取数据
  17. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  18. sensorDS.map()
  19. //3.打印
  20. // MapDS.print()
  21. //4.执行
  22. env.execute("map")
  23. }
  24. /**
  25. * 自定义继承 MapFunction
  26. * MapFunction[T,O]
  27. * 自定义输入和输出
  28. *
  29. */
  30. class MyMapFunction extends MapFunction[String,WaterSensor]{
  31. override def map(t: String): WaterSensor = {
  32. val datas: Array[String] = t.split(",")
  33. WaterSensor(datas(0),datas(1).toLong,datas(2).toInt)
  34. }
  35. }
  36. /**
  37. * 定义样例类:水位传感器:用于接收空高数据
  38. *
  39. * @param id 传感器编号
  40. * @param ts 时间戳
  41. * @param vc 空高
  42. */
  43. case class WaterSensor(id: String, ts: Long, vc: Double)
  44. }

 

Flink从入门到入土

所有Flink函数类都有其Rich版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能。也有意味着提供了更多的,更丰富的功能。例如:RichMapFunction

sensor-data.log 文件数据 同上一致

  1. import org.apache.flink.api.common.functions.{MapFunction, RichMapFunction}
  2. import org.apache.flink.configuration.Configuration
  3. import org.apache.flink.streaming.api.scala._
  4. /**
  5. * description: SourceList
  6. * date: 2020/8/28 19:02
  7. * version: 1.0
  8. *
  9. * @author 阳斌
  10. * 邮箱:1692207904@qq.com
  11. * 类的说明:从文件读取数据
  12. */
  13. object Transform_RichMapFunction {
  14. def main(args: Array[String]): Unit = {
  15. //1.创建执行的环境
  16. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  17. //2.从指定路径获取数据
  18. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  19. val myMapDS: DataStream[WaterSensor] = sensorDS.map(new MyRichMapFunction)
  20. //3.打印
  21. myMapDS.print()
  22. //4.执行
  23. env.execute("map")
  24. }
  25. /**
  26. * 自定义继承 MapFunction
  27. * MapFunction[T,O]
  28. * 自定义输入和输出
  29. *
  30. */
  31. class MyRichMapFunction extends RichMapFunction[String,WaterSensor]{
  32. override def map(value: String): WaterSensor = {
  33. val datas: Array[String] = value.split(",")
  34. // WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  35. WaterSensor(getRuntimeContext.getTaskName, datas(1).toLong, datas(2).toInt)
  36. }
  37. // 富函数提供了生命周期方法
  38. override def open(parameters: Configuration): Unit = {}
  39. override def close(): Unit = {}
  40. }
  41. /**
  42. * 定义样例类:水位传感器:用于接收空高数据
  43. *
  44. * @param id 传感器编号
  45. * @param ts 时间戳
  46. * @param vc 空高
  47. */
  48. case class WaterSensor(id: String, ts: Long, vc: Double)
  49. }

 

Rich Function有一个生命周期的概念。典型的生命周期方法有:

  • open()方法是rich function的初始化方法,当一个算子例如map或者filter被调 用之前open()会被调用
  • close()方法是生命周期中的最后一个调用的方法,做一些清理工作
  • getRuntimeContext()方法提供了函数的RuntimeContext的一些信息,例如函数执行         的并行度,任务的名字,以及state状态
  • 扁平映射:将数据流中的整体拆分成一个一个的个体使用,消费一个元素并产生零到多个元素
  • 参数:Scala匿名函数或FlatMapFunction
  • 返回:DataStream

Flink从入门到入土

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:FlatMap
  10. */
  11. object Transform_FlatMap {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val listDS: DataStream[List[Int]] = env.fromCollection(
  18. List(
  19. List(1, 2, 3, 4),
  20. List(5, 6, 7,1,1,1)
  21. )
  22. )
  23. val resultDS: DataStream[Int] = listDS.flatMap(list => list)
  24. resultDS.print()
  25. // 4. 执行
  26. env.execute()
  27. }
  28. }

 

Flink从入门到入土

  • 过滤:根据指定的规则将满足条件(true)的数据保留,不满足条件(false)的数据丢弃
  • 参数:Scala匿名函数或FilterFunction
  • 返回:DataStream
  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:Filter
  10. */
  11. object Transform_Filter {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val listDS: DataStream[List[Int]] = env.fromCollection(
  18. List(
  19. List(1, 2, 3, 4,1, 2, 3, 4),
  20. List(5, 6, 7,1,1,1,1, 2, 3, 4,1, 2, 3, 4),
  21. List(1, 2, 3, 4),
  22. List(5, 6, 7,1,1,1),
  23. List(1, 2, 3, 4),
  24. List(5, 6, 7,1,1,1)
  25. )
  26. )
  27. // true就留下,false就抛弃
  28. listDS.filter(num => {
  29. num.size>5
  30. })
  31. .print("filter")
  32. // 4. 执行
  33. env.execute()
  34. }
  35. }

 

Flink从入门到入土

在Spark中有一个GroupBy的算子,用于根据指定的规则将数据进行分组,在flink中也有类似的功能,那就是keyBy,根据指定的key对数据进行分流

  • 分流:根据指定的Key将元素发送到不同的分区,相同的Key会被分到一个分区(这里分区指的就是下游算子多个并行节点的其中一个)。keyBy()是通过哈希来分区的

Flink从入门到入土

 

  • 参数:Scala匿名函数或POJO属性或元组索引,不能使用数组

  • 返回:KeyedStream

Flink从入门到入土

 

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:FlatMap
  10. */
  11. object Transform_KeyBy {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  18. //3.转换为样例类
  19. val mapDS = sensorDS.map(
  20. lines => {
  21. val datas = lines.split(",")
  22. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  23. }
  24. )
  25. // 4. 使用keyby进行分组
  26. // TODO 关于返回的key的类型:
  27. // 1. 如果是位置索引 或 字段名称 ,程序无法推断出key的类型,所以给一个java的Tuple类型
  28. // 2. 如果是匿名函数 或 函数类 的方式,可以推断出key的类型,比较推荐使用
  29. // *** 分组的概念:分组只是逻辑上进行分组,打上了记号(标签),跟并行度没有绝对的关系
  30. // 同一个分组的数据在一起(不离不弃)
  31. // 同一个分区里可以有多个不同的组
  32. // val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy(0)
  33. // val sensorKS: KeyedStream[WaterSensor, Tuple] = mapDS.keyBy("id")
  34. val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
  35. // val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(
  36. // new KeySelector[WaterSensor, String] {
  37. // override def getKey(value: WaterSensor): String = {
  38. // value.id
  39. // }
  40. // }
  41. // )
  42. sensorKS.print().setParallelism(5)
  43. // 4. 执行
  44. env.execute()
  45. }
  46. /**
  47. * 定义样例类:水位传感器:用于接收空高数据
  48. *
  49. * @param id 传感器编号
  50. * @param ts 时间戳
  51. * @param vc 空高
  52. */
  53. case class WaterSensor(id: String, ts: Long, vc: Double)
  54. }

 

Flink从入门到入土

  • 打乱重组(洗牌):将数据按照均匀分布打散到下游
  • 参数:无
  • 返回:DataStream

Flink从入门到入土

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:FlatMap
  10. */
  11. object Transform_Shuffle {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  18. val shuffleDS = sensorDS.shuffle
  19. sensorDS.print("data")
  20. shuffleDS.print("shuffle")
  21. // 4. 执行
  22. env.execute()
  23. }
  24. }

 

Flink从入门到入土

在某些情况下,我们需要将数据流根据某些特征拆分成两个或者多个数据流,给不同数据流增加标记以便于从流中取出。

Flink从入门到入土

需求:将水位传感器数据按照空高高低(以40cm,30cm为界),拆分成三个流

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:FlatMap
  10. */
  11. object Transform_Split {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  18. // 3.转换成样例类
  19. val mapDS: DataStream[WaterSensor] = sensorDS.map(
  20. lines => {
  21. val datas: Array[String] = lines.split(",")
  22. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  23. }
  24. )
  25. val splitSS: SplitStream[WaterSensor] = mapDS.split(
  26. sensor => {
  27. if (sensor.vc < 40) {
  28. Seq("normal")
  29. } else if (sensor.vc < 80) {
  30. Seq("Warn")
  31. } else {
  32. Seq("alarm")
  33. }
  34. }
  35. )
  36. // 4. 执行
  37. env.execute()
  38. }
  39. /**
  40. * 定义样例类:水位传感器:用于接收空高数据
  41. *
  42. * @param id 传感器编号
  43. * @param ts 时间戳
  44. * @param vc 空高
  45. */
  46. case class WaterSensor(id: String, ts: Long, vc: Double)
  47. }

 

将数据流进行切分后,如何从流中将不同的标记取出呢,这时就需要使用select算子了。

Flink从入门到入土

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:FlatMap
  10. */
  11. object Transform_Split {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  18. // 3.转换成样例类
  19. val mapDS: DataStream[WaterSensor] = sensorDS.map(
  20. lines => {
  21. val datas: Array[String] = lines.split(",")
  22. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  23. }
  24. )
  25. val splitDS: SplitStream[WaterSensor] = mapDS.split(
  26. sensor => {
  27. if (sensor.vc < 40) {
  28. Seq("info")
  29. } else if (sensor.vc < 80) {
  30. Seq("warn")
  31. } else {
  32. Seq("error")
  33. }
  34. }
  35. )
  36. val errorDS: DataStream[WaterSensor] = splitDS.select("error")
  37. val warnDS: DataStream[WaterSensor] = splitDS.select("warn")
  38. val infoDS: DataStream[WaterSensor] = splitDS.select("info")
  39. infoDS.print("info")
  40. warnDS.print("warn")
  41. errorDS.print("error")
  42. // 4. 执行
  43. env.execute()
  44. }
  45. /**
  46. * 定义样例类:水位传感器:用于接收空高数据
  47. *
  48. * @param id 传感器编号
  49. * @param ts 时间戳
  50. * @param vc 空高
  51. */
  52. case class WaterSensor(id: String, ts: Long, vc: Double)
  53. }

 

Flink从入门到入土

在某些情况下,我们需要将两个不同来源的数据流进行连接,实现数据匹配,比如订单支付和第三方交易信息,这两个信息的数据就来自于不同数据源,连接后,将订单支付和第三方交易信息进行对账,此时,才能算真正的支付完成。

Flink中的connect算子可以连接两个保持他们类型的数据流,两个数据流被Connect之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。

Flink从入门到入土

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:FlatMap
  10. */
  11. object Transform_Connect {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  18. // 3.转换成样例类
  19. val mapDS: DataStream[WaterSensor] = sensorDS.map(
  20. lines => {
  21. val datas: Array[String] = lines.split(",")
  22. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  23. }
  24. )
  25. // 4. 从集合中再读取一条流
  26. val numDS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4, 5, 6))
  27. val resultCS: ConnectedStreams[WaterSensor, Int] = mapDS.connect(numDS)
  28. // coMap表示连接流调用的map,各自都需要一个 function
  29. resultCS.map(
  30. sensor=>sensor.id,
  31. num=>num+1
  32. ).print()
  33. // 4. 执行
  34. env.execute()
  35. }
  36. /**
  37. * 定义样例类:水位传感器:用于接收空高数据
  38. *
  39. * @param id 传感器编号
  40. * @param ts 时间戳
  41. * @param vc 空高
  42. */
  43. case class WaterSensor(id: String, ts: Long, vc: Double)
  44. }

 

Flink从入门到入土

对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream

Flink从入门到入土

connect与 union 区别:

  1. union之前两个流的类型必须是一样,connect可以不一样
  2. connect只能操作两个流,union可以操作多个。
  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:FlatMap
  10. */
  11. object Transform_Union {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2. 从集合中读取流
  17. val num1DS: DataStream[Int] = env.fromCollection(List(1, 2, 3, 4))
  18. val num2DS: DataStream[Int] = env.fromCollection(List(7, 8, 9, 10))
  19. val num3DS: DataStream[Int] = env.fromCollection(List(17, 18, 19, 110))
  20. // TODO union 真正将多条流合并成一条流
  21. // 合并的流,类型必须一致
  22. // 可以合并多条流,只要类型一致
  23. num1DS.union(num2DS).union(num3DS)
  24. .print()
  25. // 4. 执行
  26. env.execute()
  27. }
  28. /**
  29. * 定义样例类:水位传感器:用于接收空高数据
  30. *
  31. * @param id 传感器编号
  32. * @param ts 时间戳
  33. * @param vc 空高
  34. */
  35. case class WaterSensor(id: String, ts: Long, vc: Double)
  36. }

 

Flink从入门到入土

Flink作为计算框架,主要应用于数据计算处理上, 所以在keyBy对数据进行分流后,可以对数据进行相应的统计分析

这些算子可以针对KeyedStream的每一个支流做聚合。执行完成后,会将聚合的结果合成一个流返回,所以结果都是DataStream

sum()

Flink从入门到入土

min()

Flink从入门到入土

max()

Flink从入门到入土

一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。

Flink从入门到入土

  1. import org.apache.flink.streaming.api.scala._
  2. /**
  3. * description: SourceList
  4. * date: 2020/8/28 19:02
  5. * version: 1.0
  6. *
  7. * @author 阳斌
  8. * 邮箱:1692207904@qq.com
  9. * 类的说明:Reduce
  10. */
  11. object Transform_Reduce {
  12. def main(args: Array[String]): Unit = {
  13. // 1.创建执行环境
  14. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  15. env.setParallelism(1)
  16. // 2.读取数据
  17. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  18. // 3.转换成样例类
  19. val mapDS: DataStream[WaterSensor] = sensorDS.map(
  20. lines => {
  21. val datas: Array[String] = lines.split(",")
  22. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  23. }
  24. )
  25. val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
  26. // 输入的类型一样,输出类型和输出类型也要一样
  27. // 组内的第一条数据,不进入reduce计算
  28. val reduceDS: DataStream[WaterSensor] = sensorKS.reduce(
  29. (ws1, ws2) => {
  30. println(ws1 + "<===>" + ws2)
  31. WaterSensor(ws1.id, System.currentTimeMillis(), ws1.vc + ws2.vc)
  32. }
  33. )
  34. reduceDS.print("reduce")
  35. // 4. 执行
  36. env.execute()
  37. }
  38. /**
  39. * 定义样例类:水位传感器:用于接收空高数据
  40. *
  41. * @param id 传感器编号
  42. * @param ts 时间戳
  43. * @param vc 空高
  44. */
  45. case class WaterSensor(id: String, ts: Long, vc: Double)
  46. }

 

Flink从入门到入土

Flink在数据流通过keyBy进行分流处理后,如果想要处理过程中获取环境相关信息,可以采用process算子自定义实现 1)继承KeyedProcessFunction抽象类,并定义泛型:[KEY, IN, OUT]

  1. class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String]{}
  2. 重写方法
  3. // 自定义KeyedProcessFunction,是一个特殊的富函数
  4. // 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I - 上游数据的类型, O - 输出的数据类型
  5. // 2.重写 processElement方法,定义 每条数据来的时候 的 处理逻辑
  6.  
  7. /**
  8. * 处理逻辑:来一条处理一条
  9. *
  10. * @param value 一条数据
  11. * @param ctx 上下文对象
  12. * @param out 采集器:收集数据,并输出
  13. */
  14. override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
  15. out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据=" + value)
  16. // 如果key是tuple,即keyby的时候,使用的是 位置索引 或 字段名称,那么key获取到是一个tuple
  17. // ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手动引入Java的Tuple
  18. }

 

完整代码:

  1. import org.apache.flink.streaming.api.functions.KeyedProcessFunction
  2. import org.apache.flink.streaming.api.scala._
  3. import org.apache.flink.util.Collector
  4. /**
  5. * description: SourceList
  6. * date: 2020/8/28 19:02
  7. * version: 1.0
  8. *
  9. * @author 阳斌
  10. * 邮箱:1692207904@qq.com
  11. * 类的说明:Reduce
  12. */
  13. object Transform_Process {
  14. def main(args: Array[String]): Unit = {
  15. // 1.创建执行环境
  16. val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
  17. env.setParallelism(1)
  18. // 2.读取数据
  19. val sensorDS: DataStream[String] = env.readTextFile("input/sensor-data.log")
  20. // 3.转换成样例类
  21. val mapDS: DataStream[WaterSensor] = sensorDS.map(
  22. lines => {
  23. val datas: Array[String] = lines.split(",")
  24. WaterSensor(datas(0), datas(1).toLong, datas(2).toInt)
  25. }
  26. )
  27. //按照ID 进行分组
  28. val sensorKS: KeyedStream[WaterSensor, String] = mapDS.keyBy(_.id)
  29. sensorKS.process(new MyKeyedProcessFunction)
  30. // 4. 执行
  31. env.execute()
  32. }
  33. // 自定义KeyedProcessFunction,是一个特殊的富函数
  34. // 1.实现KeyedProcessFunction,指定泛型:K - key的类型, I - 上游数据的类型, O - 输出的数据类型
  35. // 2.重写 processElement方法,定义 每条数据来的时候 的 处理逻辑
  36. class MyKeyedProcessFunction extends KeyedProcessFunction[String, WaterSensor, String] {
  37. /**
  38. * 处理逻辑:来一条处理一条
  39. *
  40. * @param value 一条数据
  41. * @param ctx 上下文对象
  42. * @param out 采集器:收集数据,并输出
  43. */
  44. override def processElement(value: WaterSensor, ctx: KeyedProcessFunction[String, WaterSensor, String]#Context, out: Collector[String]): Unit = {
  45. out.collect("我来到process啦,分组的key是="+ctx.getCurrentKey+",数据=" + value)
  46. // 如果key是tuple,即keyby的时候,使用的是 位置索引 或 字段名称,那么key获取到是一个tuple
  47. // ctx.getCurrentKey.asInstanceOf[Tuple1].f0 //Tuple1需要手动引入Java的Tuple
  48. }
  49. }
  50. /**
  51. * 定义样例类:水位传感器:用于接收空高数据
  52. *
  53. * @param id 传感器编号
  54. * @param ts 时间戳
  55. * @param vc 空高
  56. */
  57. case class WaterSensor(id: String, ts: Long, vc: Double)
  58. }

 

Flink从入门到入土

Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作

之前我们一直在使用的print方法其实就是一种Sink。

  1. @PublicEvolving
  2. public DataStreamSink<T> print(String sinkIdentifier) {
  3. PrintSinkFunction<T> printFunction = new PrintSinkFunction(sinkIdentifier, false);
  4. return this.addSink(printFunction).name("Print to Std. Out");
  5. }

 

官方提供了一部分的框架的sink。除此以外,需要用户自定义实现sink

Flink从入门到入土

Flink从入门到入土

本文作者:Java知音@阳斌

版权声明:本文为javazhiyin原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/javazhiyin/p/13597319.html