Hadoop优化
天气案例
随机生成温度代码;并写入到文件中
需求:求每年2月份的最高温度
package utils; import java.io.BufferedWriter; import java.io.FileWriter; import java.io.IOException; import java.text.DateFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Date; public class RandomWeather { public static void main(String[] args) throws ParseException, IOException { //创建日期格式 DateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); long start = sdf.parse("2000-01-01 00:00:00").getTime(); long end = sdf.parse("2022-12-31 00:00:00").getTime(); long difference = end - start; BufferedWriter bw=new BufferedWriter(new FileWriter("D:\\soft\\projects\\bigdata19-project\\
bigdata19-mapreduce\\data\\weather.txt")); for (int i = 0; i < 10000; i++) { //随机生成时间2000-2023 Date date = new Date(start + (long) (Math.random() * difference)); //随机生成一个温度 int temperature = -20 + (int) (Math.random() * 60); //打印生成的结果 // System.out.println(sdf.format(date) + "\t" + temperature); bw.write(sdf.format(date)+"\t"+temperature); bw.newLine(); bw.flush(); } } }
mapreduce代码
package com.shujia.weather; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /* 2014-07-08 14:04:18 -5 2000-10-03 14:28:54 32 2002-05-11 14:40:37 -18 2009-07-08 11:17:50 0 */ class WeatherMapper extends Mapper<LongWritable,Text,Text,LongWritable>{ @Override protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
throws IOException, InterruptedException { String line = value.toString(); String[] split = line.split("\t"); String temp=split[1]; String[] ym = split[0].split("-"); if("02".equals(ym[1])) { String yam=ym[0]+"-"+ym[1]; context.write(new Text(yam), new LongWritable(Long.parseLong(temp))); } } } class WeatherReducer extends Reducer<Text,LongWritable,Text,LongWritable>{ @Override protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context)
throws IOException, InterruptedException { long max=0; for (LongWritable value : values) { long temp = value.get(); if(temp>max){ max=temp; } } context.write(key,new LongWritable(max)); } } public class WeatherMax { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(WeatherMax.class); job.setJobName("求每一年2月份的最高温度"); job.setNumReduceTasks(1); job.setMapperClass(WeatherMapper.class); job.setReducerClass(WeatherReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); job.waitForCompletion(true); } }
优化一:Combiner
使用combiner之前
使用combiner之后
减少的了reduce 从map拉取数据的过程,提高计算效率。
注意:将reduce端的聚合操作,放到map 进行执行。适合求和,计数,等一些等幂操作。不适合求平均值,次幂等类似操作
优化二:Join(数据倾斜)
发生数据倾斜解决方法:1.可以设置多个reduce;2.把发生数据倾斜的key打上随机值分配到不同的reduce中;3.再写一个mapreduce,把随机值去掉再做一次聚合;
MapReduce中的join
其实就是类似于关系型数据库中的连接查询一样。需要计算的数据可能存储在不同的文件中或不同表中,两个文件又有一些相同的字段可以相互关联,这时候我们就可以通过这些关联字段将两个文件中的数据组合到一起进行计算了。
join的三种方式:Map join、SemiJoin、reduce join
Reduce Join:
分为两个阶段
(1)map函数主要是对不同文件中的数据打标签。
就是在map端过滤掉不参加join操作的数据,则可以大大减少数据量,提高网络传输速度。
这三种join方式适用于不同的场景:
Map join和SemiJoin则要考虑数据量过大时的内存问题。 如果只考虑网络传输,忽略内存问题则。
优化三:根据实际情况调整切片大小
1 切片大小默认一致,是为了数据本地化,减少数据拉取消耗网络io
优化四:可以设置yarn资源和队列
调整计算资源:参考博客:https://blog.csdn.net/qq_36753550/article/details/83065546
设置队列:参考博客:https://blog.51cto.com/u_13525470/4723358
mr运行日志信息:百分比是按照完成的m或r的任务的个数/m或r的总个数。
对于经典的MRv1它由三部分组成 :
编程模型、 数据处理引擎和运行时环境。
编程模型由新旧 API 两部分组成,新旧api只是代码封装上略有变化,性能没变化。
(1)接收用户请求
(2)管理调度资源
(3)启动管理am
当RM收到submitApplciation()的请求时, 就将该请求发给调度器, 调度器分配第一个容器, 然后RM在该容器内启动ApplicationMaster进程。该进程上运行着一个MRAppMaster的Java应用。其通过创造一些bookkeeping对象来监控作业的进度。 然后通过hdfs得到由JobClient已经处理好的作业信息。为每个Inputsplit创建一个map任务, 并创建相应的reduce任务。然后ApplicationMaster会对整个作业量进行判断,如果作业量很小, ApplicationMaster会选择在其自己的JVM中运行任务, 这种作业称作是uber task的方式。在任务运行之前, 作业的setup
如果不是小作业, 那么ApplicationMaster向RM请求更多的容器来运行所有的map和reduce任务,每个容器只能对应一个任务
每个NM会向applicationmaster汇报自己的工作状态,JobClient会每秒轮询检测applicationmaster,这样就能随时收到更新信息。
-
作业完成
推测执行是在分布式环境下,因为某种原因造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task,则这些task拖慢了整个job的执行进度,为了避免这种情况发生,Hadoop会为该task启动备份任务,让该speculative task与原始task同时处理一份数据,哪个先运行完,则将谁的结果作为最终结果。推测执行优化机制采用了典型的以空间换时间的优化策略
yarn默认是计算能力调度 FifoScheduler:根据先进先出排队,最简单的调度器。 FIFO
CapacityScheduler(计算能力调度)、FairScheduler(公平调度):
相同点:
(1)都是多队列。
(2)都有资源最大最小上线限制。
(3)都是资源共享,每个队列剩余的资源可以给其他队列使用。
不同点:
(1)队列排序算法不同:计算能力调度资源使用量小的优先。公平调度根据公平排序算法排序。
(2)应该用选择算法不同:计算能力调度是先进先出。公平调度先进先出或者公平排序算法。