本文将简述Flink SQL / Table API的内部实现,为大家把 “从SQL语句到具体执行” 这个流程串起来。并且尽量多提供调用栈,这样大家在遇到问题时就知道应该从什么地方设置断点,对整体架构理解也能更加深入。

本文将简述Flink SQL / Table API的内部实现,为大家把 “从SQL语句到具体执行” 这个流程串起来。并且尽量多提供调用栈,这样大家在遇到问题时就知道应该从什么地方设置断点,对整体架构理解也能更加深入。

SQL流程中涉及到几个重要的节点举例如下:

  1. // NOTE : 执行顺序是从上至下, " -----> " 表示生成的实例类型
  2. *
  3. * +-----> "left outer JOIN" (SQL statement)
  4. * |
  5. * |
  6. * SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
  7. * |
  8. * |
  9. * +-----> SqlJoin (SqlNode)
  10. * |
  11. * |
  12. * SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
  13. * |
  14. * |
  15. * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
  16. * |
  17. * |
  18. * FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules
  19. * VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan
  20. * |
  21. * |
  22. * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,逻辑执行计划
  23. * |
  24. * |
  25. * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin
  26. * VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
  27. * |
  28. * |
  29. * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
  30. * |
  31. * |
  32. * StreamExecJoin.translateToPlanInternal // 作用是生成 StreamOperator, 即Flink算子
  33. * |
  34. * |
  35. * +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask
  36. * |
  37. * |
  38. * StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask调用StreamingJoinOperator,真实的执行
  39. * |
  40. * |

后续我们会以这个图为脉络进行讲解

Flink Table API&SQL 为流式数据和静态数据的关系查询保留统一的接口,而且利用了Apache Calcite的查询优化框架和SQL parser。

为什么Flink要使用Table API呢?总结来说,关系型API的好处如下:

  • 关系型API是声明式的
  • 查询能够被有效的优化
  • 查询可以高效的执行
  • “Everybody” knows SQL

Calcite是这里面的核心成员。Apache Calcite是面向Hadoop新的sql引擎,它提供了标准的SQL语言、多种查询优化和连接各种数据源的能力。

下面是 Calcite 概念梳理:

  • 关系代数(Relational algebra):即关系表达式。它们通常以动词命名,例如 Sort, Join, Project, Filter, Scan, Sample.
  • 表达式有各种特征(Trait):使用 Trait 的 satisfies() 方法来测试某个表达式是否符合某 Trait 或 Convention.
  • 规则(Rules):用于将一个表达式转换(Transform)为另一个表达式。它有一个由 RelOptRuleOperand 组成的列表来决定是否可将规则应用于树的某部分。
  • 规划器(Planner) :即请求优化器,它可以根据一系列规则和成本模型(例如基于成本的优化模型 VolcanoPlanner、启发式优化模型 HepPlanner)来将一个表达式转为语义等价(但效率更优)的另一个表达式。
  • RelNode :代表了对数据的一个处理操作,常见的操作有 Sort、Join、Project、Filter、Scan 等。它蕴含的是对整个 Relation 的操作,而不是对具体数据的处理逻辑。RelNode 会标识其 input RelNode 信息,这样就构成了一棵 RelNode 树。
  • RexNode : 行表达式(标量表达式),蕴含的是对一行数据的处理逻辑。每个行表达式都有数据的类型。这是因为在 Valdiation 的过程中,编译器会推导出表达式的结果类型。常见的行表达式包括字面量 RexLiteral, 变量 RexVariable,函数或操作符调用 RexCall 等。RexNode 通过 RexBuilder 进行构建。
  • RelTrait : 用来定义逻辑表的物理相关属性(physical property),三种主要的 trait 类型是:Convention、RelCollation、RelDistribution;

Sql 的执行过程一般可以分为四个阶段,Calcite 与这个很类似,但Calcite是分成五个阶段 :

  1. SQL 解析阶段,生成AST(抽象语法树)(SQL–>SqlNode)

  2. SqlNode 验证(SqlNode–>SqlNode)

  3. 语义分析,生成逻辑计划(Logical Plan)(SqlNode–>RelNode/RexNode)

  4. 优化阶段,按照相应的规则(Rule)进行优化(RelNode–>RelNode)

  5. 生成ExecutionPlan,生成物理执行计划(DataStream Plan)

Flink承载了 Table API 和 SQL API 两套表达方式。它以Apache Calcite这个SQL解析器做SQL语义解析,统一生成为 Calcite Logical Plan(SqlNode 树);随后验证;再利用 Calcite的优化器优化转换规则和logical plan,根据数据源的性质(流和批)使用不同的规则进行优化,优化为 RelNode 逻辑执行计划树;最终优化后的plan转成常规的Flink DataSet 或 DataStream 程序。任何对于DataStream API和DataSet API的性能调优提升都能够自动地提升Table API或者SQL查询的效率。

一条stream sql从提交到calcite解析、优化最后到Flink引擎执行,一般分为以下几个阶段:

  1. Sql Parser: 将sql语句通过java cc解析成AST(语法树),在calcite中用SqlNode表示AST;
  2. Sql Validator: 结合数字字典(catalog)去验证sql语法;
  3. 生成Logical Plan: 将sqlNode表示的AST转换成LogicalPlan, 用relNode表示;
  4. 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan, 再基于Flink定制的一些优化rules去优化logical Plan;
  5. 生成Flink PhysicalPlan: 这里也是基于Flink里头的rules,将optimized LogicalPlan转成成Flink的物理执行计划;
  6. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。

而如果是通过table api来提交任务的话,也会经过calcite优化等阶段,基本流程和直接运行sql类似:

  1. table api parser: Flink会把table api表达的计算逻辑也表示成一颗树,用treeNode去表式;
    在这棵树上的每个节点的计算逻辑用Expression来表示。
  2. Validate: 会结合数字字典(catalog)将树的每个节点的Unresolved Expression进行绑定,生成Resolved Expression;
  3. 生成Logical Plan: 依次遍历数的每个节点,调用construct方法将原先用treeNode表达的节点转成成用calcite 内部的数据结构relNode 来表达。即生成了LogicalPlan, 用relNode表示;
  4. 生成 optimized LogicalPlan: 先基于calcite rules 去优化logical Plan,
    再基于Flink定制的一些优化rules去优化logical Plan;
  5. 生成Flink PhysicalPlan: 这里也是基于Flink里头的rules,将optimized LogicalPlan转成成Flink的物理执行计划;
  6. 将物理执行计划转成Flink ExecutionPlan: 就是调用相应的tanslateToPlan方法转换和利用CodeGen元编程成Flink的各种算子。

可以看出来,Table API 与 SQL 在获取 RelNode 之后是一样的流程,只是获取 RelNode 的方式有所区别:

  • Table API :通过使用 RelBuilder来拿到RelNode(LogicalNode与Expression分别转换成RelNode与RexNode);
  • SQL :通过使用Planner。首先通过parse方法将用户使用的SQL文本转换成由SqlNode表示的parse tree。接着通过validate方法,使用元信息来resolve字段,确定类型,验证有效性等等。最后通过rel方法将SqlNode转换成RelNode;

TableEnvironment对象是Table API和SQL集成的一个核心,支持以下场景:

  • 注册一个Table。
  • 将一个TableSource注册给TableEnvironment,这里的TableSource指的是将数据存储系统的作为Table,例如mysql,hbase,CSV,Kakfa,RabbitMQ等等。
  • 注册一个外部的catalog,可以访问外部系统的数据或文件。
  • 执行SQL查询。
  • 注册一个用户自定义的function。
  • 将DataStream或DataSet转成Table。

一个查询中只能绑定一个指定的TableEnvironment,TableEnvironment可以通过来配置TableConfig来配置,通过TableConfig可以自定义查询优化以及translation的进程。

TableEnvironment执行过程如下:

  • TableEnvironment.sql()为调用入口;

  • Flink实现了FlinkPlannerImpl,执行parse(sql),validate(sqlNode),rel(sqlNode)操作;

  • 生成Table;

具体代码摘要如下

  1. package org.apache.Flink.table.api.internal;
  2. @Internal
  3. public class TableEnvironmentImpl implements TableEnvironment {
  4. private final CatalogManager catalogManager;
  5. private final ModuleManager moduleManager;
  6. private final OperationTreeBuilder operationTreeBuilder;
  7. private final List<ModifyOperation> bufferedModifyOperations = new ArrayList<>();
  8. protected final TableConfig tableConfig;
  9. protected final Executor execEnv;
  10. protected final FunctionCatalog functionCatalog;
  11. protected final Planner planner;
  12. protected final Parser parser;
  13. }
  14. // 在程序中打印类内容如下
  15. this = {StreamTableEnvironmentImpl@4701}
  16. functionCatalog = {FunctionCatalog@4702}
  17. scalaExecutionEnvironment = {StreamExecutionEnvironment@4703}
  18. planner = {StreamPlanner@4704}
  19. config = {TableConfig@4708}
  20. executor = {StreamExecutor@4709}
  21. PlannerBase.config = {TableConfig@4708}
  22. functionCatalog = {FunctionCatalog@4702}
  23. catalogManager = {CatalogManager@1250}
  24. isStreamingMode = true
  25. plannerContext = {PlannerContext@4711}
  26. parser = {ParserImpl@4696}
  27. catalogManager = {CatalogManager@1250}
  28. moduleManager = {ModuleManager@4705}
  29. operationTreeBuilder = {OperationTreeBuilder@4706}
  30. bufferedModifyOperations = {ArrayList@4707} size = 0
  31. tableConfig = {TableConfig@4708}
  32. execEnv = {StreamExecutor@4709}
  33. TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4702}
  34. TableEnvironmentImpl.planner = {StreamPlanner@4704}
  35. parser = {ParserImpl@4696}
  36. registration = {TableEnvironmentImpl$1@4710}

Catalog – 定义元数据和命名空间,包含 Schema(库),Table(表),RelDataType(类型信息)。

所有对数据库和表的元数据信息都存放在Flink CataLog内部目录结构中,其存放了Flink内部所有与Table相关的元数据信息,包括表结构信息/数据源信息等。

  1. // TableEnvironment里面包含一个CatalogManager
  2. public final class CatalogManager {
  3. // A map between names and catalogs.
  4. private Map<String, Catalog> catalogs;
  5. }
  6. // Catalog接口
  7. public interface Catalog {
  8. ......
  9. default Optional<TableFactory> getTableFactory() {
  10. return Optional.empty();
  11. }
  12. ......
  13. }
  14. // 当数据来源是在程序里面自定义的时候,对应是GenericInMemoryCatalog
  15. public class GenericInMemoryCatalog extends AbstractCatalog {
  16. public static final String DEFAULT_DB = "default";
  17. private final Map<String, CatalogDatabase> databases;
  18. private final Map<ObjectPath, CatalogBaseTable> tables;
  19. private final Map<ObjectPath, CatalogFunction> functions;
  20. private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogPartition>> partitions;
  21. private final Map<ObjectPath, CatalogTableStatistics> tableStats;
  22. private final Map<ObjectPath, CatalogColumnStatistics> tableColumnStats;
  23. private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogTableStatistics>> partitionStats;
  24. private final Map<ObjectPath, Map<CatalogPartitionSpec, CatalogColumnStatistics>> partitionColumnStats;
  25. }
  26. // 程序中调试的内容
  27. catalogManager = {CatalogManager@4646}
  28. catalogs = {LinkedHashMap@4652} size = 1
  29. "default_catalog" -> {GenericInMemoryCatalog@4659}
  30. key = "default_catalog"
  31. value = {char[15]@4668}
  32. hash = 552406043
  33. value = {GenericInMemoryCatalog@4659}
  34. databases = {LinkedHashMap@4660} size = 1
  35. tables = {LinkedHashMap@4661} size = 0
  36. functions = {LinkedHashMap@4662} size = 0
  37. partitions = {LinkedHashMap@4663} size = 0
  38. tableStats = {LinkedHashMap@4664} size = 0
  39. tableColumnStats = {LinkedHashMap@4665} size = 0
  40. partitionStats = {LinkedHashMap@4666} size = 0
  41. partitionColumnStats = {LinkedHashMap@4667} size = 0
  42. catalogName = "default_catalog"
  43. defaultDatabase = "default_database"
  44. temporaryTables = {HashMap@4653} size = 2
  45. currentCatalogName = "default_catalog"
  46. currentDatabaseName = "default_database"
  47. builtInCatalogName = "default_catalog"

StreamPlanner是新的Blink Planner一种。

Flink Table 的新架构实现了查询处理器的插件化,社区完整保留原有 Flink Planner (Old Planner),同时又引入了新的 Blink Planner,用户可以自行选择使用 Old Planner 还是 Blink Planner。

在模型上,Old Planner 没有考虑流计算作业和批处理作业的统一,针对流计算作业和批处理作业的实现不尽相同,在底层会分别翻译到 DataStream API 和 DataSet API 上。而 Blink Planner 将批数据集看作 bounded DataStream (有界流式数据) ,流计算作业和批处理作业最终都会翻译到 Transformation API 上。 在架构上,Blink Planner 针对批处理和流计算,分别实现了BatchPlanner 和 StreamPlanner ,两者共用了大部分代码,共享了很多优化逻辑。 Old Planner 针对批处理和流计算的代码实现的是完全独立的两套体系,基本没有实现代码和优化逻辑复用。

除了模型和架构上的优点外,Blink Planner 沉淀了许多实用功能,集中在三个方面:

  • Blink Planner 对代码生成机制做了改进、对部分算子进行了优化,提供了丰富实用的新功能,如维表 join、Top N、MiniBatch、流式去重、聚合场景的数据倾斜优化等新功能。
  • Blink Planner 的优化策略是基于公共子图的优化算法,包含了基于成本的优化(CBO)和基于规则的优化(CRO)两种策略,优化更为全面。同时,Blink Planner 支持从 catalog 中获取数据源的统计信息,这对CBO优化非常重要。
  • Blink Planner 提供了更多的内置函数,更标准的 SQL 支持,在 Flink 1.9 版本中已经完整支持 TPC-H ,对高阶的 TPC-DS 支持也计划在下一个版本实现。

具体对应代码来看,StreamPlanner体现在translateToPlan会调用到不同的 StreamOperator 生成系统上。

  1. class StreamPlanner(
  2. executor: Executor,
  3. config: TableConfig,
  4. functionCatalog: FunctionCatalog,
  5. catalogManager: CatalogManager)
  6. extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
  7. override protected def translateToPlan(
  8. execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
  9. execNodes.map {
  10. case node: StreamExecNode[_] => node.translateToPlan(this)
  11. case _ =>
  12. throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
  13. "This is a bug and should not happen. Please file an issue.")
  14. }
  15. }
  16. }
  17. @Internal
  18. public final class StreamTableEnvironmentImpl extends TableEnvironmentImpl implements StreamTableEnvironment {
  19. private <T> DataStream<T> toDataStream(Table table, OutputConversionModifyOperation modifyOperation) {
  20. // 在转换回DataStream时候进行调用 planner 生成plan的操作。
  21. List<Transformation<?>> transformations = planner.translate(Collections.singletonList(modifyOperation));
  22. Transformation<T> transformation = getTransformation(table, transformations);
  23. executionEnvironment.addOperator(transformation);
  24. return new DataStream<>(executionEnvironment, transformation);
  25. }
  26. }
  27. // 程序中调试打印的运行栈
  28. translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  29. translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  30. translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
  31. translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  32. translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  33. translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  34. translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  35. translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
  36. translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  37. apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
  38. apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
  39. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  40. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  41. foreach:891, Iterator$class (scala.collection)
  42. foreach:1334, AbstractIterator (scala.collection)
  43. foreach:72, IterableLike$class (scala.collection)
  44. foreach:54, AbstractIterable (scala.collection)
  45. map:234, TraversableLike$class (scala.collection)
  46. map:104, AbstractTraversable (scala.collection)
  47. translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation)
  48. translate:153, PlannerBase (org.apache.Flink.table.planner.delegation)
  49. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  50. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  51. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  52. main:89, StreamSQLExample$ (spendreport)
  53. main:-1, StreamSQLExample (spendreport)

Flink实现了FlinkPlannerImpl,做为和Calcite 联系的桥梁,执行parse(sql),validate(sqlNode),rel(sqlNode)操作。

  1. class FlinkPlannerImpl(
  2. config: FrameworkConfig,
  3. catalogReaderSupplier: JFunction[JBoolean, CalciteCatalogReader],
  4. typeFactory: FlinkTypeFactory,
  5. cluster: RelOptCluster) {
  6. val operatorTable: SqlOperatorTable = config.getOperatorTable
  7. val parser: CalciteParser = new CalciteParser(config.getParserConfig)
  8. val convertletTable: SqlRexConvertletTable = config.getConvertletTable
  9. val sqlToRelConverterConfig: SqlToRelConverter.Config = config.getSqlToRelConverterConfig
  10. }
  11. // 这里会有使用 FlinkPlannerImpl
  12. public class ParserImpl implements Parser {
  13. private final CatalogManager catalogManager;
  14. private final Supplier<FlinkPlannerImpl> validatorSupplier;
  15. private final Supplier<CalciteParser> calciteParserSupplier;
  16. @Override
  17. public List<Operation> parse(String statement) {
  18. CalciteParser parser = calciteParserSupplier.get();
  19. // 这里会有使用 FlinkPlannerImpl
  20. FlinkPlannerImpl planner = validatorSupplier.get();
  21. // parse the sql query
  22. SqlNode parsed = parser.parse(statement);
  23. Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
  24. .orElseThrow(() -> new TableException("Unsupported query: " + statement));
  25. return Collections.singletonList(operation);
  26. }
  27. }
  28. // 程序中调试的内容
  29. planner = {FlinkPlannerImpl@4659}
  30. config = {Frameworks$StdFrameworkConfig@4685}
  31. catalogReaderSupplier = {PlannerContext$lambda@4686}
  32. typeFactory = {FlinkTypeFactory@4687}
  33. cluster = {FlinkRelOptCluster@4688}
  34. operatorTable = {ChainedSqlOperatorTable@4689}
  35. parser = {CalciteParser@4690}
  36. convertletTable = {StandardConvertletTable@4691}
  37. sqlToRelConverterConfig = {SqlToRelConverter$ConfigImpl@4692}
  38. validator = null
  39. // 程序调用栈之一
  40. validate:104, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
  41. convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  42. parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
  43. sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
  44. main:82, StreamSQLExample$ (spendreport)
  45. main:-1, StreamSQLExample (spendreport)
  46. // 程序调用栈之二
  47. rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
  48. toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  49. convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  50. convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  51. parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
  52. sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
  53. main:82, StreamSQLExample$ (spendreport)
  54. main:-1, StreamSQLExample (spendreport)

从代码中能看出,这就是个把各种相关操作和信息封装起来类而已,并不涉及太多实际逻辑。

  1. @Internal
  2. public class TableImpl implements Table {
  3. private static final AtomicInteger uniqueId = new AtomicInteger(0);
  4. private final TableEnvironment tableEnvironment;
  5. private final QueryOperation operationTree;
  6. private final OperationTreeBuilder operationTreeBuilder;
  7. private final LookupCallResolver lookupResolver;
  8. private TableImpl joinInternal(
  9. Table right,
  10. Optional<Expression> joinPredicate,
  11. JoinType joinType) {
  12. verifyTableCompatible(right);
  13. return createTable(operationTreeBuilder.join(
  14. this.operationTree,
  15. right.getQueryOperation(),
  16. joinType,
  17. joinPredicate,
  18. false));
  19. }
  20. }
  21. // 程序中调试的内容
  22. view = {TableImpl@4583} "UnnamedTable$0"
  23. tableEnvironment = {StreamTableEnvironmentImpl@4580}
  24. functionCatalog = {FunctionCatalog@4646}
  25. scalaExecutionEnvironment = {StreamExecutionEnvironment@4579}
  26. planner = {StreamPlanner@4647}
  27. catalogManager = {CatalogManager@4644}
  28. moduleManager = {ModuleManager@4648}
  29. operationTreeBuilder = {OperationTreeBuilder@4649}
  30. bufferedModifyOperations = {ArrayList@4650} size = 0
  31. tableConfig = {TableConfig@4651}
  32. execEnv = {StreamExecutor@4652}
  33. TableEnvironmentImpl.functionCatalog = {FunctionCatalog@4646}
  34. TableEnvironmentImpl.planner = {StreamPlanner@4647}
  35. parser = {ParserImpl@4653}
  36. registration = {TableEnvironmentImpl$1@4654}
  37. operationTree = {ScalaDataStreamQueryOperation@4665}
  38. identifier = null
  39. dataStream = {DataStreamSource@4676}
  40. fieldIndices = {int[2]@4677}
  41. tableSchema = {TableSchema@4678} "root\n |-- orderId: STRING\n |-- productName: STRING\n"
  42. operationTreeBuilder = {OperationTreeBuilder@4649}
  43. config = {TableConfig@4651}
  44. functionCatalog = {FunctionCatalog@4646}
  45. tableReferenceLookup = {TableEnvironmentImpl$lambda@4668}
  46. lookupResolver = {LookupCallResolver@4669}
  47. projectionOperationFactory = {ProjectionOperationFactory@4670}
  48. sortOperationFactory = {SortOperationFactory@4671}
  49. calculatedTableFactory = {CalculatedTableFactory@4672}
  50. setOperationFactory = {SetOperationFactory@4673}
  51. aggregateOperationFactory = {AggregateOperationFactory@4674}
  52. joinOperationFactory = {JoinOperationFactory@4675}
  53. lookupResolver = {LookupCallResolver@4666}
  54. functionLookup = {FunctionCatalog@4646}
  55. tableName = "UnnamedTable$0"
  56. value = {char[14]@4667}
  57. hash = 1355882650

这里对应前面脉络图,作用是生成了 SqlJoin 这样的 SqlNode

  1. // NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
  2. *
  3. * +-----> "left outer JOIN" (SQL statement)
  4. * |
  5. * |
  6. * SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
  7. * |
  8. * |
  9. * +-----> SqlJoin (SqlNode)
  10. * |
  11. * |

Calcite 使用 JavaCC 做 SQL 解析,JavaCC 根据 Calcite 中定义的 Parser.jj 文件,生成一系列的 java 代码,生成的 Java 代码会把 SQL 转换成 AST 的数据结构(这里是 SqlNode 类型)。

即:把 SQL 转换成为 AST (抽象语法树),在 Calcite 中用 SqlNode 来表示;

  1. package org.apache.Flink.table.planner.delegation;
  2. public class ParserImpl implements Parser {
  3. @Override
  4. public List<Operation> parse(String statement) {
  5. CalciteParser parser = calciteParserSupplier.get();
  6. FlinkPlannerImpl planner = validatorSupplier.get();
  7. // parse the sql query
  8. SqlNode parsed = parser.parse(statement);
  9. Operation operation = SqlToOperationConverter.convert(planner, catalogManager, parsed)
  10. .orElseThrow(() -> new TableException("Unsupported query: " + statement));
  11. return Collections.singletonList(operation);
  12. }
  13. }
  14. // 打印出来解析之后 parsed 的内容,我们能看到 SqlNode 的基本格式。
  15. parsed = {SqlBasicCall@4690} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2\nUNION ALL\nSELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
  16. operator = {SqlSetOperator@4716} "UNION ALL"
  17. all = true
  18. name = "UNION ALL"
  19. kind = {SqlKind@4742} "UNION"
  20. leftPrec = 14
  21. rightPrec = 15
  22. returnTypeInference = {ReturnTypes$lambda@4743}
  23. operandTypeInference = null
  24. operandTypeChecker = {SetopOperandTypeChecker@4744}
  25. operands = {SqlNode[2]@4717}
  26. 0 = {SqlSelect@4746} "SELECT *\nFROM `UnnamedTable$0`\nWHERE `amount` > 2"
  27. 1 = {SqlSelect@4747} "SELECT *\nFROM `OrderB`\nWHERE `amount` < 2"
  28. functionQuantifier = null
  29. expanded = false
  30. pos = {SqlParserPos@4719} "line 2, column 1"
  31. // 下面是调试相关Stack,可以帮助大家深入理解
  32. SqlStmt:3208, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
  33. SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
  34. parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.Flink.sql.parser.impl)
  35. parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
  36. parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
  37. parse:48, CalciteParser (org.apache.Flink.table.planner.calcite)
  38. parse:64, ParserImpl (org.apache.Flink.table.planner.delegation)
  39. sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
  40. main:82, StreamSQLExample$ (spendreport)
  41. main:-1, StreamSQLExample (spendreport)
  42. // 另一个参考 in FlinkSqlParserImpl.FromClause
  43. e = {SqlJoin@4709} "`Orders` AS `o`\nLEFT JOIN `Payment` AS `p` ON `o`.`orderId` = `p`.`orderId`"
  44. left = {SqlBasicCall@4676} "`Orders` AS `o`"
  45. operator = {SqlAsOperator@4752} "AS"
  46. operands = {SqlNode[2]@4753}
  47. functionQuantifier = null
  48. expanded = false
  49. pos = {SqlParserPos@4755} "line 7, column 3"
  50. natural = {SqlLiteral@4677} "FALSE"
  51. typeName = {SqlTypeName@4775} "BOOLEAN"
  52. value = {Boolean@4776} false
  53. pos = {SqlParserPos@4777} "line 7, column 13"
  54. joinType = {SqlLiteral@4678} "LEFT"
  55. typeName = {SqlTypeName@4758} "SYMBOL"
  56. value = {JoinType@4759} "LEFT"
  57. pos = {SqlParserPos@4724} "line 7, column 26"
  58. right = {SqlBasicCall@4679} "`Payment` AS `p`"
  59. operator = {SqlAsOperator@4752} "AS"
  60. operands = {SqlNode[2]@4763}
  61. functionQuantifier = null
  62. expanded = false
  63. pos = {SqlParserPos@4764} "line 7, column 31"
  64. conditionType = {SqlLiteral@4680} "ON"
  65. typeName = {SqlTypeName@4758} "SYMBOL"
  66. value = {JoinConditionType@4771} "ON"
  67. pos = {SqlParserPos@4772} "line 7, column 44"
  68. condition = {SqlBasicCall@4681} "`o`.`orderId` = `p`.`orderId`"
  69. operator = {SqlBinaryOperator@4766} "="
  70. operands = {SqlNode[2]@4767}
  71. functionQuantifier = null
  72. expanded = false
  73. pos = {SqlParserPos@4768} "line 7, column 47"
  74. pos = {SqlParserPos@4724} "line 7, column 26"
  75. // 下面是调试相关Stack,可以帮助大家深入理解
  76. FromClause:10192, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  77. SqlSelect:5918, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  78. LeafQuery:630, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  79. LeafQueryOrExpr:15651, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  80. QueryOrExpr:15118, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  81. OrderedQueryOrExpr:504, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  82. SqlStmt:3693, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  83. SqlStmtEof:3732, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  84. parseSqlStmtEof:234, FlinkSqlParserImpl (org.apache.flink.sql.parser.impl)
  85. parseQuery:160, SqlParser (org.apache.calcite.sql.parser)
  86. parseStmt:187, SqlParser (org.apache.calcite.sql.parser)
  87. parse:48, CalciteParser (org.apache.flink.table.planner.calcite)
  88. parse:64, ParserImpl (org.apache.flink.table.planner.delegation)
  89. sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  90. main:73, SimpleOuterJoin$ (spendreport)
  91. main:-1, SimpleOuterJoin (spendreport)

经过上面的第一步,会生成一个 SqlNode 对象,它是一个未经验证的抽象语法树,下面就进入了一个语法检查阶段,语法检查前需要知道元数据信息,这个检查会包括表名、字段名、函数名、数据类型的检查。

即:语法检查,根据元数据信息进行语法验证,验证之后还是用 SqlNode 表示 AST 语法树;

  1. package org.apache.Flink.table.planner.operations;
  2. public class SqlToOperationConverter {
  3. public static Optional<Operation> convert(
  4. // 这里进行validate的调用
  5. final SqlNode validated = FlinkPlanner.validate(sqlNode);
  6. SqlToOperationConverter converter = new SqlToOperationConverter(FlinkPlanner, catalogManager);
  7. }
  8. }
  9. // 打印出来解析之后 validated 的内容。
  10. validated = {SqlBasicCall@4675} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2\nUNION ALL\nSELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
  11. operator = {SqlSetOperator@5000} "UNION ALL"
  12. all = true
  13. name = "UNION ALL"
  14. kind = {SqlKind@5029} "UNION"
  15. leftPrec = 14
  16. rightPrec = 15
  17. returnTypeInference = {ReturnTypes$lambda@5030}
  18. operandTypeInference = null
  19. operandTypeChecker = {SetopOperandTypeChecker@5031}
  20. operands = {SqlNode[2]@5001}
  21. 0 = {SqlSelect@4840} "SELECT `UnnamedTable$0`.`user`, `UnnamedTable$0`.`product`, `UnnamedTable$0`.`amount`\nFROM `default_catalog`.`default_database`.`UnnamedTable$0` AS `UnnamedTable$0`\nWHERE `UnnamedTable$0`.`amount` > 2"
  22. 1 = {SqlSelect@5026} "SELECT `OrderB`.`user`, `OrderB`.`product`, `OrderB`.`amount`\nFROM `default_catalog`.`default_database`.`OrderB` AS `OrderB`\nWHERE `OrderB`.`amount` < 2"
  23. functionQuantifier = null
  24. expanded = false
  25. pos = {SqlParserPos@5003} "line 2, column 1"
  26. // 下面是调试相关Stack,可以帮助大家深入理解
  27. validate:81, AbstractNamespace (org.apache.calcite.sql.validate)
  28. validateNamespace:1008, SqlValidatorImpl (org.apache.calcite.sql.validate)
  29. validateQuery:968, SqlValidatorImpl (org.apache.calcite.sql.validate)
  30. validateCall:90, SqlSetOperator (org.apache.calcite.sql)
  31. validateCall:5304, SqlValidatorImpl (org.apache.calcite.sql.validate)
  32. validate:116, SqlCall (org.apache.calcite.sql)
  33. validateScopedExpression:943, SqlValidatorImpl (org.apache.calcite.sql.validate)
  34. validate:650, SqlValidatorImpl (org.apache.calcite.sql.validate)
  35. org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$validate:126, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
  36. validate:105, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
  37. convert:127, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  38. parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
  39. sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
  40. main:82, StreamSQLExample$ (spendreport)
  41. main:-1, StreamSQLExample (spendreport)

脉络图中,这时候来到了

  1. // NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
  2. *
  3. * +-----> "left outer JOIN" (SQL statement)
  4. * |
  5. * |
  6. * SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
  7. * |
  8. * |
  9. * +-----> SqlJoin (SqlNode)
  10. * |
  11. * |
  12. * SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
  13. * |
  14. * |
  15. * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
  16. * |
  17. * |

经过第二步之后,这里的 SqlNode 就是经过语法校验的 SqlNode 树,接下来这一步就是将 SqlNode 转换成 RelNode/RexNode,也就是生成相应的逻辑计划(Logical Plan)

即:语义分析,根据 SqlNode及元信息构建 RelNode 树,也就是最初版本的逻辑计划(Logical Plan);

根据这个已经生成的Flink的logical Plan,将它转换成calcite的logicalPlan,这样我们才能用到calcite强大的优化规则

Flink由上往下依次调用各个节点的construct方法,将Flink节点转换成calcite的RelNode节点。真正的实现是在 convertQueryRecursive() 方法中完成的。

比如生成 LogicalProject 调用关系大概如下:

  1. createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
  2. createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
  3. convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
  4. convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
  5. convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
  6. convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
  7. convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
  8. org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
  9. rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
  10. toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
  11. convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
  12. convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
  13. parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
  14. sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  15. main:73, SimpleOuterJoin$ (spendreport)
  16. main:-1, SimpleOuterJoin (spendreport)

具体详细源码如下:

  1. SqlToRelConverter 中的 convertQuery() SqlNode 转换为 RelRoot
  2. public class SqlToRelConverter {
  3. public RelRoot convertQuery(SqlNode query, boolean needsValidation, boolean top) {
  4. if (needsValidation) {
  5. query = this.validator.validate(query);
  6. }
  7. RelMetadataQuery.THREAD_PROVIDERS.set(JaninoRelMetadataProvider.of(this.cluster.getMetadataProvider()));
  8. RelNode result = this.convertQueryRecursive(query, top, (RelDataType)null).rel;
  9. if (top && isStream(query)) {
  10. result = new LogicalDelta(this.cluster, ((RelNode)result).getTraitSet(), (RelNode)result);
  11. }
  12. RelCollation collation = RelCollations.EMPTY;
  13. if (!query.isA(SqlKind.DML) && isOrdered(query)) {
  14. collation = this.requiredCollation((RelNode)result);
  15. }
  16. this.checkConvertedType(query, (RelNode)result);
  17. RelDataType validatedRowType = this.validator.getValidatedNodeType(query);
  18. // 这里设定了Root
  19. return RelRoot.of((RelNode)result, validatedRowType, query.getKind()).withCollation(collation);
  20. }
  21. }
  22. // 在这里打印
  23. toQueryOperation:523, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  24. // 得到如下内容,可以看到一个RelRoot的真实结构
  25. relational = {RelRoot@5248} "Root {kind: UNION, rel: LogicalUnion#6, rowType: RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount), fields: [<0, user>, <1, product>, <2, amount>], collation: []}"
  26. rel = {LogicalUnion@5227} "LogicalUnion#6"
  27. inputs = {RegularImmutableList@5272} size = 2
  28. kind = {SqlKind@5029} "UNION"
  29. all = true
  30. desc = "LogicalUnion#6"
  31. rowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
  32. digest = "LogicalUnion#6"
  33. cluster = {FlinkRelOptCluster@4800}
  34. id = 6
  35. traitSet = {RelTraitSet@5273} size = 5
  36. validatedRowType = {RelRecordType@5238} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
  37. kind = {StructKind@5268} "FULLY_QUALIFIED"
  38. nullable = false
  39. fieldList = {RegularImmutableList@5269} size = 3
  40. digest = "RecordType(BIGINT user, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" product, INTEGER amount) NOT NULL"
  41. kind = {SqlKind@5029} "UNION"
  42. lowerName = "union"
  43. sql = "UNION"
  44. name = "UNION"
  45. ordinal = 18
  46. fields = {RegularImmutableList@5254} size = 3
  47. {Integer@5261} 0 -> "user"
  48. {Integer@5263} 1 -> "product"
  49. {Integer@5265} 2 -> "amount"
  50. collation = {RelCollationImpl@5237} "[]"
  51. fieldCollations = {RegularImmutableList@5256} size = 0
  52. // 调用栈内容
  53. convertQuery:561, SqlToRelConverter (org.apache.calcite.sql2rel)
  54. org$apache$Flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
  55. rel:135, FlinkPlannerImpl (org.apache.Flink.table.planner.calcite)
  56. toQueryOperation:522, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  57. convertSqlQuery:436, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  58. convert:154, SqlToOperationConverter (org.apache.Flink.table.planner.operations)
  59. parse:66, ParserImpl (org.apache.Flink.table.planner.delegation)
  60. sqlQuery:464, TableEnvironmentImpl (org.apache.Flink.table.api.internal)
  61. main:82, StreamSQLExample$ (spendreport)
  62. main:-1, StreamSQLExample (spendreport)
  63. // 再次举例,生成了LogicalProject
  64. bb = {SqlToRelConverter$Blackboard@4978}
  65. scope = {SelectScope@4977}
  66. nameToNodeMap = null
  67. root = {LogicalProject@5100} "LogicalProject#4"
  68. exps = {RegularImmutableList@5105} size = 3
  69. input = {LogicalJoin@5106} "LogicalJoin#3"
  70. desc = "LogicalProject#4"
  71. rowType = {RelRecordType@5107} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) payType)"
  72. digest = "LogicalProject#4"
  73. cluster = {FlinkRelOptCluster@4949}
  74. id = 4
  75. traitSet = {RelTraitSet@5108} size = 5
  76. inputs = {Collections$SingletonList@5111} size = 1
  77. mapCorrelateToRex = {HashMap@5112} size = 0
  78. isPatternVarRef = false
  79. cursors = {ArrayList@5113} size = 0
  80. subQueryList = {LinkedHashSet@5114} size = 0
  81. agg = null
  82. window = null
  83. mapRootRelToFieldProjection = {HashMap@5115} size = 0
  84. columnMonotonicities = {ArrayList@5116} size = 3
  85. systemFieldList = {ArrayList@5117} size = 0
  86. top = true
  87. initializerExpressionFactory = {NullInitializerExpressionFactory@5118}
  88. this$0 = {SqlToRelConverter@4926}
  89. // 举例,LogicalProject是在这里生成的。
  90. protected void convertFrom(SqlToRelConverter.Blackboard bb, SqlNode from) {
  91. case JOIN:
  92. RelNode joinRel = this.createJoin(fromBlackboard, leftRel, rightRel, conditionExp, convertedJoinType);
  93. bb.setRoot(joinRel, false);
  94. }
  95. // 相关调用栈
  96. createJoin:378, RelFactories$JoinFactoryImpl (org.apache.calcite.rel.core)
  97. createJoin:2520, SqlToRelConverter (org.apache.calcite.sql2rel)
  98. convertFrom:2111, SqlToRelConverter (org.apache.calcite.sql2rel)
  99. convertSelectImpl:646, SqlToRelConverter (org.apache.calcite.sql2rel)
  100. convertSelect:627, SqlToRelConverter (org.apache.calcite.sql2rel)
  101. convertQueryRecursive:3181, SqlToRelConverter (org.apache.calcite.sql2rel)
  102. convertQuery:563, SqlToRelConverter (org.apache.calcite.sql2rel)
  103. org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel:148, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
  104. rel:135, FlinkPlannerImpl (org.apache.flink.table.planner.calcite)
  105. toQueryOperation:522, SqlToOperationConverter (org.apache.flink.table.planner.operations)
  106. convertSqlQuery:436, SqlToOperationConverter (org.apache.flink.table.planner.operations)
  107. convert:154, SqlToOperationConverter (org.apache.flink.table.planner.operations)
  108. parse:66, ParserImpl (org.apache.flink.table.planner.delegation)
  109. sqlQuery:464, TableEnvironmentImpl (org.apache.flink.table.api.internal)
  110. main:73, SimpleOuterJoin$ (spendreport)
  111. main:-1, SimpleOuterJoin (spendreport)

这时候,脉络图到了这里

  1. // NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
  2. *
  3. * +-----> "left outer JOIN" (SQL statement)
  4. * |
  5. * |
  6. * SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
  7. * |
  8. * |
  9. * +-----> SqlJoin (SqlNode)
  10. * |
  11. * |
  12. * SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
  13. * |
  14. * |
  15. * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
  16. * |
  17. * |
  18. * FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules
  19. * VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan
  20. * |
  21. * |
  22. * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,逻辑执行计划
  23. * |
  24. * |
  25. * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin
  26. * VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
  27. * |
  28. * |
  29. * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
  30. * |
  31. * |

第四阶段,也就是 Calcite 的核心所在。

即:逻辑计划优化,优化器的核心,根据前面生成的逻辑计划按照相应的规则(Rule)进行优化;

Flink的这部分实现统一封装在optimize方法里头。这部分涉及到多个阶段,每个阶段都是用Rule对逻辑计划进行优化和改进。

在 Calcite 架构中,最核心地方就是 Optimizer,也就是优化器,一个 Optimization Engine 包含三个组成部分:

  • rules:也就是匹配规则,Calcite 内置上百种 Rules 来优化 relational expression,当然也支持自定义 rules;
  • metadata providers:主要是向优化器提供信息,这些信息会有助于指导优化器向着目标(减少整体 cost)进行优化,信息可以包括行数、table 哪一列是唯一列等,也包括计算 RelNode 树中执行 subexpression cost 的函数;
  • planner engines:它的主要目标是进行触发 rules 来达到指定目标,比如像 cost-based optimizer(CBO)的目标是减少cost(Cost 包括处理的数据行数、CPU cost、IO cost 等)。

优化器的作用是将解析器生成的关系代数表达式转换成执行计划,供执行引擎执行,在这个过程中,会应用一些规则优化,以帮助生成更高效的执行计划。优化器进行优化的地方如过滤条件的下压(push down),在进行 join 操作前,先进行 filter 操作,这样的话就不需要在 join 时进行全量 join,减少参与 join 的数据量等。

Calcite 中 RelOptPlanner 是 Calcite 中优化器的基类。Calcite 中关于优化器提供了两种实现:

  • HepPlanner:就是基于规则优化RBO 的实现,它是一个启发式的优化器,按照规则进行匹配,直到达到次数限制(match 次数限制)或者遍历一遍后不再出现 rule match 的情况才算完成;
  • VolcanoPlanner:就是基于成本优化CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。

基于代价的优化器(Cost-Based Optimizer,CBO) 是根据优化规则对关系表达式进行转换。这里的转换是说一个关系表达式经过优化规则后会生成另外一个关系表达式,同时原有表达式也会保留,经过一系列转换后会生成多个执行计划,然后 CBO 会根据统计信息和代价模型 (Cost Model) 计算每个执行计划的 Cost,从中挑选 Cost 最小的执行计划。

由上可知,CBO 中有两个依赖:统计信息和代价模型。统计信息的准确与否、代价模型的合理与否都会影响 CBO 选择最优计划。 从上述描述可知,CBO 是优于 RBO 的,原因是 RBO 是一种只认规则,对数据不敏感的呆板的优化器,而在实际过程中,数据往往是有变化的,通过 RBO 生成的执行计划很有可能不是最优的。事实上目前各大数据库和大数据计算引擎都倾向于使用 CBO,但是对于流式计算引擎来说,使用 CBO 还是有很大难度的,因为并不能提前预知数据量等信息,这会极大地影响优化效果,CBO 主要还是应用在离线的场景。

VolcanoPlanner就是 CBO 的实现,它会一直迭代 rules,直到找到 cost 最小的 paln。其部分相关概念如下:

  • RelSet 描述一组等价 Relation Expression,所有的 RelNode 会记录在 rels 中;
  • RelSubset 描述一组物理属性相同的等价 Relation Expression,即它们具有相同的 Physical Properties;每个 RelSubset 都会记录其所属的 RelSet;RelSubset 继承自 AbstractRelNode,它也是一种 RelNode,物理属性记录在其成员变量 traitSet 中。每个 RelSubset 都将会记录其最佳 plan(best)和最佳 plan 的 cost(bestCost)信息。
  • RuleMatch 是对 Rule 和 RelSubset 关系的一个抽象,它会记录这两者的信息。
  • importance 决定了在进行 Rule 优化时 Rule 应用的顺序,它是一个相对概念,在 VolcanoPlanner 中有两个 importance,分别是 RelSubset 和 RuleMatch 的 importance

在应用 VolcanoPlanner 时,整体分为以下四步:

  1. 初始化 VolcanoPlanner,并向 Rule Match Queue 中添加相应的 Rule Match(包括 ConverterRule);
  2. 对 RelNode 做等价转换:应用 Rule Match 对 plan graph 做 transformation 优化(Rule specifies an Operator sub-graph to match and logic to generate equivalent better sub-graph);这里只是改变其物理属性(Convention);
  3. 通过 VolcanoPlanner 的 setRoot() 方法注册相应的 RelNode,并进行相应的初始化操作;
  4. 通过动态规划算法进行相应的迭代,直到 cost 不再变化或者 Rule Match Queue 中 rule match 已经全部应用完成;这样找到 cost 最小的 plan;Rule Match 的 importance 依赖于 RelNode 的 cost 和深度。

下面通过这个 示例 来详细看下 VolcanoPlanner 内部的实现逻辑。

  1. //1. 初始化 VolcanoPlanner 对象,并添加相应的 Rule
  2. VolcanoPlanner planner = new VolcanoPlanner();
  3. planner.addRelTraitDef(ConventionTraitDef.INSTANCE);
  4. planner.addRelTraitDef(RelDistributionTraitDef.INSTANCE);
  5. // 添加相应的 rule
  6. planner.addRule(FilterJoinRule.FilterIntoJoinRule.FILTER_ON_JOIN);
  7. planner.addRule(ReduceExpressionsRule.PROJECT_INSTANCE);
  8. planner.addRule(PruneEmptyRules.PROJECT_INSTANCE);
  9. // 添加相应的 ConverterRule
  10. planner.addRule(EnumerableRules.ENUMERABLE_MERGE_JOIN_RULE);
  11. planner.addRule(EnumerableRules.ENUMERABLE_SORT_RULE);
  12. planner.addRule(EnumerableRules.ENUMERABLE_VALUES_RULE);
  13. planner.addRule(EnumerableRules.ENUMERABLE_PROJECT_RULE);
  14. planner.addRule(EnumerableRules.ENUMERABLE_FILTER_RULE);
  15. //2. Changes a relational expression to an equivalent one with a different set of traits.
  16. RelTraitSet desiredTraits =
  17. relNode.getCluster().traitSet().replace(EnumerableConvention.INSTANCE);
  18. relNode = planner.changeTraits(relNode, desiredTraits);
  19. //3. 通过 VolcanoPlanner 的 setRoot 方法注册相应的 RelNode,并进行相应的初始化操作
  20. planner.setRoot(relNode);
  21. //4. 通过动态规划算法找到 cost 最小的 plan
  22. relNode = planner.findBestExp();

Flink 中相关代码如下:

  1. public PlannerContext(
  2. TableConfig tableConfig,
  3. FunctionCatalog functionCatalog,
  4. CatalogManager catalogManager,
  5. CalciteSchema rootSchema,
  6. List<RelTraitDef> traitDefs) {
  7. this.tableConfig = tableConfig;
  8. this.context = new FlinkContextImpl(
  9. tableConfig,
  10. functionCatalog,
  11. catalogManager,
  12. this::createSqlExprToRexConverter);
  13. this.rootSchema = rootSchema;
  14. this.traitDefs = traitDefs;
  15. // Make a framework config to initialize the RelOptCluster instance,
  16. // caution that we can only use the attributes that can not be overwrite/configured
  17. // by user.
  18. this.frameworkConfig = createFrameworkConfig();
  19. // 这里使用了VolcanoPlanner
  20. RelOptPlanner planner = new VolcanoPlanner(frameworkConfig.getCostFactory(), frameworkConfig.getContext());
  21. planner.setExecutor(frameworkConfig.getExecutor());
  22. for (RelTraitDef traitDef : frameworkConfig.getTraitDefs()) {
  23. planner.addRelTraitDef(traitDef);
  24. }
  25. this.cluster = FlinkRelOptClusterFactory.create(planner, new RexBuilder(typeFactory));
  26. }
  27. //初始化
  28. <init>:119, PlannerContext (org.apache.Flink.table.planner.delegation)
  29. <init>:86, PlannerBase (org.apache.Flink.table.planner.delegation)
  30. <init>:44, StreamPlanner (org.apache.Flink.table.planner.delegation)
  31. create:50, BlinkPlannerFactory (org.apache.Flink.table.planner.delegation)
  32. create:325, StreamTableEnvironmentImpl$ (org.apache.Flink.table.api.scala.internal)
  33. create:425, StreamTableEnvironment$ (org.apache.Flink.table.api.scala)
  34. main:56, StreamSQLExample$ (spendreport)
  35. main:-1, StreamSQLExample (spendreport)
  36. class FlinkVolcanoProgram[OC <: FlinkOptimizeContext] extends FlinkRuleSetProgram[OC] {
  37. override def optimize(root: RelNode, context: OC): RelNode = {
  38. val targetTraits = root.getTraitSet.plusAll(requiredOutputTraits.get).simplify()
  39. // VolcanoPlanner limits that the planer a RelNode tree belongs to and
  40. // the VolcanoPlanner used to optimize the RelNode tree should be same instance.
  41. // see: VolcanoPlanner#registerImpl
  42. // here, use the planner in cluster directly
  43. // 这里也使用了VolcanoPlanner
  44. val planner = root.getCluster.getPlanner.asInstanceOf[VolcanoPlanner]
  45. val optProgram = Programs.ofRules(rules)
  46. }
  47. }
  48. // 其调用栈
  49. optimize:60, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
  50. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  51. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  52. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  53. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  54. foreach:891, Iterator$class (scala.collection)
  55. foreach:1334, AbstractIterator (scala.collection)
  56. foreach:72, IterableLike$class (scala.collection)
  57. foreach:54, AbstractIterable (scala.collection)
  58. foldLeft:157, TraversableOnce$class (scala.collection)
  59. foldLeft:104, AbstractTraversable (scala.collection)
  60. optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
  61. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  62. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  63. optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  64. optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
  65. translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
  66. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  67. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  68. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  69. main:89, StreamSQLExample$ (spendreport)
  70. main:-1, StreamSQLExample (spendreport)
  71. // 下面全部是 VolcanoPlanner 相关代码和调用栈
  72. // VolcanoPlanner添加Rule,筛选出来的优化规则会封装成VolcanoRuleMatch,然后扔到RuleQueue里,而这个RuleQueue正是接下来执行动态规划算法要用到的核心类。
  73. public class VolcanoPlanner extends AbstractRelOptPlanner {
  74. public boolean addRule(RelOptRule rule) {
  75. ......
  76. }
  77. }
  78. addRule:438, VolcanoPlanner (org.apache.calcite.plan.volcano)
  79. run:315, Programs$RuleSetProgram (org.apache.calcite.tools)
  80. optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
  81. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  82. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  83. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  84. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  85. foreach:891, Iterator$class (scala.collection)
  86. foreach:1334, AbstractIterator (scala.collection)
  87. foreach:72, IterableLike$class (scala.collection)
  88. foreach:54, AbstractIterable (scala.collection)
  89. foldLeft:157, TraversableOnce$class (scala.collection)
  90. foldLeft:104, AbstractTraversable (scala.collection)
  91. optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
  92. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  93. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  94. optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  95. optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
  96. translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
  97. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  98. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  99. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  100. main:89, StreamSQLExample$ (spendreport)
  101. main:-1, StreamSQLExample (spendreport)
  102. // VolcanoPlanner修改Traits
  103. public class VolcanoPlanner extends AbstractRelOptPlanner {
  104. public RelNode changeTraits(RelNode rel, RelTraitSet toTraits) {
  105. assert !rel.getTraitSet().equals(toTraits);
  106. assert toTraits.allSimple();
  107. RelSubset rel2 = this.ensureRegistered(rel, (RelNode)null);
  108. return rel2.getTraitSet().equals(toTraits) ? rel2 : rel2.set.getOrCreateSubset(rel.getCluster(), toTraits.simplify());
  109. }
  110. }
  111. changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
  112. run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
  113. optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
  114. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  115. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  116. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  117. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  118. foreach:891, Iterator$class (scala.collection)
  119. foreach:1334, AbstractIterator (scala.collection)
  120. foreach:72, IterableLike$class (scala.collection)
  121. foreach:54, AbstractIterable (scala.collection)
  122. foldLeft:157, TraversableOnce$class (scala.collection)
  123. foldLeft:104, AbstractTraversable (scala.collection)
  124. optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
  125. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  126. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  127. optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  128. optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
  129. translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
  130. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  131. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  132. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  133. main:89, StreamSQLExample$ (spendreport)
  134. main:-1, StreamSQLExample (spendreport)
  135. // VolcanoPlanner设定Root
  136. public class VolcanoPlanner extends AbstractRelOptPlanner {
  137. public void setRoot(RelNode rel) {
  138. this.registerMetadataRels();
  139. this.root = this.registerImpl(rel, (RelSet)null);
  140. if (this.originalRoot == null) {
  141. this.originalRoot = rel;
  142. }
  143. this.ruleQueue.recompute(this.root);
  144. this.ensureRootConverters();
  145. }
  146. }
  147. setRoot:294, VolcanoPlanner (org.apache.calcite.plan.volcano)
  148. run:326, Programs$RuleSetProgram (org.apache.calcite.tools)
  149. optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
  150. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  151. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  152. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  153. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  154. foreach:891, Iterator$class (scala.collection)
  155. foreach:1334, AbstractIterator (scala.collection)
  156. foreach:72, IterableLike$class (scala.collection)
  157. foreach:54, AbstractIterable (scala.collection)
  158. foldLeft:157, TraversableOnce$class (scala.collection)
  159. foldLeft:104, AbstractTraversable (scala.collection)
  160. optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
  161. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  162. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  163. optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  164. optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
  165. translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
  166. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  167. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  168. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  169. main:89, StreamSQLExample$ (spendreport)
  170. main:-1, StreamSQLExample (spendreport)
  171. // VolcanoPlanner找到最小cost,本质上就是一个动态规划算法的实现。
  172. public class VolcanoPlanner extends AbstractRelOptPlanner {
  173. public RelNode findBestExp() {
  174. this.ensureRootConverters();
  175. this.registerMaterializations();
  176. int cumulativeTicks = 0;
  177. VolcanoPlannerPhase[] var2 = VolcanoPlannerPhase.values();
  178. int var3 = var2.length;
  179. for(int var4 = 0; var4 < var3; ++var4) {
  180. VolcanoPlannerPhase phase = var2[var4];
  181. this.setInitialImportance();
  182. RelOptCost targetCost = this.costFactory.makeHugeCost();
  183. int tick = 0;
  184. int firstFiniteTick = -1;
  185. int splitCount = 0;
  186. int giveUpTick = 2147483647;
  187. while(true) {
  188. ++tick;
  189. ++cumulativeTicks;
  190. if (this.root.bestCost.isLe(targetCost)) {
  191. if (firstFiniteTick < 0) {
  192. firstFiniteTick = cumulativeTicks;
  193. this.clearImportanceBoost();
  194. }
  195. if (!this.ambitious) {
  196. break;
  197. }
  198. targetCost = this.root.bestCost.multiplyBy(0.9D);
  199. ++splitCount;
  200. if (this.impatient) {
  201. if (firstFiniteTick < 10) {
  202. giveUpTick = cumulativeTicks + 25;
  203. } else {
  204. giveUpTick = cumulativeTicks + Math.max(firstFiniteTick / 10, 25);
  205. }
  206. }
  207. } else {
  208. if (cumulativeTicks > giveUpTick) {
  209. break;
  210. }
  211. if (this.root.bestCost.isInfinite() && tick % 10 == 0) {
  212. this.injectImportanceBoost();
  213. }
  214. }
  215. VolcanoRuleMatch match = this.ruleQueue.popMatch(phase);
  216. if (match == null) {
  217. break;
  218. }
  219. assert match.getRule().matches(match);
  220. match.onMatch();
  221. this.root = this.canonize(this.root);
  222. }
  223. this.ruleQueue.phaseCompleted(phase);
  224. }
  225. RelNode cheapest = this.root.buildCheapestPlan(this);
  226. return cheapest;
  227. }
  228. }
  229. // VolcanoPlanner得到的Flink逻辑节点 cheapest,就是最终选择的结点
  230. cheapest = {FlinkLogicalUnion@6487} "FlinkLogicalUnion#443"
  231. cluster = {FlinkRelOptCluster@6224}
  232. inputs = {RegularImmutableList@6493} size = 2
  233. 0 = {FlinkLogicalCalc@6498} "FlinkLogicalCalc#441"
  234. cluster = {FlinkRelOptCluster@6224}
  235. calcProgram = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
  236. program = {RexProgram@6509} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[>($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
  237. input = {FlinkLogicalDataStreamTableScan@6510} "rel#437:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, UnnamedTable$0])"
  238. desc = "FlinkLogicalCalc#441"
  239. rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
  240. digest = "FlinkLogicalCalc#441"
  241. AbstractRelNode.cluster = {FlinkRelOptCluster@6224}
  242. id = 441
  243. traitSet = {RelTraitSet@5942} size = 5
  244. 1 = {FlinkLogicalCalc@6499} "FlinkLogicalCalc#442"
  245. cluster = {FlinkRelOptCluster@6224}
  246. calcProgram = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
  247. program = {RexProgram@6502} "(expr#0..2=[{inputs}], expr#3=[2], expr#4=[<($t2, $t3)], proj#0..2=[{exprs}], $condition=[$t4])"
  248. input = {FlinkLogicalDataStreamTableScan@6503} "rel#435:FlinkLogicalDataStreamTableScan.LOGICAL.any.None: 0.false.UNKNOWN(table=[default_catalog, default_database, OrderB])"
  249. desc = "FlinkLogicalCalc#442"
  250. rowType = {RelRecordType@6504} "RecordType(BIGINT user, VARCHAR(2147483647) product, INTEGER amount)"
  251. digest = "FlinkLogicalCalc#442"
  252. AbstractRelNode.cluster = {FlinkRelOptCluster@6224}
  253. id = 442
  254. traitSet = {RelTraitSet@5942} size = 5
  255. kind = {SqlKind@6494} "UNION"
  256. lowerName = "union"
  257. sql = "UNION"
  258. name = "UNION"
  259. ordinal = 18
  260. all = true
  261. desc = "FlinkLogicalUnion#443"
  262. rowType = null
  263. digest = "FlinkLogicalUnion#443"
  264. AbstractRelNode.cluster = {FlinkRelOptCluster@6224}
  265. id = 443
  266. traitSet = {RelTraitSet@5942} size = 5
  267. findBestExp:572, VolcanoPlanner (org.apache.calcite.plan.volcano)
  268. run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
  269. optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
  270. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  271. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  272. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  273. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  274. foreach:891, Iterator$class (scala.collection)
  275. foreach:1334, AbstractIterator (scala.collection)
  276. foreach:72, IterableLike$class (scala.collection)
  277. foreach:54, AbstractIterable (scala.collection)
  278. foldLeft:157, TraversableOnce$class (scala.collection)
  279. foldLeft:104, AbstractTraversable (scala.collection)
  280. optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
  281. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  282. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  283. optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  284. optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
  285. translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
  286. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  287. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  288. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  289. main:89, StreamSQLExample$ (spendreport)
  290. main:-1, StreamSQLExample (spendreport)

以下是Join的优化

  1. class FlinkLogicalJoin(
  2. cluster: RelOptCluster,
  3. traitSet: RelTraitSet,
  4. left: RelNode,
  5. right: RelNode,
  6. condition: RexNode,
  7. joinType: JoinRelType)
  8. extends FlinkLogicalJoinBase(
  9. override def convert(rel: RelNode): RelNode = {
  10. val join = rel.asInstanceOf[LogicalJoin]
  11. val traitSet = rel.getTraitSet.replace(FlinkConventions.LOGICAL)
  12. val newLeft = RelOptRule.convert(join.getLeft, FlinkConventions.LOGICAL)
  13. val newRight = RelOptRule.convert(join.getRight, FlinkConventions.LOGICAL)
  14. new FlinkLogicalJoin(
  15. rel.getCluster,
  16. traitSet,
  17. newLeft,
  18. newRight,
  19. join.getCondition,
  20. join.getJoinType)
  21. }
  22. }
  23. call = {VolcanoRuleMatch@6191} "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]"
  24. targetSet = {RelSet@6193}
  25. targetSubset = null
  26. digest = "rule [FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)] rels [rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)]"
  27. cachedImportance = 0.8019000000000001
  28. volcanoPlanner = {VolcanoPlanner@6194}
  29. generatedRelList = null
  30. id = 71
  31. operand0 = {RelOptRule$ConverterRelOptRuleOperand@6186}
  32. parent = null
  33. rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
  34. predicate = {ConverterRule$lambda@6246}
  35. solveOrder = {int[1]@6247}
  36. ordinalInParent = 0
  37. ordinalInRule = 0
  38. trait = {Convention$Impl@6184} "NONE"
  39. clazz = {Class@5010} "class org.apache.calcite.rel.logical.LogicalJoin"
  40. children = {RegularImmutableList@6230} size = 0
  41. childPolicy = {RelOptRuleOperandChildPolicy@6248} "ANY"
  42. nodeInputs = {RegularImmutableBiMap@6195} size = 0
  43. rule = {FlinkLogicalJoinConverter@6179} "FlinkLogicalJoinConverter(in:NONE,out:LOGICAL)"
  44. rels = {RelNode[1]@6196}
  45. 0 = {LogicalJoin@6181} "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
  46. semiJoinDone = false
  47. systemFieldList = {RegularImmutableList@6230} size = 0
  48. condition = {RexCall@6231} "=($0, $2)"
  49. variablesSet = {RegularImmutableSet@6232} size = 0
  50. joinType = {JoinRelType@6233} "LEFT"
  51. joinInfo = {JoinInfo@6234}
  52. left = {RelSubset@6235} "rel#98:Subset#0.NONE.any.None: 0.false.UNKNOWN"
  53. right = {RelSubset@6236} "rel#99:Subset#1.NONE.any.None: 0.false.UNKNOWN"
  54. desc = "rel#100:LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
  55. rowType = {RelRecordType@6237} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName, VARCHAR(2147483647) orderId0, VARCHAR(2147483647) payType)"
  56. digest = "LogicalJoin.NONE.any.None: 0.false.UNKNOWN(left=RelSubset#98,right=RelSubset#99,condition==($0, $2),joinType=left)"
  57. cluster = {FlinkRelOptCluster@6239}
  58. id = 100
  59. traitSet = {RelTraitSet@6240} size = 5
  60. planner = {VolcanoPlanner@6194}
  61. parents = null
  62. // 生成时候的调用栈
  63. create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical)
  64. convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical)
  65. onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
  66. onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  67. findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
  68. run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
  69. optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
  70. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  71. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  72. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  73. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  74. foreach:891, Iterator$class (scala.collection)
  75. foreach:1334, AbstractIterator (scala.collection)
  76. foreach:72, IterableLike$class (scala.collection)
  77. foreach:54, AbstractIterable (scala.collection)
  78. foldLeft:157, TraversableOnce$class (scala.collection)
  79. foldLeft:104, AbstractTraversable (scala.collection)
  80. optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
  81. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  82. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  83. optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  84. optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
  85. translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
  86. toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  87. toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  88. toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
  89. main:75, SimpleOuterJoin$ (spendreport)
  90. main:-1, SimpleOuterJoin (spendreport)
  91. abstract class FlinkLogicalJoinBase(
  92. cluster: RelOptCluster,
  93. traitSet: RelTraitSet,
  94. left: RelNode,
  95. right: RelNode,
  96. condition: RexNode,
  97. joinType: JoinRelType)
  98. extends Join(
  99. cluster,
  100. traitSet,
  101. left,
  102. right,
  103. condition,
  104. Set.empty[CorrelationId].asJava,
  105. joinType)
  106. with FlinkLogicalRel {
  107. // 这里也会计算cost
  108. override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
  109. val leftRowCnt = mq.getRowCount(getLeft)
  110. val leftRowSize = mq.getAverageRowSize(getLeft)
  111. val rightRowCnt = mq.getRowCount(getRight)
  112. joinType match {
  113. case JoinRelType.SEMI | JoinRelType.ANTI =>
  114. val rightRowSize = mq.getAverageRowSize(getRight)
  115. val ioCost = (leftRowCnt * leftRowSize) + (rightRowCnt * rightRowSize)
  116. val cpuCost = leftRowCnt + rightRowCnt
  117. val rowCnt = leftRowCnt + rightRowCnt
  118. planner.getCostFactory.makeCost(rowCnt, cpuCost, ioCost)
  119. case _ =>
  120. val cpuCost = leftRowCnt + rightRowCnt
  121. val ioCost = (leftRowCnt * leftRowSize) + rightRowCnt
  122. planner.getCostFactory.makeCost(leftRowCnt, cpuCost, ioCost)
  123. }
  124. }
  125. }
  126. // 调用栈
  127. computeSelfCost:63, FlinkLogicalJoin (org.apache.flink.table.planner.plan.nodes.logical)
  128. getNonCumulativeCost:41, FlinkRelMdNonCumulativeCost (org.apache.flink.table.planner.plan.metadata)
  129. getNonCumulativeCost_$:-1, GeneratedMetadataHandler_NonCumulativeCost
  130. getNonCumulativeCost:-1, GeneratedMetadataHandler_NonCumulativeCost
  131. getNonCumulativeCost:301, RelMetadataQuery (org.apache.calcite.rel.metadata)
  132. getCost:936, VolcanoPlanner (org.apache.calcite.plan.volcano)
  133. propagateCostImprovements0:347, RelSubset (org.apache.calcite.plan.volcano)
  134. propagateCostImprovements:330, RelSubset (org.apache.calcite.plan.volcano)
  135. addRelToSet:1828, VolcanoPlanner (org.apache.calcite.plan.volcano)
  136. registerImpl:1764, VolcanoPlanner (org.apache.calcite.plan.volcano)
  137. register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
  138. ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
  139. ensureRegistered:1939, VolcanoPlanner (org.apache.calcite.plan.volcano)
  140. transformTo:129, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  141. transformTo:236, RelOptRuleCall (org.apache.calcite.plan)
  142. onMatch:146, ConverterRule (org.apache.calcite.rel.convert)
  143. onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  144. findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
  145. run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
  146. optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
  147. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  148. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  149. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  150. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  151. foreach:891, Iterator$class (scala.collection)
  152. foreach:1334, AbstractIterator (scala.collection)
  153. foreach:72, IterableLike$class (scala.collection)
  154. foreach:54, AbstractIterable (scala.collection)
  155. foldLeft:157, TraversableOnce$class (scala.collection)
  156. foldLeft:104, AbstractTraversable (scala.collection)
  157. optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
  158. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  159. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  160. optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  161. optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
  162. translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
  163. toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  164. toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  165. toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
  166. main:75, SimpleOuterJoin$ (spendreport)
  167. main:-1, SimpleOuterJoin (spendreport)

Calcite 会基于优化规则来优化这些 Logical Plan,根据运行环境的不同会应用不同的优化规则(Flink提供了批的优化规则 和 流的优化规则)。

优化规则分为两类,一类是Calcite提供的内置优化规则(如条件下推,剪枝等),另一类是是将Logical Node转变成 Flink Node 的规则。

这两步骤都属于 Calcite 的优化阶段。得到的 DataStream Plan 封装了如何将节点翻译成对应 DataStream / DataSet 程序的逻辑。其步骤就是将不同的 DataStream/DataSet Node 通过代码生成(CodeGen)翻译成最终可执行的 DataStream/DataSet 程序。

下面就列举了不同的Rule,每条规则会对应生成一个物理节点。比如节点内根据Calcite生成的sql的执行步骤,会进行codegen出DataSet的执行Function代码,

  1. package org.apache.Flink.table.plan.rules
  2. /**
  3. * RuleSet to optimize plans for batch / DataSet execution
  4. */
  5. val DATASET_OPT_RULES: RuleSet = RuleSets.ofList(
  6. // translate to Flink DataSet nodes
  7. DataSetWindowAggregateRule.INSTANCE,
  8. DataSetAggregateRule.INSTANCE,
  9. DataSetDistinctRule.INSTANCE,
  10. DataSetCalcRule.INSTANCE,
  11. DataSetPythonCalcRule.INSTANCE,
  12. DataSetJoinRule.INSTANCE,
  13. DataSetSingleRowJoinRule.INSTANCE,
  14. DataSetScanRule.INSTANCE,
  15. DataSetUnionRule.INSTANCE,
  16. DataSetIntersectRule.INSTANCE,
  17. DataSetMinusRule.INSTANCE,
  18. DataSetSortRule.INSTANCE,
  19. DataSetValuesRule.INSTANCE,
  20. DataSetCorrelateRule.INSTANCE,
  21. BatchTableSourceScanRule.INSTANCE
  22. )
  23. /**
  24. * RuleSet to optimize plans for stream / DataStream execution
  25. */
  26. val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList(
  27. // translate to DataStream nodes
  28. DataStreamSortRule.INSTANCE,
  29. DataStreamGroupAggregateRule.INSTANCE,
  30. DataStreamOverAggregateRule.INSTANCE,
  31. DataStreamGroupWindowAggregateRule.INSTANCE,
  32. DataStreamCalcRule.INSTANCE,
  33. DataStreamScanRule.INSTANCE,
  34. DataStreamUnionRule.INSTANCE,
  35. DataStreamValuesRule.INSTANCE,
  36. DataStreamCorrelateRule.INSTANCE,
  37. DataStreamWindowJoinRule.INSTANCE,
  38. DataStreamJoinRule.INSTANCE,
  39. DataStreamTemporalTableJoinRule.INSTANCE,
  40. StreamTableSourceScanRule.INSTANCE,
  41. DataStreamMatchRule.INSTANCE,
  42. DataStreamTableAggregateRule.INSTANCE,
  43. DataStreamGroupWindowTableAggregateRule.INSTANCE,
  44. DataStreamPythonCalcRule.INSTANCE
  45. )
  46. package org.apache.Flink.table.planner.plan.rules
  47. /**
  48. * RuleSet to do physical optimize for stream
  49. */
  50. val PHYSICAL_OPT_RULES: RuleSet = RuleSets.ofList(
  51. FlinkExpandConversionRule.STREAM_INSTANCE,
  52. // source
  53. StreamExecDataStreamScanRule.INSTANCE,
  54. StreamExecTableSourceScanRule.INSTANCE,
  55. StreamExecIntermediateTableScanRule.INSTANCE,
  56. StreamExecWatermarkAssignerRule.INSTANCE,
  57. StreamExecValuesRule.INSTANCE,
  58. // calc
  59. StreamExecCalcRule.INSTANCE,
  60. StreamExecPythonCalcRule.INSTANCE,
  61. // union
  62. StreamExecUnionRule.INSTANCE,
  63. // sort
  64. StreamExecSortRule.INSTANCE,
  65. StreamExecLimitRule.INSTANCE,
  66. StreamExecSortLimitRule.INSTANCE,
  67. StreamExecTemporalSortRule.INSTANCE,
  68. // rank
  69. StreamExecRankRule.INSTANCE,
  70. StreamExecDeduplicateRule.RANK_INSTANCE,
  71. // expand
  72. StreamExecExpandRule.INSTANCE,
  73. // group agg
  74. StreamExecGroupAggregateRule.INSTANCE,
  75. StreamExecGroupTableAggregateRule.INSTANCE,
  76. // over agg
  77. StreamExecOverAggregateRule.INSTANCE,
  78. // window agg
  79. StreamExecGroupWindowAggregateRule.INSTANCE,
  80. StreamExecGroupWindowTableAggregateRule.INSTANCE,
  81. // join
  82. StreamExecJoinRule.INSTANCE,
  83. StreamExecWindowJoinRule.INSTANCE,
  84. StreamExecTemporalJoinRule.INSTANCE,
  85. StreamExecLookupJoinRule.SNAPSHOT_ON_TABLESCAN,
  86. StreamExecLookupJoinRule.SNAPSHOT_ON_CALC_TABLESCAN,
  87. // CEP
  88. StreamExecMatchRule.INSTANCE,
  89. // correlate
  90. StreamExecConstantTableFunctionScanRule.INSTANCE,
  91. StreamExecCorrelateRule.INSTANCE,
  92. // sink
  93. StreamExecSinkRule.INSTANCE
  94. )

一个具体的Rule举例 ,这里是 Union 的 Rule :

  1. package org.apache.Flink.table.planner.plan.rules.physical.stream
  2. class StreamExecUnionRule
  3. extends ConverterRule(
  4. classOf[FlinkLogicalUnion],
  5. FlinkConventions.LOGICAL,
  6. FlinkConventions.STREAM_PHYSICAL,
  7. "StreamExecUnionRule") {
  8. def convert(rel: RelNode): RelNode = {
  9. val union: FlinkLogicalUnion = rel.asInstanceOf[FlinkLogicalUnion]
  10. val traitSet: RelTraitSet = rel.getTraitSet.replace(FlinkConventions.STREAM_PHYSICAL)
  11. val newInputs = union.getInputs.map(RelOptRule.convert(_, FlinkConventions.STREAM_PHYSICAL))
  12. // 这里本条规则会对应生成一个物理节点。节点内根据Calcite生成的sql的执行步骤,会进行codegen出Stream的执行Function代码,
  13. new StreamExecUnion(
  14. rel.getCluster,
  15. traitSet,
  16. newInputs,
  17. union.all,
  18. rel.getRowType)
  19. }
  20. }
  21. }
  22. public class VolcanoPlanner extends AbstractRelOptPlanner {
  23. public RelNode findBestExp() {
  24. // 在这里会对Rule进行匹配调用
  25. match.onMatch();
  26. return cheapest;
  27. }
  28. }
  29. match = {VolcanoRuleMatch@6252} "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]"
  30. targetSet = {RelSet@6298}
  31. targetSubset = null
  32. digest = "rule [StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)] rels [rel#215:FlinkLogicalUnion.LOGICAL.any.None: 0.false.UNKNOWN(input#0=RelSubset#211,input#1=RelSubset#214,all=true)]"
  33. cachedImportance = 0.81
  34. volcanoPlanner = {VolcanoPlanner@6259}
  35. generatedRelList = null
  36. id = 521
  37. operand0 = {RelOptRule$ConverterRelOptRuleOperand@6247}
  38. nodeInputs = {RegularImmutableBiMap@6299} size = 0
  39. rule = {StreamExecUnionRule@6241} "StreamExecUnionRule(in:LOGICAL,out:STREAM_PHYSICAL)"
  40. rels = {RelNode[1]@6300}
  41. planner = {VolcanoPlanner@6259}
  42. parents = null
  43. // 调用栈
  44. create:106, FlinkLogicalJoin$ (org.apache.flink.table.planner.plan.nodes.logical)
  45. convert:92, FlinkLogicalJoinConverter (org.apache.flink.table.planner.plan.nodes.logical)
  46. onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
  47. onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  48. findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
  49. run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
  50. optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
  51. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  52. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  53. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  54. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  55. foreach:891, Iterator$class (scala.collection)
  56. foreach:1334, AbstractIterator (scala.collection)
  57. foreach:72, IterableLike$class (scala.collection)
  58. foreach:54, AbstractIterable (scala.collection)
  59. foldLeft:157, TraversableOnce$class (scala.collection)
  60. foldLeft:104, AbstractTraversable (scala.collection)
  61. optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
  62. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  63. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  64. optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  65. optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
  66. translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
  67. toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  68. toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  69. toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
  70. main:75, SimpleOuterJoin$ (spendreport)
  71. main:-1, SimpleOuterJoin (spendreport)
  72. // 调用栈
  73. convert:46, StreamExecUnionRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
  74. onMatch:144, ConverterRule (org.apache.calcite.rel.convert)
  75. onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  76. findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
  77. run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
  78. optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
  79. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  80. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  81. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  82. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  83. foreach:891, Iterator$class (scala.collection)
  84. foreach:1334, AbstractIterator (scala.collection)
  85. foreach:72, IterableLike$class (scala.collection)
  86. foreach:54, AbstractIterable (scala.collection)
  87. foldLeft:157, TraversableOnce$class (scala.collection)
  88. foldLeft:104, AbstractTraversable (scala.collection)
  89. optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
  90. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  91. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  92. optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  93. optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
  94. translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
  95. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  96. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  97. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  98. main:89, StreamSQLExample$ (spendreport)
  99. main:-1, StreamSQLExample (spendreport)

另一个具体的Rule举例 ,这里是 Join的优化,StreamExecJoin的生成

  1. class StreamExecJoinRule {
  2. override def onMatch(call: RelOptRuleCall): Unit = {
  3. val newJoin = new StreamExecJoin(
  4. join.getCluster,
  5. providedTraitSet,
  6. newLeft,
  7. newRight,
  8. join.getCondition,
  9. join.getJoinType)
  10. call.transformTo(newJoin)
  11. }
  12. }
  13. newJoin = {StreamExecJoin@6326} "StreamExecJoin#152"
  14. cluster = {FlinkRelOptCluster@5072}
  15. joinType = {JoinRelType@5038} "LEFT"
  16. LOG = null
  17. transformation = null
  18. bitmap$trans$0 = false
  19. CommonPhysicalJoin.joinType = {JoinRelType@5038} "LEFT"
  20. filterNulls = null
  21. keyPairs = null
  22. flinkJoinType = null
  23. inputRowType = null
  24. bitmap$0 = 0
  25. condition = {RexCall@5041} "=($0, $2)"
  26. variablesSet = {RegularImmutableSet@6342} size = 0
  27. Join.joinType = {JoinRelType@5038} "LEFT"
  28. joinInfo = {JoinInfo@6343}
  29. left = {RelSubset@6328} "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  30. bestCost = {FlinkCost$$anon$1@6344} "{inf}"
  31. set = {RelSet@6348}
  32. best = null
  33. timestamp = 0
  34. boosted = false
  35. desc = "rel#150:Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  36. rowType = {RelRecordType@6349} "RecordType(VARCHAR(2147483647) orderId, VARCHAR(2147483647) productName)"
  37. digest = "Subset#5.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  38. cluster = {FlinkRelOptCluster@5072}
  39. id = 150
  40. traitSet = {RelTraitSet@6336} size = 5
  41. right = {RelSubset@6329} "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  42. bestCost = {FlinkCost$$anon$1@6344} "{inf}"
  43. set = {RelSet@6345}
  44. best = null
  45. timestamp = 0
  46. boosted = false
  47. desc = "rel#151:Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  48. rowType = null
  49. digest = "Subset#6.STREAM_PHYSICAL.hash[0]true.None: 0.false.UNKNOWN"
  50. cluster = {FlinkRelOptCluster@5072}
  51. id = 151
  52. traitSet = {RelTraitSet@6336} size = 5
  53. desc = "StreamExecJoin#152"
  54. rowType = null
  55. digest = "StreamExecJoin#152"
  56. AbstractRelNode.cluster = {FlinkRelOptCluster@5072}
  57. id = 152
  58. traitSet = {RelTraitSet@6327} size = 5
  59. // 调用栈
  60. <init>:58, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
  61. onMatch:128, StreamExecJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
  62. onMatch:208, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  63. findBestExp:631, VolcanoPlanner (org.apache.calcite.plan.volcano)
  64. run:327, Programs$RuleSetProgram (org.apache.calcite.tools)
  65. optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
  66. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  67. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  68. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  69. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  70. foreach:891, Iterator$class (scala.collection)
  71. foreach:1334, AbstractIterator (scala.collection)
  72. foreach:72, IterableLike$class (scala.collection)
  73. foreach:54, AbstractIterable (scala.collection)
  74. foldLeft:157, TraversableOnce$class (scala.collection)
  75. foldLeft:104, AbstractTraversable (scala.collection)
  76. optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
  77. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  78. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  79. optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  80. optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
  81. translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
  82. toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  83. toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  84. toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
  85. main:75, SimpleOuterJoin$ (spendreport)
  86. main:-1, SimpleOuterJoin (spendreport)

这时候脉络图如下

  1. // NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
  2. *
  3. * +-----> "left outer JOIN" (SQL statement)
  4. * |
  5. * |
  6. * SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
  7. * |
  8. * |
  9. * +-----> SqlJoin (SqlNode)
  10. * |
  11. * |
  12. * SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
  13. * |
  14. * |
  15. * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
  16. * |
  17. * |
  18. * FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules
  19. * VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan
  20. * |
  21. * |
  22. * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,逻辑执行计划
  23. * |
  24. * |
  25. * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin
  26. * VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
  27. * |
  28. * |
  29. * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
  30. * |
  31. * |
  32. * StreamExecJoin.translateToPlanInternal // 作用是生成 StreamOperator, 即Flink算子
  33. * |
  34. * |
  35. * +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask
  36. * |
  37. * |

Calcite 针对不同的大数据组件,将优化后的plan映射到最终的大数据引擎,如折射成Flink图。

这一块只要是递归调用各个节点DataStreamRel的translateToPlan方法,这个方法利用CodeGen元编程成Flink的各种算子。现在就相当于我们直接利用Flink的DataSet或DataStream API开发的程序。

  1. class StreamPlanner(
  2. executor: Executor,
  3. config: TableConfig,
  4. functionCatalog: FunctionCatalog,
  5. catalogManager: CatalogManager)
  6. extends PlannerBase(executor, config, functionCatalog, catalogManager, isStreamingMode = true) {
  7. override protected def translateToPlan(
  8. execNodes: util.List[ExecNode[_, _]]): util.List[Transformation[_]] = {
  9. execNodes.map {
  10. case node: StreamExecNode[_] => node.translateToPlan(this)
  11. case _ =>
  12. throw new TableException("Cannot generate DataStream due to an invalid logical plan. " +
  13. "This is a bug and should not happen. Please file an issue.")
  14. }
  15. }
  16. }
  17. package org.apache.Flink.table.planner.plan.nodes.physical.stream
  18. class StreamExecUnion(
  19. cluster: RelOptCluster,
  20. traitSet: RelTraitSet,
  21. inputRels: util.List[RelNode],
  22. all: Boolean,
  23. outputRowType: RelDataType)
  24. extends Union(cluster, traitSet, inputRels, all)
  25. with StreamPhysicalRel
  26. with StreamExecNode[BaseRow] {
  27. // 这里就生成了Flink算子
  28. override protected def translateToPlanInternal(
  29. planner: StreamPlanner): Transformation[BaseRow] = {
  30. val transformations = getInputNodes.map {
  31. input => input.translateToPlan(planner).asInstanceOf[Transformation[BaseRow]]
  32. }
  33. new UnionTransformation(transformations)
  34. }
  35. }
  36. // 调用栈
  37. translateToPlanInternal:85, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  38. translateToPlanInternal:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  39. translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
  40. translateToPlan:39, StreamExecUnion (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  41. translateToTransformation:184, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  42. translateToPlanInternal:153, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  43. translateToPlanInternal:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  44. translateToPlan:58, ExecNode$class (org.apache.Flink.table.planner.plan.nodes.exec)
  45. translateToPlan:48, StreamExecSink (org.apache.Flink.table.planner.plan.nodes.physical.stream)
  46. apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
  47. apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.Flink.table.planner.delegation)
  48. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  49. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  50. foreach:891, Iterator$class (scala.collection)
  51. foreach:1334, AbstractIterator (scala.collection)
  52. foreach:72, IterableLike$class (scala.collection)
  53. foreach:54, AbstractIterable (scala.collection)
  54. map:234, TraversableLike$class (scala.collection)
  55. map:104, AbstractTraversable (scala.collection)
  56. translateToPlan:59, StreamPlanner (org.apache.Flink.table.planner.delegation)
  57. translate:153, PlannerBase (org.apache.Flink.table.planner.delegation)
  58. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  59. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  60. toAppendStream:101, TableConversions (org.apache.Flink.table.api.scala)
  61. main:89, StreamSQLExample$ (spendreport)
  62. main:-1, StreamSQLExample (spendreport)

此时脉络图补充完全。

  1. // NOTE : 执行顺序是从上至下," -----> " 表示生成的实例类型
  2. *
  3. * +-----> "left outer JOIN" (SQL statement)
  4. * |
  5. * |
  6. * SqlParser.parseQuery // SQL 解析阶段,生成AST(抽象语法树),作用是SQL–>SqlNode
  7. * |
  8. * |
  9. * +-----> SqlJoin (SqlNode)
  10. * |
  11. * |
  12. * SqlToRelConverter.convertQuery // 语义分析,生成逻辑计划,作用是SqlNode–>RelNode
  13. * |
  14. * |
  15. * +-----> LogicalProject (RelNode) // Abstract Syntax Tree,未优化的RelNode
  16. * |
  17. * |
  18. * FlinkLogicalJoinConverter (RelOptRule) // Flink定制的优化rules
  19. * VolcanoRuleCall.onMatch // 基于Flink定制的一些优化rules去优化 Logical Plan
  20. * |
  21. * |
  22. * +-----> FlinkLogicalJoin (RelNode) // Optimized Logical Plan,逻辑执行计划
  23. * |
  24. * |
  25. * StreamExecJoinRule (RelOptRule) // Rule that converts FlinkLogicalJoin without window bounds in join condition to StreamExecJoin
  26. * VolcanoRuleCall.onMatch // 基于Flink rules将optimized LogicalPlan转成Flink物理执行计划
  27. * |
  28. * |
  29. * +-----> StreamExecJoin (FlinkRelNode) // Stream physical RelNode,物理执行计划
  30. * |
  31. * |
  32. * StreamExecJoin.translateToPlanInternal // 作用是生成 StreamOperator, 即Flink算子
  33. * |
  34. * |
  35. * +-----> StreamingJoinOperator (StreamOperator) // Streaming unbounded Join operator in StreamTask
  36. * |
  37. * |
  38. * StreamTwoInputProcessor.processRecord1// 在TwoInputStreamTask调用StreamingJoinOperator,真实的执行
  39. * |
  40. * |

运行时候,则会在StreamTask中进行业务操作,这就是我们熟悉的操作了。调用栈举例如下

  1. processElement:150, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
  2. emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
  3. processInput:69, StreamOneInputProcessor (org.apache.Flink.streaming.runtime.io)
  4. processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  5. runDefaultAction:-1, 354713989 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$710)
  6. runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
  7. runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  8. invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  9. doRun:707, Task (org.apache.Flink.runtime.taskmanager)
  10. run:532, Task (org.apache.Flink.runtime.taskmanager)
  11. run:748, Thread (java.lang)

下面是如何具体生成各种执行计划的代码

  1. import org.apache.Flink.api.java.utils.ParameterTool
  2. import org.apache.Flink.api.scala._
  3. import org.apache.Flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
  4. import org.apache.Flink.table.api.EnvironmentSettings
  5. import org.apache.Flink.table.api.scala._
  6. object StreamSQLExample {
  7. // *************************************************************************
  8. // PROGRAM
  9. // *************************************************************************
  10. def main(args: Array[String]): Unit = {
  11. val params = ParameterTool.fromArgs(args)
  12. val planner = if (params.has("planner")) params.get("planner") else "Flink"
  13. // set up execution environment
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. val tEnv = if (planner == "blink") { // use blink planner in streaming mode
  16. val settings = EnvironmentSettings.newInstance()
  17. .useBlinkPlanner()
  18. .inStreamingMode()
  19. .build()
  20. StreamTableEnvironment.create(env, settings)
  21. } else if (planner == "Flink") { // use Flink planner in streaming mode
  22. StreamTableEnvironment.create(env)
  23. } else {
  24. System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
  25. "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " +
  26. "example uses Flink planner or blink planner.")
  27. return
  28. }
  29. val orderA: DataStream[Order] = env.fromCollection(Seq(
  30. Order(1L, "beer", 3),
  31. Order(1L, "diaper", 4),
  32. Order(3L, "rubber", 2)))
  33. val orderB: DataStream[Order] = env.fromCollection(Seq(
  34. Order(2L, "pen", 3),
  35. Order(2L, "rubber", 3),
  36. Order(4L, "beer", 1)))
  37. // convert DataStream to Table
  38. val tableA = tEnv.fromDataStream(orderA, 'user, 'product, 'amount)
  39. // register DataStream as Table
  40. tEnv.registerDataStream("OrderB", orderB, 'user, 'product, 'amount)
  41. // union the two tables
  42. val result = tEnv.sqlQuery(
  43. s"""
  44. |SELECT * FROM $tableA WHERE amount > 2
  45. |UNION ALL
  46. |SELECT * FROM OrderB WHERE amount < 2
  47. """.stripMargin)
  48. result.toAppendStream[Order].print()
  49. print(tEnv.explain(result))
  50. env.execute()
  51. }
  52. // *************************************************************************
  53. // USER DATA TYPES
  54. // *************************************************************************
  55. case class Order(user: Long, product: String, amount: Int)
  56. }

整个流程的转换大体就像这样:

  1. == Abstract Syntax Tree ==
  2. LogicalUnion(all=[true])
  3. :- LogicalProject(user=[$0], product=[$1], amount=[$2])
  4. : +- LogicalFilter(condition=[>($2, 2)])
  5. : +- LogicalTableScan(table=[[default_catalog, default_database, UnnamedTable$0]])
  6. +- LogicalProject(user=[$0], product=[$1], amount=[$2])
  7. +- LogicalFilter(condition=[<($2, 2)])
  8. +- LogicalTableScan(table=[[default_catalog, default_database, OrderB]])
  9. == Optimized Logical Plan ==
  10. Union(all=[true], union=[user, product, amount])
  11. :- Calc(select=[user, product, amount], where=[>(amount, 2)])
  12. : +- DataStreamScan(table=[[default_catalog, default_database, UnnamedTable$0]], fields=[user, product, amount])
  13. +- Calc(select=[user, product, amount], where=[<(amount, 2)])
  14. +- DataStreamScan(table=[[default_catalog, default_database, OrderB]], fields=[user, product, amount])
  15. == Physical Execution Plan ==
  16. Stage 1 : Data Source
  17. content : Source: Collection Source
  18. Stage 2 : Data Source
  19. content : Source: Collection Source
  20. Stage 10 : Operator
  21. content : SourceConversion(table=[default_catalog.default_database.UnnamedTable$0], fields=[user, product, amount])
  22. ship_strategy : FORWARD
  23. Stage 11 : Operator
  24. content : Calc(select=[user, product, amount], where=[(amount > 2)])
  25. ship_strategy : FORWARD
  26. Stage 12 : Operator
  27. content : SourceConversion(table=[default_catalog.default_database.OrderB], fields=[user, product, amount])
  28. ship_strategy : FORWARD
  29. Stage 13 : Operator
  30. content : Calc(select=[user, product, amount], where=[(amount < 2)])
  31. ship_strategy : FORWARD
  1. import java.sql.Timestamp
  2. import org.apache.Flink.api.java.utils.ParameterTool
  3. import org.apache.Flink.api.scala._
  4. import org.apache.Flink.streaming.api.TimeCharacteristic
  5. import org.apache.Flink.streaming.api.scala.StreamExecutionEnvironment
  6. import org.apache.Flink.table.api.{EnvironmentSettings, TableEnvironment}
  7. import org.apache.Flink.table.api.scala._
  8. import org.apache.Flink.types.Row
  9. import scala.collection.mutable
  10. object SimpleOuterJoin {
  11. def main(args: Array[String]): Unit = {
  12. val params = ParameterTool.fromArgs(args)
  13. val planner = if (params.has("planner")) params.get("planner") else "Flink"
  14. val env = StreamExecutionEnvironment.getExecutionEnvironment
  15. val tEnv = if (planner == "blink") { // use blink planner in streaming mode
  16. val settings = EnvironmentSettings.newInstance()
  17. .useBlinkPlanner()
  18. .inStreamingMode()
  19. .build()
  20. StreamTableEnvironment.create(env, settings)
  21. } else if (planner == "Flink") { // use Flink planner in streaming mode
  22. StreamTableEnvironment.create(env)
  23. } else {
  24. System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
  25. "where planner (it is either Flink or blink, and the default is Flink) indicates whether the " +
  26. "example uses Flink planner or blink planner.")
  27. return
  28. }
  29. env.setParallelism(1)
  30. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  31. // 构造订单数据
  32. val ordersData = new mutable.MutableList[(String, String)]
  33. ordersData.+=(("001", "iphone"))
  34. ordersData.+=(("002", "mac"))
  35. ordersData.+=(("003", "book"))
  36. ordersData.+=(("004", "cup"))
  37. // 构造付款表
  38. val paymentData = new mutable.MutableList[(String, String)]
  39. paymentData.+=(("001", "alipay"))
  40. paymentData.+=(("002", "card"))
  41. paymentData.+=(("003", "card"))
  42. paymentData.+=(("004", "alipay"))
  43. val orders = env
  44. .fromCollection(ordersData)
  45. .toTable(tEnv, 'orderId, 'productName)
  46. val ratesHistory = env
  47. .fromCollection(paymentData)
  48. .toTable(tEnv, 'orderId, 'payType)
  49. tEnv.registerTable("Orders", orders)
  50. tEnv.registerTable("Payment", ratesHistory)
  51. var sqlQuery =
  52. """
  53. |SELECT
  54. | o.orderId,
  55. | o.productName,
  56. | p.payType
  57. |FROM
  58. | Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId
  59. |""".stripMargin
  60. tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
  61. val result = tEnv.scan("TemporalJoinResult").toRetractStream[Row]
  62. result.print()
  63. print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
  64. env.execute()
  65. }
  66. }

整个流程的转换如下:

  1. == Abstract Syntax Tree ==
  2. LogicalProject(orderId=[$0], productName=[$1], payType=[$3])
  3. +- LogicalJoin(condition=[=($0, $2)], joinType=[left])
  4. :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
  5. +- LogicalTableScan(table=[[default_catalog, default_database, Payment]])
  6. == Optimized Logical Plan ==
  7. Calc(select=[orderId, productName, payType])
  8. +- Join(joinType=[LeftOuterJoin], where=[=(orderId, orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
  9. :- Exchange(distribution=[hash[orderId]])
  10. : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName])
  11. +- Exchange(distribution=[hash[orderId]])
  12. +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType])
  13. == Physical Execution Plan ==
  14. Stage 1 : Data Source
  15. content : Source: Collection Source
  16. Stage 2 : Data Source
  17. content : Source: Collection Source
  18. Stage 11 : Operator
  19. content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName])
  20. ship_strategy : FORWARD
  21. Stage 13 : Operator
  22. content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType])
  23. ship_strategy : FORWARD
  24. Stage 15 : Operator
  25. content : Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])
  26. ship_strategy : HASH
  27. Stage 16 : Operator
  28. content : Calc(select=[orderId, productName, payType])
  29. ship_strategy : FORWARD
  30. 输出结果是
  31. (true,001,iphone,null)
  32. (false,001,iphone,null)
  33. (true,001,iphone,alipay)
  34. (true,002,mac,null)
  35. (false,002,mac,null)
  36. (true,002,mac,card)
  37. (true,003,book,null)
  38. (false,003,book,null)
  39. (true,003,book,card)
  40. (true,004,cup,null)
  41. (false,004,cup,null)
  42. (true,004,cup,alipay)

下面是调试时候的调用栈,这个可以给大家参考

  1. // 调用Rule进行优化
  2. matches:49, StreamExecJoinRule (org.apache.Flink.table.planner.plan.rules.physical.stream)
  3. matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  4. matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  5. matchRecurse:370, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  6. match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  7. fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano)
  8. registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano)
  9. register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
  10. ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
  11. ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
  12. onRegister:329, AbstractRelNode (org.apache.calcite.rel)
  13. registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
  14. register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
  15. ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
  16. ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
  17. onRegister:329, AbstractRelNode (org.apache.calcite.rel)
  18. registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
  19. register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
  20. ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
  21. changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
  22. run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
  23. optimize:64, FlinkVolcanoProgram (org.apache.Flink.table.planner.plan.optimize.program)
  24. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  25. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.Flink.table.planner.plan.optimize.program)
  26. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  27. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  28. foreach:891, Iterator$class (scala.collection)
  29. foreach:1334, AbstractIterator (scala.collection)
  30. foreach:72, IterableLike$class (scala.collection)
  31. foreach:54, AbstractIterable (scala.collection)
  32. foldLeft:157, TraversableOnce$class (scala.collection)
  33. foldLeft:104, AbstractTraversable (scala.collection)
  34. optimize:57, FlinkChainedProgram (org.apache.Flink.table.planner.plan.optimize.program)
  35. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  36. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  37. optimize:77, CommonSubGraphBasedOptimizer (org.apache.Flink.table.planner.plan.optimize)
  38. optimize:248, PlannerBase (org.apache.Flink.table.planner.delegation)
  39. translate:151, PlannerBase (org.apache.Flink.table.planner.delegation)
  40. toDataStream:210, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  41. toRetractStream:127, StreamTableEnvironmentImpl (org.apache.Flink.table.api.scala.internal)
  42. toRetractStream:146, TableConversions (org.apache.Flink.table.api.scala)
  43. main:75, SimpleOuterJoin$ (spendreport)
  44. main:-1, SimpleOuterJoin (spendreport)
  45. // 调用Rule进行转换到Flink逻辑算子
  46. translateToPlanInternal:140, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
  47. translateToPlanInternal:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
  48. translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
  49. translateToPlan:51, StreamExecJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
  50. translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
  51. translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
  52. translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
  53. translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream)
  54. translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  55. translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  56. translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  57. translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
  58. translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  59. apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
  60. apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
  61. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  62. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  63. foreach:891, Iterator$class (scala.collection)
  64. foreach:1334, AbstractIterator (scala.collection)
  65. foreach:72, IterableLike$class (scala.collection)
  66. foreach:54, AbstractIterable (scala.collection)
  67. map:234, TraversableLike$class (scala.collection)
  68. map:104, AbstractTraversable (scala.collection)
  69. translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation)
  70. translate:153, PlannerBase (org.apache.flink.table.planner.delegation)
  71. toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  72. toRetractStream:127, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  73. toRetractStream:146, TableConversions (org.apache.flink.table.api.scala)
  74. main:75, SimpleOuterJoin$ (spendreport)
  75. main:-1, SimpleOuterJoin (spendreport)
  76. // 运行时候
  77. @Internal
  78. public final class StreamTwoInputProcessor<IN1, IN2> implements StreamInputProcessor {
  79. private void processRecord2(
  80. StreamRecord<IN2> record,
  81. TwoInputStreamOperator<IN1, IN2, ?> streamOperator,
  82. Counter numRecordsIn) throws Exception {
  83. streamOperator.setKeyContextElement2(record);
  84. streamOperator.processElement2(record);
  85. postProcessRecord(numRecordsIn);
  86. }
  87. }
  88. // 能看出来,streamOperator就是StreamingJoinOperator
  89. streamOperator = {StreamingJoinOperator@10943}
  90. leftIsOuter = true
  91. rightIsOuter = false
  92. outRow = {JoinedRow@10948} "JoinedRow{row1=org.apache.flink.table.dataformat.BinaryRow@dc6a1b67, row2=(+|null,null)}"
  93. leftNullRow = {GenericRow@10949} "(+|null,null)"
  94. rightNullRow = {GenericRow@10950} "(+|null,null)"
  95. leftRecordStateView = {OuterJoinRecordStateViews$InputSideHasNoUniqueKey@10945}
  96. rightRecordStateView = {JoinRecordStateViews$InputSideHasNoUniqueKey@10946}
  97. generatedJoinCondition = {GeneratedJoinCondition@10951}
  98. leftType = {BaseRowTypeInfo@10952} "BaseRow(orderId: STRING, productName: STRING)"
  99. rightType = {BaseRowTypeInfo@10953} "BaseRow(orderId: STRING, payType: STRING)"
  100. leftInputSideSpec = {JoinInputSideSpec@10954} "NoUniqueKey"
  101. rightInputSideSpec = {JoinInputSideSpec@10955} "NoUniqueKey"
  102. nullFilterKeys = {int[1]@10956}
  103. nullSafe = false
  104. filterAllNulls = true
  105. minRetentionTime = 0
  106. stateCleaningEnabled = false
  107. joinCondition = {AbstractStreamingJoinOperator$JoinConditionWithNullFilters@10947}
  108. collector = {TimestampedCollector@10957}
  109. chainingStrategy = {ChainingStrategy@10958} "HEAD"
  110. container = {TwoInputStreamTask@10959} "Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey]) -> Calc(select=[orderId, productName, payType]) -> SinkConversionToTuple2 -> Sink: Print to Std. Out (1/1)"
  111. config = {StreamConfig@10960} "\n=======================Stream Config=======================\nNumber of non-chained inputs: 2\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Join(joinType=[LeftOuterJoin], where=[(orderId = orderId0)], select=[orderId, productName, orderId0, payType], leftInputSpec=[NoUniqueKey], rightInputSpec=[NoUniqueKey])-7 -> Calc(select=[orderId, productName, payType])-8, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: SimpleOperatorFactory\nBuffer timeout: 100\nState Monitoring: false\n\n\n---------------------\nChained task configs\n---------------------\n{8=\n=======================Stream Config=======================\nNumber of non-chained inputs: 0\nNumber of non-chained outputs: 0\nOutput names: []\nPartitioning:\nChained subtasks: [(Calc(select=[orderId, productName, payType])-8 -> SinkConversionToTuple2-9, typeNumber=0, selectedNames=[], outputPartitioner=FORWARD, outputTag=null)]\nOperator: CodeGenOperatorFactory\nBuffer timeout: "
  112. output = {AbstractStreamOperator$CountingOutput@10961}
  113. runtimeContext = {StreamingRuntimeContext@10962}
  114. stateKeySelector1 = {BinaryRowKeySelector@10963}
  115. stateKeySelector2 = {BinaryRowKeySelector@10964}
  116. keyedStateBackend = {HeapKeyedStateBackend@10965} "HeapKeyedStateBackend"
  117. keyedStateStore = {DefaultKeyedStateStore@10966}
  118. operatorStateBackend = {DefaultOperatorStateBackend@10967}
  119. metrics = {OperatorMetricGroup@10968}
  120. latencyStats = {LatencyStats@10969}
  121. processingTimeService = {ProcessingTimeServiceImpl@10970}
  122. timeServiceManager = {InternalTimeServiceManager@10971}
  123. combinedWatermark = -9223372036854775808
  124. input1Watermark = -9223372036854775808
  125. input2Watermark = -9223372036854775808
  126. // 处理table 1
  127. processElement1:118, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
  128. processRecord1:135, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
  129. lambda$new$0:100, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
  130. accept:-1, 169462196 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$733)
  131. emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io)
  132. processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
  133. emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
  134. processInput:182, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
  135. processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  136. runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713)
  137. runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
  138. runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  139. invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  140. doRun:707, Task (org.apache.Flink.runtime.taskmanager)
  141. run:532, Task (org.apache.Flink.runtime.taskmanager)
  142. run:748, Thread (java.lang)
  143. // 处理table 2
  144. processElement2:123, StreamingJoinOperator (org.apache.Flink.table.runtime.operators.join.stream)
  145. processRecord2:145, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
  146. lambda$new$1:107, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
  147. accept:-1, 76811487 (org.apache.Flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$734)
  148. emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.Flink.streaming.runtime.io)
  149. processElement:151, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
  150. emitNext:128, StreamTaskNetworkInput (org.apache.Flink.streaming.runtime.io)
  151. processInput:185, StreamTwoInputProcessor (org.apache.Flink.streaming.runtime.io)
  152. processInput:311, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  153. runDefaultAction:-1, 1284793893 (org.apache.Flink.streaming.runtime.tasks.StreamTask$$Lambda$713)
  154. runMailboxLoop:187, MailboxProcessor (org.apache.Flink.streaming.runtime.tasks.mailbox)
  155. runMailboxLoop:487, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  156. invoke:470, StreamTask (org.apache.Flink.streaming.runtime.tasks)
  157. doRun:707, Task (org.apache.Flink.runtime.taskmanager)
  158. run:532, Task (org.apache.Flink.runtime.taskmanager)
  159. run:748, Thread (java.lang)
  160. // 处理table 1
  161. processRecord1:134, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
  162. lambda$new$0:100, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
  163. accept:-1, 230607815 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$735)
  164. emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io)
  165. processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
  166. emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
  167. processInput:182, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
  168. processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks)
  169. runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718)
  170. runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
  171. runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks)
  172. invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks)
  173. doRun:707, Task (org.apache.flink.runtime.taskmanager)
  174. run:532, Task (org.apache.flink.runtime.taskmanager)
  175. run:748, Thread (java.lang)
  176. // 处理table 2
  177. processRecord2:144, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
  178. lambda$new$1:107, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
  179. accept:-1, 212261435 (org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$$Lambda$736)
  180. emitRecord:362, StreamTwoInputProcessor$StreamTaskNetworkOutput (org.apache.flink.streaming.runtime.io)
  181. processElement:151, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
  182. emitNext:128, StreamTaskNetworkInput (org.apache.flink.streaming.runtime.io)
  183. processInput:185, StreamTwoInputProcessor (org.apache.flink.streaming.runtime.io)
  184. processInput:311, StreamTask (org.apache.flink.streaming.runtime.tasks)
  185. runDefaultAction:-1, 33038573 (org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$718)
  186. runMailboxLoop:187, MailboxProcessor (org.apache.flink.streaming.runtime.tasks.mailbox)
  187. runMailboxLoop:487, StreamTask (org.apache.flink.streaming.runtime.tasks)
  188. invoke:470, StreamTask (org.apache.flink.streaming.runtime.tasks)
  189. doRun:707, Task (org.apache.flink.runtime.taskmanager)
  190. run:532, Task (org.apache.flink.runtime.taskmanager)
  191. run:748, Thread (java.lang)
  1. import java.sql.Timestamp
  2. import org.apache.flink.api.java.utils.ParameterTool
  3. import org.apache.flink.api.scala._
  4. import org.apache.flink.streaming.api.TimeCharacteristic
  5. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  6. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  7. import org.apache.flink.streaming.api.windowing.time.Time
  8. import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}
  9. import org.apache.flink.table.api.scala._
  10. import org.apache.flink.types.Row
  11. import scala.collection.mutable
  12. import java.sql.Timestamp
  13. import org.apache.flink.api.scala._
  14. import org.apache.flink.streaming.api.TimeCharacteristic
  15. import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
  16. import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
  17. import org.apache.flink.streaming.api.windowing.time.Time
  18. import org.apache.flink.table.api.TableEnvironment
  19. import org.apache.flink.table.api.scala._
  20. import org.apache.flink.types.Row
  21. import scala.collection.mutable
  22. object SimpleTimeIntervalJoinA {
  23. def main(args: Array[String]): Unit = {
  24. val params = ParameterTool.fromArgs(args)
  25. val planner = if (params.has("planner")) params.get("planner") else "flink"
  26. val env = StreamExecutionEnvironment.getExecutionEnvironment
  27. val tEnv = if (planner == "blink") { // use blink planner in streaming mode
  28. val settings = EnvironmentSettings.newInstance()
  29. .useBlinkPlanner()
  30. .inStreamingMode()
  31. .build()
  32. StreamTableEnvironment.create(env, settings)
  33. } else if (planner == "flink") { // use flink planner in streaming mode
  34. StreamTableEnvironment.create(env)
  35. } else {
  36. System.err.println("The planner is incorrect. Please run 'StreamSQLExample --planner <planner>', " +
  37. "where planner (it is either flink or blink, and the default is flink) indicates whether the " +
  38. "example uses flink planner or blink planner.")
  39. return
  40. }
  41. env.setParallelism(1)
  42. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  43. // 构造订单数据
  44. val ordersData = new mutable.MutableList[(String, String, Timestamp)]
  45. ordersData.+=(("001", "iphone", new Timestamp(1545800002000L)))
  46. ordersData.+=(("002", "mac", new Timestamp(1545800003000L)))
  47. ordersData.+=(("003", "book", new Timestamp(1545800004000L)))
  48. ordersData.+=(("004", "cup", new Timestamp(1545800018000L)))
  49. // 构造付款表
  50. val paymentData = new mutable.MutableList[(String, String, Timestamp)]
  51. paymentData.+=(("001", "alipay", new Timestamp(1545803501000L)))
  52. paymentData.+=(("002", "card", new Timestamp(1545803602000L)))
  53. paymentData.+=(("003", "card", new Timestamp(1545803610000L)))
  54. paymentData.+=(("004", "alipay", new Timestamp(1545803611000L)))
  55. val orders = env
  56. .fromCollection(ordersData)
  57. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
  58. .toTable(tEnv, 'orderId, 'productName, 'orderTime.rowtime)
  59. val ratesHistory = env
  60. .fromCollection(paymentData)
  61. .assignTimestampsAndWatermarks(new TimestampExtractor[String, String]())
  62. .toTable(tEnv, 'orderId, 'payType, 'payTime.rowtime)
  63. tEnv.registerTable("Orders", orders)
  64. tEnv.registerTable("Payment", ratesHistory)
  65. var sqlQuery =
  66. """
  67. |SELECT
  68. | o.orderId,
  69. | o.productName,
  70. | p.payType,
  71. | o.orderTime,
  72. | cast(payTime as timestamp) as payTime
  73. |FROM
  74. | Orders AS o left outer JOIN Payment AS p ON o.orderId = p.orderId AND
  75. | p.payTime BETWEEN orderTime AND orderTime + INTERVAL '1' HOUR
  76. |""".stripMargin
  77. tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
  78. val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
  79. result.print()
  80. print(tEnv.explain(tEnv.sqlQuery(sqlQuery)))
  81. env.execute()
  82. }
  83. }
  84. class TimestampExtractor[T1, T2]
  85. extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)](Time.seconds(10)) {
  86. override def extractTimestamp(element: (T1, T2, Timestamp)): Long = {
  87. element._3.getTime
  88. }
  89. }

输出如下

  1. == Abstract Syntax Tree ==
  2. LogicalProject(orderId=[$0], productName=[$1], payType=[$4], orderTime=[$2], payTime=[CAST($5):TIMESTAMP(6)])
  3. +- LogicalJoin(condition=[AND(=($0, $3), >=($5, $2), <=($5, +($2, 3600000:INTERVAL HOUR)))], joinType=[left])
  4. :- LogicalTableScan(table=[[default_catalog, default_database, Orders]])
  5. +- LogicalTableScan(table=[[default_catalog, default_database, Payment]])
  6. == Optimized Logical Plan ==
  7. Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime])
  8. +- WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[AND(=(orderId, orderId0), >=(payTime, orderTime), <=(payTime, +(orderTime, 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime])
  9. :- Exchange(distribution=[hash[orderId]])
  10. : +- DataStreamScan(table=[[default_catalog, default_database, Orders]], fields=[orderId, productName, orderTime])
  11. +- Exchange(distribution=[hash[orderId]])
  12. +- DataStreamScan(table=[[default_catalog, default_database, Payment]], fields=[orderId, payType, payTime])
  13. == Physical Execution Plan ==
  14. Stage 1 : Data Source
  15. content : Source: Collection Source
  16. Stage 2 : Operator
  17. content : Timestamps/Watermarks
  18. ship_strategy : FORWARD
  19. Stage 3 : Data Source
  20. content : Source: Collection Source
  21. Stage 4 : Operator
  22. content : Timestamps/Watermarks
  23. ship_strategy : FORWARD
  24. Stage 13 : Operator
  25. content : SourceConversion(table=[default_catalog.default_database.Orders], fields=[orderId, productName, orderTime])
  26. ship_strategy : FORWARD
  27. Stage 15 : Operator
  28. content : SourceConversion(table=[default_catalog.default_database.Payment], fields=[orderId, payType, payTime])
  29. ship_strategy : FORWARD
  30. Stage 17 : Operator
  31. content : WindowJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-3600000, leftUpperBound=0, leftTimeIndex=2, rightTimeIndex=2], where=[((orderId = orderId0) AND (payTime >= orderTime) AND (payTime <= (orderTime + 3600000:INTERVAL HOUR)))], select=[orderId, productName, orderTime, orderId0, payType, payTime])
  32. ship_strategy : HASH
  33. Stage 18 : Operator
  34. content : Calc(select=[orderId, productName, payType, orderTime, CAST(CAST(payTime)) AS payTime])
  35. ship_strategy : FORWARD
  36. 001,iphone,alipay,2018-12-26T04:53:22,2018-12-26T05:51:41
  37. 002,mac,card,2018-12-26T04:53:23,2018-12-26T05:53:22
  38. 004,cup,alipay,2018-12-26T04:53:38,2018-12-26T05:53:31
  39. 003,book,null,2018-12-26T04:53:24,null

相关类以及调用栈

  1. class StreamExecWindowJoin {
  2. }
  3. class StreamExecWindowJoinRule
  4. extends ConverterRule(
  5. classOf[FlinkLogicalJoin],
  6. FlinkConventions.LOGICAL,
  7. FlinkConventions.STREAM_PHYSICAL,
  8. "StreamExecWindowJoinRule") {
  9. }
  10. matches:54, StreamExecWindowJoinRule (org.apache.flink.table.planner.plan.rules.physical.stream)
  11. matchRecurse:263, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  12. match:247, VolcanoRuleCall (org.apache.calcite.plan.volcano)
  13. fireRules:1534, VolcanoPlanner (org.apache.calcite.plan.volcano)
  14. registerImpl:1807, VolcanoPlanner (org.apache.calcite.plan.volcano)
  15. register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
  16. ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
  17. ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
  18. onRegister:329, AbstractRelNode (org.apache.calcite.rel)
  19. registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
  20. register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
  21. ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
  22. ensureRegistered:90, VolcanoPlanner (org.apache.calcite.plan.volcano)
  23. onRegister:329, AbstractRelNode (org.apache.calcite.rel)
  24. registerImpl:1668, VolcanoPlanner (org.apache.calcite.plan.volcano)
  25. register:846, VolcanoPlanner (org.apache.calcite.plan.volcano)
  26. ensureRegistered:868, VolcanoPlanner (org.apache.calcite.plan.volcano)
  27. changeTraits:529, VolcanoPlanner (org.apache.calcite.plan.volcano)
  28. run:324, Programs$RuleSetProgram (org.apache.calcite.tools)
  29. optimize:64, FlinkVolcanoProgram (org.apache.flink.table.planner.plan.optimize.program)
  30. apply:62, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  31. apply:58, FlinkChainedProgram$$anonfun$optimize$1 (org.apache.flink.table.planner.plan.optimize.program)
  32. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  33. apply:157, TraversableOnce$$anonfun$foldLeft$1 (scala.collection)
  34. foreach:891, Iterator$class (scala.collection)
  35. foreach:1334, AbstractIterator (scala.collection)
  36. foreach:72, IterableLike$class (scala.collection)
  37. foreach:54, AbstractIterable (scala.collection)
  38. foldLeft:157, TraversableOnce$class (scala.collection)
  39. foldLeft:104, AbstractTraversable (scala.collection)
  40. optimize:57, FlinkChainedProgram (org.apache.flink.table.planner.plan.optimize.program)
  41. optimizeTree:170, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  42. doOptimize:90, StreamCommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  43. optimize:77, CommonSubGraphBasedOptimizer (org.apache.flink.table.planner.plan.optimize)
  44. optimize:248, PlannerBase (org.apache.flink.table.planner.delegation)
  45. translate:151, PlannerBase (org.apache.flink.table.planner.delegation)
  46. toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  47. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  48. toAppendStream:101, TableConversions (org.apache.flink.table.api.scala)
  49. main:93, SimpleTimeIntervalJoinA$ (spendreport)
  50. main:-1, SimpleTimeIntervalJoinA (spendreport)
  51. translateToPlanInternal:136, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
  52. translateToPlanInternal:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
  53. translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
  54. translateToPlan:53, StreamExecWindowJoin (org.apache.flink.table.planner.plan.nodes.physical.stream)
  55. translateToPlanInternal:54, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
  56. translateToPlanInternal:39, StreamExecCalc (org.apache.flink.table.planner.plan.nodes.physical.stream)
  57. translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
  58. translateToPlan:38, StreamExecCalcBase (org.apache.flink.table.planner.plan.nodes.physical.stream)
  59. translateToTransformation:184, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  60. translateToPlanInternal:153, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  61. translateToPlanInternal:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  62. translateToPlan:58, ExecNode$class (org.apache.flink.table.planner.plan.nodes.exec)
  63. translateToPlan:48, StreamExecSink (org.apache.flink.table.planner.plan.nodes.physical.stream)
  64. apply:60, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
  65. apply:59, StreamPlanner$$anonfun$translateToPlan$1 (org.apache.flink.table.planner.delegation)
  66. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  67. apply:234, TraversableLike$$anonfun$map$1 (scala.collection)
  68. foreach:891, Iterator$class (scala.collection)
  69. foreach:1334, AbstractIterator (scala.collection)
  70. foreach:72, IterableLike$class (scala.collection)
  71. foreach:54, AbstractIterable (scala.collection)
  72. map:234, TraversableLike$class (scala.collection)
  73. map:104, AbstractTraversable (scala.collection)
  74. translateToPlan:59, StreamPlanner (org.apache.flink.table.planner.delegation)
  75. translate:153, PlannerBase (org.apache.flink.table.planner.delegation)
  76. toDataStream:210, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  77. toAppendStream:107, StreamTableEnvironmentImpl (org.apache.flink.table.api.scala.internal)
  78. toAppendStream:101, TableConversions (org.apache.flink.table.api.scala)
  79. main:93, SimpleTimeIntervalJoinA$ (spendreport)
  80. main:-1, SimpleTimeIntervalJoinA (spendreport)

Flink table&Sql中使用Calcite

Flink sql的实现

Calcite 功能简析及在 Flink 的应用

基于Flink1.8 深入理解Flink Sql执行流程 + Flink Sql语法扩展

使用Flink Table &Sql api来构建批量和流式应用(3)Flink Sql 使用

Flink关系型API: Table API 与SQL

Flink sql的实现

Flink如何实现动态表与静态表的Join操作

一文解析Flink SQL工作流程

Flink1.9-table/SQLAPI

【Flink SQL引擎】:Calcite 功能简析及在 Flink 的应用

Apache Calcite 处理流程详解(一)

Apache Calcite 优化器详解(二)

揭秘 Flink 1.9 新架构,Blink Planner 你会用了吗?

Flink 原理与实现:Table & SQL API | Jark’s Blog

版权声明:本文为rossiXYZ原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://www.cnblogs.com/rossiXYZ/p/12770436.html