8.1 Hive压缩和存储 以及优化

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.标题

1
2
第九章 压缩和存储
第十章 hive优化

3.所学内容概述

hadoop压缩配置

压缩参数配置

开启Map输出阶段压缩 利用MR

企业调优

4.根据概述分章节描述

个人理解

压缩和解压和我们平常window使用的差不多,虽然是在hive中的压缩和解压 但是底层还是利用hadoop中的MR,所以压缩编码也是MR所支持的。

有哪些我截图出来

image-20220904125544572

image-20220904125555816

根据性能比较,可以看出 LZO的性能远远领先于其他两个

在实际的项目开发当中,hive 表的数据存储格式一般选择:orc 或 parquet。压缩方式一般选择 snappy,lzo。

开启Map输出阶段压缩

因为map输出阶段压缩中job,数据传输量很大,所以我们根据效率需要适量减少,在job中工作的是map和reduce task间数据传输。利用hive代码开启

分4步 最后一步是查询

1
2
3
4
5
6
7
8
9
--1.开启hive中间传输数据压缩功能
hive (default)>set hive.exec.compress.intermediate=true;
--2.开启mapreduce中map的输出压缩功能
hive (default)>set mapreduce.map.output.compress=true;
--3.设置mapreduce中map输出数据的压缩方式
hive (default)>set mapreduce.map.output.compress.codec=
org.apache.hadoop.io.compress.SnappyCodec;
--4.执行查询语句
hive (default)> select count(ename) name from emp;

开启Reduce输出阶段压缩

如果hive已经把数据导入的表中没这个时候怎么压缩,就需要用Reduce

hive.exec.compress.output在hive中 默认设置为false 意思就是输出是纯文本文件

设置为true 就是压缩文件

也是四条命令 最后一条查询 可以查询输出文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
--(1)开启 hive 最终输出数据压缩功能
hive (default)>set hive.exec.compress.output=true;
--(2)开启 mapreduce 最终输出数据压缩
hive (default)>set mapreduce.output.fileoutputformat.compress=true;
--(3)设置 mapreduce 最终数据输出压缩方式
hive (default)> set mapreduce.output.fileoutputformat.compress.codec =
org.apache.hadoop.io.compress.SnappyCodec;
--(4)设置 mapreduce 最终数据输出压缩为块压缩
hive (default)> set
mapreduce.output.fileoutputformat.compress.type=BLOCK;
--(5)测试一下输出结果是否是压缩文件
hive (default)> insert overwrite local directory
'/opt/module/data/distribute-result' select * from emp distribute by
deptno sort by empno desc;

企业级调优

在尚硅谷给的文档中有,比较多,而且目前对自己学习用处不大。就不计入笔记了。

5.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

今天的学习任务不是很重,基本算是hive的完结了。 压缩和存储,跟着文档试着整了一下,花费的时间比较多,三个来小时,然后静下心去看了优化的文档,看完,认为对以后hive操作,应该用处挺大的,主要就是提高效率问题,但是有一些麻烦,而且比赛也不会设置这些参数,就先不去纠结了,今天倒是没有什么难点,明天试着去做hive题库了。

8.2 Hive复习 sqoop完结

1-头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.标题

1
2
3
4
5
6
7
8
9
10
P101_尚硅谷_Sqoop_课程介绍
P202_尚硅谷_Sqoop_安装
P303_尚硅谷_Sqoop_全部数据导入
P404_尚硅谷_Sqoop_查询导入
P505_尚硅谷_Sqoop_导入指定列
P606_尚硅谷_Sqoop_查询条件导入
P707_尚硅谷_Sqoop_导入数据到Hive
P808_尚硅谷_Sqoop_导入数据到HBase
P909_尚硅谷_Sqoop_导出数据
P1010_尚硅谷_Sqoop_脚本调用

3-所学内容概述

安装sqoop

导入的几种方式

导出数据

脚本调用

根据概述分章节描述

什么是Sqoop

Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库mysql间进行数据的传递,可以将一个关系型数据库Mysql中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。

我理解就是导入导出数据库方便

Sqoop安装

1
2
3
4
5
下载并解压
修改配置文件
拷贝JDBC驱动
验证Sqoop
测试Sqoop是否能够成功连接数据库

导入数据

导入数据就是从非大数据集群 也激素RDBMS 向大数据集群 HDFS Hive Hbase 这些 传输数据 俗称导入

从RDBMS到HDFS

1.全部导入

1
2
3
4
5
6
7
8
9
bin/sqoop import \
--connect jdbc:mysql://hadoop86:3306/company \
--username root \
--password 000000 \
--table staff \
--target-dir /user/company \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t"

2.查询导入

1
2
3
4
5
6
7
8
9
bin/sqoop import \
--connect jdbc:mysql://hadoop86:3306/company \
--username root \
--password 000000 \
--target-dir /user/company \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--query 'select name,sex from staff where id <=1 and $CONDITIONS;'

如果query后使用的是双引号,则$CONDITIONS前必须加转移符,防止shell识别为自己的变量。

3.导入指定列

1
2
3
4
5
6
7
8
9
10
bin/sqoop import \
--connect jdbc:mysql://hadoop86:3306/company \
--username root \
--password 000000 \
--target-dir /user/company \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--columns id,sex \
--table staff

columns中如果涉及到多列,用逗号分隔,分隔时不要添加空格

4.使用sqoop关键字筛选查询导入数据

可以自定义筛选或者对数据进行处理

1
2
3
4
5
6
7
8
9
10
bin/sqoop import \
--connect jdbc:mysql://hadoop86:3306/company \
--username root \
--password 000000 \
--target-dir /user/company \
--delete-target-dir \
--num-mappers 1 \
--fields-terminated-by "\t" \
--table staff \
--where "id=1"

RDBMS到Hive

1
2
3
4
5
6
7
8
9
10
bin/sqoop import \
--connect jdbc:mysql://hadoop102:3306/company \
--username root \
--password 000000 \
--table staff \
--num-mappers 1 \
--hive-import \
--fields-terminated-by "\t" \
--hive-overwrite \
--hive-table staff_hive

该过程分为两步,第一步将数据导入到HDFS,第二步将导入到HDFS的数据迁移到Hive仓库,第一步默认的临时目录是/user/root/表名

RDBMS到Hbase

1
2
3
4
5
6
7
8
9
10
11
12
bin/sqoop import \
--connect jdbc:mysql://hadoop102:3306/company \
--username root \
--password 000000 \
--table company \
--columns "id,name,sex" \
--column-family "info" \
--hbase-create-table \
--hbase-row-key "id" \
--hbase-table "hbase_company" \
--num-mappers 1 \
--split-by id

手动创建HBase表

1
hbase> create 'hbase_company,'info'

HBase查看表scan命令

1
scan 'hbase_company'

导出数据

就和导入反过来了 从大数据集群到非大数据集群传输数据 叫做导出 export 关键字

1
2
3
4
5
6
7
8
bin/sqoop export \
--connect jdbc:mysql://hadoop102:3306/company \
--username root \
--password 000000 \
--table staff \
--num-mappers 1 \
--export-dir /user/hive/warehouse/staff_hive \
--input-fields-terminated-by "\t"

脚本打包

使用opt格式的文件打包sqoop命令 我的理解就是把命令封装 然后调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
vi opt/job_HDFS2RDBMS.opt

export
--connect
jdbc:mysql://hadoop102:3306/company
--username
root
--password
000000
--table
staff
--num-mappers
1
--export-dir
/user/hive/warehouse/staff_hive
--input-fields-terminated-by
"\t"

扩展学习部分

Hive练习

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
--查询平均成绩大于等于60分的同学
--的学生编号和学生姓名和平均成绩
SELECT stu.id,stu.name,AVG(sco.scores)
from student stu join score sco
on stu.id = sco.sid
group by stu.id,stu.name
having AVG(sco.scores) >= 60;


4.查询平均成绩小于60分的同学的学生编号和学生
姓名和平均成绩 – (包括有成绩的和无成绩的)

SELECT stu.id, stu.name,ROUND(AVG(sco.scores)) as avg_scores
from student stu join score sco
on stu.id = sco.sid
group by stu.id, stu.name
having AVG(sco.scores) < 60
union all --合并查询结果
SELECT stu1.id, stu1.name,0 as avg_scores
from student stu1
where stu1.id not IN
(select sid from score);

总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

上午都在安装环境,因为sqoop之前要安装zook和hbase,去看了尚硅谷之前的安装视频,还是挺顺利的,一上午就搞好了,但是在sqoop,从HDFS导出数据到Hive的练习的时候,出现了问题,就是运行过程,到一半卡住不动了。去搜解决问题说是虚拟机CPU分配问题,以后是在服务器上使用的,就没去管了,知道怎么导出就好了。今天的学习状态很不错的,都很顺利,没有之前安装Hive的那种烦躁感,sqoop的使用也比较简单,重点在导入数据,也是蛮容易的,全都掌握了。

8.4-8.7 省赛题目分析

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

大数据省赛和国赛spark代码分析

3.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

这几天都是看比赛流程,读懂任务书什么的,还有李昊清洗的代码和sql语句,先看大致的流程,每个案例都怎么弄。每一个函数的作用什么的 , 在刚看到这些代码都懵逼了,一些关键字都不知道是干嘛用的。不知道的就一个一个去搜,是什么作用,反正去搞懂大致先了,难懂的点在spark的自定义的函数,读取文件什么的,这些弄了很久才搞明白,这两天去问了李昊很多问题,sql啊spark什么的。

8.8-8.9 SparkSQL深度集成

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

3.所学内容概述

SparkSQL用IDEA操作Hive的进阶操作

4.根据概述分章节描述

封装SparkSQL连Hive方法

因为每次写SparkSQL代码的时候都需要,写一大串连接Hive的代码,所以自己为了省时间将代码封装到一个工具类了,写SparkSQL代码前调用一下就好了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.atguigu.bigdata.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object SQLUntilTools {
def main(args: Array[String]): Unit = {

}

def Conf: SparkSession ={
val conf: SparkConf = new SparkConf().setMaster("local").setAppName("SparkSQL1")

//获取SparkSession
val sparkSession: SparkSession = SparkSession.builder()
.config(conf)
//开启对Hive的支持
.enableHiveSupport()
.getOrCreate()
sparkSession
}
}

具体使用

image-20220808093331230

使用insertInto向hive表中写入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
package com.atguigu.bigdata.sparksql

import org.apache.spark.sql.{DataFrame, SparkSession}

object SparkSQLWriteHive {
def main(args: Array[String]): Unit = {
val sparkSessionConf: SparkSession = SQLUntilTools.SparkSessionConf
import sparkSessionConf.sql
//查询数据
val resDf: DataFrame = sql("select * from test6")

resDf.write
//指定数据写入格式 append 追加 overwrite覆盖
.mode("append")
//这里需要指定数据格式 parquet orc json csv text 不指定默认是parquet
//针对insertInto而言 因为是向已经存在的表写入数据 所以format和options参数会被忽略
// .format()
// .options()
.insertInto("test7")

sparkSessionConf.stop()
}
}

7.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

这两天大部分时间都在调配置,代码是没有问题的,但是IDEA一直连不上虚拟机中的hive,晚上才发现是版本问题,第二天就去重装了三台,linux中spark也因为版本问题,导致要全部重装,因为自己把Hive的计算模块从MR换成了Spark,换Linux的spark换了一上午的时间,下午才去在IDEA学SparkSQL操作hive,内容倒是不难,基本就是把hive中的代码放到SparkSQL的语句中去,sql(“show tables”),这种类似,唯一的难点就是IDEA连上hive了。

8.10 Hive实战之影音数据分析

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.标题

1
2
3
4
5
6
7
P119119-尚硅谷-Hive-案例实操 需求一
P120120-尚硅谷-Hive-案例实操 需求二
P121121-尚硅谷-Hive-案例实操 需求三
P122122-尚硅谷-Hive-案例实操 需求四
P123123-尚硅谷-Hive-案例实操 需求五
P124124-尚硅谷-Hive-案例实操 需求六
P125125-尚硅谷-Hive-案例实操 需求七

3.所学内容概述

今日学习部分

SparkSQL商品

Hive影音

4.根据概述分章节描述

Spark项目实战

各区域热门商品Top3

效果图

注意是要自定义udaf函数 比较难 不太好掌握 所以是跟着老师敲的

image-20220810194109044

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
package com.atguigu.bigdata.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.{Aggregator, UserDefinedAggregateFunction}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

object subjectTest2 {
def main(args: Array[String]): Unit = {
val sparkSessionConf: SparkSession = SQLUntilTools.SparkSessionConf
import sparkSessionConf.sql

sql("use atguigu")

//查询基本数据
sql(
"""
|select p.product_name,ci.area,ci.city_name,a.* from user_visit_action a
|join product_info p on p.product_id=a.click_product_id
|join city_info ci on a.city_id = ci.city_id where product_id<>-1
|""".stripMargin).createOrReplaceTempView("t1")


//根据区域 商品 进行数据的聚合
// sparkSessionConf.udf.register("city_remark",new CityRemarkUDAF())

sql(
"""
|select area,product_name,count(*) as ct,
|cityRemark(city_name) as city_remark
|from t1 group by area, product_name
|""".stripMargin).createOrReplaceTempView("t2")

//区域内对点击数量排行
sql(
"""
|select *,rank() over (partition by area order by ct desc) as rank from t2
|""".stripMargin).createOrReplaceTempView("t3")

//取前三名
sql("select * from t3 where t3.rank<=3").show(false) //false 全部显示

sparkSessionConf.stop()

}

case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] )


// 自定义聚合函数:实现城市备注功能
// 1. 继承Aggregator, 定义泛型
// IN : 城市名称
// BUF : Buffer =>【总点击数量,Map[(city, cnt), (city, cnt)]】
// OUT : 备注信息
// 2. 重写方法(6)
class CityRemarkUDAF extends Aggregator[String, Buffer, String]{
// 缓冲区初始化
override def zero: Buffer = {
Buffer(0, mutable.Map[String, Long]())
}

// 更新缓冲区数据
override def reduce(buff: Buffer, city: String): Buffer = {
buff.total += 1
val newCount: Long = buff.cityMap.getOrElse(city, 0L) + 1
buff.cityMap.update(city, newCount)
buff
}

// 合并缓冲区数据
override def merge(buff1: Buffer, buff2: Buffer): Buffer = {
buff1.total += buff2.total

val map1: mutable.Map[String, Long] = buff1.cityMap
val map2: mutable.Map[String, Long] = buff2.cityMap

// 两个Map的合并操作
// buff1.cityMap = map1.foldLeft(map2) {
// case ( map, (city, cnt) ) => {
// val newCount = map.getOrElse(city, 0L) + cnt
// map.update(city, newCount)
// map
// }
// }
map2.foreach{
case (city , cnt) => {
val newCount = map1.getOrElse(city, 0L) + cnt
map1.update(city, newCount)
}
}
buff1.cityMap = map1
buff1
}
// 将统计的结果生成字符串信息
override def finish(buff: Buffer): String = {
val remarkList = ListBuffer[String]()

val totalcnt = buff.total
val cityMap = buff.cityMap

// 降序排列
val cityCntList = cityMap.toList.sortWith(
(left, right) => {
left._2 > right._2
}
).take(2)

val hasMore = cityMap.size > 2
var rsum = 0L
cityCntList.foreach{
case ( city, cnt ) => {
val r = cnt * 100 / totalcnt
remarkList.append(s"${city} ${r}%")
rsum += r
}
}
if ( hasMore ) {
remarkList.append(s"其他 ${100 - rsum}%")
}

remarkList.mkString(", ")
}

override def bufferEncoder: Encoder[Buffer] = Encoders.product

override def outputEncoder: Encoder[String] = Encoders.STRING
}

}

Hive影音

数据清洗

写好ETL代码

利用maven将运行的java代码打包

1
2
3
bin/yarn jar /opt/module/software/subject/sparksql-1.0-SNAPSHOT.jar com.atguigu.bigdata.sparksql.VideoETLRunner  --运行代码包名
/opt/module/software/subject/video/video --hdfs 需要清洗的文件
/opt/module/software/subject/output --清洗后 文件放的位置

创建表导入数据

image-20220810195710564

做题

答案和步骤都在IDEA里面 包在 bigdata package com.atguigu.bigdata.sparksql.day7SubjectTest

–统计视频观看数Top10

–统计视频类别热度Top10

–统计视频观看数Top20所属类别

–统计视频观看数Top50所关联视频的所属类别Rank

–统计每个类别中的视频热度Top10

–统计每个类别中视频流量Top10

–统计上传视频最多的用户Top10以及他们上传的视频

–统计每个类别视频观看数Top10

7.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

今天学习的任务不是很重,实战性的,上午把上次的SparkSQL操作的商品表的查询筛选复习了一下,把Hive实战的电影表和用户表建了一下,下午去敲代码,还是很简单,轻松的。因为之前反复琢磨了50题的hive,做了四五天呢,所以看清楚表的结构和要求,敲出来不难。

8.11 SparkSQL向hive表写入数据

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.标题

1
2
3
1-2-2 使用saveAsTable向Hive表中写入数据-1
1-2-3 使用saveAsTable向Hive表中写入数据-2
1-2-4 使用SparkSQL向Hive表中写入数据

3.所学内容概述

SparkSQL向Hive表写入数据的几种方式

4.根据概述分章节描述

使用saveAsTabel向Hive表中写入数据

自己大部分对代码的解释都 在 注解那边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.atguigu.bigdata.sparksql

import org.apache.spark.sql.{DataFrame, SparkSession}

/**
* 使用saveAsTable()向Hive表中写入数据
* Created by xuwei
*/
object SparkSQLWriteHive_2 {
def main(args: Array[String]): Unit = {
val sparkSessionConf: SparkSession = SQLUntilTools.SparkSessionConf

import sparkSessionConf.sql
//查询数据
val resDf: DataFrame = sql("select * from student_score")
//2:saveAsTable()
/**
* 分为两种情况,表不存在和表存在
* 1:表不存在,则会根据DataFrame中的Schema自动创建目标表并写入数据
* 2:表存在
* 2.1:如果mode=append,当DataFrame中的Schema和表中的Schema相同(字段顺序可以不同),则执行追加操作。
* 当DataFrame中的Schema和表中的Schema不相同,则报错。
* 2.2:如果mode=overwrite,当DataFrame中的Schema和表中的Schema相同(字段顺序可以不同),则直接覆盖。
* 当DataFrame中的Schema和表中的Schema不相同,则会删除之前的表,然后按照DataFrame中的Schema重新创建表并写入数据。
*/
//表不存在
/*resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("overwrite")
//这里需要指定数据格式:parquet, orc, avro(需要添加外部依赖), json, csv, text。不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对普通文本文件数据格式(json、csv),默认创建的Hive表示SequenceFile格式的,无法读取生成的普通文件,不要使用这种格式。
//parquet、orc数据格式可以正常使用
.format("parquet")
.saveAsTable("student_score_bak")*/

//表存在
sql(
"""
|create table if not exists student_score_bak(
| id int,
| name string,
| sub string,
| score int
|)using hive
| OPTIONS(
| fileFormat 'parquet'
| )
|""".stripMargin)
resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("overwrite")
//这里需要指定数据格式:parquet, orc, avro(需要添加外部依赖), json, csv, text。不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对已存在的表,当mode为append时,这里必须指定为hive。
//针对已存在的表,当mode为overwrite时:
// 这里如果指定为hive,则会生成默认数据存储格式(TextFile)的Hive表
// 这里如果指定为普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,无法读取生成的普通文件,不需要使用这种方式
//parquet、orc数据格式可以正常使用
.format("parquet")
.saveAsTable("student_score_bak")
sparkSessionConf.stop()
}

}

将代码打包 放linux执行 需要修改一些配置 以及在pom.xml文件中 增加 provided

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.atguigu.bigdata.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 使用saveAsTable()向Hive表中写入数据
* Created by xuwei
*/
object SparkSQLWriteHive_2_cluster {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("SparkSQLWriteHive_2_cluster")
.config(conf)
//开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
.enableHiveSupport()
.getOrCreate()

import sparkSession.sql
//查询数据
val resDf = sql("select * from student_score")
//2:saveAsTable()
/**
* 分为两种情况,表不存在和表存在
* 1:表不存在,则会根据DataFrame中的Schema自动创建目标表并写入数据
* 2:表存在
* 2.1:如果mode=append,当DataFrame中的Schema和表中的Schema相同(字段顺序可以不同),则执行追加操作。
* 当DataFrame中的Schema和表中的Schema不相同,则报错。
* 2.2:如果mode=overwrite,当DataFrame中的Schema和表中的Schema相同(字段顺序可以不同),则直接覆盖。
* 当DataFrame中的Schema和表中的Schema不相同,则会删除之前的表,然后按照DataFrame中的Schema重新创建表并写入数据。
*/

//表不存在
/*resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("overwrite")
//这里需要指定数据格式:parquet, orc, avro(需要添加外部依赖), json, csv, text。不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对普通文本文件数据格式(json、csv),默认创建的Hive表示SequenceFile格式的,无法读取生成的普通文件,不要使用这种格式。
//parquet、orc数据格式可以正常使用
.format("parquet")
.saveAsTable("student_score_bak")*/

//表存在
sql(
"""
|create table if not exists student_score_bak(
| id int,
| name string,
| sub string,
| score int
|)using hive
| OPTIONS(
| fileFormat 'parquet'
| )
|""".stripMargin)
resDf.write
//指定数据写入格式append:追加。overwrite:覆盖。
.mode("overwrite")
//这里需要指定数据格式:parquet, orc, avro(需要添加外部依赖), json, csv, text。不指定的话默认是parquet格式。
//注意:text数据格式在这里不支持int数据类型
//针对已存在的表,当mode为append时,这里必须指定为hive。
//针对已存在的表,当mode为overwrite时:
// 这里如果指定为hive,则会生成默认数据存储格式(TextFile)的Hive表
// 这里如果指定为普通文本文件数据格式(json、csv),默认创建的Hive表是SequenceFile格式的,无法读取生成的普通文件,不需要使用这种方式
//parquet、orc数据格式可以正常使用
.format("parquet")
.saveAsTable("student_score_bak")


sparkSession.stop()
}

}

利用SparkSQL向hive表写入数据 (推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.atguigu.bigdata.sparksql

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

/**
* 使用SparkSQL向Hive表中写入数据
* Created by xuwei
*/
object SparkSQLWriteHive_3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local")

//获取SparkSession,为了操作SparkSQL
val sparkSession = SparkSession
.builder()
.appName("SparkSQLWriteHive_3")
.config(conf)
//开启对Hive的支持,支持连接Hive的MetaStore、Hive的序列化、Hive的自定义函数
.enableHiveSupport()
.getOrCreate()

import sparkSession.sql

//3:SparkSQL语句
/**
* 分为两种情况:表不存在和表存在
* 1:表不存在,使用create table as select
* 2:表存在,使用insert into select
*/
//表不存在
/*sql(
"""
|create table student_score_bak
|as
|select * from student_score
|""".stripMargin)*/

//表存在
sql(
"""
|create table if not exists student_score_bak(
| id int,
| name string,
| sub string,
| score int
|)using hive
| OPTIONS(
| fileFormat 'textfile',
| fieldDelim ','
| )
|""".stripMargin)

sql(
"""
|insert into student_score_bak
|select * from student_score
|""".stripMargin)


sparkSession.stop()
}

}

5. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

看报错点是最后一行的stored as orc 翻译说是sql不能用orc文件 CSDN搜索

官方建议在hive中建表 在SparkSQL中使用,因为在spaek只能默认textfile 想要指定需要使用using

有效指定 create tablet1(id int) using hive options(fileformat’textfile’)

fileformat支持的六种参数 sequencefile rcfile orc parquet textfile avro

image-20220811091010618

7.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

今天学习了Spark的进阶操作吧,向hive表中添加处理数据,这样的添加方式。可以在将数据过滤以后, 把新过滤好的数据,建立成新的表。 老师讲课说saveAtTable这种方法太麻烦了,平常还是使用SparkSQL比较多。但是看大三的省赛大数据数据清洗的代码是有用到saveAtTable的.

8.12 with … as Hive sql理解

1.头:日期、所学内容出处

http://t.csdn.cn/KSKLW

http://t.csdn.cn/WMTZN

3.所学内容概述

with … as

参考博客如上

with as 也叫做子查询部分,首先定义一个sql片段,该sql片段会被整个sql语句所用到,为了让sql语句的可读性更高些,作为提供数据的部分,也常常用在union等集合操作中。

with as就类似于一个视图或临时表,可以用来存储一部分的sql语句作为别名,不同的是with as 属于一次性的,而且必须要和其他sql一起使用才可以

其最大的好处就是适当的提高代码可读性,而且如果with子句在后面要多次使用到,这可以大大的简化SQL;更重要的是:一次分析,多次使用,这也是为什么会提供性能的地方,达到了“少读”的目标。

简单使用 语法

1
2
3
4
5
6
7
8
9
10
WITH t1 AS (
SELECT *
FROM carinfo
),
t2 AS (
SELECT *
FROM car_blacklist
)
SELECT *
FROM t1, t2

注意点

1
2
1--with子句必须在引用的select语句之前定义,同级with关键字只能使用一次,多个只能用逗号分割;最后一个with 子句与下面的查询之间不能有逗号,只通过右括号分割,with 子句的查询必须用括号括起来.
2--如果定义了with子句,但其后没有跟select查询,则会报错!

自己使用分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/*不使用with as*/
select * from
(select *,rank() over (partition by area order by ct desc) as rank from
(select area,product_name,count(*) as ct from
(select p.product_name,ci.area,ci.city_name,a.* from user_visit_action a
join product_info p on p.product_id=a.click_product_id
join city_info ci on a.city_id = ci.city_id where product_id<>-1) t1
group by area, product_name) t2
) t3 where t3.rank<=3;

/*--使用with ..as*/
with t1 as ( select p.product_name,ci.area,ci.city_name,a.* from user_visit_action a
join product_info p on p.product_id=a.click_product_id
join city_info ci on a.city_id = ci.city_id where product_id<>-1 ),
t2 as ( select area,product_name,count(*) as ct from t1 group by area, product_name),
t3 as ( select *,rank() over (partition by area order by ct desc) as rank from t2 )
select * from t3 where t3.rank<=3;

以上案例可见,with 对Hive可读性的提升是显而易见的。

其实一些时候是可以用with as 代替 join on 的但是我觉得join on更方便而且代码量比较少 所以像表格嵌套去查的时候,我才会去使用with as来提高可读性

Hive查询结构完整

1
2
3
4
5
6
7
8
9
select distinct....
from leftTable....
join rightTable.... on ....
where ....
group by ....
having ....
order by ....
limit ....
union [all]....

hive执行顺序

1
2
3
4
5
6
7
8
9
10
11
1. 先执行 from 子句 
2. 再执行 on 关联条件
3. 然后再关联右表,也就是执行 join....
4. 再执行 where....
5. 执行 group by.....
6. 再执行 having....
7. 然后执行 select....
8. 如果有 distinct,则会对 select 选择的所有字段的值的组合去重
9. 再执行 order by....
10. 再执行 limit 进行限制条数查询(分页查询)
11. 最后执行两个 sql 的合并。

注意:列别名和表别名尽量正确使用,在hive中,非常严格,有的时候必须加别名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1. hql什么时候不会被翻译成mr程序
-- select * from tablename;
-- select * from tablename limit [m,]n;
2. hql对子查询支持的不够友好
(1) hql有子查询时,通常都会有表别名和列别名的应用
(2) 子查询可以在from子句中,与mysql的用法一致
(3) 子查询可以在where子句中,但是只能应用[not] in 或者 [not] exists
(4) 子查询可以在having子句中,但是只能应用[not] in 或者 [not] exists
(5) 子查询在having子句中时,如果用的是in,并且有聚合函数,那么聚合函数应该在select子句出现
(6) 子查询不支持在select子句中。
--3. 查询原则
(1)能不使用子查询,就不用子查询,通常子查询都可以使用join来替换
(2)能不用join就别用join,但是通常避免不了
(3)[not] in 通常要替换成 [not] exists
(4)join时,一定要注意小表驱动大表
--4,聚合函数的count的优化
#1. 执行效果上:
- count(*)包括了所有的列,相当于行数,在统计结果的时候不会忽略null值
- count(1)包括了所有列,用1代表行,在统计结果的时候也不会忽略null值
- count(列名)只包括列名那一列,在统计结果时,会忽略null值

#2. 执行效率上:
- 列名为主键,count(列名)会比count(1)快,count(主键)效率是最高的
- 列名不为主键,count(1)会比count(列名)快
- 如果表中有多个列并且没有主键,count(1)的效率高于count(*)
- 如果表中只有一个字段count(*)效率最高

5. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

当我筛选count 取大于5的时候,报错了。后面注意到 ,count是聚合函数 需要分组才能使用。忘记加了group by 分组

image-20220812192832083

总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

今天上午在看省赛数据清洗中sparksql那边的代码答案,学长写的,发现比较常用的with as这部分视频没讲,能很好的增强可读性,就去CSDN去搜了一下,sql下面还有read读数据的代码。总之就是代码看不懂的地方就一个一个去搜过来,搞懂作用,不行就问学长,下午又去做了几道hive的案例,多用了with as 原本 join 里面()t1这样的方式,查询嵌套真的不太看得懂,用了with as 自己写的sql语句,思路也清晰不少,别名出现的几次BUG也去看了一下,以后要多加注意。

8.13-8.17 hive 50题

1.头:日期、所学内容出处

http://t.csdn.cn/kVN4d

2.所学内容概述

hive 50题自写 改进 用with as

3.根据概述分章节描述

因为之前hive刚看完的时候,直接去看省赛题目,李昊省赛的spark代码只能看个七八十,自己写估计是不大写的出来,问学长说sql语句比较重要,csdn搜hive sql 练习题 很多都是50道经典题,但是都是用很麻烦的方法,看学长的with as 句,就直接打算自己用with as 自己写。

sql 语句 如下

像25题有很多种解法,我自己就列出来了3种。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
-- 4.查询平均成绩小于60分的同学的学生编号和学生
-- 姓名和平均成绩 – (包括有成绩的和无成绩的)
select stu.name,stu.id,round(avg(score.scores),2) as avg_scores from student stu
left join score on score.sid=stu.id
group by stu.name, stu.id
having avg(score.scores) < 60
union all
select stu1.id, stu1.name,0 as avg_scores from student stu1
where stu1.id not in (select sid from score);

-- 5.查询所有同学的学生编号、学生姓名、选课总数、所有课程的总成绩
select stu.id,stu.name,count(cid),nvl(sum(scores),0) from student stu
left join score sco on sco.sid=stu.id
group by id,name;

-- 6.查询"李"姓老师的数量
select *,count(tname) as num from teacher
where tname like "李%"
group by tid,tname;

-- 7.查询学过"张三"老师授课的同学的信息
select student.* from student
left join score on score.sid=student.id
left join course c on score.cid = c.cid
left join teacher t on c.tid = t.tid
where t.tname="张三";

-- 8.查询没学过"张三"老师授课的同学的信息
select *
from student where student.id not in (
select student.id from student
left join score on score.sid=student.id
left join course c on score.cid = c.cid
left join teacher t on c.tid = t.tid
where t.tname="张三");

-- 9.查询学过编号为"01"并且也学过编号为"02"的课程的同学的信息
select student.* from student
left join(
select * from score
where cid=1
union all
select * from score
where cid=2) cc on cc.sid=student.id
group by student.id,name,sex,birthday
having count(cc.sid)=2;


-- 10.查询学过编号为"01"但是没有学过编号为"02"的课程的同学的信息
select student.* from student
join score on score.sid=student.id
where cid=1 and score.sid not in (
select sid from score sco where sco.cid=2);


-- 11、查询没有学全所有课程的同学的信息
select student.* from student
left join (select sid from score
group by sid
having count(cid)=3) tmp on tmp.sid=student.id
where tmp.sid is null;



-- 12、查询至少有一门课与学号为"01"
-- 的同学所学相同的同学的信息
select student.* from score
right join student on student.id=score.sid
where score.cid in (
select sco.cid
from score sco
where sco.sid=1) and score.sid<>1
group by student.id,name,birthday,sex;


-- 15、查询两门及其以上不及格课程
-- 的同学的学号,姓名及其平均成绩
select student.id,name,pingjun from student
join (
select sid,round(avg(scores),1) pingjun from score
where scores<60
group by sid
having count(scores)>=2) tmp on tmp.sid=student.id;


--16、检索"01"课程分数小于80,按分数降序排列的学生信息
select student.*,tmp.scores from student
join (select sid,scores from score where cid=1 and scores<60) tmp on tmp.sid=student.id
ORDER BY tmp.scores desc ;

-- 17、按平均成绩从高到低显示所有学生
-- 的所有课程的成绩以及平均成绩
select student.id,student.name,nvl(tmp1.scores,"未考") as c1,nvl(tmp2.scores,"未考") as c2,nvl(tmp3.scores,"未考") as c3,tmp.avg_sc
from student
left join (select sid,round(avg(scores),1) avg_sc from score group by sid) tmp on tmp.sid=student.id
left join (select sid,scores from score s1 where cid='01')tmp1 on tmp1.sid=student.id
left join (select sid,scores from score s2 where cid='02')tmp2 on tmp2.sid=student.id
left join (select sid,scores from score s3 where cid='03')tmp3 on tmp3.sid=student.id
order by tmp.avg_sc desc;


-- 18.查询各科成绩最高分、最低分和平均分:以如下形式显示:
-- 课程ID,课程name,最高分,最低分,平均分,及格率,中等率,优良率,优秀率
select score.cid,c.cname,
max(scores) as maxScore,
min(scores) as minScore,
round(avg(scores),2) avgScore,
round(sum(if(scores >= 60, 1, 0))/count(c.cid),2) passRate,
round(sum(if(scores >= 60 and scores < 70, 1, 0))/ count(c.cid), 2) moderate,
round(sum(case when scores>=70 and scores<80 then 1 else 0 end)/count(c.cid),2) goodRate,
round(sum(case when scores>=80 and scores<90 then 1 else 0 end)/count(c.cid),2) excellentRates
from score
join course c on score.cid = c.cid
group by score.cid,c.cname;


-- 19、按各科成绩进行排序,并显示排名:
select c.cid,c.cname,s.name,rank() over (partition by score.cid order by scores) from score
left join course c on score.cid = c.cid
left join student s on s.id=score.sid;

-- 20、查询学生的总成绩并进行排名
select student.id,student.name,nvl(sum(scores),'缺考') as sum_sc,rank() over (order by sum(scores) desc )from score
right join student on student.id=score.sid
group by student.id, student.name;

-- 21、查询不同老师所教不同课
-- 程平均分从高到低显示
select score.cid,c.cname,t.tname,round(avg(scores),1) as avg_sc from score
join course c on score.cid = c.cid
join teacher t on c.tid = t.tid
group by score.cid,cname,tname
order by avg_sc desc;

-- 22、查询所有课程的成绩第2名到第3名的学生信息及该课程成绩
select course.cname,student.*,tmp.cid,tmp.scores,tmp.cno from student
join (select *,row_number() over (partition by cid order by scores desc)cno from score) tmp on tmp.sid=student.id
join course on course.cid=tmp.cid
where tmp.cno between 2 and 3;

-- 23、统计各科成绩各分数段人数:课程编号,课程名称,[100-85],[85-70],[70-60],[0-60]及所占百分比
select score.cid,cname,
sum(if(score.scores >= 85 and score.scores <= 100, 1, 0)) as sum_85,
round(sum(if(score.scores >= 85 and score.scores <= 100, 1, 0))/ count(c.cid), 2),
sum(if(score.scores >= 70 and score.scores <= 85, 1, 0)) as sum_70,
round(sum(if(score.scores >= 70 and score.scores <= 85, 1, 0))/ count(c.cid), 2),
sum(case when score.scores>=60 and score.scores<=70 then 1 else 0 end) as sum_60,
round(sum(case when score.scores>=60 and score.scores<=70 then 1 else 0 end)/count(c.cid),2),
sum(if(scores>=0 and scores<60,1,0)) as sum_0,
round(sum(if(score.scores >= 0 and score.scores <= 60, 1, 0))/ count(c.cid), 2)

from score
join course c on score.cid = c.cid
group by score.cid,cname;

-- 24、查询学生平均成绩及其名次
select id,name,round(avg(scores),2),rank() over (order by round(avg(scores),2) desc) from score
right join student on student.id=score.sid
group by id,name;

-- 25、查询各科成绩前三名的记录
select course.cname,student.*,tmp.cid,tmp.scores,tmp.cno from student
join (select *,row_number() over (partition by cid order by scores desc)cno from score) tmp on tmp.sid=student.id
join course on course.cid=tmp.cid
where tmp.cno <= 3;


select * from
(select *,rank() over (partition by area order by ct desc) as rank from
(select area,product_name,count(*) as ct from
(select p.product_name,ci.area,ci.city_name,a.* from user_visit_action a
join product_info p on p.product_id=a.click_product_id
join city_info ci on a.city_id = ci.city_id where product_id<>-1) t1
group by area, product_name) t2
) t3 where t3.rank<=3;


with t1 as ( select p.product_name,ci.area,ci.city_name,a.* from user_visit_action a
join product_info p on p.product_id=a.click_product_id
join city_info ci on a.city_id = ci.city_id where product_id<>-1 ),
t2 as ( select area,product_name,count(*) as ct from t1 group by area, product_name),
t3 as ( select *,rank() over (partition by area order by ct desc) as rank from t2 )
select * from t3 where t3.rank<=3;

use default;

-- 26、查询每门课程被选修的学生数
select cid,count(*) `学生数` from score group by cid


-- 27、查询出只有两门课程的全部学生的学号和姓名
select id,name from score
join student on student.id=score.sid
group by id,name
having count(cid)=2;


-- 28、查询男生、女生人数
select sex,count(sex) from student
group by sex;


-- 29、查询名字中含有"风"字的学生信息
select * from student
where name like "%风%";


-- 30、查询同名同性学生名单,并统计同名人数
select stu1.name,count(*) from student stu1
join student stu2 on stu1.name=stu2.name
where stu1.id<>stu2.id and stu1.sex=stu2.sex
group by stu1.name;

-- 31、查询1990年出生的学生名单
select * from student
where year(student.birthday)="1990";

-- 32、查询每门课程的平均成绩,结果按平均成绩降序排列,平均成绩相同时,按课程编号升序排列
select cid,avg(scores) `平均成绩`
from score
group by cid
order by `平均成绩` desc ,cid;

-- 33、查询平均成绩大于等于85的所有学生的学号、姓名和平均成绩
select id,name,round(avg(scores),2) `平均成绩`from score
join student on student.id=score.sid
group by id,name
having round(avg(scores),2) >= 85;


-- 34、查询课程名称为"数学",且分数低于60的学生姓名和分数

select sid,name,scores from score
left join course c on score.cid = c.cid
join student on student.id=score.sid
where cname="数学" and scores<=60;


-- 35、查询所有学生的课程及分数情况;
with t1 as ( select sid,
sum(case cid when '1' then scores else 0 end ) as `语文`,
sum(case cid when '2' then scores else 0 end ) as `数学`,
sum(case cid when '3' then scores else 0 end ) as `英语`
from score
group by sid)
select id,name,t1. `语文`, `数学`, `英语` from student
left join t1 on t1.sid=student.id;


-- 36、查询任何一门课程成绩在70分以上的姓名、课程名称和分数;
with t as ( select sid,
sum(case cid when '1' then scores else 0 end ) as `语文`,
sum(case cid when '2' then scores else 0 end ) as `数学`,
sum(case cid when '3' then scores else 0 end ) as `英语`
from score
group by sid)
select id,name,t. `语文`, `数学`, `英语` from student
left join t on t.sid=student.id
where t.`语文`>70 or t.`数学`>70 or t.`英语`>70;

-- 37、查询不及格的课程
select name,cid,scores from score
join student on student.id=score.sid
where scores < 60;

-- 38、查询课程编号为01且课程成绩在80分以上的学生的学号和姓名;
select id,name,cid,scores from
student stu
join score sco on sco.sid=stu.id
where sco.scores>60 and sco.cid="1";

-- 39、求每门课程的学生人数
select cid,count(*) from score
group by cid;

-- 40、查询选修"张三"老师所授课程的学生中,成绩最高的学生信息及其成绩
with tmp as (
select stu.*, scores,rank() over (order by scores desc ) as rkdesc from student stu
join score sc on sc.sid=stu.id
join course c on sc.cid = c.cid
join teacher t on c.tid = t.tid
where tname="张三")
select * from tmp
where tmp.rkdesc="1";



-- 41、查询不同课程成绩相同的学生的学生编号、课程编号、学生成绩
select sco1.sid, sco1.cid, sco1.scores from score sco1
join score sco2 on sco2.sid=sco1.sid
where sco1.cid<>sco2.cid and sco1.scores=sco2.scores
group by sco1.sid, sco1.cid, sco1.scores;

-- 42、查询每门成绩最好的前两名
with tmp as
(select name,cname,scores,row_number() over (partition by score.cid order by scores desc ) Toptwo from score
join student on student.id=score.sid
join course c on score.cid = c.cid)
select * from tmp
where tmp.Toptwo<=2;



-- 43、统计每门课程的学生选修人数(超过5人的课程才统计)。
-- 要求输出课程号和选修人数,查询结果按人数降序排列,若人数相同,按课程号升序排列
select c.cid,cname,count(1) c from score
join course c on score.cid = c.cid
group by c.cid, cname
having count(1) > 5
order by c desc , cid;

-- 44、检索至少选修两门课程的学生学号
select sid,name,count(cid) `次数`
from score
join student on student.id=score.sid
group by sid,name
having count(cid) >= 2;



-- 45、查询选修了全部课程的学生信息
with tmp as ( select count(1) num from course )
select student.*,count(1) `次数`
from score
join student
join tmp
on student.id=score.sid
group by student.id,birthday,name,sex,tmp.num
having count(1)=tmp.num;


-- 46、查询各学生的年龄:按照出生日期来算,当前月日 < 出生年月的月日则,年龄减一
select *,
year(current_date())-year(birthday)-
(case when month(current_date()) < month(birthday) then 1
when month(current_date()) = month(birthday) and day(current_date())>day(birthday) then 1 else 0 end ) as age
from student;

select * ,
year(current_date())-year(birthday)-if(current_date()>to_date(concat("2022-",substr(birthday,6))),0,1) as `age`
from student;


-- 47、查询本周过生日的学生
select *,weekofyear(birthday) from student
where weekofyear(birthday)=weekofyear(current_date());

-- 48、查询下周过生日的学生
select *,weekofyear(birthday) from student
where weekofyear(birthday)=weekofyear(current_date())+1;
-- 49、查询本月过生日的学生
select *,month(birthday) from student
where month(birthday)=month(current_date());
-- 50、查询上月过生日的学生
select *,month(birthday) from student
where month(birthday)=month(current_date())-1;

4.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

这几天任务量也不算大,其实超出了自己的时间,本来打算3天写完,然后1天去总结的。但是发现hive这些语句和B站讲的都不太一样,老师所讲的自己没有去实操过,老师给的题目都是很简单的。像这种稍微有点难度的,需要用大量的join 和 over() 这两个比较难的点,自己去找了很多文档,再看了几遍,觉得差不多了才去做题目。前面第一题就做了三题,到后面上手了一天能做15 道,最后一天17号早上写完了,下午又把语句一个一个运行,去分析每一个关键字的作用,当时是怎么想的,写出来的,模糊的,就再写一遍。

8.18-8.25 hive提高题 15

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.根据概述分章节描述

题目出自http://t.csdn.cn/ojSHo

ti1

image-20220825154825546

建表语句和SparkSQL代码

后面虚拟机打开,再开IDEA有点吃不消了,hive sql语句就放虚拟机里面了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atguigu.bigdata.SparksqlTest15

import com.atguigu.bigdata.sparksql.SQLUntilTools
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession

object CreateTable {
def main(args: Array[String]): Unit = {
//获取spark连接
val sparkSession: SparkSession = SQLUntilTools.SparkSessionConf

import sparkSession.sql
//切换db_hive_1数据库
sql("use db_hive_1")
//建立表格
sql("""create table action
|(userId string,
|visitDate string,
|visitCount int)
|row format delimited fields terminated by "\t"
|""".stripMargin)
//添加数据
sql(
"""
|insert into action values('u01','2017/1/21',5)
|insert into action values('u02','2017/1/23',6)
|insert into action values('u03','2017/1/22',8)
|insert into action values('u04','2017/1/20',3)
|insert into action values('u01','2017/1/23',6)
|insert into action values('u01','2017/2/21',8)
|insert into action values('u02','2017/1/23',6)
|insert into action values('u01','2017/2/22',4)
|""".stripMargin)
//查看action数据是否建立
sql("select * from action").show()
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package com.atguigu.bigdata.SparksqlTest15
import com.atguigu.bigdata.sparksql.SQLUntilTools
import org.apache.spark.sql.SparkSession

object subject1 {
def main(args: Array[String]): Unit = {
val conf: SparkSession = SQLUntilTools.SparkSessionConf
import conf.sql

sql("use db_hive_1")

//任务一
sql(
"""
|with t1 as (
| select
| userId,
| date_format(regexp_replace(visitDate,'/','-'),'yyyy-MM') mn,
| visitCount
| from
| action),
| t2 as (
| select userid,mn,sum(visitcount) mn_count
| from t1 group by userid, mn)
|select userid,mn,mn_count,sum(mn_count) over (partition by userid order by mn) count_mn from t2 order by userid
|""".stripMargin).show()
//任务二
sql(
"""
|select shop,
| user_id,
| ct
|from (select shop,
| user_id,
| ct,
| rank() over (partition by shop order by ct) rk
| from (select shop,
| user_id,
| count(*) ct
| from visit
| group by shop,
| user_id) t1
| ) t2
|where rk <= 3;
|""".stripMargin).show()
}
}

3. BUG点

难点(关键代码或关键配置,BUG截图+解决方案)

SparkSQL添加数据BUG

刚开始用sparkSQL向action表添加数据报错, 就直接去hive里面直接输入语句了

image-20220818103459749

虚拟机Linux中Hive启动直接添加

image-20220818103652166

成功

image-20220818103705385

4.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

因为自己的任务主要是对数据的处理,也就是sql语句用的比较多,刚好CSDN看到hive提高题,点进去看,感觉还不错,数据也蛮经典的,就想着剩下来的时间刷省赛题之前,刷一下hive sql语句,能节省一点做题时间。这星期都在刷hive的题目,难度还是很大我觉得,前面几天基本一天一道,后面差不多一天两三题,去做完,自己敲完,和答案对一对,看看结果有什么不一样的地方。用spark添加数据这一块会报错,就直接把语句放到文件中,传到虚拟机里面,直接用hive执行,把数据添加好就行了。

8.26-8.27 sqoop增量导入

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.标题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
01--Apache Sqoop--软件介绍
02--Apache Sqoop--安装部署
03--Apache Sqoop--导入import--全量数据导入hdfs
04--Apache Sqoop--导入import--全量数据导入hdfs--并行度设置&注意事项
05--Apache Sqoop--导入import--全量数据导入hive
06--Apache Sqoop--导入import--表子集数据导入
07--Apache Sqoop--导入import--增量导入--append模式
08--Apache Sqoop--导入import--增量导入--lastmodified模式(附加数据)
10--Apache Sqoop--导出export--默认模式导出(insert)
11--Apache Sqoop--导出export--默认模式导出(insert)--配置参数
12--Apache Sqoop--导出export--更新模式导出(insert)--updateonly
13--Apache Sqoop--导出export--更新模式导出(insert)--allowinsert
14--Apache Sqoop--job作业的使用--创建、查看、执行、删除
15--Apache Sqoop--job作业的使用--免密执行

4.根据概述分章节描述

在实际工作当中,数据的导入,很多时候都是只需要导入增量数据即可,并不需要将表中的数据每次都全部导入到 hive 或者 hdfs 当中去,这样会造成数据重复的问题。因此一般都是选用一些字段进行增量的导入, sqoop 支持增量的导入据。

1
2
大白话讲 就是数据随着增加的,导入
导入表的行
1
2
3
4
5
6
7
8
9
10
--incremental append \
#上面的选模式 sqoop有两种
#append:追加,比如对大于 last-value 指定的值之后的记录进行追加导入。
#lastmodified:最后的修改时间,追加 last-value 指定的日期之后的记录
--check-column id \
#根据id这列 增量数据 !这些被指定的列的类型不能使任意字符类型,如 charvarchar 等类
#型都是不可以的,同时-- check-column 可以去指定多个列。
--last-value 1205
#指定列的值以后的数据才能导入
#指定自从上次导入后列的最大值(大于该指定的值),也可以自己设定某一值

Append模式导入

1
2
3
4
5
6
7
8
bin/sqoop import \
--connect jdbc:mysql://node-1:3306/userdb \
--username root --password hadoop \
--table emp --m 1 \
--target-dir /appendresult \
--incremental append \
--check-column id \
--last-value 1205

导入过后hdfs会多出一个文件 这个文件就是增量以后的数据 也就是ID 1205后面的数据 不包括1205

lastmodified模式导入

1
2
3
4
5
6
7
8
9
10
11
bin/sqoop import \
--connect jdbc:mysql://node-1:3306/userdb \
--username root \
--password hadoop \
--table customertest \
--target-dir /lastmodifiedresult \
--check-column last_mod \
--incremental lastmodified \
--last-value "2019-05-28 18:42:06" \
--m 1 \
--append

导入也会有一个文件

check-column列的数据需要是时间戳

last-value的值 增量导入以后 是时间后面的值 包括该时间

lastmodified 模式去处理增量时,会将大于等于 lastvalue 值的数据当做增量插入。

7.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

因为自己在看sqoop的那个视频很短,就两个小时,没有讲过增量导入这一章节,但是省赛任务书是第一点是数据抽取,而且是要求使用sqoop进行抽取,因为要抽8个表格,前面6个都是比较普通的全量抽取,之前自己学的也能做出来,但是到增量导入的时候, 我用sqoop就不知道怎么弄了,问李昊,他们比赛是给后面可视化的,所以没学sqoop,自己整也整不出来,任务书描述的还模糊,两天也没整出来。

8.28-8.31spark离线数据抽取

1.头:日期、所学内容出处

https://www.bilibili.com/video/BV1WY4y1H7d3?p=28&share_source=copy_web

2.所学内容概述

IDEA中spark 代码 package打包

sparkSQL jar在linux执行

1
spark-submit --master spark://192.168.23.89:7077 --executor-memory 2g --total-executor-cores 10  --driver-memory 4G  --class com.atguigu.bigdata.Test.subjectTwoData --name subjectTwoData /opt/XXXX.jar

打包过程

要加Maven插件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>bigdata</artifactId>
<groupId>org.example</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>

<artifactId>dataClear</artifactId>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<build>
<resources>
<resource>
<directory>src/main/scala</directory>
</resource>
<resource>
<directory>src/main/java</directory>
</resource>

</resources>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<configuration>
<recompileMode>incremental</recompileMode>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>

<dependencies>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.12</artifactId>
<version>2.4.4</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.12</artifactId>
<version>2.4.4</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn_2.12</artifactId>
<version>2.4.4</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.12</artifactId>
<version>2.4.4</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala</artifactId>
<version>0.10.1</version>
</dependency>
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.10</artifactId>
<version>1.0</version>
</dependency>
</dependencies>
</project>

数据抽取的代码

省赛任务书模块D

spark增量 只用sql 这种where 就比sqoop 我觉得方便很多而且容易理解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package BIGDATA
import org.apache.spark.sql.SparkSession
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}


object subjectTwoData {

val MYSQLIP = "192.168.23.89"
val DATABASE = "shtd_store"
val MYSQLDBURL: String = "jdbc:mysql://" + MYSQLIP + ":3306/" + DATABASE + "?characterEncoding=UTF-8"
val MYSQLDRIVER = "com.mysql.jdbc.Driver"
val MYSQLUSER = "root"
val MYSQLPASSWORD = "123456"

def main(args: Array[String]): Unit = {
//MYSQL参数

//建立spark连接 hive支持
val sparkBuilder: SparkSession.Builder = SparkSession.builder()
if ((args.length > 0 && args(0).equals("local")) || args.length == 0) {
sparkBuilder.master("local[*]")
}
val spark: SparkSession = sparkBuilder.appName("subjectTwoData")
.enableHiveSupport()
.getOrCreate()
import spark.sql

/**
连接mysql
给表格起别名
*/

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "CUSTOMER").load().createTempView("mysql_customer")

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "NATION").load().createTempView("mysql_nation")

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "PART").load().createTempView("mysql_part")

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "PARTSUPP").load().createTempView("mysql_partsupp")

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "REGION").load().createTempView("mysql_region")

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "SUPPLIER").load().createTempView("mysql_supplier")

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "ORDERS").load().createTempView("mysql_orders")

spark.read.format("jdbc")
.option("url", MYSQLDBURL)
.option("driver", MYSQLDRIVER)
.option("user", MYSQLUSER)
.option("password", MYSQLPASSWORD)
.option("dbtable", "LINEITEM").load().createTempView("mysql_lineitem")


//设置时间为今天
val sdf = new SimpleDateFormat("yyyyMMdd")
val calendar: Calendar = Calendar.getInstance()
calendar.add(Calendar.DATE,0) //0代表今天 正数是未来 负数是以前的日期
val todayTime: Date = calendar.getTime //获取时间 格式 :Mon Aug 29 00:01:10 CST 2022
val dateStr: String = sdf.format(todayTime) //string 类型时间 作分区etldate

//hive开启动态分区支持
sql("set hive.exec.dynamic.partition=true")
sql("set hive.exec.dynamic.partition.mode=nonstrict")
sql("set hive.exec.max.dynamic.partitions=50000")
sql("set hive.exec.max.dynamic.partitions.pernode=10000")

//抽取CUSTOMER 到hive的ods设置分区dateStr
//以下都是全量抽取
sql(
s"""
|insert overwrite table ods.customer partition (etldate='$dateStr')
|select * from mysql_customer
|""".stripMargin)

//抽取NATION 到hive的ods设置分区
sql(
s"""
|insert overwrite table ods.nation partition (etldate='$dateStr')
|select * from mysql_nation
|""".stripMargin)

//抽取PART 到hive的ods设置分区
sql(
s"""
|insert overwrite table ods.part partition (etldate='$dateStr')
|select * from mysql_part
|""".stripMargin)

//抽取PARTSUPP 到hive的ods设置分区
sql(
s"""
|insert overwrite table ods.partsupp partition (etldate='$dateStr')
|select * from mysql_partsupp
|""".stripMargin)

//抽取REGION 到hive的ods设置分区
sql(
s"""
|insert overwrite table ods.region partition (etldate='$dateStr')
|select * from mysql_region
|""".stripMargin)
//抽取SUPPLIER
sql(
s"""
|insert overwrite table ods.supplier partition (etldate='$dateStr')
|select * from mysql_supplier
|""".stripMargin)

//增量抽取 1997 12 1以后的数据orderkey为增量字段 etldate为动态分区列
spark.sql(
s"""
|select
|orderkey,custkey,orderstatus,totalprice,orderdate,
|orderpriority,clerk,shippriority,comment,regexp_replace(orderdate,"-","")
| from mysql_orders a
|left join
|(select COALESCE(max(orderkey),-1) as maxid from ods.orders ) b
|on 1=1 where cast(a.orderkey as int)>cast(b.maxid as int)
|and regexp_replace(a.orderdate,"-","")>= '19971201'
|
""".stripMargin).coalesce(50).createOrReplaceTempView("mysql_order_coalesce")

/**
* 出过几次BUG的点 任务书的要求是看dealdate这列为动态分区 但是hive表中明显作为动态分区的列 最后一列
* 是orders.etldate 就把原来代码中的dealdate改成etldate
* 报错信息
* exception in thread main org.apache.spark.sql.analysisException dealdate is not a valid partition column in table `ods`.`orders`
* 线程主org.apache.spark.sql中出现异常。analysisException dealdate不是表“ods”中的有效分区列
*/
spark.sql(
"""
|insert overwrite table ods.orders partition (etldate)
|select * from mysql_order_coalesce
|
""".stripMargin)

//增量抽取 lineitem orderkey为增量字段 静态分区为当天日期dateStr
sql(
s"""
|insert into table ods.lineitem partition (etldate='$dateStr')
|select a.* from mysql_lineitem a
|left join
|(select coalesce(max(orderkey),-1) as maxid from ods.lineitem) b on 1=1
|where cast(a.orderkey as int)>cast(b.maxid as int)
|""".stripMargin)
spark.stop()
}
}

3.总结

重点是哪些知识比较重要,难点是你在学习过程中觉得比较繁琐,掌握起来有一点

前段时间因为sqoop增量自己整不来,然后李昊让我基本的sqoop学会就好了,练习题目就用spark,而且省赛说是不考sqoop的,所以我就直接用sparksql连hive,抽取数据了,就是直接用hive 查询导入到MySQL 或者 从MySQL到hive hive到hive都是可以的,动态分区也很方便,都会默认分区。这几天sparksql使用的比较多,也出现了很多大大小小的问题吧,麻烦的点在打包放linux,里面之前因为没加Maven插件,导致报错了,重新打包还是会报错,代码并没有更新。