(三)批处理单词个数统计实例

需求:从本地txt文件中读取单词,然后统计每个单词出现的次数,写入csv文件,行之间换行符分割,每行字段用空格分割。

代码实例:

Java版本

package xuwei.tech.batch;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 
 */
public class BatchWordCountJava {

    public static void main(String[] args) throws Exception{
        String inputPath = "D:\\data\\file";
        String outPath = "D:\\data\\result";

        //获取运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //获取文件中的内容
        DataSource<String> text = env.readTextFile(inputPath);

        DataSet<Tuple2<String, Integer>> counts = text.flatMap(new Tokenizer()).groupBy(0).sum(1);
        counts.writeAsCsv(outPath,"\n"," ").setParallelism(1);
        env.execute("batch word count");

    }


    public static class Tokenizer implements FlatMapFunction<String,Tuple2<String,Integer>>{
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] tokens = value.toLowerCase().split("\\W+");
            for (String token: tokens) {
                if(token.length()>0){
                    out.collect(new Tuple2<String, Integer>(token,1));
                }
            }
        }
    }
}

Scala版本

后续补充……

 

Flink Streaming和Batch的区别:

流处理Streaming: StreamExecutionEnvironment + DataStreaming

批处理Batch:ExecutionEnvironment + DataSet

转载请注明地址: https://www.cnblogs.com/wynjauu/articles/10542950.html

 

版权声明:本文为wynjauu原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/wynjauu/articles/10542950.html