1项目需求架构

1.1 项目需求分析

  一、数据采集平台搭建

  二、Kafka中间件准备

  三、下游Spark Streaming对接Kafka接收数据,实现vip个数统计、页面之间的跳转率、做题正确率与掌握度、播放时长统计及历史区间统计的实时计算功能。

1.2 项目框架

1.2.1 技术选型

  一、数据存储:KafkaMySQL

  二、数据处理:Spark

  三、其他组件:Zookeeper

1.2.2 流程设计

2章 需求

2.1环境准备

  在本机三台虚拟机上分别搭建好zookeeperkafka,创建所需topic(注:CDH6.3.2kafka的版本为2.2.1

kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic register_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic qz_log
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic page_topic
kafka-topics --bootstrap-server hadoop102:9092 --create --replication-factor 2 --partitions 10  --topic course_learn

  如果使用–bootstrap-server hadoop102:9092创建,则消费者的offset保存在kafka中,如果使用–zookeeper hadoop:2181创建,则消费者的offset保存在zk中

2.2原始数据格式及kafka对应topic

2.2.1实时统计注册人数 – register.log

  kafka对应topicregister_topic

  数据格式:

字段

字段说明

1

用户id

 

2

平台id

1:PC

2:APP

3:Other

3

创建时间

 

  数据示例:

# 数据使用/t作为分隔符
7188    2    2019-07-16 16:01:55
7189    1    2019-07-16 16:01:55
7190    1    2019-07-16 16:01:55
7191    1    2019-07-16 16:01:55
7192    1    2019-07-16 16:01:55
7193    3    2019-07-16 16:01:55
7194    1    2019-07-16 16:01:55
7195    3    2019-07-16 16:01:55

2.2.2做题正确率数与知识点掌握度数据格式 – qz.log

  kafka对应topicqz_log

  数据格式:

字段

字段说明

1

用户id

 

2

课程id

 

3

知识点id

 

4

题目id

 

5

是否正确

0错误

1正确

6

创建时间

 

  数据示例:

# 数据使用/t作为分隔符
1006    504    8    7    0    2019-07-12 11:17:45
1007    505    16    9    1    2019-07-12 11:17:45
1002    505    29    3    0    2019-07-12 11:17:45
1006    504    10    5    0    2019-07-12 11:17:45
1001    502    28    8    0    2019-07-12 11:17:45
1006    505    27    0    1    2019-07-12 11:17:45
1004    503    25    3    0    2019-07-12 11:17:45
1007    504    12    1    0    2019-07-12 11:17:45
1006    501    7    6    0    2019-07-12 11:17:45

2.2.3商品页面到订单页,订单页到支付页数据格式 page.log

  kafka对应topicpage_topic

  数据格式:

序号

字段

字段说明

1

app_id

平台id

1:PC

2:APP

3:Other

2

device_id

平台id

3

distinct_id

唯一标识

4

ip

用户ip地址

5

last_event_name

上一事件名称

6

last_page_id

上一页面id

7

next_event_name

下一事件名称

8

next_page_id

下一页面id

9

page_id

当前页面id

1:商品课程页

2:订单页面

3:支付页面

10

server_time

服务器时间

11

uid

用户id

  数据示例:

# 数据为json格式
{"app_id":"2","device_id":"100","distinct_id":"23a6d4a7-6903-46a4-bce2-a8317693da45","event_name":"-","ip":"123.235.113.225","last_event_name":"-","last_page_id":"0","next_event_name":"-","next_page_id":"2","page_id":"1","server_time":"-","uid":"0"}
# json格式化之后
{
  "app_id": "2",
  "device_id": "100",
  "distinct_id": "23a6d4a7-6903-46a4-bce2-a8317693da45",
  "event_name": "-",
  "ip": "123.235.113.225",
  "last_event_name": "-",
  "last_page_id": "0",
  "next_event_name": "-",
  "next_page_id": "2",
  "page_id": "1",
  "server_time": "-",
  "uid": "0"
}

2.2.4实时统计学员播放视频各时长 – course_learn.log

  Kafka对应topiccourse_learn

  数据格式:

序号

字段

字段说明

1

biz

唯一标识

2

chapterid

章节id

3

cwareid

课件id

4

edutypeid

辅导id

5

pe

视频播放结束区间

6

ps

视频播放开始区间

7

sourceType

播放平台

8

speed

播放倍速

9

subjectid

主题id

10

te

视频播放结束时间(时间戳)

11

ts

视频播放开始时间(时间戳)

12

uid

用户id

13

videoid

视频id

  数据示例:

# 数据为json格式
{"biz":"34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9","chapterid":"2","cwareid":"2","edutypeid":"1","pe":"56","ps":"24","sourceType":"PC","speed":"2","subjectid":"1","te":"1563352144131","ts":"1563352128131","uid":"219","videoid":"6"}
# json格式化之后
{
  "biz": "34e4b8fe-476c-4c7e-8e2c-274ec00bb5c9",
  "chapterid": "2",
  "cwareid": "2",
  "edutypeid": "1",
  "pe": "56",
  "ps": "24",
  "sourceType": "PC",
  "speed": "2",
  "subjectid": "1",
  "te": "1563352144131",
  "ts": "1563352128131",
  "uid": "219",
  "videoid": "6"
}

2.3模拟数据采集

  将准备好的log文件使用kafka生产者代码发送信息到对应的topiclog文件均在资料包的.\2.资料\01日志数据\04_实时-kafka主题数据中)

数据说明

日志文件

Kafka topic

代码文件

注册日志数据

register.log

register_topic

 

做题数据

qz.log

qz_log

 

商品页面数据

page.log

page_topic

 

视频播放时长数据

course_learn.log

course_learn

 

  注:如果windows下没有安装hadoop环境,先windows配置环境变量。(代码运行时候会寻找环境变量中的HADOOP_HOME,然后找%HADOOP_HOME%/bin/winutils.exe,所以我们不需要下载全部的代码,只需要把bin包配置好,能让系统找到%HADOOP_HOME%/bin/winutils.exe即可)

  该文件为hadoop-3.0.0bin目录压缩包

2.4 ip解析工具测试

  ip解析本地库文件

  测试ip解析工具代码:

import org.lionsoul.ip2region.{DbConfig, DbSearcher}

object IpTest {
  def main(args: Array[String]): Unit = {
    val ipSearch = new DbSearcher(new DbConfig(), this.getClass.getResource("/ip2region.db").getPath)
    val region = ipSearch.binarySearch("182.250.250.42").getRegion
    println(region)
    val city = region.split("\\|")(2)
    println(city)
  }
}

  测试结果:

2.5实时统计注册人员信息

2.5.1 MySQL建表语句

CREATE TABLE `offset_manager` (
  `groupid` varchar(50) DEFAULT NULL,
  `topic` varchar(50) DEFAULT NULL,
  `partition` int(11) DEFAULT NULL,
  `untiloffset` mediumtext,
  UNIQUE KEY `offset_unique` (`groupid`,`topic`,`partition`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.5.2表结构说明

表名:

offset_manager

主键:

字段名

字段说明

1

groupid

Kafka consumergroupid

2

topic

Kafka consumertopic

3

partition

Kafka consumerpartition

4

untiloffset

最新的消费offset(由上面的GTP进行定位)

2.5.3业务流程说明

  用户使用网站或APP进行注册,后台实时收集数据传输KafkaSpark Streaming进行对接统计,实时统计注册人数。

2.5.4需求说明

  需求1:实时统计注册人数,批次为3秒一批,使用updateStateBykey算子计算历史数据和当前批次的数据总数,仅此需求使用updateStateBykey,后续需求不使用updateStateBykey

  需求2:每6秒统统计一次1分钟内的注册数据,不需要历史数据(提示:reduceByKeyAndWindow算子)

  需求3:观察对接数据,尝试进行调优。

2.6实时统计学员做题正确率与知识点掌握度

2.6.1 MySQL建表语句

CREATE TABLE `qz_point_detail` (
  `userid` int(11) DEFAULT NULL,
  `courseid` int(11) DEFAULT NULL,
  `pointid` int(11) DEFAULT NULL,
  `qz_sum` int(11) DEFAULT NULL,
  `qz_count` int(11) DEFAULT NULL,
  `qz_istrue` int(11) DEFAULT NULL,
  `correct_rate` double(4,2) DEFAULT NULL,
  `mastery_rate` double(4,2) DEFAULT NULL,
  `createtime` datetime DEFAULT NULL,
  `updatetime` datetime DEFAULT NULL,
  UNIQUE KEY `qz_point_detail_unique` (`userid`,`courseid`,`pointid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `qz_point_history` (
  `userid` int(11) DEFAULT NULL,
  `courseid` int(11) DEFAULT NULL,
  `pointid` int(11) DEFAULT NULL,
  `questionids` text,
  `createtime` datetime DEFAULT NULL,
  `updatetime` datetime DEFAULT NULL,
  UNIQUE KEY `qz_point_set_unique` (`userid`,`courseid`,`pointid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

2.6.2表结构说明

表名:

qz_point_detail

主键:

`qz_point_detail_unique` (`userid`,`courseid`,`pointid`)

字段名

字段说明

1

userid

用户id

2

courseid

课程id

3

pointid

知识点id

4

qz_sum

做题总数(与历史进行累加)

5

qz_count

当前批次做题个数(去重)

6

qz_istrue

做题正确题目总数(与历史进行累加)

7

correct_rate

正确率

题目正确率=qz_istrue/qz_count

8

mastery_rate

知识点掌握程度=(题目正确率*题目完成度)

题目完成度=当前知识点去重完成题目数/当前知识点题目总数10

9

createtime

创建时间

10

updatetime

更新时间

表名:

qz_point_history

主键:

`qz_point_set_unique` (`userid`,`courseid`,`pointid`)

字段名

字段说明

1

userid

用户id

2

courseid

课程id

3

pointid

知识点id

4

questionids

题目id(使用,”作为拼接)

5

createtime

创建时间

6

updatetime

更新时间

2.6.3业务流程说明

  用户在网站或APP进行做题,做完题点击交卷按钮,程序将做题记录提交,传输到Kafka中,下游Spark Streaming对接kafka实现实时计算做题正确率和掌握度的计算,将正确率和掌握度存入MySQL中,用户点击交卷后刷新页面能立马(思考:这个更新的速度取决于什么?)看到自己做题的详情。

2.6.4需求说明

  需求1:要求Spark Streaming保证数据不丢失,每秒100条处理速度,需要手动维护偏移量

  需求2:同一个用户做在同一门课程同一知识点下做题需要去重,需要根据历史数据进行去重并且记录去重后的做题id与个数

  需求3:计算知识点正确率正确率计算公式:做题正确总个数/做题总数)保留两位小数

  需求4:计算知识点掌握度,(知识点掌握度=去重后的做题个数/当前知识点总题数(已知10题)*当前知识点的正确率

2.7实时统计商品页到订单页,订单页到支付页转换率

2.7.1 MySQL建表语句

CREATE TABLE `page_jump_rate` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `last_page_id` INT(11) DEFAULT NULL,
  `page_id` INT(11) DEFAULT NULL,
  `next_page_id` INT(11) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  `jump_rate` VARCHAR(10) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `page_jum_rate_unique` (`page_id`)
) ENGINE=INNODB AUTO_INCREMENT=394 DEFAULT CHARSET=utf8;

CREATE TABLE `tmp_city_num_detail` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `province` VARCHAR(10) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `tmp_cityp_num_index_province` (`province`)
) ENGINE=INNODB AUTO_INCREMENT=4191 DEFAULT CHARSET=utf8;

CREATE TABLE `top_city_num` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `province` VARCHAR(10) DEFAULT NULL,
  `num` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8;

2.7.2表结构说明

表名:

page_jump_rate

主键:

`id`

唯一键:

`page_jum_rate_unique` (`page_id`)

字段名

字段说明

1

id

用户id

2

last_page_id

上一页面id 1:商品课程页 2:订单页面 3:支付页面

3

page_id

当前页面id 1:商品课程页 2:订单页面 3:支付页面

4

next_page_id

下一页面id 1:商品课程页 2:订单页面 3:支付页面

5

num

 

6

jump_rate

页面跳转率

表名:

tmp_city_num_detail

主键:

`id`

唯一键:

`tmp_cityp_num_index_province` (`province`)

字段名

字段说明

1

id

自增id

2

province

省份

3

num

各省数据统计结果

表名:

top_city_num

主键:

`id`

唯一键:

`page_jum_rate_unique` (`page_id`)

字段名

字段说明

1

id

自增id

2

province

省份

3

num

各省数据统计结果(只取前3

2.7.3业务流程说明

  用户浏览课程首页点击下订单,跳转到订单页面,再点击支付跳转到支付页面进行支付,收集各页面跳转json数据,解析json数据计算各页面点击数和转换率,计算top3点击量按地区排名(ip字段,需要根据历史数据累计)

2.7.4需求说明

  需求1:计算首页(商品详情页)总浏览数、订单页总浏览数、支付页面总浏览数

  需求2:计算商品课程页面到订单页的跳转转换率、订单页面到支付页面的跳转转换率

  需求3:根据ip得出相应省份,展示出top3省份的点击数,需要根据历史数据累加

  注:此处默认首页为商品页,如果当前页为商品页则无需计算转化率,记为100%

  为了简化需求,该页面跳转逻辑默认为1号页面跳转至2号页面,2号页面才能跳转3号页面。3号不能跳转回2号和1号。即页面是按序号顺序前进。

2.8实时统计学员播放视频各时长

2.8.1 MySQL建表语句

CREATE TABLE `video_learn_detail` (
  `userid` INT(11) NOT NULL DEFAULT \'0\',
  `cwareid` INT(11) NOT NULL DEFAULT \'0\',
  `videoid` INT(11) NOT NULL DEFAULT \'0\',
  `totaltime` BIGINT(20) DEFAULT NULL,
  `effecttime` BIGINT(20) DEFAULT NULL,
  `completetime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`userid`,`cwareid`,`videoid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `chapter_learn_detail` (
  `chapterid` INT(11) NOT NULL DEFAULT \'0\',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`chapterid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `cwareid_learn_detail` (
  `cwareid` INT(11) NOT NULL DEFAULT \'0\',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`cwareid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;


CREATE TABLE `edutype_learn_detail` (
  `edutypeid` INT(11) NOT NULL DEFAULT \'0\',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`edutypeid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;


CREATE TABLE `sourcetype_learn_detail` (
  `sourcetype` VARCHAR(10) NOT NULL DEFAULT \'\',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`sourcetype_learn`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `subject_learn_detail` (
  `subjectid` INT(11) NOT NULL DEFAULT \'0\',
  `totaltime` BIGINT(20) DEFAULT NULL,
  PRIMARY KEY (`subjectid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8;

CREATE TABLE `video_interval` (
  `userid` INT(11) NOT NULL DEFAULT \'0\',
  `cwareid` INT(11) NOT NULL DEFAULT \'0\',
  `videoid` INT(11) NOT NULL DEFAULT \'0\',
  `play_interval` TEXT,
  
版权声明:本文为LzMingYueShanPao原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/LzMingYueShanPao/p/15165916.html