9.1-9.3 数据清洗和指标计算

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

数据清洗 查询导入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package BIGDATA

import org.apache.spark.sql.{Row, SparkSession}

import java.text.SimpleDateFormat
import java.util.{Calendar, Date}

object subjectThree {

val MYSQLIP = "192.168.23.89"
val DATABASE = "shtd_store"
val MYSQLDBURL = "jdbc:mysql://" + MYSQLIP + ":3306/" + DATABASE + "?characterEncoding=UTF-8"
val MYSQLDRIVER = "com.mysql.jdbc.Driver"
val MYSQLUSER = "root"
val MYSQLPASSWORD = "123456"

def main(args: Array[String]): Unit = {
val builder: SparkSession.Builder = SparkSession.builder().master("local[*]")
val spark: SparkSession = builder.appName("subjectThree")
.enableHiveSupport()
.getOrCreate()
import spark.sql

val sdf = new SimpleDateFormat("yyyyMMdd")
val calendar: Calendar = Calendar.getInstance
calendar.add(Calendar.DATE,0)
val date: Date = calendar.getTime
val dateStr: String = sdf.format(date)

// 对于dwd层
//新版spark对于数据格式转换有强制要求,可采取旧版的
spark.sql(" set spark.sql.storeAssignmentPolicy=LEGACY")

//自定义函数 将-变空
// val makeDT: String => String = (date: String) => {
// date.replaceAll("-", "")
// }
// spark.udf.register("makeDt", makeDT(_: String)) //sparksql注册匿名函数 makeDt

//动态分区设置
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql("set hive.exec.max.dynamic.partitions=50000")
spark.sql("set hive.exec.max.dynamic.partitions.pernode=10000")


/**
* from_unixtime 可以把时间戳格式的时间,转化为年月日时分秒格式的时间。 默认是2019-05-02 21:19:59
* 第二个参数是设置格式的
* unix_timestamp(concat(orderdate," 00:00:00"))
* concat 把原本orderdate列 1996-01-02 这种格式的 拼接 变成1996-01-02 00:00:00
* unix_timestamp将拼接好的格式转换为时间戳
*
*
* !!!!!!!
* from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') hive版本已经弃用了 会报错改成
* date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss')
* date_format()修改日期格式
* select date_format('2015-04-08 10:10:01', 'yyyy-MM');
* 2015-04
* current_timestamp 直接获取当前日期时间 2022-08-31 10:55:37.551 到毫秒的
*/

sql(
s"""
|insert overwrite table dwd.dim_customer partition (etldate='$dateStr')
|select custkey,name,address,nationkey,
|phone,acctbal,mktsegment,comment,
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss')
|from ods.customer
|""".stripMargin)

sql(
s"""
|insert overwrite table dwd.dim_part partition (etldate=$dateStr)
|select partkey,name,mfgr,brand,type,size,container,retailprice,comment,
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss')
|from ods.part
|""".stripMargin)
sql(
s"""
|insert overwrite table dwd.dim_nation partition (etldate=$dateStr)
|select nationkey,name,regionkey,comment,
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss')
|from ods.nation
|""".stripMargin)

sql(
s"""
|insert overwrite table dwd.dim_region partition (etldate=$dateStr)
|select regionkey,name,comment,
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss')
|from ods.region
|""".stripMargin)

sql(
s"""
|select orderkey,custkey,orderstatus,totalprice,
|from_unixtime(unix_timestamp(concat(orderdate," 00:00:00"))) as myorderdate,
|orderpriority,clerk,shippriority,comment,
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),etldate
|from ods.orders
|""".stripMargin).coalesce(50).createOrReplaceTempView("my_orders_test")

sql(
"""
|insert overwrite table dwd.fact_orders partition (etldate)
|select * from my_orders_test
|""".stripMargin)

// //删除分区 需删除ods.orders中的分区,仅保留最近的三个分区

/**
* alter table orders drop partition (etldate<='19980730');
*/

// val partitions: Array[String] = spark.sql("show partitions ods.orders").collect.filter((x: Row) => {
// null != x && x.length != 0
// }).map((x: Row) => {
// println("zheshi____________________________分区:" + x)
// x.toString().replace("]", "")
// .replace("[", "").split('=')(1)
// }).sortBy[Int]((x: String) => {
// x.toInt
// })
//
// if (partitions.length > 3) {
// for (index <- 0 until partitions.length - 3) {
// spark.sql(s"ALTER TABLE ods.orders DROP PARTITION (etldate='${partitions(index)}')")
// }
// }

sql(
s"""
|insert overwrite table dwd.fact_lineitem partition (etldate)
|select
|orderkey,partkey,suppkey,lienumber,quantity,extendedprice,discount,
|tax,returnflag,linestatus,
|from_unixtime(unix_timestamp(concat(shipdate," 00:00:00")),'yyyy-MM-dd HH:mm:ss') as myshopdate,
|from_unixtime(unix_timestamp(concat(commentdate," 00:00:00")),'yyyy-MM-dd HH:mm:ss') as mycommitdate,
|from_unixtime(unix_timestamp(concat(receiptdate," 00:00:00")),'yyyy-MM-dd HH:mm:ss') as myreceiptdate,
|shipstruct,shipmode,comment,
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss'),etldate
|from ods.lineitem
""".stripMargin)
spark.stop()
}
}

指标计算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package BIGDATA.math

import org.apache.spark.sql.SparkSession
/**
指标数据计算
*/
object indicatorCalculation {
//设置mysql的地址参数表格
// val MYSQLIP = "192.168.23.89"
// val DATABASE = "shtd_store"
// val MYSQLDBURL = "jdbc:mysql://" + MYSQLIP + ":3306/" + DATABASE + "?characterEncoding=UTF-8"
// val MYSQLDRIVER = "com.mysql.jdbc.Driver"
// val MYSQLUSER = "root"
// val MYSQLPASSWORD = "123456"

def main(args: Array[String]): Unit = {
val builder: SparkSession.Builder = SparkSession.builder().master("local[*]")
val spark: SparkSession = builder.appName("indicatorCalculation")
.enableHiveSupport()
.getOrCreate()
import spark.sql

/**
* 连接mysql
* */
// spark.read.format("jdbc")
// .option("url", MYSQLDBURL)
// .option("driver", MYSQLDRIVER)
// .option("user", MYSQLUSER)
// .option("password", MYSQLPASSWORD)
// .option("dbtable", "nationeverymonth").load().createTempView("mysql_nationeverymonth")
//
// spark.read.format("jdbc")
// .option("url", MYSQLDBURL)
// .option("driver", MYSQLDRIVER)
// .option("user", MYSQLUSER)
// .option("password", MYSQLPASSWORD)
// .option("dbtable", "usercontinueorder").load().createTempView("mysql_usercontinueorder")
// // 对于dwd层
//新版spark对于数据格式转换有强制要求,可采取旧版的
spark.sql(" set spark.sql.storeAssignmentPolicy=LEGACY")

/**
* 一、每人每天下单的数量和下单的总金额 存入dws层的customer_consumption_day_aggr表
*
*/

sql(
"""
|with a as ( --获取orders表 需要的信息 作为表a
|select cust_key,
| total_price,
| date_format(TO_DATE(order_date), 'yyyy') as year,
| date_format(TO_DATE(order_date), 'MM') as month,
| date_format(TO_DATE(order_date), 'dd') as day
|from dwd.fact_orders),
|b as ( -- 每人每天的信息
| select fo.cust_key as keys,
| max(name) as name,
| a.year,a.month,a.day
| from dwd.fact_orders fo
| left join dwd.dim_customer cust on cust.cust_key=fo.cust_key
| left join a on a.cust_key=fo.cust_key
| group by fo.cust_key,year,month,day),
|c as ( -- 每月的订单数 和 金额
| select sum(total_price) as totalconsumption,
| count(1) as totalorder,
| year,month
| from a
| group by year,month)
|select keys,name,
| totalconsumption,
| totalorder,
| b.year,b.month,b.day
|from b
|left join c on b.month=c.month
|order by keys
|""".stripMargin).coalesce(50).createOrReplaceTempView("my_aggr")
sql(
"""
|insert overwrite table dws.customer_consumption_day_aggr
|select * from my_aggr
|""".stripMargin)
spark.stop()
}
}

3.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

这两天把任务书数据清洗的部分都做完了。前面数据清洗比较简单点,就是对列进行处理一下,转换日期什么的,也是去查了hive中的对日期处理的方式。from_unixtime current_timestamp date_format 这几个方法,之前使用的unix_timestamp是被淘汰了,我在hive中使用,会一直报错,还以为代码的问题,到后面搜到hive2是淘汰了,如果多条查询是会报错的,但是单独的使用就没有问题,我刚开始使用的时候,就只查了现在的时间,没报错,打jar包进去后就报错了,改成current就好了。

9月5 数据 指标计算三

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

做生省赛任务书2的指标计算第三题

3.根据概述分章节描述

SQL语句

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
//计算出某年每个国家的平均消费额和所有国家平均消费额相比较结果
sql(
"""
|-- a 获得每个消费人的总消费额 以及信息
|with a as (select fo.cust_key,
|max(nation.nation_key) as nation_key,
|max(nation.name) as name ,
|sum(total_price) as total_price from fact_orders fo
|left join dim_customer cust on cust.cust_key=fo.cust_key
|left join dim_nation nation on cust.nation_key=nation.nation_key
|group by fo.cust_key),
|--b 每个国家 人均消费
|b as (select nation_key as nationkey ,max(name) as name ,sum(total_price) as sumprice,count(1) as countperson,
|round(avg(total_price),2) as avgnationperson
|from a group by nation_key)
|select nationkey,name,round(avgnationperson,2),306980.73,(
|case when avgnationperson > 306980.73 then '+'
| when avgnationperson = 306980.73 then '='
| when avgnationperson < 306980.73 then '-' end)
|from b
|""".stripMargin).coalesce(50).createOrReplaceTempView("my_nation_avg")

4. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

很久没写sql逻辑语句了

case 这里出现了BUG BUG在另一台机器上 图片就不列出来了

出现问题的原因是case最后忘记加了end when之间是不需要加,的 之前也因为这个报错过

如果是判断是否为一个值或者一个字符等 case 属性 when 条件 这样的方式去做

1
2
3
case when avgnationperson > 306980.73 then '+'
when avgnationperson = 306980.73 then '='
when avgnationperson < 306980.73 then '-' end)

6.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

这个任务超出了自己的目标太多, 本来想半个小时解决掉的,但是整了将近一天,问题就出现在,hive聚合函数这里的漏洞,刚开始打算利用join,把表公共部分都结合起来,结果发现数据多出了好几倍,原本是1条的,变成了9条多,网上搜是出现了笛卡尔积的问题,然后去解决,发现还是有问题,然后我查看了一下自己在dwd库中的表,发现都是三条,三种不同的分区,应该是之前运行jar包的时候,我以为是覆盖,结果分区是还在的,导致里面有三天清洗的数据, 重新把之前的表内数据都删了,重新打jar包,清洗,解决完这个问题,在第四列的时候,因为题目要求的是每个国家平均的,就不能进行分组,不然数据会出现问题。但是前面三列不分组是查找不出来的 。第四列是订单人均消费,也就是第四列每一行都是一样的才是最终效果,但是多行和一行,在hive中没有这种表格的整合方法(没找到),第二题问李昊,说spark里面有但是lit后面要跟常数。一想就把结果计算出来,然后直接打进去不就好了,输入的常数,hive会自动补齐。就用了这样的方式,才完成这个第三题。省赛也算过了一遍了,明天再自己全部做一下另外一个差不多的。

spark处理方法网址如下https://blog.csdn.net/manba_yqq/article/details/122019749?ops_request_misc=%257B%2522request%255Fid%2522%253A%2522166243159716782427468889%2522%252C%2522scm%2522%253A%252220140713.130102334.pc%255Fall.%2522%257D&request_id=166243159716782427468889&biz_id=&utm_medium=distribute.pc_search_result.none-task-code-2~all~first_rank_ecpm_v1~rank_v31_ecpm-3-122019749-0-null-null.142^v46^control&utm_term=sparksql%E4%B8%ADwithColumn

9.6-9.7 任务书4

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

省赛任务书4完成

省赛赛项规程

3.根据概述分章节描述

赛项规程

发现自己的部分分值权重和时间占了大部分

sql语句的熟练性就非常重要了

*阶段* *竞赛时间* *分值权重*
大数据平台环境搭建 4小时 权重10%
离线数据抽取 权重15%
离线数据统计 权重20%
数据采集与实时计算 权重20%
数据可视化 权重20%
综合分析报告 权重10%
团队分工明确合理、操作规范、文明竞赛 权重5%

image-20220906155330772

数据抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package BIGDATA.subject4Test

import org.apache.spark.sql.SparkSession

object DataChouQu {
def main(args: Array[String]): Unit = {
val builder: SparkSession.Builder = SparkSession.builder().master("local[*]")
val spark: SparkSession = builder.appName("DataChouQu")
.enableHiveSupport() //config 设置动态分区的参数 开启动态分区支持
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
import spark.sql

//连接到数据库 并给表格创建临时表名
ConnectMysql(spark,"CUSTOMER","mysql_customer")
ConnectMysql(spark,"NATION","mysql_nation")
ConnectMysql(spark,"PART","mysql_part")
ConnectMysql(spark,"PARTSUPP","mysql_partsupp")
ConnectMysql(spark,"REGION","mysql_region")
ConnectMysql(spark,"SUPPLIER","mysql_supplier")
ConnectMysql(spark,"ORDERS","mysql_orders")
ConnectMysql(spark,"LINEITEM","mysql_lineitem")

sql("truncate table ods.customer")
sql(
"""
|insert overwrite table ods.customer partition (etldate='20220906')
|select * from mysql_customer
|""".stripMargin)

sql("truncate table ods.nation")
sql(
"""
|insert overwrite table ods.nation partition (etldate='20220906')
|select * from mysql_nation
|""".stripMargin)

sql("truncate table ods.part")
sql(
"""
|insert overwrite table ods.part partition (etldate='20220906')
|select * from mysql_part
|""".stripMargin)

sql("truncate table ods.partsupp")
sql(
"""
|insert overwrite table ods.partsupp partition (etldate='20220906')
|select * from mysql_partsupp
|""".stripMargin)

sql("truncate table ods.region")
sql(
"""
|insert overwrite table ods.region partition (etldate='20220906')
|select * from mysql_region
|""".stripMargin)

sql("truncate table ods.supplier")
sql(
"""
|insert overwrite table ods.supplier partition (etldate='20220906')
|select * from mysql_supplier
|""".stripMargin)

sql(" truncate table ods.orders")

/**
* 要求只取某年某月某日及之后的数据(包括某年某月某日)
* ,根据ORDERS表中ORDERKEY作为增量字段(提示:对比MySQL和Hive中的表的ORDERKEY大小),
* 只将新增的数据抽入,字段类型不变,同时添加动态分区,分区字段类型为String,
* 且值为ORDERDATE字段的内容(ORDERDATE的格式为yyyy-MM-dd,分区字段格式为yyyyMMdd)
*/
sql(
"""
|insert overwrite table ods.orders partition (etldate)
|select a.*,date_format(orderdate,'yyyyMMdd') from mysql_orders a
|where
|regexp_replace(a.orderdate,"-","") >= '19960909'
|and
|orderkey not in (select orderkey from ods.orders)
|""".stripMargin)

sql("truncate table ods.lineitem")

/**
* 根据LINEITEM表中orderkey作为增量字段,只将新增的数据抽入
*/
sql(
"""
|insert overwrite table ods.lineitem partition (etldate='20220906')
|select * from mysql_lineitem
|where orderkey not in (select orderkey from ods.lineitem)
|""".stripMargin)

//创建连接mysql表格函数 --连接 参数
//1.SparkSession提供连接 dbtable 连接到的表格 temporaryName创建临时表的名字
def ConnectMysql(spark:SparkSession,dbtable:String,temporaryName:String):Unit ={
val URL="jdbc:mysql://192.168.23.89:3306/shtd_store"
val driver="com.mysql.jdbc.Driver"
val user="root"
val password="123456"
spark.read.format("jdbc")
.option("url",URL)
. option("driver",driver)
.option("user",user)
.option("password",password)
.option("dbtable",dbtable).load().createTempView(temporaryName)
}
}
}

数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package BIGDATA.subject4Test

import org.apache.spark.sql.SparkSession

object DataChouQu {
def main(args: Array[String]): Unit = {
val builder: SparkSession.Builder = SparkSession.builder().master("local[*]")
val spark: SparkSession = builder.appName("DataChouQu")
.enableHiveSupport() //config 设置动态分区的参数 开启动态分区支持
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
import spark.sql

//连接到数据库 并给表格创建临时表名
ConnectMysql(spark,"CUSTOMER","mysql_customer")
ConnectMysql(spark,"NATION","mysql_nation")
ConnectMysql(spark,"PART","mysql_part")
ConnectMysql(spark,"PARTSUPP","mysql_partsupp")
ConnectMysql(spark,"REGION","mysql_region")
ConnectMysql(spark,"SUPPLIER","mysql_supplier")
ConnectMysql(spark,"ORDERS","mysql_orders")
ConnectMysql(spark,"LINEITEM","mysql_lineitem")

sql("truncate table ods.customer")
sql(
"""
|insert overwrite table ods.customer partition (etldate='20220906')
|select * from mysql_customer
|""".stripMargin)

sql("truncate table ods.nation")
sql(
"""
|insert overwrite table ods.nation partition (etldate='20220906')
|select * from mysql_nation
|""".stripMargin)

sql("truncate table ods.part")
sql(
"""
|insert overwrite table ods.part partition (etldate='20220906')
|select * from mysql_part
|""".stripMargin)

sql("truncate table ods.partsupp")
sql(
"""
|insert overwrite table ods.partsupp partition (etldate='20220906')
|select * from mysql_partsupp
|""".stripMargin)

sql("truncate table ods.region")
sql(
"""
|insert overwrite table ods.region partition (etldate='20220906')
|select * from mysql_region
|""".stripMargin)

sql("truncate table ods.supplier")
sql(
"""
|insert overwrite table ods.supplier partition (etldate='20220906')
|select * from mysql_supplier
|""".stripMargin)

sql(" truncate table ods.orders")

/**
* 要求只取某年某月某日及之后的数据(包括某年某月某日)
* ,根据ORDERS表中ORDERKEY作为增量字段(提示:对比MySQL和Hive中的表的ORDERKEY大小),
* 只将新增的数据抽入,字段类型不变,同时添加动态分区,分区字段类型为String,
* 且值为ORDERDATE字段的内容(ORDERDATE的格式为yyyy-MM-dd,分区字段格式为yyyyMMdd)
*/
sql(
"""
|insert overwrite table ods.orders partition (etldate)
|select a.*,date_format(orderdate,'yyyyMMdd') from mysql_orders a
|where
|regexp_replace(a.orderdate,"-","") >= '19960909'
|and
|orderkey not in (select orderkey from ods.orders)
|""".stripMargin)

sql("truncate table ods.lineitem")

/**
* 根据LINEITEM表中orderkey作为增量字段,只将新增的数据抽入
*/
sql(
"""
|insert overwrite table ods.lineitem partition (etldate='20220906')
|select * from mysql_lineitem
|where orderkey not in (select orderkey from ods.lineitem)
|""".stripMargin)

//创建连接mysql表格函数 --连接 参数
//1.SparkSession提供连接 dbtable 连接到的表格 temporaryName创建临时表的名字
def ConnectMysql(spark:SparkSession,dbtable:String,temporaryName:String):Unit ={
val URL="jdbc:mysql://192.168.23.89:3306/shtd_store"
val driver="com.mysql.jdbc.Driver"
val user="root"
val password="123456"
spark.read.format("jdbc")
.option("url",URL)
. option("driver",driver)
.option("user",user)
.option("password",password)
.option("dbtable",dbtable).load().createTempView(temporaryName)
}
}
}

给可视化接口提供sql文件

查询\1. 表二:以订单表为主表, 显示各订单主要信息如下 :

地区, 国家, 客户名, 采购订单消费额, 供应商名称, 时间

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
select region.name as region_name,
nation.name as nation_name,
cust.name as cust_name,
--销售额
t1.sumprice,
supplier.name as supplier_name,
fo.order_date
from dwd.fact_orders fo
left join dwd.dim_customer cust on cust.cust_key=fo.cust_key
left join dwd.dim_nation nation on cust.nation_key=nation.nation_key
left join dwd.dim_region region on nation.region_key=region.region_key
join dwd.fact_lineitem lineitem on lineitem.order_key=fo.order_key
left join ods.supplier supplier on supplier.suppkey=lineitem.supp_key
join --总订单额
(select cust2.cust_key,sum(total_price) as sumprice from dwd.fact_orders fo2
left join dwd.dim_customer cust2 on cust2.cust_key= fo2.cust_key
group by cust2.cust_key) t1
on t1.cust_key=fo.cust_key

将mysql表或者库导出 在mysql外面输入命令

img

4. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

看起来说是yyyyMMdd,仔细看了下,发现忘记给他加’'引号了,后面date_format参数应该是字符串

image-20220907090551269

image-20220907090239599

5.扩展学习部分

常用的hive函数总结 要背的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* from_unixtime 可以把时间戳格式的时间,转化为年月日时分秒格式的时间。 默认是2019-05-02 21:19:59
* 第二个参数是设置格式的
* unix_timestamp(concat(orderdate," 00:00:00"))
* concat 把原本orderdate列 1996-01-02 这种格式的 拼接 变成1996-01-02 00:00:00
* unix_timestamp将拼接好的格式转换为时间戳
*
*
* !!!!!!!
* from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') hive版本已经弃用了 会报错改成
* date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss')
* date_format()修改日期格式
* select date_format('2015-04-08 10:10:01', 'yyyy-MM');
* 2015-04
* current_timestamp 直接获取当前日期时间 2022-08-31 10:55:37.551 到毫秒的
*/

regexp_replace

替换字符的,三个参数

1
2
3
--使用
regexp_replace(orderdate,"-","")
将orderdate字段中的-变成空\

date_format

日期函数将日期格式化

1
2
select date_format('2019-12-12','yyyyMMdd');
--20191212 注意点是第一个参数 必须是yyyy-MM-dd HH:mm:ss格式的

from_unixtime

转换时间戳为指定格式的时间

1
2
3
select from_unixtime(cast(1567896035000 as int),'yyyy-MM-dd');
--2019-09-08
默认有毫秒

current_timestamp

1
2
3
4
5
6
--获取当前的时间
select current_timestamp;
--2022-09-07 15:58:36.102
--可以用date_format转换格式
select date_format(current_timestamp,'yyyy-MM-dd HH:mm:ss');
--2022-09-07 15:58:36

unix_timestamp

1
2
3
4
5
6
--空参 得到当前的时间戳
--如果参数date满足yyyy-MM-dd HH:mm:ss形式,则可以直接unix_timestamp(string date) 得到参数对应的时间戳
--如果参数date不满足yyyy-MM-dd HH:mm:ss形式,则我们需要指定date的形式,在进行转换
select unix_timestamp('2009-03-20') --1237507200
select unix_timestamp('2009-03-20 00:00:00', 'yyyy-MM-dd HH:mm:ss') --1237507200
select unix_timestamp('2009-03-20 00:00:01', 'yyyy-MM-dd HH:mm:ss') --1237507201

truncate table

清除表内的数据 结构保存的 自己练习最好每次都加上 防止打的jar包报错 但是之前都已经存在了 数据会重复

alter table ods.orders partition (etldate < ‘19980731’)

spark-submit

1
2
3
4
5
6
7
8
spark-submit \
--master spark://192.168.23.89:7077 \
--executor-memory 2g \
--total-executor-cores 10 \
--driver-memory 4G \
--class BIGDATA.subject4Test.DataChouQu \
--name DataChouQu \
/opt/xxx.jar

6.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

这两天对任务书做法,做了个总结的流程,以及常用方法的总结。先是大数据比赛的赛项规程,自己负责的部分,占了比赛很大一部分的分值。为了保证时间和正确率,所以去总结了这几天做任务书经常用到的hive函数,要背下来的。到时候比赛是都没有的。然后晚上帮可视化的提供了接口数据,也知道了mysql怎么导出来的命令。

9.8 省赛模拟卷

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

模拟卷

3.根据概述分章节描述

离线数据处理

数据抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package SubjectTwo

import org.apache.spark.sql.SparkSession

object DataChouQu2 {
def main(args: Array[String]): Unit = {
val builder: SparkSession.Builder = SparkSession.builder().master("local[*]")
val spark: SparkSession = builder.appName("DataChouQu2").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
spark.sql("set spark.sql.storeAssignmentPolicy=LEGACY")
val date="20220907"//静态分区日期


//mysql表明绑定临时表
localMysqlByMyself(spark,"CUSTOMER","mysql_customer")
localMysqlByMyself(spark,"NATION","mysql_nation")
localMysqlByMyself(spark,"PART","mysql_part")
localMysqlByMyself(spark,"PARTSUPP","mysql_partsupp")
localMysqlByMyself(spark,"REGION","mysql_region")
localMysqlByMyself(spark,"SUPPLIER","mysql_supplier")
localMysqlByMyself(spark,"ORDERS","mysql_orders")
localMysqlByMyself(spark,"LINEITEM","mysql_lineitem")

//hive全量抽取的数据
hiveT6(spark,"customer")
hiveT6(spark,"nation")
hiveT6(spark,"part")
hiveT6(spark,"partsupp")
hiveT6(spark,"region")
hiveT6(spark,"supplier")

//增量抽取的两条
//orders 取19970802以后的 动态分区
spark.sql(
"""
|insert overwrite table ods.orders partition (etldate)
|select o.*,
|date_format(orderdate,'yyyyMMdd')
|from mysql_orders o
|where regexp_replace(o.orderdate,"-","") >= '19970802'
|and
|orderkey not in
|(select orderkey from ods.orders)
|""".stripMargin)

// //orderkey 增量字段 静态分区
spark.sql(
s"""
|insert overwrite table ods.lineitem partition (etldate=$date)
|select * from mysql_lineitem
|where orderkey not in
|(select orderkey from ods.lineitem)
|""".stripMargin)

def hiveT6(spark: SparkSession,hiveName:String): Unit ={
spark.sql(
s"""
|insert overwrite table ods.$hiveName partition (etldate=$date)
|select * from mysql_$hiveName
|""".stripMargin)
}
def localMysqlByMyself(Spark:SparkSession,mysqlName:String,tempName:String): Unit ={
val url="jdbc:mysql://192.168.23.89:3306/shtd_store"
val driver="com.mysql.jdbc.Driver"
val user="root"
val password="123456"

Spark.read.format("jdbc")
.option("url",url)
.option("driver",driver)
.option("user",user)
.option("password",password)
.option("dbtable",mysqlName)
.load().createTempView(tempName)
}
spark.stop()
}
}

数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
package SubjectTwo

import org.apache.spark.sql.SparkSession

/**
* 数据清洗
*/
object DataClear2 {
def main(args: Array[String]): Unit = {

val builder: SparkSession.Builder = SparkSession.builder().master("local[*]")
val spark: SparkSession = builder.appName("DataChouQu2").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
spark.sql("set spark.sql.storeAssignmentPolicy=LEGACY")
import spark.sql
val date="20220907" //分区


sql("truncate table dwd.dim_customer")
sql(
s"""
|insert overwrite table dwd.dim_customer partition (etldate=$date)
|select
|custkey,name,address,nationkey,phone,acctbal,mktsegment,comment,
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.customer
|""".stripMargin)

sql("truncate table dwd.dim_part")
sql(
s"""
|insert overwrite table dwd.dim_part partition (etldate=$date)
|select
|partkey,name,mfgr,brand,type,size,container,retailprice,comment,
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.part
|""".stripMargin)

sql("truncate table dwd.dim_nation")
sql(
s"""
|insert overwrite table dwd.dim_nation partition (etldate=$date)
|select
|nationkey,name,regionkey,comment,
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.nation
|""".stripMargin)

sql("truncate table dwd.dim_region")
sql(
s"""
|insert overwrite table dwd.dim_region partition (etldate=$date)
|select
|regionkey,name,comment,
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.region
|""".stripMargin)


sql("truncate table dwd.fact_orders")
sql(
"""
|insert overwrite table dwd.fact_orders partition (etldate)
|select
|orderkey,custkey,orderstatus,totalprice,
|from_unixtime(unix_timestamp(concat(orderdate," 00:00:00")),'yyyy-MM-dd HH:mm:ss'),
|orderpriority,clerk,shippriority,comment,
|'user1',from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss"),
|'user1',from_unixtime(unix_timestamp(),"yyyy-MM-dd HH:mm:ss"),
|etldate
|from ods.orders
|""".stripMargin)
// alter table ods.orders drop partition (etldate < '19980731')


sql("truncate table dwd.fact_lineitem")
sql(
s"""
|insert overwrite table dwd.fact_lineitem partition (etldate=$date)
|select
|orderkey,partkey,suppkey,lienumber,quantity,
|extendedprice,discount,tax,returnflag,linestatus,
|from_unixtime(unix_timestamp(concat(shipdate," 00:00:00")),'yyyy-MM-dd HH:mm:ss'),
|from_unixtime(unix_timestamp(concat(commentdate," 00:00:00")),'yyyy-MM-dd HH:mm:ss'),
|from_unixtime(unix_timestamp(concat(receiptdate," 00:00:00")),'yyyy-MM-dd HH:mm:ss'),
|shipstruct,shipmode,comment,
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',from_unixtime(unix_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.lineitem
|""".stripMargin)

spark.stop()
}
}

指标计算 花费时间的点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package SubjectTwo

import org.apache.spark.sql.SparkSession

object DataMath2 {
def main(args: Array[String]): Unit = {
val builder: SparkSession.Builder = SparkSession.builder().master("local[*]")
val spark: SparkSession = builder.appName("DataChouQu2").enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
import spark.sql
loadMysql(spark,"nationeverymonth","mysql1")
loadMysql(spark,"nationavgcmp","mysql2")

sql("truncate table dws.customer_consumption_day_aggr")
sql(
"""
|with t1 as (
|select fo.cust_key as cust_key,
|cust.name as name,
|total_price,
|date_format(to_date(order_date),'yyyy') as year,
|date_format(to_date(order_date),'MM') as month,
|date_format(to_date(order_date),'dd') as day
|from dwd.fact_orders fo
|left join dwd.dim_customer cust on cust.cust_key=fo.cust_key)
|select cust_key,max(name),sum(total_price),count(1),year,month,day
|from t1
|group by t1.cust_key,name,year,month,day
|""".stripMargin).coalesce(50).createTempView("sub1")
sql(
"""
|insert overwrite table dws.customer_consumption_day_aggr
|select * from sub1
|""".stripMargin)

sql(
"""
|with t1 as (
|select cust_key,
|sum(totalconsumption) as sumprice,
|count(1) as countprice,
|year,month from dws.customer_consumption_day_aggr
|group by cust_key,year,month),
|t2 as (
|select
|ccda.cust_key,
|ccda.cust_name,
|nation.nation_key,
|nation.name as nation_name,
|region.region_key,
|region.name as region_name,
|t1.sumprice as totalconsumption,
|t1.countprice as totalorder,
|t1.year,t1.month
|from dws.customer_consumption_day_aggr ccda
|left join dwd.dim_customer cust on ccda.cust_key=cust.cust_key
|left join dwd.dim_nation nation on cust.nation_key=nation.nation_key
|left join dwd.dim_region region on nation.region_key=region.region_key
|join t1 on t1.cust_key=ccda.cust_key
|group by ccda.cust_key,
|ccda.cust_name,
|nation.nation_key,
|nation.name,
|region.region_key,
|region.name ,
|t1.sumprice,
|t1.countprice,
|t1.year,t1.month)
|select t2.*,rank() over(partition by t2.cust_key order by totalconsumption,totalorder,month desc)
|from t2
|""".stripMargin).coalesce(50).createTempView("sub2")

sql(
"""
|insert overwrite table mysql1
|select * from sub2
|""".stripMargin)

sql(
"""
|-- a 获得每个消费人的总消费额 以及信息
|with a as (select fo.cust_key,
|max(nation.nation_key) as nation_key,
|max(nation.name) as name ,
|sum(total_price) as total_price from dwd.fact_orders fo
|left join dwd.dim_customer cust on cust.cust_key=fo.cust_key
|left join dwd.dim_nation nation on cust.nation_key=nation.nation_key
|group by fo.cust_key),
|--b 每个国家 人均消费
|b as (select nation_key as nationkey ,max(name) as name ,sum(total_price) as sumprice,count(1) as countperson,
|round(avg(total_price),2) as avgnationperson
|from a group by nation_key)
|select nationkey,name,round(avgnationperson,2),306980.73,(
|case when avgnationperson > 306980.73 then '+'
| when avgnationperson = 306980.73 then '='
| when avgnationperson < 306980.73 then '-' end)
|from b
|""".stripMargin).coalesce(50).createTempView("sub3")

sql(
"""
|insert overwrite table mysql2
|select * from sub3
|""".stripMargin)



def loadMysql(Spark:SparkSession,totalName:String,tempName:String): Unit ={
val url="jdbc:mysql://192.168.23.89/shtd_store"
val user="root"
val password="123456"
val driver="com.mysql.jdbc.Driver"
Spark.read.format("jdbc")
.option("url",url)
.option("user",user)
.option("password",password)
.option("driver",driver)
.option("dbtable",totalName).load().createTempView(tempName)
}
}
}

4.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

今天第一次半正式的,省赛模拟,早上八点开始,也算是在省赛要求的时间内,把题目做完了,刚刚好踩着点,其实过程中是出现过很多问题的,在数据清洗的时候,有一步是把数据清洗之前的orders表,删除分区。但是我在还没运行清洗的jar包的时候,就先用hive把分区删了,导致我清洗完,fact_orders表数据是缺失的。任务书有一步,查看分区数量的时候,才发现做题顺序错了,又回头把抽取的jar包重新运行,忘记删表了,导致后面又清洗的时候,数据又是有问题的。这样整了很多次,后面干脆把平台的虚拟机重置了,然后才做第三题。心态也有些影响了,觉得题目还是应该多练,才能在比赛避免一些问题。

9.12 解决干扰自己很久的问题

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

任务四 指标计算 单行和多行数据并存

3.根据概述分章节描述

问题点 困扰我和李昊的原因

t1表查询结果 这是一个多行多列的数据

2022-09-12 19-35-25屏幕截图

题目要求 第四列要是全球的中位数

聚合函数查询结果如下

2022-09-12 19-39-59屏幕截图

只有一行

两个表行数不匹配 普通没法成下面的效果 一定会有笛卡尔积的出现 但是又是第一行 我们的目的就是要有笛卡尔积的存在

2022-09-12 19-46-47屏幕截图

正确答案代码如下

我们需要开启笛卡尔积的支持 否则会报错

1
2
3
4
select t1.*,t2.* from t1,t2
--#这种来自两个表的数据 结合在一起写我也是今天无意间发现的 使用的时候没有报错
#我推测这种方法 只能两个表要查的行数相同的情况 不然会有笛卡尔积
#打开笛卡尔积 支持就好了

然后spark提交jar包的时候又出现了笛卡尔积的问题 下面为BUG解释了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
   //percentile_approx(col,p) p=0.5 col为中位数的列 求中位数
//percentile_approx第一个参数只能是int
//hive (default)> set hive.strict.checks.cartesian.product=true;
//hive (default)> set hive.mapred.mode=nonstrict;
sql("set hive.strict.checks.cartesian.product=true") //hive开启笛卡尔积支持
sql("set hive.mapred.mode=nonstrict") //开启非严格模式
sql(
"""
|with t1 as (
|select nation.nation_key as nation_key,max(nation.name) as name ,
|round(percentile_approx(total_price,0.5),2) as per
|from dwd.fact_orders fo
|left join dwd.dim_customer cust on cust.cust_key=fo.cust_key
|left join dwd.dim_nation nation on nation.nation_key=cust.nation_key
|group by nation.nation_key),
|t2 as (
|select round(percentile_approx(per,0.5),2) as per1
|from t1
|)
|select t1.*,t2.* from t1,t2
|""".stripMargin).coalesce(50).createTempView("hive2")
mysqlExtract(spark,"nationmedian","mysql2")

spark.conf.set("spark.sql.crossJoin.enabled", "true")//spark开启笛卡尔积支持
sql(
"""
|insert overwrite table mysql2
|select * from hive2
|""".stripMargin)
spark.stop()

4. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

use the CROSS JOIN syntax to allow cartesian products between these relations;;

上面是报错信息 翻译是 使用交叉连接 CROSS JOIN 语法允许这些关系之间的笛卡尔积;

CSDN搜索说是spark默认是不支持笛卡尔积的情况的,所以需要打开 在insert 导入mysql之前 加入如下语句

1
spark.conf.set("spark.sql.crossJoin.enabled","true")

6.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

开启支持这些命令可能有点长不太好记,花点时间去记一下,而且我做三四个任务下来,经常会遇到这种情况,我都是算好,直接填入字符串的,但是任务书要求截sql语句,如果看到是一个字符串,不太好。今天总算解决了,后面一些任务需要一列的数据要求一样的,就可以达成了。

9.13 省赛任务1

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

省赛任务一 完成

3.根据概述分章节描述

数据抽取

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package BIGDATA.subject1

import org.apache.spark.sql.SparkSession

object DataExtract {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("DataExtract")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
val date="20220912"

import spark.sql
//mysql
mysqlExtract(spark,"CUSTOMER","mysql_customer")
mysqlExtract(spark,"NATION","mysql_nation")
mysqlExtract(spark,"PART","mysql_part")
mysqlExtract(spark,"PARTSUPP","mysql_partsupp")
mysqlExtract(spark,"REGION","mysql_region")
mysqlExtract(spark,"SUPPLIER","mysql_supplier")
mysqlExtract(spark,"LINEITEM","mysql_lineitem")

//hive sql 6
hiveExtract(spark,"customer")
hiveExtract(spark,"nation")
hiveExtract(spark,"part")
hiveExtract(spark,"partsupp")
hiveExtract(spark,"region")
hiveExtract(spark,"supplier")
spark.sql(
s"""
|insert overwrite table ods.lineitem partition (etldate=$date)
|select l.* from mysql_lineitem l
|where
|l.orderkey not in
|(select orderkey from ods.lineitem)
|""".stripMargin)
spark.stop()


def hiveExtract(spark:SparkSession,hiveName:String): Unit ={
spark.sql(
s"""
|insert overwrite table ods.$hiveName partition (etldate=$date)
|select * from mysql_$hiveName
|""".stripMargin)
}

def mysqlExtract(spark:SparkSession,mysqlName:String,tempName:String): Unit ={
val url="jdbc:mysql://192.168.23.89/shtd_store"
val driver="com.mysql.jdbc.Driver"
val user="root"
val password="123456"
spark.read.format("jdbc")
.option("url",url)
.option("driver",driver)
.option("user",user)
.option("password",password)
.option("dbtable",mysqlName).load().createTempView(tempName)
}
}
}

orders表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package BIGDATA.subject1

import org.apache.spark.sql.SparkSession

object DataExtractOrders {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("DataExtract")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
import spark.sql
mysqlExtract(spark,"ORDERS","mysql_orders")
//19970913
sql(
s"""
|insert overwrite table ods.orders partition (etldate)
|select o.*,
|date_format(orderdate,'yyyyMMdd')
|from mysql_orders o
|where
|o.orderkey not in
|(select orderkey from ods.orders)
|and
|regexp_replace(orderdate,"-","") >= '19970913'
|""".stripMargin)
spark.stop()

def mysqlExtract(spark: SparkSession, mysqlName: String, tempName: String): Unit = {
val url = "jdbc:mysql://192.168.23.89/shtd_store"
val driver = "com.mysql.jdbc.Driver"
val user = "root"
val password = "123456"
spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createTempView(tempName)
}
}
}

alter table ods.orders drop partition (etldate>0) 删除所有分区以及该分区的数据

数据清洗

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
package BIGDATA.subject1

import org.apache.spark.sql.SparkSession

object DataClear {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("DataExtract")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
val date = "20220912"
import spark.sql
/*
custkey string
name string
address string
nationkey string
phone string
acctbal string
mktsegment string
comment string
etldate string
*/
sql(
s"""
|insert overwrite table dwd.dim_customer partition (etldate=$date)
|select custkey,name,address,nationkey,phone,acctbal,mktsegment,comment,
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.customer
|""".stripMargin)
/*
partkey string
name string
mfgr string
brand string
type string
size string
container string
retailprice string
comment string
etldate string
*/
sql(
s"""
|insert overwrite table dwd.dim_part partition (etldate=$date)
|select partkey,name,mfgr,brand,
|type,size,container,retailprice,comment,
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.part
|""".stripMargin)
/*
nationkey string
name string
regionkey string
comment string
etldate string
*/
sql(
s"""
|insert overwrite table dwd.dim_nation partition (etldate=$date)
|select nationkey,name,regionkey,comment,
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.nation
|""".stripMargin)
sql(
s"""
|insert overwrite table dwd.dim_region partition (etldate=$date)
|select regionkey,name,comment,
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss')
|from ods.region
|""".stripMargin)
/*
orderkey string
custkey string
orderstatus string
totalprice string
orderdate string
orderpriority string
clerk string
shippriority string
comment string
etldate string
*/
sql(
"""
|insert overwrite table dwd.fact_orders partition (etldate)
|select orderkey,custkey,orderstatus,totalprice,
|date_format(orderdate,'yyyy-MM-dd HH:mm:ss'),
|orderpriority,clerk,shippriority,comment,
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),etldate
|from ods.orders
|""".stripMargin)
/*
orderkey string
partkey string
suppkey string
lienumber string
quantity string
extendedprice string
discount string
tax string
returnflag string
linestatus string
shipdate string
commentdate string
receiptdate string
shipstruct string
shipmode string
comment string
etldate string
*/
sql(
s"""
|insert overwrite table dwd.fact_lineitem partition (etldate)
|select orderkey,partkey,suppkey,lienumber,quantity,extendedprice,
|discount,tax,returnflag,linestatus,
|date_format(shipdate,'yyyy-MM-dd HH:mm:ss'),
|date_format(commentdate,'yyyy-MM-dd HH:mm:ss'),
|date_format(receiptdate,'yyyy-MM-dd HH:mm:ss'),
|shipstruct,shipmode,comment,
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),etldate
|from ods.lineitem
|""".stripMargin)

spark.stop()
}
}

orders表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package BIGDATA.subject1

import org.apache.spark.sql.SparkSession

object DataClearOrders {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("DataExtract")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()

spark.sql(
"""
|insert overwrite table dwd.fact_orders partition (etldate)
|select orderkey,custkey,orderstatus,totalprice,
|date_format(orderdate,'yyyy-MM-dd HH:mm:ss'),
|orderpriority,clerk,shippriority,comment,
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),
|'user1',date_format(current_timestamp(),'yyyy-MM-dd HH:mm:ss'),etldate
|from ods.orders
|""".stripMargin)
spark.stop()
}
}

指标计算1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package BIGDATA.subject1

import org.apache.spark.sql.SparkSession

object DataMath1 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("DataExtract")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()

spark.sql(
"""
|WITH t1 as (
|select c.cust_key as custkey,
|n.nation_key as nationkey,
|n.name as nationname,
|r.region_key as regionkey,
|r.name as regionname,
|total_price,
|date_format(order_date,'yyyy') as year,
|date_format(order_date,'MM') as month
|from dwd.fact_orders fo
|left join dwd.dim_customer c on fo.cust_key=c.cust_key
|left join dwd.dim_nation n on c.nation_key=n.nation_key
|left join dwd.dim_region r on n.region_key=r.region_key)
|select nationkey,max(nationname),max(regionkey),max(regionname),
|sum(total_price),count(*),year,month
|from t1
|group by nationkey,year,month
|""".stripMargin).coalesce(50).createTempView("hive1")
mysqlExtract(spark,"nationeverymonth","mysql1")
spark.sql(
"""
|insert overwrite table mysql1
|select * from hive1
|""".stripMargin)

def mysqlExtract(spark: SparkSession, mysqlName: String, tempName: String): Unit = {
val url = "jdbc:mysql://192.168.23.89/shtd_store"
val driver = "com.mysql.jdbc.Driver"
val user = "root"
val password = "123456"
spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createTempView(tempName)
}
}
}

指标计算2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package BIGDATA.subject1

import org.apache.spark.sql.SparkSession

object DataMath2 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("DataExtract")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
import spark.sql

/*
hive (default)> set hive.strict.checks.cartesian.product=true;
hive (default)> set hive.mapred.mode=nonstrict;
*/
sql("set hive.strict.checks.cartesian.product=true")
sql("set hive.mapred.mode=nonstrict")
sql(
"""
|with t1 as (
|select c.cust_key as custkey,
|n.nation_key as nationkey,
|n.name as nationname,
|total_price
|from dwd.fact_orders fo
|left join dwd.dim_customer c on fo.cust_key=c.cust_key
|left join dwd.dim_nation n on c.nation_key=n.nation_key),
|t2 as (
|select custkey,max(nationkey) as nationkey, max(nationname) as nationname,
|sum(total_price) as sumprice
|from t1
|group by t1.custkey),
|t3 as (
|select nationkey,
|max(nationname) as nationname,
|round(avg(sumprice),2) as avgprice
|from t2
|group by nationkey),
|t4 as (
|select round(avg(sumprice),2) as allavgprice from t2)
|select t3.*,t4.* from t3,t4
|""".stripMargin).coalesce(50).createTempView("hive4")

mysqlExtract(spark,"nationavgcmp","nationavgcmp")
spark.conf.set("spark.sql.crossJoin.enabled","true")
sql(
"""
|insert overwrite table nationavgcmp
|select h.*,
|(case
|when h.avgprice > h.allavgprice then '+'
|when h.avgprice = h.allavgprice then '='
|when h.avgprice < h.allavgprice then '-' end
|)
|from hive4 h
|""".stripMargin)


def mysqlExtract(spark: SparkSession, mysqlName: String, tempName: String): Unit = {
val url = "jdbc:mysql://192.168.23.89/shtd_store"
val driver = "com.mysql.jdbc.Driver"
val user = "root"
val password = "123456"
spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createTempView(tempName)
}
}
}

指标计算3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package BIGDATA.subject1

import org.apache.spark.sql.SparkSession

object DataMath3 {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local[*]")
.appName("DataExtract")
.enableHiveSupport()
.config("hive.exec.dynamic.partition", "true")
.config("hive.exec.dynamic.partition.mode", "nonstrict")
.config("hive.exec.max.dynamic.partitions", "100000")
.config("hive.exec.max.dynamic.partitions.pernode", "100000")
.getOrCreate()
import spark.sql

sql(
"""
|with t1 as (
|select cust_key,
|cast(date_format(order_date,'yyyyMM') as int) as month,
|sum(total_price) as total_price,
|count(1) as total_order
|from dwd.fact_orders
|group by cust_key,cast(date_format(order_date,'yyyyMM') as int)
|),
|t2 as (
|select
|a.cust_key,
|concat(a.month,'_',b.month) as month,
|a.total_price + b.total_price as total_price,
|a.total_order + b.total_order as total_order
|from t1 a
|left join t1 b
| on --判断连续两个月增长
|a.cust_key=b.cust_key
|where
|a.month + 1 = b.month
|and
|a.total_price < b.total_price)
|select c.cust_key,c.name,month,total_price,total_order
|from t2
|left join dwd.dim_customer c on c.cust_key=t2.cust_key
|""".stripMargin).coalesce(50).createTempView("hiveOrders")


mysqlExtract(spark,"usercontinueorder","order1")

sql(
"""
|insert overwrite table order1
|select * from hiveOrders
|""".stripMargin)

def mysqlExtract(spark: SparkSession, mysqlName: String, tempName: String): Unit = {
val url = "jdbc:mysql://192.168.23.89/shtd_store"
val driver = "com.mysql.jdbc.Driver"
val user = "root"
val password = "123456"
spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", mysqlName).load().createTempView(tempName)
}
}
}

4. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

暂无出bug的重点

6.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

前面数据抽取 和 清洗 基本很顺利 只出了一点小问题,影响不大。因为orders表 经常会出现问题,所以现在自己改变了策略,清洗和抽取的时候把orders单独取出来,因为它动态分区,较麻烦。经常会打jar包的时候到orders报错,但是前面的表已经导入了。date_format()中第二个参数 yyyyMMMdd多打了一个M 就变成了英文的月份,之前清洗表的代码发现分区信息还在,发现我之前删除表数据的方法,只能删(除表内信息,但是分区是还在的,所以查看分区的时候,错误的分区也还显示,只能用alter table ods.orders drop partition (etldate>0 )把分区删干净,重新打了一下jar包,浪费了自己20来分钟。最后快做完花了2个半小时,还剩下最后一题的最后一小题,吃完饭回来也顺利搞定了,今天指标计算,就又碰到了和昨天差不多的问题,刚好昨天已经解决了,以后做类似的问题题目,也不会怎么犯错卡住了。指标计算的2和3花了点时间。总体今天学习状态不错。

9.14-~~~ 新任务

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

3.根据概述分章节描述

greatest

greatest(a,b,c,d) 返回参数中的最大值

coalesce

coalesce(null,a,b)返回a

返回 第一个不为null的值

image-20220918132338009

想得到这样的表

多行变一行

有公共列

在这里插入图片描述

1).先使用collect_set函数使多行成为一行数组

1
2
3
hive> select name,collect_set(subject)  res1 ,
> collect_set(cast(score as string)) res2
> from stu group by name;

在这里插入图片描述

2). 加上concat_ws函数可以取出数组中的每一个元素的值在用分隔符连接

1
2
3
hive> select name,concat_ws('@',collect_set(subject))  res1 ,
> concat_ws('-',collect_set(cast(score as string))) res2
> from stu group by name;

在这里插入图片描述

1
2
3
4
5
hive>select substr('qwertyuio',0,4)
--qwer、
对字符串的
第一个参数是从哪个位置开始
第二个参数是取几位数

4. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

hive导入mysql时候,中文显示???问题

编码格式的问题

解决方法:

1
2
3
4
5
6
7
8
9
#//查看数据库编码格式
show create database shtd_result
--#修改为utf8
alter database shtd_result default character set utf8;

#查看表的编码格式
show create table regiontopthree
#修改为utf-8
alter table regiontopthree default character set utf8;

在scala中创建mysql临时表的时候url需要添加编码格式

1
jdbc:mysql://192.168.23.89/shtd_result?useUnicode=true&characterEncoding=utf-8

6.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

今天做的是7月的新数据比之前的数据逻辑性强了很多,前面两题也难了点,不算是自己独立完成,CSDN搜了几个方法的使用,然后解决了之前的查询中文的情况。磕磕绊绊差不多一天才做了一个任务书,后面应该会好很多,题目表达的和自己理解的不知道一不一样。