java模拟切分文件

package com.shujia;

import java.io.*;
import java.util.ArrayList;

public class SplitFileBlock {
    public static void main(String[] args) throws Exception {
        //将数据读取进来
        //字符缓冲输入流
        BufferedReader br = new BufferedReader(new FileReader("data/students.txt"));
        int index = 0;
        //字符缓冲输出流
        BufferedWriter bw = new BufferedWriter(new FileWriter("data/blocks2/block---" + index));

        //现在是假设一行数据是1m,没128m数据,就生成一个block块,不到128m也会生成一个block块
        //每次读到128*1.1约等于140行的数据,就写入128行,剩下的12行计入下一次block块中去存储
        //定义一个集合,用于存储,读取的内容
        ArrayList<String> row = new ArrayList<>();

        String line = null;

        //定义一个变量,记录读取的行数
        int offset = 0;
        //定义一个变量,记录读取的是哪一个block块
        int rowNum = 0;


        while ((line = br.readLine()) != null) {
            row.add(line);
            offset++;
            //当我们的偏移量,128*1.1约等于140行的数据,就写入128行,剩下的12行计入下一次block块中去存储
            if (offset == 140) {
                rowNum = 128 * index;
                //循环128次,将集合存储的数据,写入到block块中
                for (int i = rowNum; i <= rowNum + 127; i++) {
                    String s = row.get(i);
                    bw.write(s);
                    bw.newLine();
                    bw.flush();
                }
                index++;
                //将offset设置为12
                offset = 12;
                bw = new BufferedWriter(new FileWriter("data/blocks2/block---" + index));
            }
        }

        //把剩余的数据写到一个block块中
        for(int i = row.size()-offset;i<row.size();i++){
            String s = row.get(i);
            bw.write(s);
            bw.newLine();
            bw.flush();
        }

        //释放资源
        bw.close();
        br.close();




    }
}

 hadoop用java实现

map任务

package com.shujia;

import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class MapTask implements Runnable {
    private File file;
    public int offset;


    public MapTask(File file,int offset) {
        this.file = file;
        this.offset = offset;
    }

    @Override
    public void run() {
        //字符缓冲输入流
        try {
            BufferedReader br = new BufferedReader(new FileReader(file));
            //创建一个Map集合,使用HashMap
            HashMap<String, Integer> map = new HashMap<>();
            String line = null;
            while ((line = br.readLine()) != null) {
                //用逗号进行切分
                String clazz = line.split(",")[4];

                //如果在map中没有该班级作为key,那我们就把这个班级作为key存放在集合,value设置为1
                if (!map.containsKey(clazz)) {
                    map.put(clazz, 1);
                } else {
                    //否则value值加1
                    map.put(clazz, map.get(clazz) + 1);
                }
            }
            //结束读取数据流程
            br.close();
            //将局部的map任务结果写入到文件中
            //创建字符缓冲输出流
            BufferedWriter bw = new BufferedWriter(new FileWriter("data/parts2/part---" + offset));
            //遍历HashMap
            Set<Map.Entry<String, Integer>> entries = map.entrySet();
            for (Map.Entry<String, Integer> keyValue : entries){
                String clazz = keyValue.getKey();
                Integer sumNumber = keyValue.getValue();
                //写入文件
                bw.write(clazz+":"+sumNumber);
                //换行
                bw.newLine();
                bw.flush();
            }
            //关闭写通道
            bw.close();
        } catch (Exception e) {
            e.printStackTrace();
        }

    }
}

map任务执行

package com.shujia;

import java.io.File;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/*
        Map(通过线程池的方式,简单来说,模拟hadoop中一个block块生成一个map任务,一个map任务相当于一个线程)
        (将切分出来的bolck块中,统计每个班级的人数)
        4423毫秒
 */
public class Map {
    public static void main(String[] args) {

        long start = System.currentTimeMillis();
        //创建一个线程池
        ExecutorService executorService = Executors.newFixedThreadPool(10000);
        File file = new File("data/blocks2");
        //定义一个文件编号,从0开始
        int offset = 0;
        File[] files = file.listFiles();
        for (File f : files) {
            MapTask mapTask = new MapTask(f, offset);
            executorService.submit(mapTask);
            offset++;
        }

        //关闭线程池
        executorService.shutdown();
        long end = System.currentTimeMillis();
        System.out.println("总耗时:"+(end - start)+"毫秒");


    }
}

reduce任务

package com.shujia;

import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

//306毫秒 537毫秒 == 843毫秒
//4423毫秒reduce总耗时:117551毫秒
/*
    模拟hadoop的切分map-reduce处理的方式,总耗时14毫秒
        将每个map任务的结果,再做一次总的聚合,统计出最终的班级人数

    当数据量过大的时候,TB以上的级别的时候
        1、一台服务器不够存
        2、可能一台够存,但是纯java程序是由JVM虚拟机调起的,内存有限,可能会导致,OOM内存溢出

    这时候,就必须使用分布式存储,将大文件进行切分,先局部做运算,这时候局部的数据少很多,然后再总的聚合,数据量少且块!
 */
public class Reduce {
    public static void main(String[] args) throws Exception {
        long start = System.currentTimeMillis();
        //将past目录封装成File对象
        File file = new File("data/parts2");
        //获取下面的所有文件对象数组
        File[] files = file.listFiles();
        //创建一个map集合,接收总的结果数据
        HashMap<String, Integer> map = new HashMap<>();
        //遍历每个part文件
        for (File f : files) {
            //读取文件,进行分割
            //创建缓冲字符输入流对象
            BufferedReader br = new BufferedReader(new FileReader(f));
            String line = null;
            while ((line = br.readLine()) != null) {
                //以冒号进行分割得到班级和人数
                String[] strings = line.split(":");
                String clazz = strings[0];
                //包装类
                Integer sum = Integer.valueOf(strings[1]);
                //判断map集合中是否存在对应的key
                if (!map.containsKey(clazz)) {
                    map.put(clazz, sum);
                } else {
                    //如果存在,value值相加
                    map.put(clazz, map.get(clazz) + sum);
                }
            }
            //关闭读取数据的通道
            br.close();
        }

        //将结果写入到最终一个文件
        BufferedWriter bw = new BufferedWriter(new FileWriter("data/result-big/part-r-000000"));
        //遍历集合
        Set<Map.Entry<String, Integer>> entries = map.entrySet();
        for (Map.Entry<String, Integer> keyValue:entries){
            String clazz = keyValue.getKey();
            Integer number = keyValue.getValue();
            bw.write(clazz+":"+number);
            bw.newLine();
            bw.flush();
        }

        //释放资源
        bw.close();
        long end = System.currentTimeMillis();
        System.out.println("reduce总耗时:"+(end-start)+"毫秒");





    }
}

 

 

使用java程序模拟hadoop切分文件,统计students文件中,姓名包含’白’汉字的人数。

import java.io.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

public class Demo {
    public static void main(String[] args) {
        //将文件切分,写入blocks
        try {
            int index = 0;
            BufferedReader br = new BufferedReader(new FileReader("D:\\soft\\projects\\untitled\\data\\students.txt"));
            BufferedWriter bw = new BufferedWriter(new FileWriter("D:\\soft\\projects\\untitled\\blocks\\block--" + index));
            ArrayList<String> list = new ArrayList<>();
            int offset = 0;
            int num = 0;
            String line = null;
            while ((line = br.readLine()) != null) {
                list.add(line);
                offset++;
                num = 128 * index;
                if (offset == 140) {
                    for (int i = num; i <= num + 127; i++) {
                        String s = list.get(i);
                        bw.write(s);
                        bw.newLine();
                        bw.flush();
                    }
                    offset = 12;
                    index++;
                    bw = new BufferedWriter(new FileWriter("D:\\soft\\projects\\untitled\\blocks\\block--" + index));
                }
            }
            for (int i = list.size() - offset; i < list.size(); i++) {
                String s = list.get(i);
                bw.write(s);
                bw.newLine();
                bw.flush();
            }
            bw.close();
            br.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;

public class DemoTest {
    public static void main(String[] args) {
        //获取名字中有“白”的学生人数
        File file = new File("D:\\soft\\projects\\untitled\\blocks");
        File[] files = file.listFiles();
        int index = 0;
        for (File s : files) {
            try {
                BufferedReader br = new BufferedReader(new FileReader(s));
                String line = null;
                while ((line = br.readLine()) != null) {
                    String[] split = line.split(",");
                    if (split[1].contains("白")) {
                        index++;
                    }
                }
                br.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        System.out.println("名字中含有“白”的有:"+index);
    }
}

 

版权声明:本文为wqy1027原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/wqy1027/p/16622013.html