如何使用spark提供的CUBE,rollup,pivot,unpivot对多维数据集进行操作,本文进行详细讲解。
1.读取测试数据集数据集
LBJ,34,90,WEST,M
Oven,25,80,EAST,M
KD,23,85,EAST,M
CURRY,24,80,WEST,F
AD,26,86,WEST,M
Giannis,26,85,EAST,F
数据集共五列name,age,salary,department,gender
首先读取数据集:
System.setProperty("hadoop.home.dir","C:\\hadoop")
val sparkConf = new SparkConf().setMaster("local").setAppName("wordCount")
sparkConf.set("spark.network.timeout","600000")
sparkConf.set("spark.executor.heartbeatInterval","500000")
sparkConf.set("spark.executor.memory","500m")
sparkConf.set("spark.driver.memory","200m")
val spark = SparkSession.builder().config(sparkConf).getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
import spark.implicits._
val df = spark.read.csv("data/score.txt")
.toDF("name","age","salary","department","gender")
.selectExpr("name","int(age) as age","int(salary) as salary","department","gender")
println(df.count)
2.Cube和rollup
CUBE是多维数据集的数据立方体,生成所有维度的全部组合的结果。2^n种组合cuboid(n维度个数)。
ROLLUP 生成的结果集显示了所选列中值的某一层次结构的聚合。就是将GROUP BY后面的第一列名称求总和,而其他列并不要求,而CUBE则会将每一个列名称都求总和。
cube
val cdf =df.cube($"department", $"gender").agg(Map( // 2^2 =4
"salary" -> "avg",
"age" -> "max"
))
cdf.show()
cdf.printSchema()
// ---------- ------ ----------------- --------
// |department|gender| avg(salary)|max(age)|
// ---------- ------ ----------------- --------
// | EAST| null|83.33333333333333| 26|
// | EAST| M| 82.5| 25|
// | null| F| 82.5| 26|
// | null| null|84.33333333333333| 34|
// | null| M| 85.25| 34|
// | WEST| M| 88.0| 34|
// | WEST| null|85.33333333333333| 34|
// | EAST| F| 85.0| 26|
// | WEST| F| 80.0| 24|
// ---------- ------ ----------------- --------
rollup
val rdf = df.rollup($"department", $"gender").agg(Map(
"salary" -> "avg",
"age" -> "max"
))
rdf.show()
rdf.printSchema()
// ---------- ------ ----------------- --------
// |department|gender| avg(salary)|max(age)|
// ---------- ------ ----------------- --------
// | EAST| null|83.33333333333333| 26|
// | EAST| M| 82.5| 25|
// | null| null|84.33333333333333| 34|
// | WEST| M| 88.0| 34|
// | WEST| null|85.33333333333333| 34|
// | EAST| F| 85.0| 26|
// | WEST| F| 80.0| 24|
// ---------- ------ ----------------- --------
3.groupBy
groupBy没啥好说的,这里作为跟pivot进行对比。
val pdf1 = df.groupBy($"department",$"gender").agg(Map(
"salary" -> "avg"
))
pdf1.show()
pdf1.printSchema()
// ---------- ------ -----------
// |department|gender|avg(salary)|
// ---------- ------ -----------
// | EAST| F| 85.0|
// | WEST| F| 80.0|
// | EAST| M| 82.5|
// | WEST| M| 88.0|
// ---------- ------ -----------
4.pivot与unpivot
pivot 数据透视,也叫数据重塑,就是把维度值转成列名。还有unpivot就是pivot的反操作。如图:
pivot //pivot数据重塑,
val pdf2 = df.groupBy($"department").pivot($"gender").agg(Map(
"salary" -> "avg"
))
pdf2.show()
pdf2.printSchema()
// ---------- ---- ----
// |department| F| M|
// ---------- ---- ----
// | WEST|80.0|88.0|
// | EAST|85.0|82.5|
// ---------- ---- ----
unpivot
Spark没有提供内置函数来实现unpivot操作,不过我们可以使用Spark SQL提供的stack函数来间接实现需求。有几点需要特别注意:
使用selectExpr在Spark中执行SQL片段;
如果字段名称有中文,要使用反引号` 把字段包起来;
//unpivot
val unpivotDF = pdf2.selectExpr("`department`",
"stack(2, 'F', `F`,'M', `M`) as (`gender`,`avg(salary)`)")
unpivotDF.show()
// ---------- ------ -----------
// |department|gender|avg(salary)|
// ---------- ------ -----------
// | WEST| F| 80.0|
// | WEST| M| 88.0|
// | EAST| F| 85.0|
// | EAST| M| 82.5|
// ---------- ------ -----------
5.groupBy实现pivot功能
使用groupBy也能实现pivot的功能, 可以看到数据的结果跟pivot相同。
val pdf3 = df.groupBy($"department").agg(expr("avg(if(gender = 'F',salary, null))"), expr("avg(if(gender = 'M', salary, null))"))
pdf3.show()
pdf3.printSchema()
// ---------- ----------------------------------- -----------------------------------
// |department|avg(if((gender = F), salary, NULL))|avg(if((gender = M), salary, NULL))|
// ---------- ----------------------------------- -----------------------------------
// | WEST| 80.0| 88.0|
// | EAST| 85.0| 82.5|
// ---------- ----------------------------------- -----------------------------------
//
Copyright © 2024 妖气游戏网 www.17u1u.com All Rights Reserved