Hive理论基础
组件:用户接口,元数据存储mysql / derby,解释、编译、优化、执行器。
与数据库的区别
- 数据存储位置不同:Hive存储在HDFS中,数据库存储在块设备或本地文件
- 数据更新:数仓一般不改写数据,数据库增删改查
- 执行延迟:Hive延迟高, mysql延迟低, 只有大规模数据时Hive并行计算的优点才会体现
- 数据规模:Hive大规模计算,数据库规模较小
- 内嵌模式:使用的是内嵌的Derby数据库来存储元数据,也不需要额外起Metastore服务。
- 本地模式:
- 远程模式:
- db(库):在 hdfs 中表现为 hive.metastore.warehouse.dir 目录下一个文件夹
- table(内部表):在 hdfs 中表现所属 db 目录下一个文件夹,当我们删除一个内部表时,Hive也会删除这个表中数据。内部表不适合和其他工具共享数据。
- external table(外部表):数据存放位置可以在 HDFS 任意指定路径 ,删除该表并不会删除掉原始数据,删除的是表的元数据
- partition(分区):在 hdfs 中表现为 table 目录下的子目录
1 create table t_user_part(id int,name string,country string) 2 partitioned by (guojia string) 3 row format delimited fields terminated by ',' ; 4 --注意顺序问题 5 --分区的字段不能是表当中的字段 6 7 load data local inpath './root/4.txt' 8 into table t_user_part partition (guojia='usa'); 9 10 load data local inpath '/root/5.txt' 11 into table t_user_part partition (guojia='china'); 12 --将数据加载到哪个文件夹中 13 14 --多级分区 15 create table t_order(id int,pid int,price double) 16 partitioned by (year string,month string,day string) 17 row format delimited fields terminated by ',' ; 18 19 load data local inpath '/root/5.txt' 20 into table t_order partition (year='2019',month='09',day='18'); 21 22 load data local inpath '/root/4.txt' 23 into table t_order partition (year='2019',month='09',day='18'); 24 25 ALTER TABLE t_user_part ADD PARTITION (guojia='riben') 26 location '/user/hive/warehouse/hadoop32.db/t_user_part/guojia=riben'; 27 --一次添加一个分区 28 29 ALTER TABLE order ADD 30 PARTITION (year='2018', month='09',day="20") 31 location'/user/hive/warehouse/hadoop32.db/t_order' 32 PARTITION (year='2019', month='09',day="20") 33 location'/user/hive/warehouse/hadoop32.db/t_order'; 34 --一次添加多个分区 35 36 --删除分区 37 ALTER TABLE t_user_part DROP IF EXISTS PARTITION (guojia=riben); 38 39 --查看分区 40 show partitions table_name; 41 42 show formatted table_name;
- bucket(分桶):在 hdfs 中表现为同一个表目录下根据 hash 散列之后的多个文件 ,采用对列值哈希,然后除以桶的个数求余的方式决定该条记录存放在哪个桶当中
1 create table stu_buck(Sno string,Sname string, 2 Sbrithday string, Sex string) 3 clustered by(Sno) 4 into 4 buckets 5 row format delimited fields terminated by '\t'; 6 --clustered by 根据哪个字段去分桶,这个字段在表中一定存在 7 --into N buckets 分多少个文件 8 --如果该分桶字段是string,会根据字符串的hashcode % bucketsNum 9 --如果该分桶字段是数值类型,数值 % bucketsNum 10 11 create table student(Sno string,Sname string, 12 Sbrithday string, Sex string) 13 row format delimited fields terminated by '\t'; 14 --insert+select 15 insert overwrite table stu_buck select * from student 16 cluster by(Sno); 17 --默认不让直接使用分桶表
1 --load加载 推荐方式,最常见 (分桶表是不支持load) 2 load data local inpath '/root/hivedata/students.txt' 3 overwrite into table student; 4 --加载本地数据到表对应的路径下 5 --local表明是本地还是hdfs 6 --overwrite表示覆盖操作(慎用) 7 8 load data inpath '/stu' into table student_ext; 9 --加载hdfs上的文件到表对应的路径下(追加) 10 11 --insert + select导入 12 --insert 主要是结合 select 查询语句使用,将查询结果插入到表中 13 insert overwrite table tablename1 14 [partition (partcol1=val1,partclo2=val2)] 15 select_statement1 from source_table 16 17 --多重插入 18 from source_table 19 insert overwrite table tablename1 20 [partition (partcol1=val1,partclo2=val2)] 21 select_statement1 22 insert overwrite table tablename2 23 [partition (partcol1=val1,partclo2=val2)] 24 select_statement2.. 25 26 --动态插入 substr(day,1,7) as month,day分区的虚拟字段 顺序需要对应 27 insert overwrite table d_p_t partition (month,day) 28 select ip,substr(day,1,7) as month,day 29 from dynamic_partition_table; 30 31 --指定分隔符(复杂类型的数据表) 32 --表1(包含array字段类型) 33 --数据: zhangsan beijing,shanghai,tianjin,hangzhou 34 -- wangwu shanghai,chengdu,wuhan,haerbin 35 create table complex_array(name string, 36 work_locations array<string>) 37 row format delimited fields terminated by '\t' 38 collection items terminated by ','; 39 --collection items array集合分隔符 40 41 --表2(包含map字段类型) 42 create table t_map(id int,name string,hobby map<string,string>) 43 row format delimited 44 fields terminated by ',' 45 collection items terminated by '-' 46 map keys terminated by ':' ; 47 --map keys map中k-v分隔符 48 --数据:1,zhangsan,唱歌:非常喜欢-跳舞:喜欢-游泳:一般般 49 -- 2,lisi,打游戏:非常喜欢-篮球:不喜欢
显示自带的函数的用法:
UDF(User-Defined-Function)普通函数 一进一出
继承UDF 重载evaluate方法 打成jar包(胖包)上传到服务器 将jar包添加到 hive 的 classpath hive>add jar /home/hadoop/udf.jar; 创建临时函数与开发好的java class关联 create temporary function tolowercase as 'cn.itcast.hive.UDF_Demo'; (不加temporary就是创建永久函数,需要使用drop手动删除) 在hql中使用自定义的函数tolowercase ip Select tolowercase(name),age from t_test;
UDAF(User-Defined Aggregation Function)聚合函数 多进一出
UDAF是输入多个数据行,产生一个数据行
用户自定义的UDAF必须是继承了UDAF,且内部包含多个实现了exec的静态类
UDTF(User-Defined Table-Generating Functions)表生成函数 一进多出
继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF, 实现initialize, process, close三个方法。 UDTF首先会调用initialize方法, 此方法返回UDTF的返回行的信息(返回个数,类型)。 初始化完成后,会调用process方法,真正的处理过程在process函数中, 在process中,每一次forward()调用产生一行; 如果产生多列可以将多个列的值放在一个数组中, 然后将该数组传入到forward()函数。 最后close()方法调用,对需要清理的方法进行清理 把程序打成jar包 添加jar包:add jar /run/jar/udf_test.jar; 创建临时函数: CREATE TEMPORARY FUNCTION explode_map AS 'cn.itcast.hive.udtf.ExplodeMap'; 销毁临时函数:hive> DROP TEMPORARY FUNCTION add_example; UDTF有两种使用方法, 一种直接放到select后面(不可以添加其他字段使用,不可以嵌套调用, 不可以和group by/cluster by/distribute by/sort by一起使用) 一种和lateral view一起使用
1 --select 字段1, 字段2, ... 2 --from tabelA lateral view UDTF(xxx) 视图别名(虚拟表名) as a,b,c 3 --例如 4 select name,subview.* from test_message 5 lateral view explode(location) subview as lc;
1 select col1, 2 max(case col2 when 'c' then col3 else 0 end) as c, 3 max(case col2 when 'd' then col3 else 0 end) as d, 4 max(case col2 when 'e' then col3 else 0 end) as e 5 from row2col 6 group by col1;
1 select collect_set(col3) from row2col_1; 2 --将col3的所有数据放到一个集合中(去重) 3 4 select collect_set(col3) from row2col_1 group by col1,col2; 5 --根据col1,col2进行分组,只有第一列和第二列都相同,认为是同一组 6 7 select col1,col2, collect_set(col3) from row2col_1 8 group by col1,col2; 9 --三列显示,行转列 10 11 select col1, col2, 12 concat_ws(',', collect_set(cast(col3 as string))) as col3 13 from row2col_1 14 group by col1, col2; 15 --cast(col3 as string)将第三列变成string类型 16 --因为concat_ws是对于字符串拼接
1 select col1, 'c' as col2, c as col3 from col2row 2 UNION 3 select col1, 'd' as col2, d as col3 from col2row 4 UNION 5 select col1, 'e' as col2, e as col3 from col2row 6 order by col1, col2;
1 select col1, col2, lv.col3 as col3 2 from col2row_2 3 lateral view explode(split(col3, ',')) lv as col3;
--例1 --使用 java.lang.Math 当中的 Max 求两列当中的最大值 select reflect("java.lang.Math","max",col1,col2) from test_udf; --例2 --准备数据 test_udf2.txt java.lang.Math,min,1,2 java.lang.Math,max,2,3 --执行查询 select reflect(class_name,method_name,col1,col2) from test_udf2;
- get_json_object(string json_string,string path):第一个参数填写json对象变量,第二个参数使用$表示json变量表示,每次只能返回一个数据项
1 select get_json_object(t.json,'$.id'), 2 get_json_object(t.json,'$.total_number') 3 from tmp_json_test t;
- json_tuple(string json_string,’属性1′,’属性2′)
1 select json_tuple(json,'id','ids','total_number') 2 from tmp_json_test;
1 --从http:www.congiu.net/hive-json-serde/下载jar包 2 add jar 3 /root/hivedata/json-serde-1.3.7-jar-with-dependencies.jar; 4 5 create table tmp_json_array(id string, 6 ids array<string>,total_number int) 7 row format SERDE 'org.openx.data.jsonserde.JsonSerDe' 8 stored as textfile; 9 load data local inpath '/root/hivedata/json_test.txt' 10 overwrite into table tmp_json_array;
1 select cookieid,createtime,pv, 2 sum(pv) over(partition by cookieid order by createtime) as pv1 3 from itcast_t1; 4 --pv1: 分组内从起点到当前行的 pv 累积, 5 --如,11 号的 pv1=10 号的 pv+11 号的 pv, 12 号=10 号+11 号+12 6 7 select cookieid,createtime,pv, 8 sum(pv) over(partition by cookieid) as pv3 9 from itcast_t1; 10 --pv3: 分组内(cookie1)所有的 pv 累加 11 12 select cookieid,createtime,pv, 13 sum(pv) over(partition by cookieid 14 order by createtime 15 rows between 3 preceding and 1 following) as pv5 16 from itcast_t1; 17 --pv5: 分组内当前行+往前 3 行+往后 1 行, 18 --如,14 号=11 号+12 号+13 号+14 号+15 号=5+7+3+2+4=21 19 20 select cookieid,createtime,pv, 21 sum(pv) over(partition by cookieid 22 order by createtime rows between current row and 23 unbounded following) as pv6 24 from itcast_t1; 25 --pv6: 分组内当前行+往后所有行, 26 --如,13 号=13 号+14 号+15 号+16 号=3+2+4+4=13, 27 --14 号=14 号+15 号+16 号=2+4+4=10
- ROW_NUMBER() 从 1 开始,按照顺序,生成分组内记录的序列。 1 2 3 4
- RANK() 生成数据项在分组中的排名,排名相等会在名次中留下空位 。1 2 2 4
- DENSE_RANK()生成数据项在分组中的排名,排名相等在名次中不会留下空位。1 2 2 3
1 SELECT 2 cookieid, 3 createtime, 4 pv, 5 RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn1, 6 DENSE_RANK() OVER(PARTITION BY cookieid ORDER BY pv desc) AS rn2, 7 ROW_NUMBER() OVER(PARTITION BY cookieid ORDER BY pv DESC) AS rn3 8 FROM itcast_t2 WHERE cookieid = 'cookie1';
1 SELECT * FROM 2 (SELECT 3 cookieid, 4 createtime, 5 pv, 6 NTILE(2) OVER(PARTITION BY cookieid ORDER BY createtime) AS rn1, 7 NTILE(3) OVER(PARTITION BY cookieid ORDER BY createtime) AS rn2, 8 NTILE(4) OVER(ORDER BY createtime) AS rn3 9 FROM itcast_t2 ORDER BY cookieid,createtime) temp 10 WHERE cookieid = 'cookie2' AND rn2 = 2;
Lag(col, n)往前n行
Lead(col, n)往后n行
- 行式存储
- 列式存储
- indexData:某些列的索引数据。一个轻量级的 index,默认是每隔 1W 行做一个索引。这里做的索引只是记录某行的各字段在 Row Data 中的 offset
- rowData :真正的数据存储。,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储。
- StripFooter:存放各个stripe 的元数据信息。每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column的数据类型信息等;每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读PostScript,从里面解析到 File Footer 长度,再读 FileFooter,从里面解析到各个Stripe 信息,再读各个 Stripe,即从后往前读。
TEXTFILE,行式存储,但使用这种方式,hive 不会对数据进行切分,从而无法对数据进行并行操作
- indexData:某些列的索引数据。一个轻量级的 index,默认是每隔 1W 行做一个索引。这里做的索引只是记录某行的各字段在 Row Data 中的 offset
- rowData :真正的数据存储。,先取部分行,然后对这些行按列进行存储。对每个列进行了编码,分成多个 Stream 来存储。
- StripFooter:存放各个stripe 的元数据信息。每个文件有一个 File Footer,这里面存的是每个 Stripe 的行数,每个 Column的数据类型信息等;每个文件的尾部是一个 PostScript,这里面记录了整个文件的压缩类型以及 FileFooter 的长度信息等。在读取文件时,会 seek 到文件尾部读PostScript,从里面解析到 File Footer 长度,再读 FileFooter,从里面解析到各个Stripe 信息,再读各个 Stripe,即从后往前读。
PARQUET,列式存储,是面向分析型业务的列式存储格式。Parquet 文件是以二进制方式存储的,所以是不可以直接读取的,文件中包括该文件的数据和元数据,因此 Parquet 格式文件是自解析的。 通常情况下,在存储Parquet数据的时候会按照Block大小设置行组的大小,由于一般情况下每一个 Mapper 任务处理数据的最小单位是一个 Block,这样可以把每一个行组由一个 Mapper 任务处理,增大任务执行并行度。