Flink--输入数据集Data Sources
flink在批处理中常见的source
flink在批处理中常见的source主要有两大类。
1.基于本地集合的source(Collection-based-source)
2.基于文件的source(File-based-source)
在flink最常见的创建DataSet方式有三种。
1.使用env.fromElements(),这种方式也支持Tuple,自定义对象等复合形式。 2.使用env.fromCollection(),这种方式支持多种Collection的具体类型 3.使用env.generateSequence()方法创建基于Sequence的DataSet
基于本地集合的
import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment, _} import scala.collection.immutable.{Queue, Stack} import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} object DataSource001 { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment //0.用element创建DataSet(fromElements) val ds0: DataSet[String] = env.fromElements("spark", "flink") ds0.print() //1.用Tuple创建DataSet(fromElements) val ds1: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink")) ds1.print() //2.用Array创建DataSet val ds2: DataSet[String] = env.fromCollection(Array("spark", "flink")) ds2.print() //3.用ArrayBuffer创建DataSet val ds3: DataSet[String] = env.fromCollection(ArrayBuffer("spark", "flink")) ds3.print() //4.用List创建DataSet val ds4: DataSet[String] = env.fromCollection(List("spark", "flink")) ds4.print() //5.用List创建DataSet val ds5: DataSet[String] = env.fromCollection(ListBuffer("spark", "flink")) ds5.print() //6.用Vector创建DataSet val ds6: DataSet[String] = env.fromCollection(Vector("spark", "flink")) ds6.print() //7.用Queue创建DataSet val ds7: DataSet[String] = env.fromCollection(Queue("spark", "flink")) ds7.print() //8.用Stack创建DataSet val ds8: DataSet[String] = env.fromCollection(Stack("spark", "flink")) ds8.print() //9.用Stream创建DataSet(Stream相当于lazy List,避免在中间过程中生成不必要的集合) val ds9: DataSet[String] = env.fromCollection(Stream("spark", "flink")) ds9.print() //10.用Seq创建DataSet val ds10: DataSet[String] = env.fromCollection(Seq("spark", "flink")) ds10.print() //11.用Set创建DataSet val ds11: DataSet[String] = env.fromCollection(Set("spark", "flink")) ds11.print() //12.用Iterable创建DataSet val ds12: DataSet[String] = env.fromCollection(Iterable("spark", "flink")) ds12.print() //13.用ArraySeq创建DataSet val ds13: DataSet[String] = env.fromCollection(mutable.ArraySeq("spark", "flink")) ds13.print() //14.用ArrayStack创建DataSet val ds14: DataSet[String] = env.fromCollection(mutable.ArrayStack("spark", "flink")) ds14.print() //15.用Map创建DataSet val ds15: DataSet[(Int, String)] = env.fromCollection(Map(1 -> "spark", 2 -> "flink")) ds15.print() //16.用Range创建DataSet val ds16: DataSet[Int] = env.fromCollection(Range(1, 9)) ds16.print() //17.用fromElements创建DataSet val ds17: DataSet[Long] = env.generateSequence(1,9) ds17.print() } }
View Code
(1):读取本地文件
//TODO 使用readTextFile读取本地文件 //TODO 初始化环境 val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //TODO 加载数据 val datas: DataSet[String] = environment.readTextFile("data.txt") //TODO 指定数据的转化 val flatmap_data: DataSet[String] = datas.flatMap(line => line.split("\\W+")) val tuple_data: DataSet[(String, Int)] = flatmap_data.map(line => (line , 1)) val groupData: GroupedDataSet[(String, Int)] = tuple_data.groupBy(line => line._1) val result: DataSet[(String, Int)] = groupData.reduce((x, y) => (x._1 , x._2+y._2)) result.print()
View Code
(2):读取hdfs数据
//TODO readTextFile读取hdfs数据 //todo 初始化环境 val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment //TODO 加载数据 val file: DataSet[String] = environment.readTextFile("hdfs://hadoop01:9000/README.txt") val flatData: DataSet[String] = file.flatMap(line => line.split("\\W+")) val map_data: DataSet[(String, Int)] = flatData.map(line => (line , 1)) val groupdata: GroupedDataSet[(String, Int)] = map_data.groupBy(line => line._1) val result_data: DataSet[(String, Int)] = groupdata.reduce((x, y) => (x._1 , x._2+y._2)) result_data.print()
View Code
(3):读取CSV数据
//TODO 读取csv数据 val environment: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment val path = "data2.csv" val ds3 = environment.readCsvFile[(String, String, String, String,String,Int,Int,Int)]( filePath = path, lineDelimiter = "\n", fieldDelimiter = ",", lenient = false, ignoreFirstLine = true, includedFields = Array(0, 1, 2, 3 , 4 , 5 , 6 , 7)) val first = ds3.groupBy(0 , 1).first(50) first.print()
View Code
flink支持对一个文件目录内的所有文件,包括所有子目录中的所有文件的遍历访问方式。
对于从文件中读取数据,当读取的数个文件夹的时候,嵌套的文件默认是不会被读取的,只会读取第一个文件,其他的都会被忽略。所以我们需要使用recursive.file.enumeration进行递归读取
val env = ExecutionEnvironment.getExecutionEnvironment val parameters = new Configuration // recursive.file.enumeration 开启递归 parameters.setBoolean("recursive.file.enumeration", true) val ds1 = env.readTextFile("test").withParameters(parameters) ds1.print()
View Code
读取压缩文件
对于以下压缩类型,不需要指定任何额外的inputformat方法,flink可以自动识别并且解压。但是,压缩文件可能不会并行读取,可能是顺序读取的,这样可能会影响作业的可伸缩性。
//TODO 读取压缩文件 val env = ExecutionEnvironment.getExecutionEnvironment val file = env.readTextFile("test/data1/zookeeper.out.gz").print() tar -czvf ***.tar.gz
View Code