(三)批处理单词个数统计实例
(三)批处理单词个数统计实例
需求:从本地txt文件中读取单词,然后统计每个单词出现的次数,写入csv文件,行之间换行符分割,每行字段用空格分割。
代码实例:
Java版本
package xuwei.tech.batch; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; /** * */ public class BatchWordCountJava { public static void main(String[] args) throws Exception{ String inputPath = "D:\\data\\file"; String outPath = "D:\\data\\result"; //获取运行环境 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //获取文件中的内容 DataSource<String> text = env.readTextFile(inputPath); DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1); counts.writeAsCsv(outPath,"\n"," ").setParallelism(1); env.execute("batch word count"); } public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{ public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split("\\W+"); for (String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<String, Integer>(token,1)); } } } } }
Scala版本
后续补充……
Flink Streaming和Batch的区别:
流处理Streaming: StreamExecutionEnvironment + DataStreaming
批处理Batch:ExecutionEnvironment + DataSet