大数据篇:Hive

大数据篇:Hive

hive.apache.org

Hive是什么?

Hive是Facebook开源的用于解决海量结构化日志的数据统计,是基于Hadoop的一个数据仓库工具,可以将结构化的数据文件映射为一张表,并且提供类SQL查询功能,本质是将HQL转化成MapReduce程序。

数据存储在HDFS,分析数据底层实现默认是MapReduce,执行程序运行在Yarn上。

如果没有Hive

想象一下数据统计的时候写大量的MapReduce程序,那会是多么痛苦。如果是写SQL就开心多了,尤其是离线数据仓库方面广泛应用。

1 Hive和数据库的区别

由于 Hive 采用了类似SQL 的查询语言 HQL(hive query language),因此很容易将 Hive 理解为数据库。其实从结构上来看,Hive 和数据库除了拥有类似的查询语言,再无类似之处。

  • 1 查询语言
    • 由于SQL被广泛的应用在数据仓库中,因此,专门针对Hive的特性设计了类SQL的查询语言HQL。熟悉SQL开发的开发者可以很方便的使用Hive进行开发。
  • 2 数据存储位置
    • Hive 是建立在 Hadoop 之上的,所有 Hive 的数据都是存储在 HDFS 中的。而数据库则可以将数据保存在块设备或者本地文件系统中。
  • 3 数据更新
    • 由于Hive是针对数据仓库应用设计的,而数据仓库的内容是读多写少的。因此,Hive中不支持对数据的改写和添加,所有的数据都是在加载的时候中确定好的。而数据库中的数据通常是需要经常进行修改的,因此可以使用 INSERT INTO … VALUES 添加数据,使用 UPDATE … SET修改数据。
  • 4 索引
    • Hive在加载数据的过程中不会对数据进行任何处理,甚至不会对数据进行扫描,因此也没有对数据中的某些Key建立索引。Hive要访问数据中满足条件的特定值时,需要暴力扫描整个数据,因此访问延迟较高。由于 MapReduce 的引入, Hive 可以并行访问数据,因此即使没有索引,对于大数据量的访问,Hive 仍然可以体现出优势。数据库中,通常会针对一个或者几个列建立索引,因此对于少量的特定条件的数据的访问,数据库可以有很高的效率,较低的延迟。由于数据的访问延迟较高,决定了 Hive 不适合在线数据查询。
  • 5 执行
    • Hive中大多数查询的执行是通过 Hadoop 提供的 MapReduce 来实现的。而数据库通常有自己的执行引擎。
  • 6 执行延迟
    • Hive 在查询数据的时候,由于没有索引,需要扫描整个表,因此延迟较高。另外一个导致 Hive 执行延迟高的因素是 MapReduce框架。由于MapReduce 本身具有较高的延迟,因此在利用MapReduce 执行Hive查询时,也会有较高的延迟。相对的,数据库的执行延迟较低。当然,这个低是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
  • 7 可扩展性
    • 由于Hive是建立在Hadoop之上的,因此Hive的可扩展性是和Hadoop的可扩展性是一致的(世界上最大的Hadoop 集群在 Yahoo!,2009年的规模在4000 台节点左右)。而数据库由于 ACID 语义的严格限制,扩展行非常有限。目前最先进的并行数据库 Oracle 在理论上的扩展能力也只有100台左右。
  • 8 数据规模
    • 由于Hive建立在集群上并可以利用MapReduce进行并行计算,因此可以支持很大规模的数据;对应的,数据库可以支持的数据规模较小。

2 Hive架构

Hive通过给用户提供的一系列交互接口,接收到用户的指令(SQL),使用自己的Driver,结合元数据(MetaStore),将这些指令翻译成MapReduce,提交到Hadoop中执行,最后,将执行返回的结果输出到用户交互接口。

  • 1)用户接口:Client
    • CLI(hive shell)、JDBC/ODBC(java访问hive)、WEBUI(浏览器访问hive,如:Hue)
  • 2)元数据:Metastore
    • 元数据包括:表名、表所属的数据库(默认是default)、表的拥有者、列/分区字段、表的类型(是否是外部表)、表的数据所在目录等;
    • 默认存储在自带的derby数据库中,推荐使用MySQL存储Metastore
  • 3)Hadoop
    • 使用HDFS进行存储,使用MapReduce进行计算。
  • 4)驱动器:Driver
    • (1)解析器(SQL Parser):将SQL字符串转换成抽象语法树AST,这一步一般都用第三方工具库完成,比如antlr;对AST进行语法分析,比如表是否存在、字段是否存在、SQL语义是否有误。
    • (2)编译器(Physical Plan):将AST编译生成逻辑执行计划。
    • (3)优化器(Query Optimizer):对逻辑执行计划进行优化。
    • (4)执行器(Execution):把逻辑执行计划转换成可以运行的物理计划。对于Hive来说,就是MR/TEZ/Spark。

3 hive常用数据类型

mysql字段类型 hive:ods字段类型 hive:dwd字段类型
tinyint tinyint tinyint
int int int
bigint bigint bigint
varchar string string
datetime bigint string
bit boolean int
double double double

4 Hive常用命令

  • 链接hive的三种常用方式
  1. 使用hive脚本可以进入客户端。我这里使用Hue进行演示。

  1. 使用hue直接操作hive

  1. hiveserver2 beeline链接方式
  • 启动hiveserver2服务
hiveserver2
  • 启动beeline
beeline
  • beeline链接hiveserver2,输入用户名密码
!connect jdbc:hive2://cdh01.cm:10000

4.1 数据库操作

  • 创建数据库
CREATE DATABASE 数据库名字

  • 查询数据库
show databases

  • 删除数据库
drop database 数据库名字 
drop database 数据库名字 cascade
  • 显示数据库信息
desc database 数据库名字
desc database extended 数据库名字
  • 切换数据库
use 数据库名字
  • 查询数据库拥有表
show tables
  • 查询表信息
desc formatted 表名

4.2 创建删除表

  • 建表语法

    • CREATE [EXTERNAL] TABLE [IF NOT EXISTS] table_name 
          [(col_name data_type [COMMENT col_comment], ...)] 
          [COMMENT table_comment] 
          [PARTITIONED BY (col_name data_type [COMMENT col_comment], ...)] 
          [CLUSTERED BY (col_name, col_name, ...) 
          [SORTED BY (col_name [ASC|DESC], ...)] INTO num_buckets BUCKETS] 
          [ROW FORMAT row_format] 
          [STORED AS file_format] 
          [LOCATION hdfs_path]
      
  • 建表常用参数

    • EXTERNAL:外部表关键字,Hive分内部表(元数据和数据会被一起删除,一般不太重要的表如:中间临时表)和外部表(只删除元数据,不删除数据,一般而言都采用外部表,因为数据安全些),一般而言采用外部表。

    • PARTITIONED BY:分区指定字段为’dt’。

      PARTITIONED BY (
        `dt` String COMMENT 'partition'
      )
      
    • row format delimited fields terminated by ‘\t’ :数据使用什么做切分,这里使用制表符。

    • stored as parquet:文件存储格式,推荐parquet(spark天然支持parquet)。

    • location ‘/warehouse/层名/库名/表名’ :文件存储位置。

    • tblproperties (“parquet.compression”=”snappy”) :文件压缩策略,推荐snappy。

    • CLUSTERED BY创建分桶表(抽样场景使用)

    • SORTED BY排序(一般情况查询语句都有排序,故不常用)

  • 建表例子

#删除表
drop table if exists dwd.student

#创建表(库名.表名)
CREATE EXTERNAL TABLE `dwd.student`(
  `ID` bigint COMMENT '',
  `CreatedBy` string COMMENT '创建人',
  `CreatedTime` string COMMENT '创建时间',
  `UpdatedBy`  string COMMENT '更新人',
  `UpdatedTime` string COMMENT '更新时间',
  `Version` int COMMENT '版本号',
  `name` string COMMENT '姓名'
  ) COMMENT '学生表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
location '/warehouse/dwd/test/student/'
stored as parquet
tblproperties ("parquet.compression"="snappy") 

创建内部表就是去掉EXTERNAL关键字,使用插入数据案例测试一下删除表后重新建立的效果吧。

4.3 插入数据

INSERT INTO TABLE dwd.student partition(dt='2020-04-05') VALUES(1,"heaton","2020-04-05","","","1","zhangsan") 
INSERT INTO TABLE dwd.student partition(dt='2020-04-06') VALUES(2,"heaton","2020-04-06","","","1","lisi") 

4.4 查询数据

SELECT * FROM dwd.student
SELECT * FROM dwd.student WHERE dt="2020-04-05"
SELECT * FROM dwd.student WHERE dt="2020-04-06"

4.4.1 分区解释

  • 如上图,2个不同的分区对应的都是HDFS上的独立文件夹,该文件夹下是该分区所有的数据文件。Hive中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。在查询时通过WHERE字句中的表达式查询指定的分区(如上面的WHERE dt=”2020-04-05″),这样的查询效率会提高很多。
  • 多级分区,就是继续分文件夹,在PARTITIONED BY 中用“,”号隔开分区字段即可,不建议超过2级分区。

4.5 本地load到hive

  • 根据上面建表语句建立student1表
  • 将student表2020-04-05分区中的数据加载到student1表2020-04-05分区中
LOAD DATA INPATH '/warehouse/dwd/test/student/dt=2020-04-05/000000_0' INTO TABLE dwd.student1 partition(dt='2020-04-05')

4.6 清除表数据

  • 注意:Truncate只能删除内部表,不能删除外部表数据
  • 注意:hive不支持delete,update(需要通过配置文件修改支持)
  • Truncate不经过事务,delete经过事务
TRUNCATE TABLE dwd.student1

4.7 将查询结果插入到表中

  • INSERT INTO追加到表中
insert into table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"

由于student1表2020-04-05分区中已经有一条张三数据,这时插入李四数据,查询结果为2条数据

  • INSERT OVERWRITE复写表数据
insert overwrite table dwd.student1 partition(dt='2020-04-05') select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"

由上面的例子使student1表2020-04-05分区中有2条数据,这时候在插入一条,发现只有一条,因为数据被复写。

4.8 通过临时表插入数据

  • with 临时表名 as 查询语句,如下例子为查询两张表数据,字段缺失可以补”0″或者””,插入结果表中
with

s1 as
(
 select id,createdby,createdtime,version,name from dwd.student where dt="2020-04-05"
),
s2 as
(
 select id,updatedby,updatedtime,version,name from dwd.student where dt="2020-04-06"
)

insert overwrite table dwd.student1 partition(dt='2020-04-06')
select 
	s1.id as id,
	s1.createdby as createdby,
	s1.createdtime as createdtime,
	"" as updatedby,
	"0" as updatedtime,
	s1.version as version,
	s1.name as name
from 
s1

union all

select 
	s2.id as id,
	"" as createdby,
	"0" as createdtime,
	s2.updatedby as updatedby,
	s2.updatedtime as updatedtime,
	s2.version as version,
	s2.name as name
from 
s2

4.9 Hive常用交互命令

  • hive -e “select * from xx”:hive命令行SQL语句

  • hive -f xx.hql:hive执行sql文件

4.10 分桶及抽样查询

  1. 分区针对的是数据的存储路径,分桶针对的是数据文件,分桶是将数据集分解成更容易管理的若干部分的一个技术。
  2. 创建表时使用 CLUSTERED BY(字段名) into 分桶数 buckets 指定分桶条件。
  3. load命令导入分桶表,数据不会分桶。
  4. 使用分桶功能需要设置hive属性:set hive.enforce.bucketing=true; set mapreduce.job.reduces=-1;
  5. 对于非常大的数据集,有时用户需要使用的是一个具有代表性的查询结果,而不是全部结果,这时可以使用分桶来进行抽样。抽样语句:TABLESAMPLE(BUCKET x OUT OF y ON 分桶字段)

y必须是table总bucket数的倍数或者因子,根据Y值决定抽样的比例,如:table共4个桶,当y=2时,抽取4/2=2个桶数据,等y=8时,抽取4/8=1/2的bucket数据。

x是从哪个桶开始抽,然后依次抽取x+y的桶数据,如:table共4个桶,当y=2,x=1时,抽取4/2=2个桶数据,桶数为1号,1+2=3号。

  • 案例
#建表sql
CREATE EXTERNAL TABLE `dwd.student_buck`(
  `ID` bigint COMMENT '',
  `name` string COMMENT '姓名'
  ) COMMENT '学生表'
CLUSTERED BY(ID) into 4 buckets
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/test/student_buck/'
tblproperties ("parquet.compression"="snappy") 

#插入数据
INSERT INTO TABLE dwd.student_buck VALUES(1,"zhangsan"),(2,"lisi") ,(3,"wangwu") ,(4,"zhaoliu") ,(5,"haqi") ,(6,"xiba") ,(7,"heijiu") ,(8,"washi") 

#抽样查询
SELECT * from dwd.student_buck TABLESAMPLE(BUCKET 1 OUT OF 4 ON id)

5 Hive 查询操作特殊解释

https://cwiki.apache.org/confluence/display/Hive/LanguageManual+Select

5.1 join解释

  1. Join语句只支持等值链接,不支持非等值链接。
  2. 除左、右、内链接外还支持全连接full join,但是效率较差,一般使用union all

5.2 排序分组操作

Order By 全局排序(全局一个Reducer)

Group By 分组(会根据跟的字段,发到不同的Reducer中)

Distribute By 类似MR中的partition进行分区,需结合Sort By使用,而且要写在前面,如:根据日期分区,在根据年龄排序

#多reduce才有效果
select * from student distribute by createtime sort by age desc

Sort By每个Reducer内部排序,对全局结果来说不是排序,结合Distribute By使用。(多Reducer区内排序)

Cluster By 当分区字段和区内排序字段相同,也就是distribute by和sort by需要字段相同可以使用其代替,但是只能升序排列。

5.3 空字段赋值函数

NVL(string,替换值),如果string为NULL,则返回替换值,否则返回string值

5.4 时间转换

# 1 格式化时间->  2020-04-05 00:00:00
SELECT date_format("2020-04-05","yyyy-MM-dd HH:mm:ss")
# 2 时间天数相加->  2020-04-05  ->2020-04-01
SELECT date_add("2020-04-05",4)
SELECT date_add("2020-04-05",-4)
# 3 时间天数相减->2020-04-01(一般直接用date_add了,这个不怎么用)
SELECT date_sub("2020-04-05",4)
# 4 两个时间相减-> 4
SELECT datediff("2020-04-09","2020-04-05")
# 5 时间转时间戳->  1586016000
SELECT unix_timestamp("2020-04-05","yyyy-MM-dd")
# 6 时间戳转时间->  2020-04-05
SELECT from_unixtime(cast("1586016000" as BIGINT),"yyyy-MM-dd")
# 7 将日期转为星期->  1 (星期一)
SELECT pmod(datediff("2020-04-06","1920-01-01")-3,7)

5.5 CASE WHEN 和 IF

财务部
财务部
财务部
科技部
人事部
人事部

如上图假设表 emp 中有6个人,求每个部门下面的男女个数。

#使用CASE WHEN
select 
	department,
	sum(case sex when '男' then 1 else 0 end) man_count,
	sum(case sex when '女' then 1 else 0 end) woman_count,
from 
	emp
group by
	department
	
#还可以使用IF
select 
	department,
	sum(if(sex='男',1,0)) man_count,
	sum(if(sex='女',1,0)) woman_count,
from 
	emp
group by
	department

5.6 聚合拼接

# 1 concat拼接->  2020-04-05
SELECT concat("2020","-","04","-","05")
# 2 concat_ws拼接->  2020-04-05 注意 concat_ws可以传collect_set,需要字段为string
SELECT concat_ws("-","2020","04","05")
# 3 汇总成Array类型字段-> [8,4,5,1,6,2,7,3]
SELECT collect_set(id) from dwd.student_buck

5.7 炸裂

EXPLODE(字段名):将一列复杂的array或者map结构拆分成多行

炸裂函数必须配合侧写视图函数:LATERAL VIEW udtf(expression) 侧写视图别名 AS 侧写结果列别名

  • 例子
# 建表
CREATE EXTERNAL TABLE `dwd.hobby`(
  `id` bigint COMMENT '',
  `name` string COMMENT '姓名',
  `hobby_name` array<string> COMMENT '爱好名字'
  ) COMMENT '爱好表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
collection items terminated by ','
stored as parquet
location '/warehouse/dwd/test/hobby/'
tblproperties ("parquet.compression"="snappy") 

# 插入3条数据
INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') 
SELECT 1,"zhangsan",array("篮球","足球","羽毛球","恶作剧")

INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') 
SELECT 2,"lisi",array("足球","恶作剧")

INSERT INTO TABLE dwd.hobby partition(dt='2020-04-05') 
SELECT 3,"wangwu",array("羽毛球","恶作剧")

# 使用侧写视图将炸裂结果作为临时视图,聚合需要的结果
select 
    name,hobby_name_col 
from 
    dwd.hobby
lateral view explode(hobby_name) hobby_tmp as hobby_name_col

5.8 窗口函数

主要解决1行和n行数据无法聚合在一起展示的问题。

  • OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小会随着行的变化而变化
    • CURRENT ROW:当前行
    • n PRECEDING:往前n行数据
    • n FOLLOWING:往后n行数据
    • UNBOUNDED:起点,UNBOUNDED PRECEDING表示从前面的起点,UNBOUNDED FOLLOWING表示到后面的终点。
    • Partition By:分区
    • Order By:排序
    • Distribute By:区内分组
    • Sort By:区内排序
    • Group By:不可使用
  • LAG(col,n,默认值):往前n行数据
  • LEAD(col,n,默认值):往后n行数据
  • NTILE(n):类似于分桶,把数据有序分到n个桶里,序号从1开始。
  • RANK():排序相同时会重复,总数不会变
  • DENSE_RANK():排序相同时会重复,总数会减小
  • ROW_NUMBER():会根据顺序计算

5.8.0 案例数据准备

#建表
CREATE EXTERNAL TABLE `dwd.order`(
  `name` string COMMENT '姓名',
  `order_date` string COMMENT '购买日期',
  `price` bigint  COMMENT '消费金额'
  ) COMMENT '订单表'
PARTITIONED BY (
  `dt` String COMMENT 'partition'
)
row format delimited fields terminated by '\t'
stored as parquet
location '/warehouse/dwd/test/order/'
tblproperties ("parquet.compression"="snappy") 


#插入数据
INSERT INTO TABLE `dwd.order` partition(dt='2020-04-05') VALUES("zhangsan","2020-01-01",15),("lisi","2020-01-02",22),("zhangsan","2020-04-01",34),("lisi","2020-04-01",15),("zhangsan","2020-04-04",42),("zhangsan","2020-03-01",24),("lisi","2020-02-01",65),("wangwu","2020-04-01",33),("zhangsan","2020-05-01",43),("zhangsan","2020-07-01",12),("wangwu","2020-02-05",32),("zhangsan","2020-03-06",22),("lisi","2020-04-07",14)

5.8.1 OVER()无参数

  • 查询在2020年4月购买过的顾客名字及总人数
SELECT name,count(*) OVER() FROM dwd.`order` WHERE order_date BETWEEN "2020-04-01" AND "2020-04-30" GROUP BY name

5.8.2 OVER()无参数

  • 查询顾客的购买明细及购买总额
SELECT *,sum(price) OVER() FROM dwd.`order` 

5.8.3 OVER()排序参数

  • 查询顾客的购买明细及购买总额并按月累加展示
SELECT *,sum(price) OVER(ORDER BY order_date) FROM dwd.`order` 
SELECT *,sum(price) OVER(ORDER BY order_date DESC) FROM dwd.`order` 

over函数传参,还是给所有结果集进行开窗,但是根据参数限定窗口大小,上面sql的意思为:

1号zhangsan数据窗口内只含有 1号张三

2号lisi数据窗口内含有 1号张三,2号李四

3号lisi数据窗口内含有 1号张三,2号李四,3号李四

依次类推

5.8.4 OVER()分组参数

  • 查询顾客的购买明细及每个顾客的购买总额
SELECT *,sum(price) OVER(distribute by name) FROM dwd.`order` 

5.8.5 OVER()分组排序参数

  • 查询顾客的购买明细及每个顾客的购买总额并按月累加展示
SELECT *,sum(price) OVER(distribute by name sort by order_date) FROM dwd.`order` 

5.8.6 CURRENT ROW->UNBOUNDED->PRECEDING->FOLLOWING窗口大小指定

  • 查询顾客的购买明细及每个顾客的购买总额并直接累加展示
SELECT *,sum(price) OVER(distribute by name rows between UNBOUNDED PRECEDING and CURRENT ROW) FROM dwd.`order` 

5.8.7 LAG->LEAD 窗口值指定

  • 查询顾客的购买明细并且追加客户上次购买日期
SELECT *,lag(order_date,1,"1970-01-01") OVER(distribute by name sort by order_date) FROM dwd.`order` 

5.8.8 NTILE 分桶

SELECT *,ntile(5) OVER() FROM dwd.`order` 

  • 查询当前20%的订单信息
SELECT * FROM (SELECT *,ntile(5) OVER(ORDER BY order_date) ntile_5 FROM dwd.`order`) t1 WHERE t1.ntile_5=1 

5.8.9 ROW_NUMBER 排名

SELECT *,row_number() OVER(order by order_date) FROM dwd.`order` 

5.8.10 RANK 排名

SELECT *,rank() OVER(order by order_date) FROM dwd.`order` 

5.8.11 DENSE_RANK 排名

SELECT *,dense_rank() OVER(order by order_date) FROM dwd.`order` 

6 自定义函数

  • 用户自定义函数类别可以分为三种:
    • UDF(一行进一行出),如:concat,split
    • UDAF(多行进一行出),如:count,sum,max
    • UDTF(一行进多行出),如:explode

https://cwiki.apache.org/confluence/display/Hive/HivePlugins

  • 添加jar包及函数的方式请参考6.1案例

6.1 自建UDF

必须有返回值,可以返回null,但是类型不能是void。

需要继承UDF类,并且方法名为evaluate

  • maven-pom.xml
    <dependency>
        <groupId>org.apache.hive</groupId>
        <artifactId>hive-exec</artifactId>
        <version>2.1.1</version>
    </dependency>
  • java代码
import org.apache.hadoop.hive.ql.exec.UDF;

//模拟concat
public class MyUDF extends UDF {
    public String evaluate(String... args) {
        StringBuilder s = new StringBuilder();
        for (String arg : args) {
            s.append(arg);
        }
        return s.toString();
    }
}
  • 打包上传服务器,进入hive命令行
# 1 添加jar包,注意服务器本地路径(hive客户端关闭则消失,需要重新添加,如果不想重新添加,可以直接使用4将jar包放入hive/lib下)
add jar /root/test/function-1.0-SNAPSHOT.jar
# 2 添加函数,注意hive中的函数别名,还有jar包全类名,如下为永久创建和临时创建temporary(hive客户端关闭则消失)
create function myconcat as "com.hive.function.udf.MyUDF"
create temporary function myconcat as "com.hive.function.udf.MyUDF"
# 3 加载jar包的第二种方式,上传jar包至hdfs集群
# create temporary function myconcat as "com.hive.function.udf.MyUDF" using jar 'hdfs:///hive_jar/function-1.0-SNAPSHOT.jar';
# 4  加载jar包的第三种方式,直接放在hive的lib目录下,启动hive,使用2添加函数即可。下面是cdh环境的jars包路径,配置软连接到hive/lib下
# ln -s /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/jars/function-1.0-SNAPSHOT.jar /opt/cloudera/parcels/CDH-6.2.0-1.cdh6.2.0.p0.967373/lib/hive/lib
# 5 删除函数
# drop function myconcat
  • 可以在hive中使用自建函数
SELECT myconcat("a","-","b","-","c")

6.2 自建UDAF

用户自定义UDAF必须继承UDAF,必须提供一个实现了UDAFEvaluator接口的内部类

  • java代码
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

//模拟avg
public class MyUDAF extends UDAF {
    public static class AvgState {
        private long mCount;
        private double mSum;
    }

    public static class AvgEvaluator implements UDAFEvaluator {
        AvgState state;

        public AvgEvaluator() {
            super();
            state = new AvgState();
            init();
        }

        /**
         * init函数类似于构造函数,用于UDAF的初始化
         */
        public void init() {
            state.mSum = 0;
            state.mCount = 0;
        }

        /**
         * iterate接收传入的参数,并进行内部的轮转。其返回类型为boolean * * @param o * @return
         */

        public boolean iterate(Double o) {
            if (o != null) {
                state.mSum += o;
                state.mCount++;
            }
            return true;
        }

        /**
         * terminatePartial无参数,其为iterate函数遍历结束后,返回轮转数据, * terminatePartial类似于hadoop的Combiner * * @return
         */

        public AvgState terminatePartial() {
            // combiner
            return state.mCount == 0 ? null : state;
        }

        /**
         * merge接收terminatePartial的返回结果,进行数据merge操作,其返回类型为boolean * * @param o * @return
         */

        public boolean merge(AvgState avgState) {
            if (avgState != null) {
                state.mCount += avgState.mCount;
                state.mSum += avgState.mSum;
            }
            return true;
        }

        /**
         * terminate返回最终的聚集函数结果 * * @return
         */
        public Double terminate() {
            return state.mCount == 0 ? null : Double.valueOf(state.mSum / state.mCount);
        }
    }
}
  • 根据6.1创建函数进行函数创建,并使用

6.3 自建UDTF

用户自定义UDAF必须继承GenericUDTF,重写initialize(),process(),close()方法。

  • java代码
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

import java.util.ArrayList;
import java.util.List;

//模拟EXPLODE和split
public class MyUDTF extends GenericUDTF {
    private List<String> dataList = new ArrayList<>();

    //初始化方法,返回对象结构校验器
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        //列名,会被用户传递的覆盖
        List<String> fieldNames = new ArrayList<>();
        fieldNames.add("word1");

        //返回列以什么格式输出,这里是string,添加几个就是几个列,和上面的名字个数对应个数。
        List<ObjectInspector> fieldOIs = new ArrayList<>();
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
    }

    @Override
    public void process(Object[] objects) throws HiveException {
        //获取数据
        String data = objects[0].toString();
        //获取分隔符
        String splitKey = objects[1].toString();
        //切分数据
        String[] words = data.split(splitKey);
        //遍历写出
        for (String word : words) {
            //将数据放入集合
            dataList.clear();
            dataList.add(word);
            //写出数据到缓冲区
            forward(dataList);
        }

    }

    @Override
    public void close() throws HiveException {
    //没有流操作
    }
}
  • 根据6.1创建函数进行函数创建,并使用

7 优化

7.1 调整hive执行引擎

hive.execution.engine配置为spark

7.2 Fetch抓取

Fetch抓取修改为more,可以使全局查找,字段查找,limit查找等都不走计算引擎,而是直接读取表对应储存目录下的文件,大大普通查询速度。

hive.fetch.task.conversion配置为more

7.3 开启本地模式

hive可以通过本地模式在单台机器上处理所有的任务,对于小的数据集,执行时间可以明显被缩短。

hive-site.xml调整下面3个参数,开启本地模式,文件不超过50M,个数不超过10个。

hive.exec.mode.local.auto=true

hive.exec.mode.local.auto.inputbytes.max=50000000 (50M左右,默认128M->134217728,机器资源足建议使用默认值)

hive.exec.mode.local.auto.input.files.max=10 (模式4个)

<property><name>hive.exec.mode.local.auto</name><value>true</value></property>
<property><name>hive.exec.mode.local.auto.inputbytes.max</name><value>50000000</value></property>
<property><name>hive.exec.mode.local.auto.input.files.max</name><value>10</value></property>

7.4 join优化

在join问题上,让小表放在左边 去左链接(left join)大表,这样可以有效的减少内存溢出错误发生的几率。

hive.auto.convert.join开启。(默认开启)

hive.mapjoin.smalltable.filesize(默认25000000->接近24M,如果机器内存足可以适当调大,需在hive-site.xml中设置,如7.3)

大表和大表join,空key会打到同一个reduce上,造成数据倾斜,任务缓慢,内存泄漏。(reduce任务某个非常慢,其他很快,及发生数据倾斜)

  • 空key过滤:使用子查询过滤掉空key,可以有效的提升查询速率。
  • 空key转换:附随机值,使其可以随机均匀的分布在不同的reduce上,使用case when then或if判断null值,赋予rand()函数。

7.5 Group By优化

默认情况下map阶段同一个key发送给一个reduce,当一个key数据过大时就发生数据倾斜。

那么把某些聚合操作提到Map端进行部分聚合,最后在reduce端得出最终结果,也可以有效的提升执行效率。

hive.map.aggr开启。(默认开启)

hive.groupby.mapaggr.checkinterval=100000(默认100000条,在map端进行聚合操作的条目数目,需在hive-site.xml中设置,如7.3)

hive.groupby.skewindata=true(默认false,有数据倾斜时进行负载均衡,需在hive-site.xml中设置,如7.3)

hive.groupby.skewindata当选项设置为true时,生成的查询计划会有两个MR Job,第一个MR Job会将key加随机数均匀的分布到Reduce中,做部分聚合操作(预处理),第二个MR Job在根据预处理结果还原原始key,按照Group By Key分布到Reduce中进行聚合运算,完成最终操作。

7.6 Count(Distinct)去重统计

数据量小的时候没关系,大数据量下,由于Count Distinct操作需要用一个Reduce任务来完成,这一个Reduce需要处理的数据量太大,会导致Job缓慢,可以使用子查询Group By再Count的方式替换。

7.7 行列过滤

列处理:不使用Select *,使用什么字段就写什么字段,就算是所有字段,也要一一列出,养成好习惯。

行处理:在join操作中,不要直接关联表后使用where条件,这样会使全表关联后在过滤,使用子查询过滤后在join来替换,使查询效率提高。如下

#错误写法
select id from student s left join class c on s.cid=c.id where c.id<=10
#正确写法
select id from student s left join (select id from class where id<=10 ) c on s.cid=c.id

7.8 动态分区

当数据会根据情况变化的时候,先有数据,再想分区的情况。

设计表时尽量避免动态分区,速度比静态分区(也就是直接指定分区慢很多)。

  • 开启动态分区参数

  • 必要参数

    • hive.exec.dynamic.partition=true; 默认:true。
    • hive.exec.dynamic.partition.mode=nostrict; 默认:strict至少有一个分区列是静态分区,nostrict所有分区都可以是动态分区。
  • 相关参数

    • hive.exec.max.dynamic.partitions.pernode=100; 默认100,每一个执行mr节点上,允许创建的动态分区的最大数量
    • hive.exec.max.dynamic.partitions=1000; 默认1000,所有执行mr节点上,允许创建的所有动态分区的最大数量
    • hive.exec.max.created.files=100000; 默认100000,所有的mr job允许创建的文件的最大数量
    • hive.error.on.empty.partition=false; 默认false,当有空分区生成时,是否抛出异常。
  • 建表方式同静态分区,插入方式要注意,查询结果最后一个字段即为分区字段(不管名字是否一样,会将最后一个字段的值,直接拿来分区)

#错误写法:会将name值直接插入dt分区字段。
insert into table dwd.student1 partition(dt) 
select id,createdby,createdtime,updatedby,updatedtime,version,name from dwd.student 
#正确写法
insert into table dwd.student1 partition(dt) 
select id,createdby,createdtime,updatedby,updatedtime,version,name,dt from dwd.student 

7.9 开启并行计算

hive.exec.parallel=true;默认false。需在hive-site.xml中设置,如7.3

hive.exec.parallel.thread.number=16;默认8,同一个sql允许的最大并行度,针对集群资源适当增加。需在hive-site.xml中设置,如7.3

7.10 多用Explain

在执行的查询sql前加上Explain指令,查询分析sql执行过程。

版权声明:本文为ttzzyy原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/ttzzyy/p/12650543.html