在线教育 (实时实现)
第1章 项目需求及架构
1.1 项目需求分析
一、数据采集平台搭建
二、Kafka中间件准备
三、下游Spark Streaming对接Kafka接收数据,实现vip个数统计、页面之间的跳转率、做题正确率与掌握度、播放时长统计及历史区间统计的实时计算功能。
1.2 项目框架
1.2.1 技术选型
一、数据存储:Kafka、MySQL
二、数据处理:Spark
三、其他组件:Zookeeper
1.2.2 流程设计
第2章 需求
2.1环境准备
在本机三台虚拟机上分别搭建好zookeeper和kafka,创建所需topic(注:CDH6.3.2中kafka的版本为2.2.1)
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10 --topic register_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10 --topic qz_log
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10 --topic page_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10 --topic course_learn
如果使用–bootstrap-server hadoop102:9092创建,则消费者的offset保存在kafka中,如果使用–zookeeper hadoop:2181创建,则消费者的offset保存在zk中
2.2原始数据格式及kafka对应topic
2.2.1实时统计注册人数 – register.log
kafka对应topic:register_topic
数据格式:
列 |
字段 |
字段说明 |
1 |
用户id |
|
2 |
平台id |
1:PC 2:APP 3:Other |
3 |
创建时间 |
|
数据示例:
# 数据使用/t作为分隔符
7188 2 2019-07-16 16:01:55
7189 1 2019-07-16 16:01:55
7190 1 2019-07-16 16:01:55
7191 1 2019-07-16 16:01:55
7192 1 2019-07-16 16:01:55
7193 3 2019-07-16 16:01:55
7194 1 2019-07-16 16:01:55
7195 3 2019-07-16 16:01:55
2.2.2做题正确率数与知识点掌握度数据格式 – qz.log
kafka对应topic:qz_log
数据格式:
列 |
字段 |
字段说明 |
1 |
用户id |
|
2 |
课程id |
|
3 |
知识点id |
|
4 |
题目id |
|
5 |
是否正确 |
0错误 1正确 |
6 |
创建时间 |
|
数据示例:
# 数据使用/t作为分隔符
1006 504 8 7 0 2019-07-12 11:17:45
1007 505 16 9 1 2019-07-12 11:17:45
1002 505 29 3 0 2019-07-12 11:17:45
1006 504 10 5 0 2019-07-12 11:17:45
1001 502 28 8 0 2019-07-12 11:17:45
1006 505 27 0 1 2019-07-12 11:17:45
1004 503 25 3 0 2019-07-12 11:17:45
1007 504 12 1 0 2019-07-12 11:17:45
1006 501 7 6 0 2019-07-12 11:17:45
2.2.3商品页面到订单页,订单页到支付页数据格式 – page.log
kafka对应topic:page_topic
数据格式:
序号 |
字段 |
字段说明 |
1 |
app_id |
平台id 1:PC 2:APP 3:Other |
2 |
device_id |
平台id |
3 |
distinct_id |
唯一标识 |
4 |
ip |
用户ip地址 |
5 |
last_event_name |
上一事件名称 |
6 |
last_page_id |
上一页面id |
7 |
next_event_name |
下一事件名称 |
8 |
next_page_id |
下一页面id |
9 |
page_id |
当前页面id 1:商品课程页 2:订单页面 3:支付页面 |
10 |
server_time |
服务器时间 |
11 |
uid |
用户id |
数据示例:
# 数据为json格式
{"app_id":"2","device_id":"100","distinct_id":"23a6d4a7-6903-46a4-bce2-a8317693da45","event_name":"-","ip":"123.235.113.225","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"0"}
# json格式化之后 { "app_id": "2", "device_id": "100", "distinct_id": "23a6d4a7-6903-46a4-bce2-a8317693da45", "event_name": "-", "ip": "123.235.113.225", "last_event_name": "-", "last_page_id": "0", "next_event_name": "-", "next_page_id": "2", "page_id": "1", "server_time": "-", "uid": "0" }
2.2.4实时统计学员播放视频各时长 – course_learn.log
Kafka对应topic:course_learn
数据格式:
序号 |
字段 |
字段说明 |
1 |
biz |
唯一标识 |
2 |
chapterid |
章节id |
3 |
cwareid |
课件id |
4 |
edutypeid |
辅导id |
5 |
pe |
视频播放结束区间 |
6 |
ps |
视频播放开始区间 |
7 |
sourceType |
播放平台 |
8 |
speed |
播放倍速 |
9 |
subjectid |
主题id |
10 |
te |
视频播放结束时间(时间戳) |
11 |
ts |
视频播放开始时间(时间戳) |
12 |
uid |
用户id |
13 |
videoid |
视频id |
数据示例:
# 数据为json格式
{"biz":"34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9","chapterid":"2","cwareid":"2","edutypeid":"1","pe":"56","ps":"24","sourceType":"PC","speed":"2","subjectid":"1","te":"1563352144131","ts":"1563352128131","uid":"219","videoid":"6"}
# json格式化之后 { "biz": "34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9", "chapterid": "2", "cwareid": "2", "edutypeid": "1", "pe": "56", "ps": "24", "sourceType": "PC", "speed": "2", "subjectid": "1", "te": "1563352144131", "ts": "1563352128131", "uid": "219", "videoid": "6" }
2.3模拟数据采集
将准备好的log文件使用kafka生产者代码发送信息到对应的topic中(log文件均在资料包的.\2.资料\01日志数据\04_实时-kafka主题数据中)
数据说明 |
日志文件 |
Kafka topic |
代码文件 |
注册日志数据 |
register.log |
register_topic |
|
做题数据 |
qz.log |
qz_log |
|
商品页面数据 |
page.log |
page_topic |
|
视频播放时长数据 |
course_learn.log |
course_learn |
注:如果windows下没有安装hadoop环境,先在windows中配置环境变量。(代码运行时候会寻找环境变量中的HADOOP_HOME,然后找%HADOOP_HOME%/bin/winutils.exe,所以我们不需要下载全部的代码,只需要把bin包配置好,能让系统找到%HADOOP_HOME%/bin/winutils.exe即可)
该文件为hadoop-3.0.0的bin目录压缩包
2.4 ip解析工具测试
ip解析本地库文件:
测试ip解析工具代码:
import org.lionsoul.ip2region.{DbConfig, DbSearcher} object IpTest { def main(args: Array[String]): Unit = { val ipSearch = new DbSearcher(new DbConfig(), this.getClass.getResource("/ip2region.db").getPath) val region = ipSearch.binarySearch("182.250.250.42").getRegion println(region) val city = region.split("\\|")(2) println(city) } }
测试结果:
2.5实时统计注册人员信息
2.5.1 MySQL建表语句
CREATE TABLE `offset_manager` ( `groupid` varchar(50) DEFAULT NULL, `topic` varchar(50) DEFAULT NULL, `partition` int(11) DEFAULT NULL, `untiloffset` mediumtext, UNIQUE KEY `offset_unique` (`groupid`,`topic`,`partition`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.5.2表结构说明
表名: |
offset_manager |
|
主键: |
无 |
|
列 |
字段名 |
字段说明 |
1 |
groupid |
Kafka consumer的groupid |
2 |
topic |
Kafka consumer的topic |
3 |
partition |
Kafka consumer的partition |
4 |
untiloffset |
最新的消费offset(由上面的GTP进行定位) |
2.5.3业务流程说明
用户使用网站或APP进行注册,后台实时收集数据传输到Kafka,Spark Streaming进行对接统计,实时统计注册人数。
2.5.4需求说明
需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey。
需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据(提示:reduceByKeyAndWindow算子)
需求3:观察对接数据,尝试进行调优。
2.6实时统计学员做题正确率与知识点掌握度
2.6.1 MySQL建表语句
CREATE TABLE `qz_point_detail` ( `userid` int(11) DEFAULT NULL, `courseid` int(11) DEFAULT NULL, `pointid` int(11) DEFAULT NULL, `qz_sum` int(11) DEFAULT NULL, `qz_count` int(11) DEFAULT NULL, `qz_istrue` int(11) DEFAULT NULL, `correct_rate` double(4,2) DEFAULT NULL, `mastery_rate` double(4,2) DEFAULT NULL, `createtime` datetime DEFAULT NULL, `updatetime` datetime DEFAULT NULL, UNIQUE KEY `qz_point_detail_unique` (`userid`,`courseid`,`pointid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE `qz_point_history` ( `userid` int(11) DEFAULT NULL, `courseid` int(11) DEFAULT NULL, `pointid` int(11) DEFAULT NULL, `questionids` text, `createtime` datetime DEFAULT NULL, `updatetime` datetime DEFAULT NULL, UNIQUE KEY `qz_point_set_unique` (`userid`,`courseid`,`pointid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
2.6.2表结构说明
表名: |
qz_point_detail |
|
主键: |
`qz_point_detail_unique` (`userid`,`courseid`,`pointid`) |
|
列 |
字段名 |
字段说明 |
1 |
userid |
用户id |
2 |
courseid |
课程id |
3 |
pointid |
知识点id |
4 |
qz_sum |
做题总数(与历史进行累加) |
5 |
qz_count |
当前批次做题个数(去重) |
6 |
qz_istrue |
做题正确题目总数(与历史进行累加) |
7 |
correct_rate |
正确率 题目正确率=qz_istrue/qz_count |
8 |
mastery_rate |
知识点掌握程度=(题目正确率*题目完成度) 题目完成度=当前知识点去重完成题目数/当前知识点题目总数10 |
9 |
createtime |
创建时间 |
10 |
updatetime |
更新时间 |
表名: |
qz_point_history |
|
主键: |
`qz_point_set_unique` (`userid`,`courseid`,`pointid`) |
|
列 |
字段名 |
字段说明 |
1 |
userid |
用户id |
2 |
courseid |
课程id |
3 |
pointid |
知识点id |
4 |
questionids |
题目id(使用”,”作为拼接) |
5 |
createtime |
创建时间 |
6 |
updatetime |
更新时间 |
2.6.3业务流程说明
用户在网站或APP进行做题,做完题点击交卷按钮,程序将做题记录提交,传输到Kafka中,下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度的计算,将正确率和掌握度存入MySQL中,用户点击交卷后刷新页面能立马(思考:这个更新的速度取决于什么?)看到自己做题的详情。
2.6.4需求说明
需求1:要求Spark Streaming保证数据不丢失,每秒100条处理速度,需要手动维护偏移量
需求2:同一个用户做在同一门课程同一知识点下做题需要去重,需要根据历史数据进行去重并且记录去重后的做题id与个数
需求3:计算知识点正确率(正确率计算公式:做题正确总个数/做题总数)保留两位小数
需求4:计算知识点掌握度,(知识点掌握度=去重后的做题个数/当前知识点总题数(已知10题)*当前知识点的正确率)
2.7实时统计商品页到订单页,订单页到支付页转换率
2.7.1 MySQL建表语句
CREATE TABLE `page_jump_rate` ( `id` INT(11) NOT NULL AUTO_INCREMENT, `last_page_id` INT(11) DEFAULT NULL, `page_id` INT(11) DEFAULT NULL, `next_page_id` INT(11) DEFAULT NULL, `num` BIGINT(20) DEFAULT NULL, `jump_rate` VARCHAR(10) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `page_jum_rate_unique` (`page_id`) ) ENGINE=INNODB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8; CREATE TABLE `tmp_city_num_detail` ( `id` INT(11) NOT NULL AUTO_INCREMENT, `province` VARCHAR(10) DEFAULT NULL, `num` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `tmp_cityp_num_index_province` (`province`) ) ENGINE=INNODB AUTO_INCREMENT=4191 DEFAULT CHARSET=utf8; CREATE TABLE `top_city_num` ( `id` INT(11) NOT NULL AUTO_INCREMENT, `province` VARCHAR(10) DEFAULT NULL, `num` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=INNODB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;
2.7.2表结构说明
表名: |
page_jump_rate |
|
主键: |
`id` |
|
唯一键: |
`page_jum_rate_unique` (`page_id`) |
|
列 |
字段名 |
字段说明 |
1 |
id |
用户id |
2 |
last_page_id |
上一页面id 1:商品课程页 2:订单页面 3:支付页面 |
3 |
page_id |
当前页面id 1:商品课程页 2:订单页面 3:支付页面 |
4 |
next_page_id |
下一页面id 1:商品课程页 2:订单页面 3:支付页面 |
5 |
num |
|
6 |
jump_rate |
页面跳转率 |
表名: |
tmp_city_num_detail |
|
主键: |
`id` |
|
唯一键: |
`tmp_cityp_num_index_province` (`province`) |
|
列 |
字段名 |
字段说明 |
1 |
id |
自增id |
2 |
province |
省份 |
3 |
num |
各省数据统计结果 |
表名: |
top_city_num |
|
主键: |
`id` |
|
唯一键: |
`page_jum_rate_unique` (`page_id`) |
|
列 |
字段名 |
字段说明 |
1 |
id |
自增id |
2 |
province |
省份 |
3 |
num |
各省数据统计结果(只取前3) |
2.7.3业务流程说明
用户浏览课程首页点击下订单,跳转到订单页面,再点击支付跳转到支付页面进行支付,收集各页面跳转json数据,解析json数据计算各页面点击数和转换率,计算top3点击量按地区排名(ip字段,需要根据历史数据累计)
2.7.4需求说明
需求1:计算首页(商品详情页)总浏览数、订单页总浏览数、支付页面总浏览数
需求2:计算商品课程页面到订单页的跳转转换率、订单页面到支付页面的跳转转换率
需求3:根据ip得出相应省份,展示出top3省份的点击数,需要根据历史数据累加
注:此处默认首页为商品页,如果当前页为商品页则无需计算转化率,记为100%
为了简化需求,该页面跳转逻辑默认为1号页面跳转至2号页面,2号页面才能跳转3号页面。3号不能跳转回2号和1号。即页面是按序号顺序前进。
2.8实时统计学员播放视频各时长
2.8.1 MySQL建表语句
CREATE TABLE `video_learn_detail` ( `userid` INT(11) NOT NULL DEFAULT \'0\', `cwareid` INT(11) NOT NULL DEFAULT \'0\', `videoid` INT(11) NOT NULL DEFAULT \'0\', `totaltime` BIGINT(20) DEFAULT NULL, `effecttime` BIGINT(20) DEFAULT NULL, `completetime` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`userid`,`cwareid`,`videoid`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; CREATE TABLE `chapter_learn_detail` ( `chapterid` INT(11) NOT NULL DEFAULT \'0\', `totaltime` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`chapterid`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; CREATE TABLE `cwareid_learn_detail` ( `cwareid` INT(11) NOT NULL DEFAULT \'0\', `totaltime` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`cwareid`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; CREATE TABLE `edutype_learn_detail` ( `edutypeid` INT(11) NOT NULL DEFAULT \'0\', `totaltime` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`edutypeid`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; CREATE TABLE `sourcetype_learn_detail` ( `sourcetype` VARCHAR(10) NOT NULL DEFAULT \'\', `totaltime` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`sourcetype_learn`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; CREATE TABLE `subject_learn_detail` ( `subjectid` INT(11) NOT NULL DEFAULT \'0\', `totaltime` BIGINT(20) DEFAULT NULL, PRIMARY KEY (`subjectid`) ) ENGINE=INNODB DEFAULT CHARSET=utf8; CREATE TABLE `video_interval` ( `userid` INT(11) NOT NULL DEFAULT \'0\', `cwareid` INT(11) NOT NULL DEFAULT \'0\', `videoid` INT(11) NOT NULL DEFAULT \'0\', `play_interval` TEXT,