本文记录了spark平台工作的几类操作及部署方式,以及简介与spark联系比较多的hadoop平台。在这个基础上,探究了hadoop平台与spark的异同。

spark平台测试

1 spark的操作方式

spark的代码操作方式分为两种——命令台运行的shell与打包的程序包,分别通过spark-shell与spark-submit实现。spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如*.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有统一的接口,不用太多的设置。使用spark-submit时,应用程序的jar包以及通过—jars选项包含的任意jar文件都会被自动传到集群中。

2 spark的运行方式

spark的方式主要分为以下类型:

  1. 本机化local

    ​ 这种方式下spark没有集群化,相当于启动一个本地进程,然后在一个进程内,模拟spark集群中作业的运行。一个spark作业,就对应了进程中的一个或多个executor线程,就开始执行,包括作业的调度,task分配

  2. 集群式

    1. spark自带的standalone方式
      • client方式
      • cluster方式
    2. 使用yarn进行资源调度的集群方式
      • client方式
      • cluster方式
client方式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-56t8amF1-1586439263684)(H:/本地图片/markdow图片/1578489805642.png)]

流程

  1. client模式提交任务后,会在客户端启动Driver进程。
  2. Driver会向Master申请启动Application启动的资源。
  3. 资源申请成功,Driver端将task发送到worker端执行。
  4. worker将task执行结果返回到Driver端。
cluster方式

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-890tPlA5-1586439263689)(H:/本地图片/markdow图片/1578489684037.png)]

执行流程

  1. 客户端使用命令spark-submit –deploy-mode cluster 后会启动spark-submit进程
  2. 此进程为Driver向Master申请资源,Driver进程默认需要1G内存和1Core
  3. Master会随机在一台Wworker节点来启动Driver进程
  4. Driver启动成功后,spark-submit关闭,然后Driver向Master申请资源
  5. Master接收到请求后,会在资源充足的worker节点上启动Executor进程
  6. Driver分发Task到Executor中执行

无论是spark的standalone方式还是使用yarn进行资源调度的集群方式,作业运行原理是类似的。

对于client与cluster方式,区别在于Driver类是否位于提交Job作业的客户端节点上;client模式下Job运行时的标准输出将回显到客户端控制台上,方便调试,但是会导致本机流量激增,不常用于实际工作。

cluster模式下客户端与Spark集群(Slave集群)是否位于同一网段是没有要求的,因为分发jar包和启动Job的Driver直接存在于Slave集群中的某个节点上,但是这种模式下Driver类本身需要消耗Slave节点上的CPU和内存资源,同时Job运行时的标准输出将不能回显到客户端控制台上。用于实际工作。

3 spark的实际流程
3.1 绑定应用程序依赖

如果代码依赖于其它项目,为了将代码分发到Spark集群,就需要将这些依赖一起打包到应用程序中去。sbt和Maven都有装配插件,只要在创建集成的jar时列出Spark和Hadoop需要的依赖,而不需要将这些依赖和应用打包,因为在程序运行的时候集群的master知道如何调用和提供这些依赖;但是一旦有集成好的jar包,在执行bin/spark-submit脚本时就坐传递这些jar包了。

对于Python语言来讲,可以使用spark-submit的–py-files参数添加.py,.zip,.egg文件和应用程序一起进行分发,如果应用程序依赖于多个Python文件,建议将它们打包成.zip或.egg文件。

3.2 启动应用程序

如果打包了应用程序,就可以使用bin/spark-submit脚本启动应用程序了,这个脚本可以设置Spark类路径(classpath)和应用程序依赖包,并且可以设置不同的Spark所支持的集群管理和部署模式。提交任务后,无论是Standalone模式还是Spark on Yarn模式
spark-submit提交应用的大致格式如下:

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # other options
<application-jar> \
[application-arguments]

--class:应用程序的入口点(例如,org.apache.spark.examples.SparkPi)
--master:集群的master URL(例如,spark://localhost:7077)
--deploy-mode:将driver部署到worker节点(cluster模式)或者作为外部客户端部署到本地(client模式),默认情况下是client模式
--conf:用key=value格式强制指定Spark配置属性,用引号括起来
--application-jar:包含应用程序和所有依赖的jar包的路径,路径必须是在集群中是全局可见的,例如,hdfs://路径或者file://路径
--application-arguments:传递给主类中main函数的参数
3.3 Master URLs

传递给Spark的master url可以是以下任意格式之一,对应spark的集群方式:

master URL 意义
local 使用1个worker线程本地运行Spark(即完全没有并行化)
local[K] 使用K个worker线程本地运行Spark(最好将K设置为机器的CPU核数)
local[*] 根据机器的CPU逻辑核数,尽可能多地使用Worker线程
spark://HOST:PORT 连接到给定的Spark Standalone集群的Master,此端口必须是Master配置的端口,默认为7077
mesos://HOST:PORT 连接到给定的Mesos集群的Master,此端口必须是Master配置的端口,默认为5050。若Mesos集群使用ZooKeeper,则master URL使用mesos://zk://……
yarn-client 以client模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得
yarn-cluster 以cluster模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得
4.spark的测试记录
创建hdfs文件平台
bin/hdfs dfs -mkdir -p /data/input  #新建文件夹
bin/hdfs dfs -put README.txt /data/input #新建文件到文件夹
bin/hdfs dfs -ls /data/input #ls
bin/hdfs dfs -chmod -R 777 /data/input #避免权限问题,修改权限
spark-shell-local
var textFile=sc.textFile("hdfs://master:9000/data/input/README.txt")#读入文件

var wordCount=textFile.flatMap(line => line.split(" ")).map(word=>(word,1)).reduceByKey((a,b)=>a+b) #进行map与reduce的定义

textFile.saveAsTextFile("hdfs://master:9000/data/output")  #保存至本地
spark-submit-local
bin/spark-submit --class org.apache.spark.examples.SparkPi --master local examples/jars/spark-examples_2.11-2.2.0.jar#提交spark自带的example
spark-standalone
bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master local[8] \
  examples/jars/spark-examples_2.11-2.2.0.jar \
  100
#使用local【8】方式
bin/spark-submit   --class org.apache.spark.examples.SparkPi   --master spark://master:7077 --deploy-mode client --num-executors 2 examples/jars/spark-examples_2.11-2.2.0.jar 
#使用client方式
bin/spark-submit   --class org.apache.spark.examples.SparkPi   --master spark://master:6066   --deploy-mode cluster   --supervise   --driver-cores 4   --executor-memory 512M   --driver-memory 512M   --total-executor-cores 4   examples/jars/spark-examples_2.11-2.2.0.jar  
#使用cluster方式
spark-yarn
bin/spark-submit   --class org.apache.spark.examples.SparkPi   --master yarn --deploy-mode client examples/jars/spark-examples_2.11-2.2.0.jar 

bin/spark-submit   --class org.apache.spark.examples.SparkPi   --master yarn --deploy-mode cluster examples/jars/spark-examples_2.11-2.2.0.jar 

Python读取hdfs

1 配置pyhdfs

python调用hdfs主要有两种方式:

  1. 通过python调用shell命令,实现间接操作,这种操作比较简单,但是扩展性差
  2. 安装python的依赖库,里面封装了一系列的函数接口,可实现对hdfs平台文件的查看,增加合并与删除

手动安装配置pyhdfs十分麻烦,推荐直接安装pip,后通过pip直接安装pyhdfs

sudo apt-get install python-pip#安装pip

pip install --upgrade pip#可以选择升级pip

pip install pyHdfs#安装依赖库

使用pyhdfs可以实现对hdfs平台的访问,在文件开头引入pyhdfs即可

2 python调用hdfs
client = pyhdfs.HdfsClient(hosts="192.168.40.30,9000",user_name="janspiry")#初始化

client = pyhdfs.HdfsClient(hosts="192.168.40.30,9000")

client.listdir("/data/input")#显示目录

response = client.open("/data/input/README.txt")#读取平台文件
response.read()

client.copy_from_local("/home/janspiry/spark/tmp.txt","/data/input/tmp.txt")#上传到平台

copy_to_local(src, localdest, **kwargs)#拷贝到本地

Hadoop平台

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-51ltU6DM-1586439263691)(H:/本地图片/markdow图片/1578490043161.png)]

HDFS和MapReduce是Hadoop的两大核心,分别用于分布式储存与并行处理。在Hadoop2.0后,在HDFS的基础上增加了YARN——资源管理框架,在YARN上既可以放MapReduce,也可以放置其他的计算资源,主要是管理资源的,如CPU,硬盘,内存,网络等。

架构了解
HDFS

分有NameNode和DataNode,NameNode是整个文件系统目录,基于内存存储,存储的是一些文件的详细信息,比如文件名、文件大小、创建时间、文件位置等。Datanode是文件的数据信息,也就是文件本身,不过是分割后的小文件。

Hadoop分布式文件系统(HDFS)被设计成适合运行在通用硬件上的分布式文件系统。HDFS是一个高度容错性的系统,能提供高吞吐量的数据访问,非常适合大规模数据集上的应用。

数据块(block):大文件会被分割成多个block进行存储,block大小默认为64MB。每一个block会在多个datanode上存储多份副本,默认是3份。
NameNode:namenode负责管理文件目录、文件和block的对应关系以及block和datanode的对应关系。
DataNode:datanode就负责存储了,当然大部分容错机制都是在datanode上实现的。

MapReduce

MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算

Job Tracker职责:接受任务,计算资源,分配资源,监听数据节点

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ON6rAy4K-1586439263697)(H:/本地图片/markdow图片/1578141310755.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Igk4GNFn-1586439263698)(H:/本地图片/markdow图片/20170924165441654.png)]

MapReduce运行在大规模集群的并行计算过程中,并抽象为两个函数:map和reduce。一个MapReduce作业,会把输入的数据切分为若干块,先由map任务(task)一并行的方式处理它们,框架会先对map的输出进行排序,把所有具有相同的key值的value结合在一起,然后输入给reduce任务。接下来由reduce任务进行相应的并行计算。最后把计算的结果输出到HDFS存储起来。需要我们记忆的几个点,mapreduce作业执行涉及4个独立的实体:
(1)客户端(client):编写mapreduce程序,配置作业,提交作业,这就是程序员完成的工作;
(2)JobTracker:初始化作业,分配作业,与TaskTracker通信,协调整个作业的执行;
(3)TaskTracker:保持与JobTracker的通信,在分配的数据片段上执行Map或Reduce任务,TaskTracker和JobTracker的不同有个很重要的方面,就是在执行任务时候TaskTracker可以有n多个,JobTracker则只会有一个(JobTracker只能有一个就和hdfs里namenode一样存在单点故障,我会在后面的mapreduce的相关问题里讲到这个问题的)
(4)HDFS:保存作业的数据、配置信息等等,最后的结果也是保存在hdfs上面

Yarn

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-NOlCJ88z-1586439263704)(H:/本地图片/markdow图片/1578141339505.png)]

(1)Resource Manager
接受用户提交的分布式计算程序,并为其划分资源。
管理、监控各个Node Manager上的资源情况,以便于均衡负载。
(2)Node Manager
管理它所在机器的运算资源(cpu + 内存)。
负责接受Resource Manager分配的任务,创建容器、回收资源。

Yarn存在的原因
  1. JobTracker和NameNode的单点问题,JobTracker节点工作任务量大同时容易出现问题。

  2. MapReduce采用了基于slot的资源分配模型,slot是一种粗粒度的资源划分单位,通常一个任务不会用完槽位对应的资源,且其他任务也无法使用这些空闲资源,同时map的槽位和reduce的槽位是不可以通用的。

Yarn重构后,JobTracker的功能,被分散到了各个进程中。JobTracker的功能被分散到各个进程中包括ResourceManager和NodeManager:计算模型相关的事情可以放在 NodeManager 的一个扩展服务中,如 MAP-REDUCE 的 shuffle。监控功能分给了NodeManager和Application Master。同时由于这些进程可以被单独部署所以这样就大大减轻了单点故障及压力。

同时Yarn使用了Container,而hadoop1.x中使用了slot。具体如下:

JobTracker三部分:

  1. ResourceManager,负责 Scheduler 及 ApplicationsManager(应用管理);

    对调度器Scheduler 来说,YARN 提供了多种直接可用的调度器, Fair Scheduler 和 Capacity Scheduler 等。调度器仅根据各个应用程序的资源需求进行资源分配,分配的基本单位是Container,而容器里面是将内存,CPU,网络,磁盘封装到一起。同时用户也可设计自己的调度器。

  2. ApplicationMaster,负责 job 的生命周期管理;

  3. JobHistroyServer,负责日志的展示。

TaskTracker 替换成了 NodeManager。

NodeManager 管理各种各样的 container。Container 才是真正干活的。它封装了某个节点上的多维度资源,如内存、 CPU、磁盘、网络等,当 AM 向 RM 申请资源时,RM 为 AM 返回的资源便是用 Container 表示的。YARN 会为每个任务分配一个 Container,且该任务只能使用该 Container 中描述的 资源。容器是一个动态划分资源。

Spark与Hadoop 的比较
  1. Spark基于可以基于内存处理数据,Job中间输出结果可以保存在内存中,从而不再需要读写HDFS。

    MapReduce的设设计:中间结果保存在文件中,提高了可靠性,减少了内存占用。但是牺牲了性能。
    Spark的设计:数据在内存中进行交换,要快一些,但是内存这个东西,可靠性不如磁盘。所以性能方面比MapReduce要好。

  2. Spark中有DAG有向无环图。

    Spark 计算比 MapReduce 快的根本原因在于 DAG 计算模型。一般而言,DAG 相比MapReduce 在大多数情况下可以减少 shuffle 次数。Spark 的 DAGScheduler 相当于一个改进版的 MapReduce,如果计算不涉及与其他节点进行数据交换,Spark 可以在内存中一次性完成这些操作,也就是中间结果无须落盘,减少了磁盘 IO 的操作。但是,如果计算过程中涉及数据交换,Spark 也是会把 shuffle 的数据写磁盘的!有一个误区,Spark 是基于内存的计算,所以快,这不是主要原因,要对数据做计算,必然得加载到内存,Hadoop 也是如此,只不过 Spark 支持将需要反复用到的数据给 Cache 到内存中,减少数据加载耗时,所以 Spark 跑机器学习算法比较在行(需要对数据进行反复迭代)。Spark 基于磁盘的计算也是比 Hadoop 快。

  3. Spark存储数据可以指定副本个数,MR默认3个。

    spark容错性高,它通过弹性分布式数据集RDD来实现高效容错,RDD是一组分布式的存储在节点内存中的只读性质的数据集,这些集合是弹性的,某一部分丢失或者出错,可以通过整个数据集的计算流程的血缘关系来实现重建;mapreduce的话容错可能只能重新计算了,成本较高。

  4. JVM的优化

    Hadoop每次MapReduce操作,启动一个Task便会启动一次JVM,基于进程的操作。而Spark每次MapReduce操作是基于线程的,只在启动Executor是启动一次JVM,内存的Task操作是在线程复用的。每次启动JVM的时间可能就需要几秒甚至十几秒,那么当Task多了,Hadoop比Spark慢。

  5. Spark中提供了各种场景 的算子,MR中只有map ,reduce 相当于Spark中的map和reduceByKey两个算子

  6. Spark 是粗粒度资源申请,Application执行快。

    spark是粗粒度资源申请,也就是当提交spark application的时候,application会将所有的资源申请完毕,如果申请不到资源就等待,如果申请到资源才执行application,task在执行的时候就不需要自己去申请资源,task执行快,当最后一个task执行完之后task才会被释放。优点是执行速度快,缺点是不能使集群得到充分的利用。

    MapReduce是细粒度资源申请,当提交application的时候,task执行时,自己申请资源,自己释放资源,task执行完毕之后,资源立即会被释放,task执行的慢,application执行的相对比较慢。

    优点是集群资源得到充分利用,缺点是application执行的相对比较慢。

  7. Spark 中shuffle map端自动聚合功能,MR手动设置。

  8. Spark 中shuffle ByPass机制有自己灵活的实现。

总的来说,Spark解决了Hadoop的以下问题

  • 抽象层次低,需要手工编写代码来完成,使用上难以上手=>基于RDD的抽象,实数据处理逻辑的代码非常简短。

  • 只提供两个操作,Map和Reduce,表达力欠缺=>提供很多转换和动作,很多基本操作如Join,GroupBy已经在RDD转换和动作中实现。

  • 一个Job只有Map和Reduce两个阶段(Phase),复杂的计算需要大量的Job完成,Job之间的依赖关系是由开发者自己管理的=>一个Job可以包含RDD的多个转换操作,在调度时可以生成多个阶段(Stage),而且如果多个map操作的RDD的分区不变,是可以放在同一个Task中进行。

  • 处理逻辑隐藏在代码细节中,没有整体逻辑=>在Scala中,通过匿名函数和高阶函数,RDD的转换支持流式API,可以提供处理逻辑的整体视图。代码不包含具体操作的实现细节,逻辑更清晰。

  • 中间结果也放在HDFS文件系统中=>中间结果放在内存中,内存放不下了会写入本地磁盘,而不是HDFS。

  • ReduceTask需要等待所有MapTask都完成后才可以开始=> 分区相同的转换构成流水线放在一个Task中运行,分区不同的转换需要Shuffle,被划分到不同的Stage中,需要等待前面的Stage完成后才可以开始。

  • 时延高,只适用Batch数据处理,对于交互式数据处理,实时数据处理的支持不够=>通过将流拆成小的batch提供Discretized Stream处理流数据。

  • 对于迭代式数据处理性能比较差=>通过在内存中缓存数据,提高迭代式计算的性能。

小结

hadoop与yarn的关系
1.Tracker 替换成了 Manager
2.Yarn使用了Container,hadoop1.x中使用slot

hadoop与spark的比较
1.Spark基于内存处理数据,Job中间输出结果可以保存在内存中,从而不再需要读写HDFS。
2.Spark中有DAG有向无环图。
3.Spark存储数据可以指定副本个数,MR默认3个。
4.JVM的优化。MR启动一个Task便会启动一次JVM,基于进程的操作。Spark每次MapReduce操作是基于线程的,只在启动Executor是启动一次JVM,内存的Task操作是在线程复用的。
5.Spark中提供了各种场景的算子,MR中只有map ,reduce 相当于Spark中的map和reduceByKey两个算子。
6.Spark粗粒度资源申请Application执行。
7.Spark shuffle机制有自己灵活的实现。

lot

hadoop与spark的比较
1.Spark基于内存处理数据,Job中间输出结果可以保存在内存中,从而不再需要读写HDFS。
2.Spark中有DAG有向无环图。
3.Spark存储数据可以指定副本个数,MR默认3个。
4.JVM的优化。MR启动一个Task便会启动一次JVM,基于进程的操作。Spark每次MapReduce操作是基于线程的,只在启动Executor是启动一次JVM,内存的Task操作是在线程复用的。
5.Spark中提供了各种场景的算子,MR中只有map ,reduce 相当于Spark中的map和reduceByKey两个算子。
6.Spark粗粒度资源申请Application执行。
7.Spark shuffle机制有自己灵活的实现。


版权声明:本文为Janspiry原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/Janspiry/p/16046701.html