https://github.com/zq2599/blog_demos

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

  1. flink做流式计算时,选用kafka消息作为数据源是常用手段,因此在学习和开发flink过程中,也会将数据集文件中的记录发送到kafka,来模拟不间断数据;
  2. 整个流程如下:
    在这里插入图片描述
  3. 您可能会觉得这样做多此一举:flink直接读取CSV不就行了吗?这样做的原因如下:
  4. 首先,这是学习和开发时的做法,数据集是CSV文件,而生产环境的实时数据却是kafka数据源;
  5. 其次,Java应用中可以加入一些特殊逻辑,例如数据处理,汇总统计(用来和flink结果对比验证);
  6. 另外,如果两条记录实际的间隔时间如果是1分钟,那么Java应用在发送消息时也可以间隔一分钟再发送,这个逻辑在flink社区的demo中有具体的实现,此demo也是将数据集发送到kafka,再由flink消费kafka,地址是:https://github.com/ververica/sql-training

前面的图可以看出,读取CSV再发送消息到kafka的操作是Java应用所为,因此今天的主要工作就是开发这个Java应用,并验证;

  1. JDK:1.8.0_181
  2. 开发工具:IntelliJ IDEA 2019.2.1 (Ultimate Edition)
  3. 开发环境:Win10
  4. Zookeeper:3.4.13
  5. Kafka:2.4.0(scala:2.12)
  1. 本次实战用到的数据集是CSV文件,里面是一百零四万条淘宝用户行为数据,该数据来源是阿里云天池公开数据集,我对此数据做了少量调整;
  2. 此CSV文件可以在CSDN下载,地址:https://download.csdn.net/download/boling_cavalry/12381698
  3. 也可以在我的Github下载,地址:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/UserBehavior.7z
  4. 该CSV文件的内容,一共有六列,每列的含义如下表:
列名称 说明
用户ID 整数类型,序列化后的用户ID
商品ID 整数类型,序列化后的商品ID
商品类目ID 整数类型,序列化后的商品所属类目ID
行为类型 字符串,枚举类型,包括(‘pv’, ‘buy’, ‘cart’, ‘fav’)
时间戳 行为发生的时间戳
时间字符串 根据时间戳字段生成的时间字符串
  1. 关于该数据集的详情,请参考《准备数据集用于flink学习》

编码前,先把具体内容列出来,然后再挨个实现:

  1. 从CSV读取记录的工具类:UserBehaviorCsvFileReader
  2. 每条记录对应的Bean类:UserBehavior
  3. Java对象序列化成JSON的序列化类:JsonSerializer
  4. 向kafka发送消息的工具类:KafkaProducer
  5. 应用类,程序入口:SendMessageApplication

上述五个类即可完成Java应用的工作,接下来开始编码吧;

  1. 如果您不想写代码,您可以直接从GitHub下载这个工程的源码,地址和链接信息如下表所示:
名称 链接 备注
项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  1. 这个git项目中有多个文件夹,本章源码在flinksql这个文件夹下,如下图红框所示:
    在这里插入图片描述
  1. 创建maven工程,pom.xml如下,比较重要的jackson和javacsv的依赖:
  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  4. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  5. <modelVersion>4.0.0</modelVersion>
  6. <groupId>com.bolingcavalry</groupId>
  7. <artifactId>flinksql</artifactId>
  8. <version>1.0-SNAPSHOT</version>
  9. <properties>
  10. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  11. <flink.version>1.10.0</flink.version>
  12. <kafka.version>2.2.0</kafka.version>
  13. <java.version>1.8</java.version>
  14. <scala.binary.version>2.11</scala.binary.version>
  15. <maven.compiler.source>${java.version}</maven.compiler.source>
  16. <maven.compiler.target>${java.version}</maven.compiler.target>
  17. </properties>
  18. <dependencies>
  19. <dependency>
  20. <groupId>org.apache.kafka</groupId>
  21. <artifactId>kafka-clients</artifactId>
  22. <version>${kafka.version}</version>
  23. </dependency>
  24. <dependency>
  25. <groupId>com.fasterxml.jackson.core</groupId>
  26. <artifactId>jackson-databind</artifactId>
  27. <version>2.9.10.1</version>
  28. </dependency>
  29. <!-- Logging dependencies -->
  30. <dependency>
  31. <groupId>org.slf4j</groupId>
  32. <artifactId>slf4j-log4j12</artifactId>
  33. <version>1.7.7</version>
  34. <scope>runtime</scope>
  35. </dependency>
  36. <dependency>
  37. <groupId>log4j</groupId>
  38. <artifactId>log4j</artifactId>
  39. <version>1.2.17</version>
  40. <scope>runtime</scope>
  41. </dependency>
  42. <dependency>
  43. <groupId>net.sourceforge.javacsv</groupId>
  44. <artifactId>javacsv</artifactId>
  45. <version>2.0</version>
  46. </dependency>
  47. </dependencies>
  48. <build>
  49. <plugins>
  50. <!-- Java Compiler -->
  51. <plugin>
  52. <groupId>org.apache.maven.plugins</groupId>
  53. <artifactId>maven-compiler-plugin</artifactId>
  54. <version>3.1</version>
  55. <configuration>
  56. <source>${java.version}</source>
  57. <target>${java.version}</target>
  58. </configuration>
  59. </plugin>
  60. <!-- Shade plugin to include all dependencies -->
  61. <plugin>
  62. <groupId>org.apache.maven.plugins</groupId>
  63. <artifactId>maven-shade-plugin</artifactId>
  64. <version>3.0.0</version>
  65. <executions>
  66. <!-- Run shade goal on package phase -->
  67. <execution>
  68. <phase>package</phase>
  69. <goals>
  70. <goal>shade</goal>
  71. </goals>
  72. <configuration>
  73. <artifactSet>
  74. <excludes>
  75. </excludes>
  76. </artifactSet>
  77. <filters>
  78. <filter>
  79. <!-- Do not copy the signatures in the META-INF folder.
  80. Otherwise, this might cause SecurityExceptions when using the JAR. -->
  81. <artifact>*:*</artifact>
  82. <excludes>
  83. <exclude>META-INF/*.SF</exclude>
  84. <exclude>META-INF/*.DSA</exclude>
  85. <exclude>META-INF/*.RSA</exclude>
  86. </excludes>
  87. </filter>
  88. </filters>
  89. </configuration>
  90. </execution>
  91. </executions>
  92. </plugin>
  93. </plugins>
  94. </build>
  95. </project>
  1. 从CSV读取记录的工具类:UserBehaviorCsvFileReader,后面在主程序中会用到java8的Steam API来处理集合,所以UserBehaviorCsvFileReader实现了Supplier接口:
  1. public class UserBehaviorCsvFileReader implements Supplier<UserBehavior> {
  2. private final String filePath;
  3. private CsvReader csvReader;
  4. public UserBehaviorCsvFileReader(String filePath) throws IOException {
  5. this.filePath = filePath;
  6. try {
  7. csvReader = new CsvReader(filePath);
  8. csvReader.readHeaders();
  9. } catch (IOException e) {
  10. throw new IOException("Error reading TaxiRecords from file: " + filePath, e);
  11. }
  12. }
  13. @Override
  14. public UserBehavior get() {
  15. UserBehavior userBehavior = null;
  16. try{
  17. if(csvReader.readRecord()) {
  18. csvReader.getRawRecord();
  19. userBehavior = new UserBehavior(
  20. Long.valueOf(csvReader.get(0)),
  21. Long.valueOf(csvReader.get(1)),
  22. Long.valueOf(csvReader.get(2)),
  23. csvReader.get(3),
  24. new Date(Long.valueOf(csvReader.get(4))*1000L));
  25. }
  26. } catch (IOException e) {
  27. throw new NoSuchElementException("IOException from " + filePath);
  28. }
  29. if (null==userBehavior) {
  30. throw new NoSuchElementException("All records read from " + filePath);
  31. }
  32. return userBehavior;
  33. }
  34. }
  1. 每条记录对应的Bean类:UserBehavior,和CSV记录格式保持一致即可,表示时间的ts字段,使用了JsonFormat注解,在序列化的时候以此来控制格式:
  1. public class UserBehavior {
  2. @JsonFormat
  3. private long user_id;
  4. @JsonFormat
  5. private long item_id;
  6. @JsonFormat
  7. private long category_id;
  8. @JsonFormat
  9. private String behavior;
  10. @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")
  11. private Date ts;
  12. public UserBehavior() {
  13. }
  14. public UserBehavior(long user_id, long item_id, long category_id, String behavior, Date ts) {
  15. this.user_id = user_id;
  16. this.item_id = item_id;
  17. this.category_id = category_id;
  18. this.behavior = behavior;
  19. this.ts = ts;
  20. }
  21. }
  1. Java对象序列化成JSON的序列化类:JsonSerializer
  1. public class JsonSerializer<T> {
  2. private final ObjectMapper jsonMapper = new ObjectMapper();
  3. public String toJSONString(T r) {
  4. try {
  5. return jsonMapper.writeValueAsString(r);
  6. } catch (JsonProcessingException e) {
  7. throw new IllegalArgumentException("Could not serialize record: " + r, e);
  8. }
  9. }
  10. public byte[] toJSONBytes(T r) {
  11. try {
  12. return jsonMapper.writeValueAsBytes(r);
  13. } catch (JsonProcessingException e) {
  14. throw new IllegalArgumentException("Could not serialize record: " + r, e);
  15. }
  16. }
  17. }
  1. 向kafka发送消息的工具类:KafkaProducer
  1. public class KafkaProducer implements Consumer<UserBehavior> {
  2. private final String topic;
  3. private final org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;
  4. private final JsonSerializer<UserBehavior> serializer;
  5. public KafkaProducer(String kafkaTopic, String kafkaBrokers) {
  6. this.topic = kafkaTopic;
  7. this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(createKafkaProperties(kafkaBrokers));
  8. this.serializer = new JsonSerializer<>();
  9. }
  10. @Override
  11. public void accept(UserBehavior record) {
  12. // 将对象序列化成byte数组
  13. byte[] data = serializer.toJSONBytes(record);
  14. // 封装
  15. ProducerRecord<byte[], byte[]> kafkaRecord = new ProducerRecord<>(topic, data);
  16. // 发送
  17. producer.send(kafkaRecord);
  18. // 通过sleep控制消息的速度,请依据自身kafka配置以及flink服务器配置来调整
  19. try {
  20. Thread.sleep(500);
  21. }catch(InterruptedException e){
  22. e.printStackTrace();
  23. }
  24. }
  25. /**
  26. * kafka配置
  27. * @param brokers The brokers to connect to.
  28. * @return A Kafka producer configuration.
  29. */
  30. private static Properties createKafkaProperties(String brokers) {
  31. Properties kafkaProps = new Properties();
  32. kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
  33. kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  34. kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
  35. return kafkaProps;
  36. }
  37. }
  1. 最后是应用类SendMessageApplication,CSV文件路径、kafka的topic和borker地址都在此设置,另外借助java8的Stream API,只需少量代码即可完成所有工作:
  1. public class SendMessageApplication {
  2. public static void main(String[] args) throws Exception {
  3. // 文件地址
  4. String filePath = "D:\\temp\\202005\\02\\UserBehavior.csv";
  5. // kafka topic
  6. String topic = "user_behavior";
  7. // kafka borker地址
  8. String broker = "192.168.50.43:9092";
  9. Stream.generate(new UserBehaviorCsvFileReader(filePath))
  10. .sequential()
  11. .forEachOrdered(new KafkaProducer(topic, broker));
  12. }
  13. }
  1. 请确保kafka已经就绪,并且名为user_behavior的topic已经创建;
  2. 请将CSV文件准备好;
  3. 确认SendMessageApplication.java中的文件地址、kafka topic、kafka broker三个参数准确无误;
  4. 运行SendMessageApplication.java;
  5. 开启一个 控制台消息kafka消息,参考命令如下:
  1. ./kafka-console-consumer.sh \
  2. --bootstrap-server 127.0.0.1:9092 \
  3. --topic user_behavior \
  4. --consumer-property group.id=old-consumer-test \
  5. --consumer-property consumer.id=old-consumer-cl \
  6. --from-beginning
  1. 正常情况下可以立即见到消息,如下图:
    在这里插入图片描述
    至此,通过Java应用模拟用户行为消息流的操作就完成了,接下来的flink实战就用这个作为数据源;

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…
https://github.com/zq2599/blog_demos

版权声明:本文为bolingcavalry原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/bolingcavalry/p/13983379.html