数据生产

  1 import java.io.*;
  2 import java.text.DecimalFormat;
  3 import java.text.ParseException;
  4 import java.text.SimpleDateFormat;
  5 import java.util.*;
  6 
  7 public class ProductLog {
  8     // 存放生产的电话号码
  9     private List<String> phoneList = new ArrayList<String>();
 10     private Map<String, String> phoneNameMap = new HashMap<>();
 11     String startTime = "2020-01-01";
 12     String endTime = "2020-12-31";
 13 
 14     public void initPhone() {
 15         //20个随机电话
 16         phoneList.add("17078388295");
 17         phoneList.add("13980337439");
 18         phoneList.add("14575535933");
 19         phoneList.add("19902496992");
 20         phoneList.add("18549641558");
 21         phoneList.add("17005930322");
 22         phoneList.add("18468618874");
 23         phoneList.add("18576581848");
 24         phoneList.add("15978226424");
 25         phoneList.add("15542823911");
 26         phoneList.add("17526304161");
 27         phoneList.add("15422018558");
 28         phoneList.add("17269452013");
 29         phoneList.add("17764278604");
 30         phoneList.add("15711910344");
 31         phoneList.add("15714728273");
 32         phoneList.add("16061028454");
 33         phoneList.add("16264433631");
 34         phoneList.add("17601615878");
 35         phoneList.add("15897468949");
 36 
 37         //随机电话对应的姓名
 38         phoneNameMap.put("17078388295", "李雁");
 39         phoneNameMap.put("13980337439", "卫艺");
 40         phoneNameMap.put("14575535933", "仰莉");
 41         phoneNameMap.put("19902496992", "陶欣悦");
 42         phoneNameMap.put("18549641558", "施梅梅");
 43         phoneNameMap.put("17005930322", "金虹霖");
 44         phoneNameMap.put("18468618874", "魏明艳");
 45         phoneNameMap.put("18576581848", "华贞");
 46         phoneNameMap.put("15978226424", "华啟倩");
 47         phoneNameMap.put("15542823911", "仲采绿");
 48         phoneNameMap.put("17526304161", "卫丹");
 49         phoneNameMap.put("15422018558", "戚丽红");
 50         phoneNameMap.put("17269452013", "何翠柔");
 51         phoneNameMap.put("17764278604", "钱溶艳");
 52         phoneNameMap.put("15711910344", "钱琳");
 53         phoneNameMap.put("15714728273", "缪静欣");
 54         phoneNameMap.put("16061028454", "焦秋菊");
 55         phoneNameMap.put("16264433631", "吕访琴");
 56         phoneNameMap.put("17601615878", "沈丹");
 57         phoneNameMap.put("15897468949", "褚美丽");
 58     }
 59 
 60     // 生产数据
 61     // caller,callee,buildTime,duration
 62     // 主叫,被叫,通话建立时间,通话持续时间
 63     public String product() {
 64         String caller;
 65         String callee;
 66 
 67         // 生成主叫的随机索引
 68         int callerIndex = (int) (Math.random() * phoneList.size());
 69         // 通过随机索引获得主叫电话号码
 70         caller = phoneList.get(callerIndex);
 71 
 72         while (true) {
 73             int calleeIndex = (int) (Math.random() * phoneList.size());
 74             callee = phoneList.get(calleeIndex);
 75             // 去重判断
 76             if (!caller.equals(callee)) break;
 77         }
 78 
 79         // 随机产生通话建立时间
 80         String buildTime = randomBuildTime(startTime,endTime);
 81 
 82         // 随机产生通话持续时间
 83         DecimalFormat df = new DecimalFormat("0000");
 84         String duration = df.format((int) (30 * 60 * Math.random()));
 85 
 86         StringBuilder sb = new StringBuilder();
 87         sb.append(caller + ",").append(callee + ",").append(buildTime + ",").append(duration);
 88 
 89         return sb.toString();
 90     }
 91 
 92     // 随机生成时间
 93     private String randomBuildTime(String startTime, String endTime) {
 94         try {
 95             SimpleDateFormat sdf1 = new SimpleDateFormat("yyyy-MM-dd");
 96             Date startDate = sdf1.parse(startTime);
 97             Date endDate = sdf1.parse(endTime);
 98 
 99             // 生成时间字符串
100             if(endDate.getTime() <= startDate.getTime()){return null;}
101 
102             // (结束 - 起始) * 随机[0,1) + 起始
103             long randomTS = startDate.getTime() + (long)((endDate.getTime() - startDate.getTime())*Math.random());
104             Date resultDate = new Date(randomTS);
105             SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
106             String resultTimeString = sdf2.format(resultDate);
107 
108             return resultTimeString;
109         } catch (ParseException e) {
110             e.printStackTrace();
111         }
112         return null;
113     }
114 
115     public void writeLog(String filePath){
116         try {
117             OutputStreamWriter osw = new OutputStreamWriter(new FileOutputStream(filePath,true), "UTF-8");
118             while(true){
119                 try {
120                     Thread.sleep(500);
121                     String log = product();
122                     System.out.println(log);
123                     osw.write(log+"\n");
124                     osw.flush();
125                 } catch (InterruptedException e) {
126                     e.printStackTrace();
127                 } catch (IOException e) {
128                     e.printStackTrace();
129                 }
130             }
131         } catch (UnsupportedEncodingException e) {
132             e.printStackTrace();
133         } catch (FileNotFoundException e) {
134             e.printStackTrace();
135         }
136     }
137 
138     public static void main(String[] args) {
139         args = new String[]{"F:\\idea-workspace\\CT_BD\\data\\calllog.csv"};
140         ProductLog productLog = new ProductLog();
141         productLog.initPhone();
142         productLog.product();
143         productLog.writeLog(args[0]);
144     }
145 }

View Code

producer.sh

#!/bin/bash
java -cp /root/temp/CT_producer-1.0-SNAPSHOT.jar ProductLog /root/temp/calllog.csv

数据消费

  • Flume用于监控目标文件的变化,并把信息传递到Kafka

Flume配置

 1 #定义agent名, source、channel、sink的名称
 2 a1.sources = r1
 3 a1.channels = c1
 4 a1.sinks = k1
 5 
 6 #具体定义source
 7 a1.sources.r1.type = exec
 8 a1.sources.r1.command = tail -F -c +0 /root/temp/calllog.csv
 9 a1.sources.r1.shell = /bin/bash -c
10 
11 #具体定义channel
12 a1.channels.c1.type = memory
13 a1.channels.c1.capacity = 1000
14 a1.channels.c1.transactionCapacity = 100
15 
16 #具体定义sink
17 a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
18 a1.sinks.k1.brokerList = bigdata111:9092
19 a1.sinks.k1.topic = call
20 a1.sinks.k1.batchSize = 20

View Code

  • 启动kafka生产者:bin/kafka-server-start.sh config/server.properties &
  • 创建主题:bin/kafka-topics.sh –create –zookeeper bigdata111:2181 -replication-factor 1 –partitions 3 –topic calllog
  • 启动kafka消费者:bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic mydemo1 –from-beginning
  • 启动flume:bin/flume-ng agent -c conf/ -n a1 -f /root/temp/flume-kafka.conf
  • 生产数据:sh producer.sh

数据存储

  • 将产生的数据实时存储在HBase中
  • 编写调用HBaseAPI的相关方法,将从Kafka中读取出来的数据写入HBase中
  • HBase的描述器:命名空间描述器、表描述器、列族描述器
  • 协处理器:主叫插入f1后,被叫插入f2。修改程序和虚拟机中的 hbase-site.xml  

数据分析

  • 逻辑简单,代码量大
  • 按照时间范围(年月日),统计出所属时间范围内所有手机号码的通话次数总和及通话时长总和
    • 维度:某个视角,如按时间维度,统计2018年全年的通话记录,表示为2018年*月*日
    • 通过Mapper将数据按照不同维度聚合给Reducer
    • 通过Reducer拿到按各个维度聚合过来的数据,汇总输出
    • 将Reducer输出通过outputformat输出到Mysql表
  • 表结构设计
    • contacts:存放手机号,联系人姓名
    • call:存放某个时间维度下通话次数及通话时长总和
    • dimension_data:存放时间
  • 数据形式:联系人维度,时间维度
电话号码:123456 Chen
年:2020
月:12
日:31
  • HBase–>Mysql
    • Sqoop
    • 自定义输出:map,reducer,outputformat,runner
  • 下载 lombok 插件,并在maven中添加依赖

  

参考

flume

https://www.freesion.com/article/4812259552/

电信项目

https://blog.csdn.net/liu16659/article/details/81133090?

版权声明:本文为cxc1357原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/cxc1357/p/13150024.html