kylin

Cube & Cuboid

创建docker镜像

1
2
3
4
5
6
7
8
9
10
11
12
13
docker run --name kylin
-d \
-m 8G \
-p 7070:7070 \
-p 7080:7080 \
-p 8088:8088 \
-p 50070:50070 \
-p 8032:8032 \
-p 8042:8042 \
-p 2181:2181 \
-p 7080:7080 \
-p 8080:8080 \
apachekylin/apache-kylin-standalone:kylin-4.0.1-mondrian
1
sudo docker exec -it -u root kylin /bin/bash

kylin注意: 只能按照构建MOdel的连接写sql

​ 只能按照构建cube时选择的维度字段分组统计

​ 只能统计构建 Cube 时选择的度量值字段

kylin API

用python获取用户解密

python -c "import base64; print base64.standard_b64encode('ADMIN:KYLIN')"

curl运行SQL

curl -X POST -H "Authorization: Basic QURNSU46S1lMSU4=" -H "Content-Type: application/json" -d '{ "sql":"select dname,sum(sal) from emp e join dept d on e.deptno = d.deptno group by dname;", "project":"firstobj" }' http://192.168.56.130:7070/kylin/api/query

kylin_cube_build.sh 脚本如下

#!/bin/bash
#从第 1 个参数获取 cube_name
cube_name=$1
#从第 2 个参数获取构建 cube 时间
if [ -n "$2" ]
then
do_date=$2
else
do_date=date -d '-1 day' +%F
fi
#获取执行时间的 00:00:00 时间戳(0 时区)
start_date_unix=date -d "$do_date 08:00:00" +%s
#秒级时间戳变毫秒级
start_date=$(($start_date_unix*1000))
#获取执行时间的 24:00 的时间戳
stop_date=$(($start_date+86400000))
curl -X PUT -H "Authorization: Basic QURNSU46S1lMSU4=" -H
'Content-Type: application/json' -d '{"startTime":'$start_date',
"endTime":'$stop_date', "buildType":"BUILD"}'
http://192.168.56.130:7070/kylin/api/cubes/$cube_name/build

kylin查询下压

kylin4.0支持开启,由于mr查询引擎换为spark,将其取消注释。

image-20230508174507612

kylin查询引擎

sparder(sparder context)由spark application后端实现的新型分布式查询引擎

HDFS目录

子目录:

临时文件存储目录:/project_name/job_tmp

Cuboid 文件存储目录: /project_name /parquet/cube_name/segment_name_XXX

维度表快照存储目录:/project_name /table_snapshot

Spark 运行日志目录:/project_name/spark_logs

衍生维度

录维度表主键与维度表其他维度之间的映射关系

聚合组

剪枝工具

强制维度:不包含该维度的被剔除,自己不能单独出现

层级维度:每个层级包含两个或更多个维度。假设一个层级中包含D1,D2…Dn 这 n 个维度,那么在该分组产生的任何 Cuboid 中, 这 n 个维度只会以(),(D1),(D1,D2)…(D1,D2…Dn)这 n+1 种形式中的一种出现

联合维度:这些联合维度要么一起出现,要么都不出现

如何设置:构建cube高级设置

cube构建优化

参数 说明
spark.executor.instances Spark 应用程序的 Executor 数量。
spark.executor.cores 每个 Executor 使用的核心数,Executor 数量乘以Executor 使用的核心数就是 Spark 程序运行的最大并行度。
spark.executor.memory 每个 Executor 使用的内存。
spark.executor.memoryOverhead 每个 Executor 使用的堆外内存。
spark.sql.files.maxPartitionBytes 读取文件时要打包到单个分区中的最大字节数,默认值为 128M。如果源表(Hive source)中有许多小文件,spark 会自动将许多小文件打包到单个分区中,以避免执行太多的小任务。
spark.sql.shuffle.partitions 配置为联接 Join 或聚合 Shuffle 时要使用的分区数,默认值为 200。较大的值需要更多的 CPU 资源,而较小的值需要更多的内存资源。

Kylin 根据 Cube 情况自动设置 Spark 参数

根据源文件中最大文件的大小以及 Cube 是否具有准确的去重计数度量来设置的

(1) \Executor** \内存规则**

如 果 ${ 最 大 文 件 大 小 }>=100G and ${ 存 在 准 确 去 重 度 量 值 }, 设 置

‘spark.executor.memory’为 20G;

如果${最大文件大小}>=100G or (如果${最大文件大小}>=10G and ${存在准确去重度量值}), 设置 ‘spark.executor.memory’ 为 16G;

如果${最大文件大小}>=10G or (如果${最大文件大小}>=1G and ${存在准确去重度量值}), 设置 ‘spark.executor.memory’ 为 10G;

如果${最大文件大小}>=1G or ${存在准确去重度量值}, 设置 ‘spark.executor.memory’

为 4G;

否则设置’spark.executor.memory’ 为 1G。

(2) \Executor** \核心数规则**

如果${最大文件大小}>=1G or ${存在准确去重度量值},设置 ‘spark.executor.cores’ 为 5; 否则 设置 ‘spark.executor.cores’ to 1。

(3) \Executor** \堆外内存规则**

如 果 ${ 最 大 文 件 大 小 }>=100G and ${ 存 在 准 确 去 重 度 量 值 }, 设 置

‘spark.executor.memoryOverhead’ 为 6G, 所以这种情况下,每个 Executor 的内存为 20G + 6G

= 26G;

如果${最大文件大小}>=100G or (如果${最大文件大小}>=10G and ${存在准确去重度量值}), 设置’spark.executor.memoryOverhead’为 4G;

如果${最大文件大小}>=10G or (如果${最大文件大小}>=1G and ${存在准确去重度量值}), 设置’spark.executor.memoryOverhead’ 为 2G;

如 果 ${ 最 大 文 件 大 小 }>=1G or ${ 存 在 准 确 去 重 度 量 值 }, 设 置

‘spark.executor.memoryOverhead’ 为 1G;

否则设置 ‘spark.executor.memoryOverhead’ 为 512M。

(4) \Executor** \实例数量规则**

①读取参数’kylin.engine.base-executor-instance’的值作为基本 Executor 数量,默认值为 5

② 根 据 Cuboid 个 数来计 算 所需 的 Executor 个 数 , 配 置 文件 中读取 参 数’kylin.engine.executor-instance-strategy’的值,默认为’100,2,500,3,1000,4’,即 Cuboid 个数为 0- 100 时,因数为 1;100-500 时,因数为 2;500-1000 时,因数为 3;1000 以上时,因数为 4。然后用这个因数乘以第一步的基本Executor 数量就是 Executor 的预估总数量。

③从 Yarn 资源池中的得到可用的总核心数和总内存数,然后用总核心数和总内存数除以 kylin 任务所需的核心数和内存数,两者求个最小值,就是Executor 的可用总数量。

④最后在 Executor 的预估总数量和 Executor 的可用总数量之间取最小值作为 Executor

的实际最终总数量。

(5) \Shuffle** \分区数量规则**

设置’spark.sql.shuffle.partitions’为’max(2, ${最大文件大小 MB} / 32)’。

img

在应用上述所有分配规则后,可以在“kylin.log”文件中找到一些日志消息:

1) \根据实际情况手动设置** \Spark** \参数**

根据 Kylin 自动调整的配置值,如果仍存在一些 Cube 构建性能问题,可以适当更改这些配置的值以进行尝试,例如:

(1) 如果您从 spark ui 观察到某些任务中存在严重的 GC 现象,或者发现大量 executor

丢失或获取失败错误,您可以更改这两个配置的值,以增加每个 executor 的内存:

· spark.executor.memory=

· spark.executor.memoryOverhead=

一般调整策略是将参数的值调整为 2 倍。如果问题解决了,您可以适当地调整以避免浪费资源。在增加每个 Executor 的内存后, 如果仍有严重的内存问题,可以考虑调整’spark.executor.cores’为 1,此调整可以使单个任务是每个 Executor 的独家任务,并且执行效率相对较低,但它可以通过这种方式来避免构建失败。

(2) 如果您从 spark ui 观察到,有大量任务需要为多轮计划(每轮都用掉了所有内核),

您可以更改这两个配置的值,以增加 spark 应用程序的内核数:

· spark.executor.cores=

· spark.executor.instances=

(3) 如果有一些 Executor 丢失或获取数据错误,并且仅仅因为 Shuffle 期间的减速器数量太小,或者数据倾斜,可以尝试增加’spark.sql.shuffle.partitions’的值。

· spark.sql.shuffle.partitions=

全局字典构建性能调优

\原理:**

每个构建任务都将生成一个新的全局字典

每个新的构建任务的字典会根据版本号保存,旧的全局字典会逐渐删除

一个全局字典包含一个元数据文件和多个字典文件,每个字典文件称为一个 bucket (bucket)。

每个 bucket 被划分为两个映射(Map),并将这两个映射组合成一个完整的映射关系

1) \调优参数**

如果 cube 定义了精确去重(即 count(distinct)语法)的度量值,Kylin4.0 将基于 Spark 为这些度量值分布式地构建全局字段的值(之前版本是单点构建)。这部分的优化主要是调整一个参数:

​ kylin.dictionary.globalV2-threshold-bucket-size (默认值 500000)

如果 CPU 资源充足,减少此配置的值可以减少单个分区中的数据量,从而加快构建全局字典。

2) \使用全局字典**

在已有的 Model 中, 创建一个新的 Cube 用于测试全局字典, 设置度量为

COUNT_DISTINCT,返回类型选择 Precisely

如果构建失败,可能是yarn 资源限制,构建时单个容器申请的 cpu 核数超过 yarn 单个容器

img

默认最大 4 核,修改 hadoop 的 yarn-site.xml,分发配置文件,重启 yarn

快照表构建性能调优

img

Snapshot Table - 快照表:每一张快照表对应一个 Hive 维度表,为了实时记录 Hive 维度表的数据变化,Kylin 的 cube 每次构建都会对 hive 维度表创建一个新的快照,以下是快照表的调优参数。

构建 Snapshot table 时,主要调整 2 个参数来调优:

参数名 默认值 说明
kylin.snapshot.parallel-build-enabled true 使用并行构建,保持开启
kylin.snapshot.shard-size-mb 128MB 如果CPU 资源充足,可以减少值来增加并行度,建议并行度在 Spark 应用CPU 核数的 3 倍以内。并行度=原表数据量/该参数

查询优化

saprk计算引擎 sparderContext查询引擎

排序优化

在构建cube设置rowkey

shardby 列来裁剪 parquet 文件

分区表,shardby列进行分片,选择基数高的为列

减少小的或不均匀的parquet文件

文件大小不均匀,设置parquet文件

重分区检查策略

小文件读取到同一个分区

当 已 经 构 建 的 segments 中 有 很 多 小 文 件 时 , 可 以 修 改 参 数’spark.sql.files.maxPartitionBytes’ (默认值为 128MB)为合适的值,这样可以让 spark 引擎将一些小文件读取到单个分区中,从而避免需要太多的小任务。

如果有足够的资源, 可以减少该参数的值来增加并行度,

但需要同时减少’spark.hadoop.parquet.block.size’(默认值为 128MB)的值,因为 parquet 文件的最小分割单元是RowGroup,这个 blocksize 参数表示parquet 的 RowGroup 的最大大小。