Doris 中join的优化原理(学习笔记) 电脑版发表于:2025/4/6 23:33  >#Doris 中join的优化原理(学习笔记) [TOC] Shuffle Join(Partitioned Join) ------------ tn2>Shuffle Join 是:把 A 表和 B 表的数据都根据 Join Key 做一次大洗牌,送到对应分区的节点上,然后在本地做 Join,再把结果返回。  tn2>订单明细表: ```sql CREATE TABLE test.order_info_shuffle ( `order_id` varchar(20) COMMENT "订单id", `user_id` varchar(20) COMMENT "用户id", `goods_id` VARCHAR(20) COMMENT "商品id", `goods_num` Int COMMENT "商品数量", `price` double COMMENT "商品价格" ) duplicate KEY(`order_id`) DISTRIBUTED BY HASH(`order_id`) BUCKETS 5 properties("replication_num" = "1"); 导入数据: insert into test.order_info_shuffle values ('o001','u001','g001',1,9.9 ), ('o001','u001','g002',2,19.9), ('o001','u001','g003',2,39.9), ('o002','u002','g001',3,9.9 ), ('o002','u002','g002',1,19.9), ('o003','u002','g003',1,39.9), ('o003','u002','g002',2,19.9), ('o003','u002','g004',3,99.9), ('o003','u002','g005',1,99.9), ('o004','u003','g001',2,9.9 ), ('o004','u003','g002',1,19.9), ('o004','u003','g003',4,39.9), ('o004','u003','g004',1,99.9), ('o004','u003','g005',4,89.9); ```  tn2>商品表: ```sql CREATE TABLE test.goods_shuffle ( `goods_id` VARCHAR(20) COMMENT "商品id", `goods_name` VARCHAR(20) COMMENT "商品名称", `category_id` VARCHAR(20) COMMENT "商品品类id" ) duplicate KEY(`goods_id`) DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5 properties("replication_num" = "1") ; --导入数据: insert into test.goods_shuffle values ('g001','iphon13','c001'), ('g002','ipad','c002'), ('g003','xiaomi12','c001'), ('g004','huaweip40','c001'), ('g005','headset','c003'); ```  tn2>Sql示例: ```sql EXPLAIN select oi.order_id, oi.user_id, oi.goods_id, gs.goods_name, gs.category_id, oi.goods_num, oi.price from order_info_shuffle as oi -- 我们可以不指定哪一种join方式,doris会自己根据数据的实际情况帮我们选择 JOIN [shuffle] goods_shuffle as gs on oi.goods_id = gs.goods_id; ```  tn2>说明以上的sql使用的是shuffle Join。 tn>适用场景:不管数据量,不管是大表join大表还是大表join小表都可以用。<br/> 优点: 支持大表 Join,不依赖小表; 数据分布均衡时性能较好。<br/> 缺点: 会产生大量网络 I/O(数据 shuffle); 如果数据倾斜,某些节点负载高,性能会下降; 资源消耗大。 Broadcast Join ------------ tn2>当一个大表join小表的时候,将小表广播到每一个大表所在的每一个节点上(以hash表的形式放在内存中)这样的方式叫做Broadcast Join,类似于mr里面的一个map端join 【map join】  tn2>订单明细表: ```sql CREATE TABLE test.order_info_broadcast ( `order_id` varchar(20) COMMENT "订单id", `user_id` varchar(20) COMMENT "用户id", `goods_id` VARCHAR(20) COMMENT "商品id", `goods_num` Int COMMENT "商品数量", `price` double COMMENT "商品价格" ) duplicate KEY(`order_id`) DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5 properties("replication_num" = "1") ; --导入数据: insert into test.order_info_broadcast values ('o001','u001','g001',1,9.9 ), ('o001','u001','g002',2,19.9), ('o001','u001','g003',2,39.9), ('o002','u002','g001',3,9.9 ), ('o002','u002','g002',1,19.9), ('o003','u002','g003',1,39.9), ('o003','u002','g002',2,19.9), ('o003','u002','g004',3,99.9), ('o003','u002','g005',1,99.9), ('o004','u003','g001',2,9.9 ), ('o004','u003','g002',1,19.9), ('o004','u003','g003',4,39.9), ('o004','u003','g004',1,99.9), ('o004','u003','g005',4,89.9); ```  tn2>商品表: ```sql CREATE TABLE test.goods_broadcast ( `goods_id` VARCHAR(20) COMMENT "商品id", `goods_name` VARCHAR(20) COMMENT "商品名称", `category_id` VARCHAR(20) COMMENT "商品品类id" ) duplicate KEY(`goods_id`) DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5 properties("replication_num" = "1") ; insert into test.goods_broadcast values ('g001','iphon13','c001'), ('g002','ipad','c002'), ('g003','xiaomi12','c001'), ('g004','huaweip40','c001'), ('g005','headset','c003'); ```  ```sql EXPLAIN select oi.order_id, oi.user_id, oi.goods_id, gs.goods_name, gs.category_id, oi.goods_num, oi.price from order_info_broadcast as oi JOIN [broadcast] goods_broadcast as gs on oi.goods_id = gs.goods_id; ```  tn2>他一般用在什么场景下:左表join右表,要求左表的数据量相对来说比较大,右表数据量比较小 优点:避免了shuffle,提高了运算效率 缺点:有限制,必须右表数据量比较小 Bucket Shuffle Join (有点类似smb join) ------------ tn2>利用建表时候分桶的特性,当join的时候,join的条件和左表的分桶字段一样的时候,将右表按照左表分桶的规则进行shuffle操作,使右表中需要join的数据落在左表中需要join数据的BE节点上的join方式叫做`Bucket Shuffle Join`。  tn2>使用从 0.14 版本开始默认为 true,新版本可以不用设置这个参数了! ```sql show variables like '%bucket_shuffle_join%'; set enable_bucket_shuffle_join = true; ``` tn2>订单明细表: ```sql CREATE TABLE test.order_info_bucket ( `order_id` varchar(20) COMMENT "订单id", `user_id` varchar(20) COMMENT "用户id", `goods_id` VARCHAR(20) COMMENT "商品id", `goods_num` Int COMMENT "商品数量", `price` double COMMENT "商品价格" ) duplicate KEY(`order_id`) DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5 properties("replication_num" = "1"); --导入数据: insert into test.order_info_bucket values ('o001','u001','g001',1,9.9 ), ('o001','u001','g002',2,19.9), ('o001','u001','g003',2,39.9), ('o002','u002','g001',3,9.9 ), ('o002','u002','g002',1,19.9), ('o003','u002','g003',1,39.9), ('o003','u002','g002',2,19.9), ('o003','u002','g004',3,99.9), ('o003','u002','g005',1,99.9), ('o004','u003','g001',2,9.9 ), ('o004','u003','g002',1,19.9), ('o004','u003','g003',4,39.9), ('o004','u003','g004',1,99.9), ('o004','u003','g005',4,89.9); ```  tn2>商品表: ```sql CREATE TABLE test.goods_bucket ( `goods_id` VARCHAR(20) COMMENT "商品id", `goods_name` VARCHAR(20) COMMENT "商品名称", `category_id` VARCHAR(20) COMMENT "商品品类id" ) duplicate KEY(`goods_id`) DISTRIBUTED BY HASH(`goods_id`) BUCKETS 3 properties("replication_num" = "1"); --导入数据: insert into test.goods_bucket values ('g001','iphon13','c001'), ('g002','ipad','c002'), ('g003','xiaomi12','c001'), ('g004','huaweip40','c001'), ('g005','headset','c003'); ```  tn2>通过 explain 查看 join 类型 ```sql EXPLAIN select oi.order_id, oi.user_id, oi.goods_id, gs.goods_name, gs.category_id, oi.goods_num, oi.price from goods_bucket as gs -- 目前 Bucket Shuffle Join不能像Shuffle Join那样可以显示指定Join方式, -- 只能让执行引擎自动选择, -- 选择的顺序:Colocate Join -> Bucket Shuffle Join -> Broadcast Join -> Shuffle Join。 JOIN order_info_bucket as oi where oi.goods_id = gs.goods_id; ```  tn>注意事项 1.Bucket Shuffle Join 只生效于 Join 条件为等值的场景 2.Bucket Shuffle Join 要求左表的分桶列的类型与右表等值 join 列的类型需要保持一致,否则无法进行对应的规划。 3.Bucket Shuffle Join 只作用于 Doris 原生的 OLAP 表,对于 ODBC,MySQL,ES 等外表,当其作为左表时是无法规划生效的。 4.Bucket Shuffle Join只能保证左表为单分区时生效。所以在 SQL 执行之中,需要尽量使用 where 条件使分区裁剪的策略能够生效。 Colocation[托管] Join ------------ tn2>中文意思叫位置协同分组join,指需要join的两份数据都在同一个BE节点上,这样在join的时候,直接本地join计算即可,不需要进行shuffle。 ### 名词解释 tn2>● Colocation Group(位置协同组CG):在同一个 CG内的 Table 有着相同的 Colocation Group Schema,并且有着相同的数据分片分布(满足三个条件)。 [一组按照相同规则分片和分布的表,就像水果店里的货架。] ● Colocation Group Schema(CGS):用于描述一个 CG 中的 Table,和 Colocation 相关的通用 Schema 信息。包括分桶列类型,分桶数以及分区的副本数等。[描述这些表如何分片和分布的规则,就像货架的布局说明书。]  ### 使用限制 tn2>1.建表时两张表的分桶列的类型和数量需要完全一致,并且桶数一致,才能保证多张表的数据分片能够一一对应的进行分布控制。 2.同一个 CG 内所有表的所有分区(Partition)的副本数必须一致。 如果不一致,可能出现某一个Tablet 的某一个副本,在同一个 BE 上没有其他的表分片的副本对应 3.同一个 CG 内的表,分区的个数、范围以及分区列的类型不要求一致。 ### 使用案例 tn2>建两张表,分桶列都为 int 类型,且桶的个数都是 5 个。副本数都为默认副本数。 ```sql CREATE TABLE test.order_info_colocation ( `order_id` varchar(20) COMMENT "订单id", `user_id` varchar(20) COMMENT "用户id", `goods_id` VARCHAR(20) COMMENT "商品id", `goods_num` Int COMMENT "商品数量", `price` double COMMENT "商品价格" ) duplicate KEY(`order_id`) DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5 --指定组的定义 PROPERTIES ( "replication_num" = "1", "colocate_with" = "group1" ); --导入数据: insert into test.order_info_colocation values ('o001','u001','g001',1,9.9 ), ('o001','u001','g002',2,19.9), ('o001','u001','g003',2,39.9), ('o002','u002','g001',3,9.9 ), ('o002','u002','g002',1,19.9), ('o003','u002','g003',1,39.9), ('o003','u002','g002',2,19.9), ('o003','u002','g004',3,99.9), ('o003','u002','g005',1,99.9), ('o004','u003','g001',2,9.9 ), ('o004','u003','g002',1,19.9), ('o004','u003','g003',4,39.9), ('o004','u003','g004',1,99.9), ('o004','u003','g005',4,89.9); -- 创建商品表 drop table test.goods_colocation ; CREATE TABLE test.goods_colocation ( `goods_id` VARCHAR(20) COMMENT "商品id", `goods_name` VARCHAR(20) COMMENT "商品名称", `category_id` VARCHAR(20) COMMENT "商品品类id" ) duplicate KEY(`goods_id`) DISTRIBUTED BY HASH(`goods_id`) BUCKETS 5 PROPERTIES ( "replication_num" = "1", "colocate_with" = "group1" ); --导入数据: insert into test.goods_colocation values ('g001','iphon13','c001'), ('g002','ipad','c002'), ('g003','xiaomi12','c001'), ('g004','huaweip40','c001'), ('g005','headset','c003'); ``` tn2>编写查询语句,并查看执行计划  tn2>查看 Group ```sql SHOW PROC '/colocation_group'; ```  tn2>当 Group 中最后一张表彻底删除后(彻底删除是指从回收站中删除。通常,一张表通过DROP TABLE 命令删除后,会在回收站默认停留一天的时间后,再删除),该 Group 也会被自动删除。 修改表 Colocate Group 属性 ```sql ALTER TABLE test.goods_colocation SET ("colocate_with" = "group2"); ALTER TABLE test.order_info_colocation SET ("colocate_with" = "group2"); ``` tn2>删除表的 Colocation 属性 ```sql ALTER TABLE test.goods_colocation SET ("colocate_with" = ""); ALTER TABLE test.order_info_colocation SET ("colocate_with" = ""); ``` tn2>当对一个具有 Colocation 属性的表进行增加分区(ADD PARTITION)、修改副本数时,Doris 会检查修改是否会违反 Colocation Group Schema,如果违反则会拒绝。 Runtime Filter [运行时过滤] ------------ tn2>Runtime Filter会在有join动作的 sql运行时,创建一个HashJoinNode和一个ScanNode来对join的数据进行过滤优化,使得join的时候数据量变少,从而提高效率。  tn2>使用 指定 RuntimeFilter 类型. ```sql set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX"; set runtime_filter_type="MIN_MAX"; ``` tn2> 参数解释: ● runtime_filter_type: 包括Bloom Filter、MinMax Filter、IN predicate、IN Or Bloom Filter ○ Bloom Filter: 针对右表中的join字段的所有数据标注在一个布隆过滤器中,从而判断左表中需要join的数据在还是不在 ○ MinMax Filter: 获取到右表表中数据的最大值和最小值,看左表中查看,将超出这个最大值最小值范围的数据过滤掉 ○ IN predicate: 将右表中需要join字段所有数据构建一个IN predicate,再去左表表中过滤无意义数据 ● runtime_filter_wait_time_ms: 左表的ScanNode等待每个Runtime Filter的时间,默认1000ms ● runtime_filters_max_num: 每个查询可应用的Runtime Filter中Bloom Filter的最大数量,默认10 ● runtime_bloom_filter_min_size: Runtime Filter中Bloom Filter的最小长度,默认1M ● runtime_bloom_filter_max_size: Runtime Filter中Bloom Filter的最大长度,默认16M ● runtime_bloom_filter_size: Runtime Filter中Bloom Filter的默认长度,默认2M ● runtime_filter_max_in_num: 如果join右表数据行数大于这个值,我们将不生成IN predicate,默认102400 ### 示例实操 tn2>建表 ```sql CREATE TABLE test (t1 INT) DISTRIBUTED BY HASH (t1) BUCKETS 2 PROPERTIES("replication_num" = "1"); INSERT INTO test VALUES (1), (2), (3), (4); CREATE TABLE test2 (t2 INT) DISTRIBUTED BY HASH (t2) BUCKETS 2 PROPERTIES("replication_num" = "1"); INSERT INTO test2 VALUES (3), (4), (5); ``` tn2>查看执行计划 ```sql set runtime_filter_type="BLOOM_FILTER,IN,MIN_MAX"; EXPLAIN SELECT t1 FROM test JOIN test2 where test.t1 = test2.t2; ``` 