模版

学习日期:

项目任务

BUG

日总结

1
2
3
4
5
6
7
8
{% tabs 分栏%}
<!-- tab BUG1@fas fa-bomb -->
Any content (support inline tags too).
<!-- endtab -->
<!-- tab BUG2@fas fa-bomb -->
Any content (support inline tags too).
<!-- endtab -->
{% endtabs %}

Py综合推荐指标算法

学习日期: 6.1

项目任务

综合指标计算

​ 默认推荐指标数,因为还没想好需要推荐的字段,有哪些,所以就先把结合和景区有关的所有字段找出进行加权处理,算出一个比较合适的rating推荐指标系数,将该系数存放到spot表中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import pandas as pd
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType
from commentDmTest.DmConnect import *
os.environ["PYSPARK_PYTHON"] = "E:\\software\\anaconda3\\envs\\pyspark\\python.exe"
# 建立连接
conn = connDm()
cur = conn.cursor() # 获取游标
# 得到景区的各指标信息,方便推荐加权处理
cur.execute("""
with t1 as
(select SPOT_ID, round(avg(FEEL_SCORE), 3) as avgScore
from COMMENT_INFO_NLP
group by SPOT_ID)
select SPOT_NAME, GRADE, HOT, t2.SPOT_ID,
replace(SUM, substring(SUM, locate('条', SUM)), '') as sum,
avgScore
from SPOT_INFO t2
join t1 on t1.SPOT_ID=t2.SPOT_ID
""")
spot_info_df = pd.DataFrame(cur.fetchall(), columns=['SPOT_NAME', 'GRADE', 'HOT', 'SPOT_ID', 'SUM', 'AVG'])
# 算出各个景区的总评论数的中位数 填充为sum为nal的行
cur.execute("""
select MEDIAN(convert(int,t1.sum)) avgCommentSum
from (select replace(SUM, substring(SUM, locate('条', SUM)), '') as sum
from SPOT_INFO) t1
""")
# 得到平均值decimal类型的
avg_comment_sum = cur.fetchall()[0][0]
# 修改sum列的数据类型转为float才能使用max方法
spot_info_df['SUM'] = spot_info_df['SUM'].apply(pd.to_numeric)
# 得到最大值decimal类型的
max_comment_sum = spot_info_df['SUM'].max()


# 对spark的dataframe加权处理
def process_row(r):
# 处理每一行数据:r表示row对象
# 注意这里要全部设为浮点数,spark运算时对类型比较敏感,要保持数据类型都一致
grade_count = r.GRADE if r.GRADE else 0.0
hot_count = r.HOT if r.HOT else 0.0
sum_count = r.SUM if r.SUM else avg_comment_sum # 如果为空 使用平均值填充
avg_count = r.AVG if r.AVG else 0.0

grade_score = 0.2 * grade_count / 5 * 100 if grade_count <= 5 else 20.0
hit_score = 0.4 * hot_count / 10 * 100 if hot_count <= 10 else 40.0
sum_score = 0.2 * sum_count / max_comment_sum * 100 if sum_count <= max_comment_sum else 20
avg_score = 0.2 * avg_count / 5 * 100 if avg_count <= 5 else 20.0

rating = round(grade_score, 2) + round(hit_score, 2) + round(sum_score, 2) + round(avg_score, 2)
# 返回用户ID、分类ID、用户对分类的偏好打分
return r.SPOT_ID, r.SPOT_NAME, round(rating, 2)


# 创建spark类
spark = connSpark()
# pandas转pyspark需要设置类型
schema = StructType([
StructField("SPOT_NAME", StringType(), True),
StructField("GRADE", DoubleType(), True),
StructField("HOT", DoubleType(), True),
StructField("SPOT_ID", IntegerType(), True),
StructField("SUM", DoubleType(), True),
StructField("AVG", DoubleType(), True),
])
# 创建spark的dataframe类
spot_info_df_spark = spark.createDataFrame(spot_info_df, schema=schema)
# 展示df的架构
spot_info_df_spark.printSchema()
# 创建rdd对象执行 指标加权
rating_rdd = spot_info_df_spark.rdd.map(process_row)
# 对rdd的类型进行转换 防止toDF会报错
rating_rdd = rating_rdd.map(lambda x: (int(x[0]), str(x[1]), float(x[2])))
# rdd转回dataframe
rating_df = rating_rdd.toDF(["spot_id_rat", "spot_name_rat", "RATING"])
# 读取景区表进行连接 给景区表添加RATING字段
spot_into = dmRead(spark, "SPOT_INFO")
# 获取spot_info表的所有列名 方便连接以后取值
need_col: list = spot_into.toPandas().columns.values.tolist()
# 添加RATING列
need_col.append("RATING")
# join连接根据spot_id 只要原来景区表的字段+新的rating字段
spot_final = rating_df.join(spot_into, rating_df.spot_id_rat == spot_into.SPOT_ID) \
.selectExpr(need_col)
# 写入SPOT_INFO表 (添加了rating字段)
dmWrite(spot_final, "SPOT_INFO")

根据关键字找对应的评论

查询方法

1
2
3
4
5
6
7
8
9
10
11
12
13
# 一个sql就能解决,效率和效果都能达到预期要求
def keyWordFindComment(spot_id, comment_key_word):
cur.execute("""
select spot_id, evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO_NLP
where
SPOT_ID={}
and
EVALUATION like '%{}%'
""".format(spot_id, comment_key_word))
list_test = cur.fetchall()
df_test = pd.DataFrame(list_test, columns=["spot_id", "evaluation", "evaluation_grade",
"evaluation_time", "feel_score", "feel"])
return eval(df_test.to_json(orient='records', force_ascii=False).replace("null", "'未填'"))

接口

1
2
3
4
5
6
7
8
9
10
11
@app.route('/spot/comment/<spot_id>', methods=['POST', 'GET'])
def spot_comments_high_word_find(spot_id):
if request.method != 'POST':
return json.dumps({'msg': '查询失败,请使用POST传参', 'code': 500, 'data': 'null'}, ensure_ascii=False)
# 获取post中传递的值
keyWord = request.args.to_dict()
data = dmToDfpkuseg.keyWordFindComment(eval(spot_id), keyWord.get('keyWord'))
if len(data) == 0:
return json.dumps({'msg': '查询失败,没有该景区关键字对应的评论', 'code': 500, 'data': 'null'},
ensure_ascii=False)
return json.dumps({'msg': '查询成功', 'code': 200, 'data': data}, ensure_ascii=False)

BUG

​ 这个是flask中一个serser.py中的方法不能过多,我之前是有两个方法名的存在,而且能正常使用的情况的,不知道为什么现在不行,后面经过我测试,发现方法最多存在两个,而且需要一个有参一个无参的,不能两个有参的,即使参数的数量和类型不一样也不被允许。

image-20230601175133870

​ 我使用pyspark连接dm数据库的时候报错,看样子是找不到driver,搜索了方法,发现其实跟之前MySQL连接spark差不多,需要把driver的jar包放入spark的环境下面,将dm的jar包放到pyspark的jars包下面问题解决。

image-20230601175111970

日总结

​ 今天完成的任务还是挺多的,而且梳理好了后面几天的计划,后几天打算把预测学会,看能不能举一反三把智能推荐部分解决了,这几天还需要想下推荐部分需要什么样的筛选条件。今天的任务主要有两个,根据关键字找到相关的评论,然后套加权算法把rating,也就是景区的综合推荐度计算出来,并放入数据库中,本来是打算写成接口的,但是spark接口响应实在是太慢了,直接存入数据库中了,如果spark写入接口一直都很慢的话,可能后期要弃用spark写推荐算法,要用pandas和numpy。

数据清洗方法整合

学习日期: 6.2

项目任务

起因

​ 因为数据有些缺失值是没有补的,有些只能python进行填补,有部分是要spark补的,但是一般自己处理完一部分直接存入数据库,这样我要顺利清洗完,需要跑好几个jar包,打算利用spark临时表的特性,尽量把运行起来,只需要两三个jar包就能把数据搞好。

导入

​ 步骤以及思路:现将提供的所有文件传送到一个表中,代码也进行了很大的优化,前期准备工作一个scala代码就能解决,本来是因为txt文件是有java跑的,但是其实可以用scala调java的main方法,这样就可以放一个文件中。

finalClearOne

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package clearDf

import clearDf.Utils.{dmWriterOverwrite, getSpark}
import clearDf.fileClearTest.{csvToDf, xlsxToDf}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.{DataFrame, SparkSession}

object finalClearOne {
def main(args: Array[String]): Unit = {
val spark: SparkSession = getSpark
import spark.implicits._
import spark.sql
sql(" set spark.executor.processTreeMetrics=true") //解决报错警告导致代码跑不出来
val frame2: DataFrame = csvToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\2.csv")
val frame3: DataFrame = xlsxToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\3.xlsx")
val frame4: DataFrame = csvToDf(spark, "E:\\study\\中软杯\\样数据\\1679624200845105\\初始旅游数据\\4.csv")
//缺失字段 PHONE INTRO HOT COMMENT_TIME COMMENT_GRADE
val frameSmallFill: DataFrame = frame2.unionByName(frame3)
.withColumn("GRADE",round(substring($"GRADE",1,2).cast(IntegerType) / 20,1))
//补充缺失字段
.withColumn("PHONE",lit(""))
.withColumn("INTRO",lit(""))
.withColumn("HOT",lit(""))
.withColumn("COMMENT_GRADE",lit(""))
.withColumn("COMMENT_TIME",lit(""))
val frameAllData:DataFrame = frame4.unionByName(frameSmallFill)

dmWriterOverwrite(frameAllData,"U7U7.All_TEMP")
//需要先java把txt保存入all_temp表
import clearTxt.txtClearArrayList
txtClearArrayList.txtIntoDmData.main()
}
}

BUG

很困难解决的bug没有出现

日总结

​ 因为据说下周服务器平台就会发放了,为了处理平台可能提供的hdfs的新数据,打算将自己之前数据处理的代码整合一下,放到一个jar包中,方法都设置参数通用,这样在平台执行,也只需要两个spark提交即可。没什么困难的,后面整合表抽取部分数据以及挂join城市表挂id这类的方法,重新梳理好方法可能会麻烦一些,需要把代码都理清楚。

清洗方法整合all

学习日期: 6.5

项目任务

任务

将csv和txt文件读取以后的all_temp分表处理,先是对自己电脑上对dm添加数据,然后移植到了服务器中。然后使用人工智能这块对缺失的数据预测掉。

代码如下

全部整合了分表的代码,并利用sparkml将hot字段进行缺失值填充。机器学习这部分我使用了随机森林算法,依据hot的皮平均值,和对应的grade和sum字段,推算出可能的hot值

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
package clearDf

import clearDf.Utils.{dmRead, dmWriterOverwrite, getSpark}
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
import org.apache.spark.sql.expressions.{Window, WindowSpec}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.sql.{DataFrame, SparkSession}

object finalClearTwo {
val spark: SparkSession = getSpark

import spark.implicits._
import spark.sql

def main(args: Array[String]): Unit = {
// 将所有文件都导入到all_temp表
finalClearOne.main(args)
//对all_temp表进行处理
val all_data: DataFrame = dmRead(spark, "ALL_TEMP")
.withColumn("SPOT", regexp_replace($"SPOT", "\\?", "")) //将脏数据中景区名带?的去掉
.where("SPOT <> 'null'")

//创建景区dataframe导入景区表
val spot_info: DataFrame = spotInfoClear(all_data)
//创建并导入评论表 (后python还需要处理)
val comment_info: DataFrame = commentWithSpot(all_data, spot_info)
println(comment_info.count())
// dmWriterOverwrite(comment_info,"comment_temp")
//临时的城市表(不带省份)
val city_temp:DataFrame = cityInfoClear(spot_info)
//先导入 方便java读取
dmWriterOverwrite(city_temp,"city_info_temp")
val spot_info_temp: DataFrame = spotWithCityId(spot_info, city_temp) //带城市id的景区表
//创建城市空表 添加省份字段
val create_temp_city: DataFrame = city_temp
.withColumn("PROVINCE_NAME", lit(""))
.where("CITY_ID=0")
dmWriterOverwrite(create_temp_city,"CITY_TEMP") //空表结构到dm
//执行java中根据城市名判断省份代码
import clearTxt.jsTest.jsonToDataTest
jsonToDataTest.provinceNameIntoCity()
//读取城市表(省份名)
val city_temp_two:DataFrame = dmRead(spark, "CITY_TEMP")
//根据城市表(省份名) 创建省份表(完整版)
val province_data: DataFrame = createToProvince(city_temp_two)
dmWriterOverwrite(province_data,"PROVINCE_INFO")

//将城市表的省份名替换为id 并导入dm(完整版)
val city_info_final:DataFrame = cityJoinProvince(city_temp_two,province_data)
dmWriterOverwrite(city_info_final,"CITY_INFO")
//hot字段填充 景区表导入 还需要python配rating字段
val spot_Prediction: DataFrame = spotPredictionFinalTable(spot_info_temp)
dmWriterOverwrite(spot_Prediction, "spot_info_temp") //导入dm
}
//随机森林 hot的null填充
def spotPredictionFinalTable(spot_info_temp: DataFrame): DataFrame ={
// 读取数据
val data: DataFrame = spot_info_temp.selectExpr("SPOT_ID","GRADE","HOT","SUM")
.withColumn("GRADE",$"GRADE".cast(DoubleType))
.withColumn("HOT",$"HOT".cast(DoubleType))
.withColumn("SUM",regexp_extract($"SUM","[^条]+()",0).cast(DoubleType))
val data2: DataFrame = spot_info_temp
.withColumn("HOT",$"HOT".cast(DoubleType))
val meanValues: Double = data.agg(round(avg(col("HOT")),1)).head().getDouble(0)
val filledData: DataFrame = data.na.fill(meanValues, Seq("HOT"))
// 将特征转化为向量
val assembler: VectorAssembler = new VectorAssembler()
.setInputCols(Array("GRADE", "SUM"))
.setOutputCol("features")
val assembledData: DataFrame = assembler.transform(filledData)
// 创建随机森林回归模型
val rf: RandomForestRegressor = new RandomForestRegressor()
.setLabelCol("HOT")
.setFeaturesCol("features")
// 拟合模型
val model: RandomForestRegressionModel = rf.fit(assembledData)
// 使用模型填充缺失值
val predictedData: DataFrame = data.na.fill(meanValues, Seq("HOT"))
val assembledPredictedData: DataFrame = assembler.transform(predictedData)
val filledPredictedData: DataFrame = model.transform(assembledPredictedData)
.withColumn("prediction", round(col("prediction"), 1))
//所有景区的hot预测
val predict: DataFrame = filledPredictedData.selectExpr("SPOT_ID","prediction").withColumnRenamed("SPOT_ID","id")
//将hot的预测值 填充null 并将hot和grade类型转换成double
data2.join(predict,predict("id") === data2("SPOT_ID"))
.withColumn("HOT",
when(col("HOT").isNull, col("prediction")).otherwise(col("hot")))
.withColumn("HOT",round($"HOT",1))
.withColumn("GRADE",round($"GRADE",1))
.drop("id","prediction")
}
//TODO 将城市表的省份名换成省份id
def cityJoinProvince(city_data:DataFrame,province_data:DataFrame): DataFrame ={
//两表连接取出需要的字段
val city_temp: DataFrame = city_data.join(province_data, city_data("PROVINCE_NAME") === province_data("NAME"))
.selectExpr("CITY_NAME", "CITY_ID", "PROVINCE_ID")
//把数据先放临时表
city_temp
}

//TODO 创建省份表
def createToProvince(city_data:DataFrame): DataFrame ={
//读取省份表并去重 递增挂上省份id
val province_data: DataFrame = city_data
.selectExpr("PROVINCE_NAME")
.distinct()
.withColumnRenamed("PROVINCE_NAME", "NAME")
.withColumn("PROVINCE_ID", row_number() over Window.orderBy($"NAME"))
//传入省份表
province_data
}

//景区表
def spotInfoClear(all_data: DataFrame): DataFrame = {
val SPOT_ID_ORDER: WindowSpec = Window.orderBy($"SPOT_NAME".desc)
val spot_info_temp: DataFrame = all_data //景区名不为null
.withColumnRenamed("SPOT", "SPOT_NAME") //修改字段名 SPOT 为 SPOT_NAME
//删除不需要的字段
.drop("COMMENT")
.drop("COMMENT_GRADE")
.drop("COMMENT_TIME")
.distinct() //去重
.withColumn("SPOT_ID", row_number() over SPOT_ID_ORDER)
.where("SPOT_ID<>9") //因为txt中和csv中都有武当山景区,但是介绍多了空格,所以也要去重
.withColumn("SPOT_ID", row_number() over SPOT_ID_ORDER) //去重以后重新增加列
spot_info_temp
}
//评论表
def commentWithSpot(all_data: DataFrame, spot_data: DataFrame): DataFrame = {
val comment_info_temp: DataFrame = all_data.join(spot_data, all_data("SPOT") === spot_data("SPOT_NAME"))
.selectExpr("SPOT_ID", "COMMENT", "COMMENT_GRADE", "COMMENT_TIME")
.withColumnRenamed("COMMENT", "EVALUATION")
.withColumnRenamed("COMMENT_GRADE", "EVALUATION_GRADE")
.withColumnRenamed("COMMENT_TIME", "EVALUATION_TIME")
.where("EVALUATION<>'NULL'").distinct()
.withColumn("EVALUATION_ID", row_number() over Window.orderBy($"SPOT_ID")) //去重以后重新增加列
comment_info_temp
}
def cityInfoClear(spot_info:DataFrame): DataFrame ={
spot_info.createOrReplaceTempView("spot_info")
//[\u4e00-\u9fa5]{2}(?=市)+市
val city_final_frame: DataFrame = sql(
"""
|select
|if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION,"([^市]{2}()+市)"),regexp_extract(LOCATION,"([^省]+省)?([^自治州]+自治州)", 2)) as CITY_NAME
|from
|spot_info
|""".stripMargin)
.distinct()
//填充CITY_Id
.withColumn("CITY_ID", row_number() over Window.orderBy($"CITY_NAME".desc))
city_final_frame
}
//景区表挂城市id
def spotWithCityId(spot_info:DataFrame,city_temp:DataFrame): DataFrame ={
spot_info.createOrReplaceTempView("spot_info")
city_temp.createOrReplaceTempView("city_info")
val spot_test: DataFrame = sql(
s"""
|with t1 as (
|select
|*,
|if(locate('市',LOCATION)<>'0',regexp_extract(LOCATION,"([^市]{2}()+市)"),regexp_extract(LOCATION,"([^省]+省)?([^自治州]+自治州)", 2)) as NAME
|from
|spot_info)
|select
|t1.*,
|ci.CITY_ID
|from
|t1
|join city_info ci on ci.CITY_NAME = t1.NAME
|""".stripMargin).drop("NAME")
spot_test
}
}

随机森林对hot填充

这块自己研究了一下午+一晚上才搞定

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//随机森林 hot的null填充
def spotPredictionFinalTable(spot_info_temp: DataFrame): DataFrame ={
// 读取数据
val data: DataFrame = spot_info_temp.selectExpr("SPOT_ID","GRADE","HOT","SUM")
.withColumn("GRADE",$"GRADE".cast(DoubleType))
.withColumn("HOT",$"HOT".cast(DoubleType))
.withColumn("SUM",regexp_extract($"SUM","[^条]+()",0).cast(DoubleType))
val data2: DataFrame = spot_info_temp
.withColumn("HOT",$"HOT".cast(DoubleType))
val meanValues: Double = data.agg(round(avg(col("HOT")),1)).head().getDouble(0)
val filledData: DataFrame = data.na.fill(meanValues, Seq("HOT"))
// 将特征转化为向量
val assembler: VectorAssembler = new VectorAssembler()
.setInputCols(Array("GRADE", "SUM"))
.setOutputCol("features")
val assembledData: DataFrame = assembler.transform(filledData)
// 创建随机森林回归模型
val rf: RandomForestRegressor = new RandomForestRegressor()
.setLabelCol("HOT")
.setFeaturesCol("features")
// 拟合模型
val model: RandomForestRegressionModel = rf.fit(assembledData)
// 使用模型填充缺失值
val predictedData: DataFrame = data.na.fill(meanValues, Seq("HOT"))
val assembledPredictedData: DataFrame = assembler.transform(predictedData)
val filledPredictedData: DataFrame = model.transform(assembledPredictedData)
.withColumn("prediction", round(col("prediction"), 1))
//所有景区的hot预测
val predict: DataFrame = filledPredictedData.selectExpr("SPOT_ID","prediction").withColumnRenamed("SPOT_ID","id")
//将hot的预测值 填充null 并将hot和grade类型转换成double
data2.join(predict,predict("id") === data2("SPOT_ID"))
.withColumn("HOT",
when(col("HOT").isNull, col("prediction")).otherwise(col("hot")))
.withColumn("HOT",round($"HOT",1))
.withColumn("GRADE",round($"GRADE",1))
.drop("id","prediction")
}

BUG

​ 预测这边的报错还是挺多的,但是主要报错都是自己设置模型的时候字段没看清楚,都是xxx字段不存在之类的,慢慢修改以后发现预测效果还可以。

日总结

​ 今天的任务还是整合和预测填充,本来打算用中位数或者avg平均值填充的,但是数据展示的话,一行平的不太好看,所以使用了预测,预测那块还是挺难懂的,简单说就是利用已知的会影响缺失值的数据,做模型的支撑数据,算出缺失的hot。原理就是回归算法,简单说就是你对一个数据打分了,数据其实也会反映出你的习惯之类的,是一个相互的过程,随机森林就是利用这点,反向推测出数据。

python评论表优化

学习日期: 6.6

项目任务

起因

​ 因为评论表中grade字段很多是缺失的,又因为之前自己是对语言进行过情绪分分析的所以,对于这部分缺失值可以填充,然后就是ip以及时间了,发现了一个很好用的类faker,他可以随机生成省份以及地址,又因为其他数据的省份好像都是两字而且不带省和自治州这些的,就稍加处理了一下用了递归。

代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import random

import pandas as pd
import radar
from faker import Faker
from finalMain.DmConnect import connDm
from snownlp import SnowNLP
from snownlp import sentiment
import os
from sqlalchemy import create_engine


def provinceRandom() -> str:
f = Faker('zh_CN')
province = f.province()
if province.find("省") != -1 or province.find("市") != -1:
return province[0:-1]
return provinceRandom()


def provinceList():
province_list = []
while len(province_list) <= 26:
province = provinceRandom()
if province not in province_list:
province_list.append(province)
return province_list


# 判断情绪评分 并返回相对应的值
def scoreSnow(sentiments):
# sentiments = SnowNLP(comment).sentiments
if 0.9 <= sentiments <= 1:
feelMood = "超棒"
elif 0.8 <= sentiments <= 0.9:
feelMood = "满意"
elif 0.5 <= sentiments <= 0.8:
feelMood = "不错"
elif 0.2 <= sentiments <= 0.5:
feelMood = "一般"
else:
feelMood = "不佳"
return feelMood


# 加入dm数据库
def intoDm(data, tableName):
# 创建url
conn_url = 'dm+dmPython://SYSDBA:SYSDBA@47.120.9.247:5236/'
# 创建Engine对象
engine = create_engine(conn_url)
# 到dm8中
data.to_sql(name=tableName, con=engine, if_exists='append', index=False)




def commentDataFinalIntoDm():
data_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'snownlpDrill/sentiment.marshal')
sentiment.load(data_path)

conn = connDm()
# 获取游标
cur = conn.cursor()
# 查询评论表全部数据
cur.execute("select * from COMMENT_TEMP")
comment_list: list = cur.fetchall()
comment_df = pd.DataFrame(comment_list,
columns=["SPOT_ID", "EVALUATION", "EVALUATION_GRADE", "EVALUATION_TIME", "EVALUATION_ID"])
# 将EVALUATION_ID作为行索引
# comment_df.set_index('EVALUATION_ID', inplace=True)
# 添加两列为空值 存放情绪分和对应的情绪
comment_df['FEEL_SCORE'] = ''
comment_df['FEEL'] = ''
# 省份列表
province_list = provinceList()

# 遍历评论id
for index in list(comment_df.index):

# 获取对应的评论id的评论
comment_values = comment_df['EVALUATION']
comment_value = comment_values.loc[index]
if comment_df['EVALUATION_TIME'].loc[index] == "":
random_index = random.randrange(len(province_list))
comment_df.iloc[index - 1, 3] = str(radar.random_date("2020-09-13", "2023-02-02").date()) + "IP属地: " + \
province_list[random_index]
# 计算出该评论的情绪分值
score = round(SnowNLP(comment_value).sentiments, 2)
# 添加到feelScore列
feelScore = round(score * 5, 1)
comment_df.iloc[index - 1, 5] = feelScore
# 评论情绪
feel = scoreSnow(score)
comment_df.iloc[index - 1, 6] = feel
if comment_df['EVALUATION_GRADE'].loc[index] == "":
comment_df.iloc[index - 1, 2] = str(int(round(feelScore, 0))) + "分 " + feel
intoDm(comment_df, "COMMENT_INFO")
commentDataFinalIntoDm()

数据处理效果

​ 预测不准的情况还在解决,但不是现在的重点。

image-20230607202246243

BUG

​ 没找到原因,应该是自己的中位数是string类型的吧可能,因为这块本来是对sum字段用中位数填充的,但是后面发现数据都是有sum的,就直接注释了。

image-20230609215202562

日总结

​ 今日任务主要是优化,然后测试了多次,从空表直接执行一次spark代码一次py代码就将数据处理好并存入dm的数据库中了。今天主要是收获是发现了一个很好用的能填充随机参数了类,python faker库能随机生成很多需要的个人信息或者编码之类的挺多数据的,而且格式可以调,还是很不错的。

使用spark-submit提交

学习日期: 6.7

项目任务

​ 将自己电脑内的hadoop启动,用spark提交尝试提高清洗运行速度。大概需要1分钟一次,问题出现在过程中使用了java的代码,java又不支持分布式计算平台,所以导致时间画得比较久,而且spark启动也会花费时间。

​ 电脑又装了一个伪分布式的hadoop,花了一上午的时间,下午就在测试了,大概洗完样数据,只要2分钟,一次jar包,一次python执行。安装hadoop步骤就不写了,挂个网址好了,http://t.csdn.cn/SuQmN,找hadoop3.2.2花费了点时间,还需要对应版本的window.exe

BUG

​ 使用spark-submit提交jar包读取发现有部分乱码,仔细看了看都是在后面的数据,好像就是3000条,那就是java洗的,去找java那部分代码的格式问题,发现读txt文件的时候就已经乱码了,起初还以为是导入数据库才乱码,还修改了url结果是IO流的问题reader = new InputStreamReader(file, StandardCharsets.UTF_8);读文件的时候加编码格式就行了。

image-20230609203416091

​ java部分的报错,一直说找不到city.json,打完jar包也没搞懂他读取文件的路径相对路径到底怎么写,直接写绝对路径了,等放服务器中再改好了。

image-20230609203922865

日总结

​ 今天刚把平台发放,还咩有研究明白,现测试自己的清洗代码打jar包能不能顺利执行,上午配置了hadoop以及spark的环境,下午对打完jar包以后,一些路径和编码这样的小细节进行了处理,目前感觉最主要的是平台的使用,清洗这部分对样数据是没什么问题了,还得看hdfs的数据,如果相差很大就糟糕了。

清洗txt文件夹

学习日期: 6.8

项目任务

起因

​ 因为关系到清洗测试以及速度问题,之前自己1w条数据就花费了1分钟清洗,这一些增加到了百万条,自己那种代码肯定不行了尤其是java处理txt那边,估计是内存都要爆,得换成pandas了,之前spark代码就要推翻重新搞了,要纯python清洗了。

清洗txt

​ 因为txt是之前我觉得最难的,所以先使用一些python对txt能不能很好的处理以及效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import random
import pandas as pd
import re
import os
import radar
from faker import Faker
from hdfs3 import HDFileSystem
import glob
from snownlp import SnowNLP

#处理评论 情绪化打分
def scoreSnow(sentiments):
# sentiments = SnowNLP(comment).sentiments
if 0.9 <= sentiments <= 1:
feelMood = "超棒"
elif 0.8 <= sentiments <= 0.9:
feelMood = "满意"
elif 0.5 <= sentiments <= 0.8:
feelMood = "不错"
elif 0.2 <= sentiments <= 0.5:
feelMood = "一般"
else:
feelMood = "不佳"
return feelMood

# 将所有的省份 去掉最后的省和市 递归找到只为省和直辖市的地址
def provinceRandom() -> str:
f = Faker('zh_CN')
province = f.province()
if province.find("省") != -1 or province.find("市") != -1:
return province[0:-1]
return provinceRandom()

# 将地址都放入列表,这样填充会快很多,因为递归算法只用执行到26个都取到即可。
def provinceList():
province_list = []
while len(province_list) <= 26:
province = provinceRandom()
if province not in province_list:
province_list.append(province)
return province_list


# 读取hdfs文件
def hdfsFileRead():
# 创建一个 HDFileSystem 对象
hdfs = HDFileSystem(host='hadoopb-namenode.damengb-zone.svc', port=9000)
# 读取 HDFS 文件夹中的所有文件
file_list = hdfs.ls('/data/txt')
hdfsConfig = hdfsFileRead()
hdfs = hdfsConfig[0]
file_list = hdfsConfig[1]
# 读取文本文件
for f in file_list:
with hdfs.open(f) as file:
text = file.read()
return hdfs, file_list


# 本地测试读取文件
def LoadFileRead():
folder_path = r'/data/txt'
file_names = os.listdir(folder_path)
return file_names
# 处理景区表
def clearSpot():
spot_df = df[df.columns[0:11]].drop_duplicates()
spot_df['rating'] = 4 * spot_df['GRADE'].astype(float) + 4 * spot_df['HOT'].astype(float)


if __name__ == '__main__':
# 用于存放每个文件数据
listDf = []
# 文件夹位置
folder_path = r'../data/txt'
# 打算将每个文件都按照切割成字典,最后放列表中再合并
for file_path in glob.glob(os.path.join(folder_path, '*.txt')):
# 读取文件 字符集为utf-8 存入text
with open(file_path, 'r', encoding="utf-8") as f:
text = f.read()
# 根据txt文件关键字处理成key:value
spot = text[text.find('[SPOT]') + 7:text.find('[LOCATION]')].strip()
location = text[text.find('[LOCATION]') + 11:text.find('[OPENTIME]')].strip()
opentime = text[text.find('[OPENTIME]') + 10:text.find('[PHONE]')].strip()
phone = text[text.find('[PHONE]') + 7:text.find('[INTRO]')].strip()
intro = text[text.find('[INTRO]') + 7:text.find('[GRADE]')].strip()
grade = text[text.find('[GRADE]') + 7:text.find('[HOT]')].strip()
hot = text[text.find('[HOT]') + 5:text.find('[SUM]')].strip()
sum_ = text[text.find('[SUM]') + 5:text.find('[COMMENT]')].strip()
comment = text[text.find('[COMMENT]') + 10:text.find('[COMMENT_GRADE]')].strip()
comment_grade = text[text.find('[COMMENT_GRADE]') + 15:text.find('[COMMENT_TIME]')].strip()
comment_time = text[text.find('[COMMENT_TIME]') + 14:].strip()

# 将数据转换成key:value格式
data = {
'SPOT_NAME': spot,
'LEVEL': '',
'LOCATION': location,
'OPENTIME': opentime,
'PHONE': phone,
'INTRO': intro,
'NOTICE': '',
'ST': '',
'GRADE': grade,
'HOT': hot,
'QTY': sum_,
'EVALUATION': comment,
'EVALUATION_GRADE': comment_grade,
'EVALUATION_TIME': comment_time,
'EVALUATION_NAME': '',
'FEEL_SCORE': '',
'FEEL': ''
}
# 存入数组
listDf.append(data)
# 转换成dataframe
df = pd.DataFrame(listDf)
# 下面三行是将介绍那列的数据切割成三列 后面这部分考虑到景区表去重以后在执行
df['ST'] = df['INTRO'].apply(lambda x: re.search(r'服务设施([\s\S]*)', x, re.S).group(1))
df['NOTICE'] = df['INTRO'].apply(lambda x: re.search(r'优待政策\s*(.+?)\s*服务设施', x, re.S).group(1))
df['INTRO'] = df['INTRO'].apply(lambda x: re.search(r'介绍\s*(.+?)\s*全文', x, re.S).group(1))
df['SPOT_NAME'] = df['SPOT_NAME'].apply(lambda x: x.replace("?", ""))

结论

​ 发现python使用pandas效果出奇的好,不仅速度快,而且可以直接使用一些预测的模型,不需要scala那边搞好python这里再把表读出来再填充了。读取txt加一些处理大概也就一秒的时间,后期导入dm的话,也就是不到一面。

BUG

​ 读取的时候直接报错, 一看信息就是编码的问题,去搜了一下python处理文件的编码问题只要在外层读取的open方法中设置encoding就好了,with open(file_path, ‘r’, encoding=“utf-8”) 。

image-20230609210537386

日总结

​ 今天算是得到个坏消息,基本一下子将之前辛辛苦苦写的代码全部作废了,以为hdfs上面的数据蛮大的,主要是格式的问题,也不打算再头铁强行搞了,本来清洗就不应该是三个语言的使用,不仅拖慢了速度,执行起来也很奇怪,要跑好几次。速度方面测试都不敢测,1w就花费了那么长时间,这一下就是百万,直接打算换成python,先测试了一下最难的txt,发现处理起来还是很简单的,虽然方法有点笨,但是好在txt文件格式都是固定的,都拿到了存成dataframe了,后面慢慢处理好再导入dm就行了。

清洗csv文件夹

学习日期: 6.9

项目任务

梳理数据

​ 新给的数据中csv和xlsx的字段都是有改变的,所有要分析好字段,因为后端接口都写很多了,不能大改,但是有些提供的字段还是要添加的。计划spot景区表14个,评论表7个前后没有改变多少,后面的dataframe就按照这个格式慢慢处理。看了一些HDFS的数据大致分为4种,csv有两种,字段是有些许不一样的,xlsx又一种,也是有些字段有,有些没有,没有的字段就预测得出,txt是最少的。打算就是能预测的就预测处理填充,一些没发预测又缺失了,要么丢弃,要么不显示。

清洗csv代码

​ 都先把数据处理成所有需要的字段了,都先添加上 ,然后首要任务,关联好景区id,不知道能不能group by分组,明天再研究一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import os
import pandas as pd
from faker import Faker

f = Faker('zh_CN')


def readCsvToDf(folder_path):
# 遍历文件夹中所有文件
all_files = os.listdir(folder_path)
csv_files = [file for file in all_files if file.endswith('.csv')]

# 合并每个 CSV 文件为一个 DataFrame,并将它们存储在一个列表中
dfs = []
for file in csv_files:
file_path = os.path.join(folder_path, file)
df = pd.read_csv(file_path)
dfs.append(df)

# 将所有 DataFrame 合并为一个 DataFrame
return pd.concat(dfs, ignore_index=True)


# 对换行符以及空格去除
def test(x):
stringApply = ''.join(str(x).split()).strip('\n')
return stringApply


def clearAllDf(merged_df):
# 修改列名以及换行和空白字段处理
merged_df.rename(columns={'SPOT': 'SPOT_NAME', 'CO_TIME': 'EVALUATION_TIME', 'CO_NAME': 'EVALUATION_NAME',
'COMMENT': 'EVALUATION'}, inplace=True)
merged_df_copy = merged_df.applymap(test).drop(columns=['TGA'])
# 去除景区名带?
merged_df_copy['SPOT_NAME'] = merged_df_copy['SPOT_NAME'].apply(lambda x: x.replace("?", ""))
# 添加空列 后面景区表需要的字段
merged_df_copy.insert(9,'RATING','')
merged_df_copy.insert(9,'CITY_ID','')
merged_df_copy.insert(9,'HOT','')
return merged_df_copy


def clearSpotDf(allDf):
# 添加需要列 随机填充电话
allDf.insert(3, 'PHONE', [f.phone_number() for _ in range(allDf.iloc[:, 0].size)])


if __name__ == '__main__':
# csv文件其中一个文件夹
folder_qnecsv_path = "../data/csv/qnecsv"
merged_df = readCsvToDf(folder_qnecsv_path)
allDf = clearAllDf(merged_df)
# 需要先把城市的df清洗出来

BUG

在使用pandas的时候,出现如下的警告。虽然不会影响程序的正常运行,但是看着就很烦。

1
2
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

搜索以后发现是因为我的dataframe是其他dataframe赋值的,也就是a = b ,我对a进行了修改,这样不太好,最好直接对b进行修改,解决方案有三个
解决方案:

  1. 新建一个dataframe,在新的上面进行操作。
  2. 在复制dataframe的时候,使用.copy()。
  3. 使用.loc来赋值

日总结

​ 今天打算先初步把每种文件都试着读成dataframe,先将一个文件夹能洗成自己需要的四个表,也就是四个dataframe,后面换文件也就大差不大,目前思路梳理清除了先将id挂好,后面还是很容易的,因为之前对缺失值的填充的代码都做好了,只要再重新对hot填充一下就行了。从spark的机器学习算法改成pandas预测即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import requests

def get_province(city_name):
ak = '你的百度地图API key'
url = 'http://api.map.baidu.com/geocoder?address={}&output=json&ak={}'.format(city_name, ak)
res = requests.get(url).json()
if res['status'] == 0:
province = res['result']['addressComponent']['province']
else:
province = None
return province

# 测试
cities = ['金华市', '苏州市', '舟山市', '武汉市', '桐乡市', '杭州市', '无锡市', '扬州市', '恩施市']
province = [get_province(city) for city in cities]
df = pd.DataFrame({'city': cities, 'province': province})
print(df)

读取hdfs文件

学习日期: 6.12

项目任务

​ 因为考虑到hdfs3使用不了,所以使用了hdfs这个库,这个库是基于web访问hdfs的,又考虑到本地的数据读取应该比hdfs快上不少,所以第一步直接下载下来。后面只要写自己本地路径处理文件就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import os
from hdfs import Client, InsecureClient

# 创建客户端连接hdfs
print("启动")
client = InsecureClient('http://hadoopb-namenode.damengb-zone.svc:9870')
# # 列出HDFS上的目录
hdfs_csv_path = "/data/csv/qnecsv"
csv_data_list = client.list(hdfs_path=hdfs_csv_path)
local_path = "/home/PyCode/data/hdfscsv/qnecsv"
print(len(csv_data_list))

# 定义递归下载函数
def download_folder(client, hdfs_path, local_path):
# 如果HDFS路径是文件,则下载到本地
if client.status(hdfs_path)['type'] == 'FILE':
client.download(hdfs_path, local_path)
# 如果HDFS路径是文件夹,则递归下载其中的子文件和文件夹
elif client.status(hdfs_path)['type'] == 'DIRECTORY':
if not os.path.exists(local_path):
os.makedirs(local_path)
for file in client.list(hdfs_path):
sub_hdfs_path = hdfs_path + '/' + file
sub_local_path = local_path + '/' + file
download_folder(client, sub_hdfs_path, sub_local_path)

# 文件暂下载
download_folder(client, "/data/csv/qnecsv", "/home/PyCode/data/hdfscsv/qnecsv")
download_folder(client, "/data/csv/xccsv", "/home/PyCode/data/hdfscsv/xccsv")
download_folder(client, "/data/txt", "/home/PyCode/data/hdfstxt")
download_folder(client, "/data/excel", "/home/PyCode/data/hdfsexcel")

BUG

​ 使用hdfs3的时候,一直说找不到libhdfs3.so依赖,但是自己的确是装了,而且使用sys指定了都,但是都没用,搞一上午没解决,换成hdfs了。

image-20230615151700455

日总结

​ 今天真的搞了半天,没敲什么代码,从平台发放账户以后,这个平台一直没整明白,今天想着去把hdfs数据拿一下,当我在平台上ping提供好的说hdfs存放的位置的时候发现是好的,能ping通应该,就能拿,结果发现他那个9000端口一直说web丢失之类的,我又去测试自己虚拟机上面的hdfs,发现能取到,搜索说是hadoop的配置文件需要查看,但是数据是比赛方提供的,我们又不能操作他们的机子,后面就试着换换其他端口,结果就试出来了。

优化pandas效率

学习日期: 6.13

项目任务

​ hdfs取到以后发现数据是非常的大,而且每当我处理数据的时候,内存会超出,而且会自动把自己杀死,就必须要优化效率了,毕竟处理评论那边的代码雀氏是不太好,是使用for循环的,效率会低百倍。

优化以后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
from snownlp import SnowNLP
import numpy as np
def clearSpotComment(allInfoDf, province_list):
print("评论表开始处理")
comment_df = allInfoDf[allInfoDf.columns[12:]].copy()
comment_df.insert(2, 'EVALUATION_GRADE', '')
comment_df['FEEL_SCORE'] = ''
comment_df['FEEL'] = ''
order = ['SPOT_ID', 'EVALUATION', 'EVALUATION_GRADE', 'EVALUATION_TIME',
'EVALUATION_NAME', 'FEEL_SCORE', 'FEEL']
comment_df = comment_df[order]
# 时间处理 这组是对2018/9/16 这类没-而且ip没有的数据处理
comment_df['EVALUATION_TIME'] = comment_df['EVALUATION_TIME'].apply(lambda x: x.replace("/", "-"))

comment_df['EVALUATION_TIME'] = comment_df['EVALUATION_TIME'].apply(lambda x: x + " IP属地: " + np.random.choice(province_list) if "IP" not in x else x)

def calculate_scores(row):
if row.EVALUATION == "用户未点评。系统默认好评。":
score = 1.0
else:
score = round(SnowNLP(row.EVALUATION).sentiments, 2)
feelScore = round(score * 5, 1)
# 评论情绪
feel = scoreSnow(score)
if row.EVALUATION_GRADE == "":
row.EVALUATION_GRADE = str(int(round(feelScore, 0))) + "分 " + feel

row.FEEL_SCORE = feelScore
row.FEEL = feel
return row

comment_df = comment_df.apply(calculate_scores, axis=1)
print("评论表结束处理")
return comment_df

总结一下如何优化pandas效率。

1. 选择合适的数据类型

在数据处理中,选择正确的数据类型有可能利用内存空间并改善性能。

  • 对象类型对于所有类型的对象都是一种通用数据类型。在你不确定数据类型,或者数据类型混合的情况下,Pandas会默认采用这种数据类型。然而,对于大数据集,你可能更希望选择一个更加具体和紧凑的数据类型。

    DataFrame[‘column_name’] = DataFrame[‘column_name’].astype(‘datatype’)

例如:

DataFrame[‘column_name’] = DataFrame[‘column_name’].astype(‘category’)

  • 对于数值型数据,尽量使用’int’类型,根据数值大小的不同,还可以选择’int64’, ‘int32’, ‘int16’, ‘int8’。
  • 对于类别型数据,使用’category’类型可以节省大量内存。
  • 对于时间序列数据,你可以使用Pandas自带的’datetime64[ns]'类型。

2. 利用向量化操作,尽可能避免循环

Pandas的基础包Numpy,对向量运算做了大量的优化。因此,在编写代码时,我们建议你尽可能使用向量的操作,而不是使用Python的for循环。

1
2
3
4
5
6
# Python的for循环做法
for idx in df.index:
df.loc[idx, 'column_name'] = df.loc[idx, 'column_name'] + 1

# Pandas向量操作优化后
df['column_name'] = df['column_name'] + 1

3. 使用inplace 参数可以避免复制

对于一些可以改变原始DataFrame对象的操作,例如:drop, sort_values。 可以设置参数’inplace=True’,来保证操作在原数据上进行,避免创建新的复制品。

4. 避免链式操作

尽量避免使用链式操作,如df[df[‘A’]>0][‘B’] = 1,这样的操作会返回DataFrame的副本而非视图,对副本的操作不会反映到原始数据上。

5. 使用.isin()

当我们需要筛选出某列中属于某个list里的元素行时,使用.isin()是最有效率的方式。

1
chosen_ones = df[df.column_name.isin(list_of_values)]

6. 使用.loc(), .at(),.iat()

对数据的访问和修改,loc和iloc函数速度相对于普通方法要快,at 和iat为访问单个元素提供了更快的方法。

7. 对于大数据集,优选fillna()和interpolate()

对于大的数据集,使用fillna和interpolate来处理缺失值会比使用dropna快很多。

BUG

​ 无困难bug

日总结

​ 对于Pandas的性能优化, 主要的方向是减少内存消耗和增加计算效率,通过选择合适的数据类型,避免不必要的循环,合理使用Pandas提供的函数和方法,都是实现优化的有效策略。在编写代码前,明确了解数据的类型,选择最佳的数据处理方式,都可以帮助优化Pandas的性能。今天优化了很久,才将时间缩减到能成功运行。后面测试导入dm了,又要重新配置环境。

平台测试导入dm

学习日期: 6.14

项目任务

总结了一套如何将数据导入dm,从平台安装开始。

  • 首先安装python,步骤如下

    http://t.csdn.cn/ls0Qx

  • 安装dm数据库,好像不全安装也是可以的,怕后期需要用到就全安装了

    http://t.csdn.cn/3ptEe基本全复制就可以了

  • 然后需要去dm安装目录driver下面的两个目录,一个dmPython,一个sqlalchemy,执行python3 set.up install

    然后就能使用了,导入的时候可能会有些问题,一般都是环境变量的问题,搜一下就完活了。

四表整合

将几个文件全部整成同意的样式,这样后面分表的时候,代码就全部都能用了,也只用执行一遍即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
import re
import time
import glob
import json
import random
import time

import os
import radar
from snownlp import SnowNLP
import numpy as np
import pandas as pd
import arrow
from faker import Faker

# faker随机值
fakerCn = Faker('zh_CN')

order = ['SPOT_NAME', 'LEVEL', 'LOCATION', 'GRADE', 'QTY', 'INTRO', 'OPENTIME',
'NOTICE', 'ST', 'HOT', 'CITY_ID', 'RATING', 'SPOT_ID',
'EVALUATION_NAME', 'EVALUATION_TIME', 'EVALUATION']
dataType = {'SPOT_NAME': str, 'LEVEL': str, 'LOCATION': str, 'GRADE': float, 'QTY': str, 'INTRO': str, 'OPENTIME': str,
'NOTICE': str, 'ST': str, 'HOT': float, 'CITY_ID': str, 'RATING': float, 'SPOT_ID': str,
'EVALUATION_NAME': str, 'EVALUATION_TIME': str, 'EVALUATION': str}


def clearToAllDf(merged_df):
# 修改列名以及换行和空白字段处理
merged_df_copy = merged_df.copy()
merged_df_copy = merged_df_copy
spot_names = merged_df_copy['SPOT_NAME'].unique()
spot_id_dict = {name: i + 1 for i, name in enumerate(spot_names)}
merged_df_copy['SPOT_ID'] = merged_df_copy['SPOT_NAME'].apply(lambda x: spot_id_dict[x])
return merged_df_copy


def csvqne(folder_path):
# 遍历文件夹中所有文件
all_files = os.listdir(folder_path)
# csv_files = [file for file in all_files if file.endswith('.csv')]

# 合并每个 CSV 文件为一个 DataFrame,并将它们存储在一个列表中
dfs = []
for file in all_files:
file_path = os.path.join(folder_path, file)
df = pd.read_csv(file_path)
# 删除包含空值的行
df.dropna(subset=['COMMENT'], inplace=True)
# 添加到数组
df.rename(columns={'SPOT': 'SPOT_NAME', 'CO_TIME': 'EVALUATION_TIME', 'CO_NAME': 'EVALUATION_NAME',
'COMMENT': 'EVALUATION'}, inplace=True)
df = df.drop(columns=['TGA']).reindex(columns=order)
df['SPOT_NAME'] = df['SPOT_NAME'].apply(lambda x: x.replace("?", ""))
dfs.append(df)

# 将所有 DataFrame 合并为一个 DataFrame
# return pd.concat(dfs, ignore_index=True)
return dfs


def csvxc(folder_path):
all_files = os.listdir(folder_path)
dfs = []
for file in all_files:
file_path = os.path.join(folder_path, file)
df = pd.read_csv(file_path)
# 删除包含空值的行
df.dropna(subset=['COMMENT'], inplace=True)
# df['ST'] = df['INTRO'].apply(lambda x: re.search(r'服务设施([\s\S]*)', x, re.S).group(1))
# df['NOTICE'] = df['INTRO'].apply(lambda x: re.search(r'优待政策\s*(.+?)\s*服务设施', x, re.S).group(1))
# df['INTRO'] = df['INTRO'].apply(lambda x: re.search(r'介绍\s*(.+?)\s*全文', x, re.S).group(1))
df.rename(columns={'SPOT': 'SPOT_NAME', 'CO_TIME': 'EVALUATION_TIME', 'CO_NAME': 'EVALUATION_NAME',
'COMMENT': 'EVALUATION'}, inplace=True)
df = df.reindex(columns=order)
df['SPOT_NAME'] = df['SPOT_NAME'].apply(lambda x: x.replace("?", ""))
# 添加到数组
dfs.append(df)

# 将所有 DataFrame 合并为一个 DataFrame
return dfs


def xlsx(folder_path):
all_files = os.listdir(folder_path)
dfs = []
for file in all_files:
file_path = os.path.join(folder_path, file)
df = pd.read_excel(file_path, engine='openpyxl')
# 删除包含空值的行
df.dropna(subset=['COMMENT'], inplace=True)

df.rename(columns={'SPOT': 'SPOT_NAME', 'CO_TIME': 'EVALUATION_TIME', 'CO_NAME': 'EVALUATION_NAME',
'COMMENT': 'EVALUATION'}, inplace=True)
df = df.reindex(columns=order)
df['SPOT_NAME'] = df['SPOT_NAME'].apply(lambda x: x.replace("?", ""))

# 添加到数组
dfs.append(df)

# 将所有 DataFrame 合并为一个 DataFrame
return dfs


def txt(folder_path):
listDf = []
# 文件夹位置
# folder_path = r'../data/txt'
# 打算将每个文件都按照切割成字典,最后放列表中再合并
for file_path in glob.glob(os.path.join(folder_path, '*.txt')):
# 读取文件 字符集为utf-8 存入text
with open(file_path, 'r', encoding="utf-8") as f:
text = f.read()
# 根据txt文件关键字处理成key:value
spot = text[text.find('[SPOT]') + 7:text.find('[LOCATION]')].strip()
location = text[text.find('[LOCATION]') + 11:text.find('[OPENTIME]')].strip()
opentime = text[text.find('[OPENTIME]') + 10:text.find('[PHONE]')].strip()
phone = text[text.find('[PHONE]') + 7:text.find('[INTRO]')].strip()
intro = text[text.find('[INTRO]') + 7:text.find('[GRADE]')].strip()
grade = text[text.find('[GRADE]') + 7:text.find('[HOT]')].strip()
hot = text[text.find('[HOT]') + 5:text.find('[SUM]')].strip()
sum_ = text[text.find('[SUM]') + 5:text.find('[COMMENT]')].strip()
comment = text[text.find('[COMMENT]') + 10:text.find('[COMMENT_GRADE]')].strip()
comment_grade = text[text.find('[COMMENT_GRADE]') + 15:text.find('[COMMENT_TIME]')].strip()
comment_time = text[text.find('[COMMENT_TIME]') + 14:].strip()

# 将数据转换成key:value格式
data = {
'SPOT_NAME': spot,
'LEVEL': np.nan,
'LOCATION': location,
'OPENTIME': opentime,
'PHONE': phone,
'INTRO': intro,
'NOTICE': np.nan,
'ST': np.nan,
'GRADE': grade,
'HOT': hot,
'QTY': sum_,
'EVALUATION': comment,
'EVALUATION_GRADE': comment_grade,
'EVALUATION_TIME': comment_time,
# 用户名填充
'EVALUATION_NAME': fakerCn.user_name()
}
# 存入数组
listDf.append(data)
df = pd.DataFrame(listDf).reindex(columns=order)
df['SPOT_NAME'] = df['SPOT_NAME'].apply(lambda x: x.replace("?", ""))
df['ST'] = df['INTRO'].apply(lambda x: re.search(r'服务设施([\s\S]*)', x, re.S).group(1))
df['NOTICE'] = df['INTRO'].apply(lambda x: re.search(r'优待政策\s*(.+?)\s*服务设施', x, re.S).group(1))
df['INTRO'] = df['INTRO'].apply(lambda x: re.search(r'介绍\s*(.+?)\s*全文', x, re.S).group(1))
return df


def dataTopkl():
csvqne_list = csvqne("../data/hdfscsv/qnecsv")
csvxc_list = csvxc("../data/hdfscsv/xccsv")
excel_list = xlsx('../data/hdfsexcel')
txt_list = [txt(r'../data/hdfstxt').astype(dataType)]
all_list = csvxc_list + csvqne_list + excel_list + txt_list
all_df = pd.concat(all_list, ignore_index=True)
all_df.to_pickle('allTemp.pkl')


if __name__ == '__main__':
a = time.time()
dataTopkl()
a1 = time.time()
finalTime = arrow.get(a1 - a).format('mm分:ss秒:SSS毫秒')
print("执行速度", finalTime)

BUG

​ 将数据导入dm的时候,因为大致四种文件,格式都不太一样,介绍那块有个表是分三列,三列确实是最优的选择,原本txt文件分成三列已经写好了,但是发现在xlsx文件是用不了的,因为很多都是没有这些字段,导致无法使用,也咩有很好的解决办法,暂定那些无法处理的,就将其他两列先为空了,后面再想办法。

image-20230615170149713

日总结

​ 今天试将数据导入数据库,安装dm独有的依赖花费了很长一段时候,晚上总结了一下怎么linux系统重装,怕以后还用得到,今天大部分的时候都是在安装依赖,因为这平台对外限流,yum和pip都很慢,又将总表的数据绘制了一下,梳理了一下导入步骤,打算先将评论导入,然后是残缺的spot,因为如果spot有问题,读残缺版的就行了,这样效率会快很多,不用重新50w条重新导入。

服务器重装 评论表导入dm

学习日期: 6.15

项目任务

起因

​ 今天跑评论表的时候发现实在是太慢了,检查top运行进程的时候发现,CPU和内存都是在100%状态,后面检查机器发现,最大限制在了1核,那怎么玩,就先暂停了,修改成了8G 4核,但是重新启动的时候,发现平台镜像又清空了,真受不了一点,又花了一上午重装。

服务器重装

按照昨天的步骤基本没有问题,版本换成了3.8.8,安装依赖的时候主要pandas1.3.5 就行了。

评论表导入dm

测试dataframe导入dm没有问题了。 值得注意的是,因为数据量有点大,to_sql一次进不去,就分批导入了,大概需要1分钟,不如不分批会直接报错

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import time
import warnings
from sqlalchemy import create_engine
import numpy as np
import pandas as pd
import radar
from snownlp import SnowNLP
import arrow
from faker import Faker

# faker随机值
fakerCn = Faker('zh_CN')
# 忽略警告
warnings.filterwarnings("ignore", message="Could not import the lzma module.")


def scoreSnow(sentiments):
# sentiments = SnowNLP(comment).sentiments
if 0.9 <= sentiments <= 1:
feelMood = "超棒"
elif 0.8 <= sentiments <= 0.9:
feelMood = "满意"
elif 0.5 <= sentiments <= 0.8:
feelMood = "不错"
elif 0.2 <= sentiments <= 0.5:
feelMood = "一般"
else:
feelMood = "不佳"
return feelMood


def clearSpotComment(allInfoDf, province_list):
print("评论表开始处理")
comment_df = allInfoDf[allInfoDf.columns[12:]].copy()
comment_df.insert(2, 'EVALUATION_GRADE', '')
comment_df['FEEL_SCORE'] = ''
comment_df['FEEL'] = ''
order = ['SPOT_ID', 'EVALUATION', 'EVALUATION_GRADE', 'EVALUATION_TIME',
'EVALUATION_NAME', 'FEEL_SCORE', 'FEEL']
comment_df = comment_df[order]
# 时间处理 这组是对2018/9/16 这类没-而且ip没有的数据处理
comment_df['EVALUATION_TIME'] = comment_df['EVALUATION_TIME'].apply(lambda x: x.replace("/", "-"))
comment_df['EVALUATION_TIME'] = comment_df['EVALUATION_TIME'].apply(
lambda x: str(radar.random_date("2020-09-13", "2023-02-02").date()) if x == "" else (
x + "IP属地: " + np.random.choice(province_list) if "IP" not in x else x))

def calculate_scores(row):
row.EVALUATION = ''.join(str(row.EVALUATION).split())
if (row.EVALUATION == "用户未点评。系统默认好评。") or (type(row.EVALUATION) != str):
score = 1.0
else:
score = round(SnowNLP(row.EVALUATION).sentiments, 2)
feelScore = round(score * 5, 1)
# 评论情绪
feel = scoreSnow(score)
if row.EVALUATION_GRADE == "":
row.EVALUATION_GRADE = str(int(round(feelScore, 0))) + "分 " + feel

row.FEEL_SCORE = feelScore
row.FEEL = feel
return row

comment_df = comment_df.apply(calculate_scores, axis=1)
print("评论表结束处理")
return comment_df


def provinceRandom() -> str:
province = fakerCn.province()
if province.find("省") != -1 or province.find("市") != -1:
return province[0:-1]
return provinceRandom()


def provinceList():
province_list = []
while len(province_list) <= 33:
province = fakerCn.province()[0:2]
if province not in province_list:
province_list.append(province)
return province_list


def intoDm(data, tableName):
# 创建url
conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@47.120.9.247:5237'
# 创建Engine对象
engine = create_engine(conn_url)
# 到dm8中
data.to_sql(name=tableName, con=engine, if_exists='append', index=False)


if __name__ == '__main__':
a = time.time()
provinces = provinceList()
provinces.remove("黑龙")
provinces.append("黑龙江")
print("省份筛选完成")

allDf = pd.read_pickle('allClearData.pkl')

comment_temp_df = clearSpotComment(allDf, provinces)
comment_temp_df.to_pickle('commentFinalData.pkl')
path = 'commentFinalData.pkl'
comment_temp = pd.read_pickle(path)
# 修改object的类型
for col in comment_temp.select_dtypes(include=[object]):
comment_temp[col] = comment_temp[col].astype('string')
# 分批导入1000一次
chunkSize = 1000
for i in range(0, len(comment_temp), chunkSize):
data_chunk = comment_temp[i:i + chunkSize]
intoDm(data_chunk, 'comment_info')
a1 = time.time()
finalTime = arrow.get(a1 - a).format('mm分:ss秒:SSS毫秒')
print("执行速度", finalTime)

BUG

​ 之前python一直莫名其妙的自己停止,一直推测是内存溢出的问题,但是服务器中内存是有12G的,不应该啊,自己程序内存撑死也就占用4个G,后来top查看实时运行进程的时候,发现有很多python3的任务还在跑,但是没有停止,占用了大量的内存,导致今天内存都崩了,print都不行。杀死就好了。

1
2
pkill -9 python3 #杀死进程
echo 3 > /proc/sys/vm/drop_caches #清除缓存

image-20230615130509275

进程如下

image-20230615130959138

日总结

​ 今天花点时间处理掉了最麻烦的评论表,之所以麻烦是因为评论这边需要的技术点是最多的,一些数据的填充都需要用到ai,数据挖掘,语言处理,进行缺失值的填充。而且数据量是最多的,因为我是策略是45w条一起的,所以会导致挺慢的。效果不错的,评论我大致分了7个字段,所有的nan缺失值都填充了,而且对语言情绪进行了分析。不出意外明天就能把hdfs的新数据,清洗完毕了。

数据清洗工作完成

学习日期: 6.16

项目任务

​ 目前我分了四个文件,改天会合并成一个py文件,利用to_pickle处理成,pandas处理最快的方式,这样就不会在内存中占用资源,而是将数据放入磁盘中,不仅解决了内存超出问题,而且优化了速度。代码比较多就不放这篇笔记中了,会传到博客中,等比赛结束再公开。

level缺失值填充

用了回归模型,但是不是很准确,如果作为4A这样的景区评定的话,还是绰绰有余的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
level_dict = {'3A景区': 1, '4A景区': 2, '5A景区': 3}
df['LEVEL'] = df['LEVEL'].map(level_dict)
# 将等级缺失的填充为-1
df['LEVEL'].fillna(-1, inplace=True)
# 创建LEVEL有缺失值的表
predict_data = df[df['LEVEL'] == -1]
# 数据增强
listData = []
for _ in range(10):
listData.append(df[df['LEVEL'] != -1].sample(frac=0.8))
enhance = pd.concat(listData, ignore_index=True)
# 创建线性回归模型
lr = LinearRegression()
# 建立训练数据集和测试数据集 (使用的是80/20划分)
train_data = enhance.sample(frac=0.8)
# 把大部分数据留给测试集,只使用一小部分作为训练集
test_data = df[df['LEVEL'] != -1]
# fit模型
lr.fit(train_data[['GRADE', 'QTY','HOT','RATING']].values, train_data['LEVEL'].values)

# 测试模型
print('模型得分: ', lr.score(test_data[['GRADE', 'QTY','HOT','RATING']].values, test_data['LEVEL'].values))
prediction= lr.predict(predict_data[['GRADE', 'QTY','HOT','RATING']].values)
prediction[prediction > 3] = 3
prediction[prediction < 1] = 1

# 预测缺失的景区等级
predict_data['LEVEL'] = prediction.round(0)
df.update(predict_data)
# 景区等级还原
level_dict = {1:'3A景区', 2:'4A景区', 3:'5A景区'}
df['LEVEL'] = df['LEVEL'].map(level_dict)

BUG

​ 修改表类型将text换成varchar的了以后,我再导入spot景区数据,发现报错了,一看就是string类型太短了,存不住景区介绍那几块,那几块换成了text,其他依旧varchar,就没问题了。

image-20230617235306492

日总结

​ 今天把spot和其他城市表和省份表都重新导入了,而且建立了备份文件,防止数据丢失,因为现在已经是在平台了,等后面清洗的时候大概只需要20秒。明天还是要整理一下自己的代码包,注释需要添加的添加一下,文档是需要写代码如何清洗和运行的,所以为了不繁琐,还是需要整理一下的,速度方面评论那部分实在是太大了,也没有很好的优化方案,就暂定如此了。如果有空再修改一下。

景区评论接口修改

学习日期: 6.19

项目任务

添加所有景区的评论接口

​ 这个其实挺简单的,根据spot_id查询对应景区的所有评论就行了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 查询对应景区的所有评论
@app.route('/spot/commentAll/<spot_id>', methods=['GET'])
def spot_comments_from_id(spot_id):
data = dmToDfpkuseg.spotAllComment(eval(spot_id))
return json.dumps({'msg': '查询成功', 'code': 200, 'data': data}, ensure_ascii=False)

# dmToDfpkuseg代码
def spotAllComment(spot_id):
cur.execute("""
select spot_id, evaluation_name,evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO
where
SPOT_ID={}
""".format(spot_id))
list_test = cur.fetchall()
df_test = pd.DataFrame(list_test, columns=["spot_id","evaluation_name", "evaluation", "evaluation_grade",
"evaluation_time", "feel_score", "feel"])
return eval(df_test.to_json(orient='records', force_ascii=False).replace("null", "'未填'"))

修改景区接口

​ 因为增加了字段,加上一些不需要的词要加到停用词中,又进行处理了一下。添加了停用词,模型又训练了一遍

1
2
3
4
5
6
7
8
9
10
11
12
def keyWordFindComment(spot_id, comment_key_word):
cur.execute("""
select spot_id, evaluation_name,evaluation, evaluation_grade, evaluation_time, feel_score, feel from COMMENT_INFO
where
SPOT_ID={}
and
EVALUATION like '%{}%'
""".format(spot_id, comment_key_word))
list_test = cur.fetchall()
df_test = pd.DataFrame(list_test, columns=["spot_id", "evaluation_name", "evaluation", "evaluation_grade",
"evaluation_time", "feel_score", "feel"])
return eval(df_test.to_json(orient='records', force_ascii=False).replace("null", "'未填'"))

BUG

​ 跑py代码的时候,服务器报错,看起来是连接的问题,之前好像是遇到过,然后我用driver工具测试,发现连接不上数据库了,去数据库的服务器查看,发现是服务死了,前台启动以后报错解决。

image-20230621132405661

image-20230621132515527

日总结

​ 今天主要是对评论部分接口的修改以及添加,python跑一遍flask接口的确有点慢,是先加载再调用的,所以启动python的接口我就需要七八分钟,也是因为数据量大的原因,没有做优化,调用的速度还是很快的,所以就没有做优化,分词的效果还需要慢慢调整。

代码整合注释添加

学习日期: 6.20

项目任务

代码整合

​ 整合的目录如下,清洗的代码还是分两部来跑了,不然内存会崩。先执行alldataToPkl,将所有hdfs数据保存到本地然后再合成一个allClearData.pkl,然后allDataClearTableIntoDm就是将所有数据拆分成四个表。

image-20230621155433408

代码如下

​ 因为代码很长,博客中会代码展开栏的,为了博客的显示,就直接将代码放入了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import os
import json
import time
import warnings
from sqlalchemy import create_engine
import numpy as np
import pandas as pd
import radar
from snownlp import SnowNLP, sentiment
import arrow
from faker import Faker
from sklearn.linear_model import LinearRegression

# faker随机值
fakerCn = Faker('zh_CN')
# 将警告去除
pd.set_option('mode.chained_assignment', None)
warnings.filterwarnings("ignore", message="Could not import the lzma module.")

# 指定使用模型
sentiment.load("sentiment.marshal")


# 对于不同的情绪评分 判断出所给评论
def scoreSnow(sentiments):
if 0.9 <= sentiments <= 1:
feelMood = "超棒"
elif 0.8 <= sentiments <= 0.9:
feelMood = "满意"
elif 0.5 <= sentiments <= 0.8:
feelMood = "不错"
elif 0.2 <= sentiments <= 0.5:
feelMood = "一般"
else:
feelMood = "不佳"
# 返回评论字符
return feelMood


def clearSpotComment(allInfoDf, province_list):
print("评论表开始处理")
# 评论所需字段为全部中索引12以后的
comment_df = allInfoDf[allInfoDf.columns[12:]].copy()
# 添加新增字段
comment_df['FEEL_SCORE'] = ''
comment_df['FEEL'] = ''
# 排好序
order = ['SPOT_ID', 'EVALUATION', 'EVALUATION_GRADE', 'EVALUATION_TIME',
'EVALUATION_NAME', 'FEEL_SCORE', 'FEEL']
comment_df = comment_df[order]
# 时间处理 这组是对2018/9/16 这类没-而且ip没有的数据处理
comment_df['EVALUATION_TIME'] = comment_df['EVALUATION_TIME'].apply(lambda x: x.replace("/", "-"))
# 对time字段为空的进行随机值填充
comment_df['EVALUATION_TIME'] = comment_df['EVALUATION_TIME'].apply(
lambda x: str(radar.random_date("2020-09-13", "2023-02-02").date()) if x == "" else (
x + "IP属地: " + np.random.choice(province_list) if "IP" not in x else x))

# 添加feel以及feel_score两列并对grade为空的行填充
def calculate_scores(row):
# 对评论有换行空格的去除
row.EVALUATION = ''.join(str(row.EVALUATION).split())
# 未点评的默认好评
if (row.EVALUATION == "用户未点评。系统默认好评。") or (type(row.EVALUATION) != str):
score = 1.0
else:
# nlp自然语言处理
score = round(SnowNLP(row.EVALUATION).sentiments, 2)
# 赋值 权重平衡
feelScore = round(score * 5, 1)
# 评论情绪
feel = scoreSnow(score)
# 为空填充
if pd.isna(row.EVALUATION_GRADE):
print(row.EVALUATION_GRADE)
row.EVALUATION_GRADE = str(int(round(feelScore, 0))) + "分 " + feel
# 填充字段
row.FEEL_SCORE = feelScore
row.FEEL = feel
return row
# 进行自然语言处理 以及缺失值预测填充
comment_df = comment_df.apply(calculate_scores, axis=1)
print("评论表结束处理")
return comment_df


def provinceList():
# 存放34个省的集合 集合是唯一性
province_set = set()
# 去重导入
while len(province_set) <= 33:
province = fakerCn.province()[0:2]
# 因为两字,所以黑龙需要修改成黑龙江
if province == "黑龙":
province = "黑龙江"
province_set.add(province)
return list(province_set)


def intoDm(data, tableName):
# 创建url
conn_url = 'dm+dmPython://SYSDBA:SYSDBA001@47.120.9.36:5236'
# 创建Engine对象
engine = create_engine(conn_url)
# 到dm8中
data.to_sql(name=tableName, con=engine, if_exists='append', index=False)


def cityFindProvince(cityName):
# 处理io 并使用utf-8字符
with open("../data/city.json", "r", encoding="utf-8") as f:
content: dict = json.load(f)
# province_name省份名 city_data是省内城市是list(dict)
for province_name, city_name_datas in content.items():
# 遍历list内部数据dict
for city_name_data in city_name_datas:
# 取字典的键值对
for city_name, county_name_list in city_name_data.items():
# 找到对应城市则返回
if city_name == cityName:
return province_name
else:
# 否则在县里找
for county_name in county_name_list:
if county_name == cityName:
return province_name
# 没找到返回未知省
return "未知省"


def clearSpotProvince(cityWithProvinceDf):
# 省份名为城市名在cityFindProvince方法中找
cityWithProvinceDf['PROVINCE_NAME'] = cityWithProvinceDf['CITY_NAME'].apply(cityFindProvince)
# 省份名去重 排出省份id
province_dict = {name: rank + 1 for rank, name in enumerate(cityWithProvinceDf['PROVINCE_NAME'].unique())}
# 省份id填充
cityWithProvinceDf['PROVINCE_ID'] = cityWithProvinceDf['PROVINCE_NAME'].apply(lambda x: province_dict[x])
# 创建省份df 字段为name和id
province_temp_df = cityWithProvinceDf.copy()[['PROVINCE_NAME', 'PROVINCE_ID']].drop_duplicates()
# 删除城市表中省份名字段
cityWithProvinceDf.drop(columns=['PROVINCE_NAME'], inplace=True)
return province_temp_df


def clearSpotDf(allInfoDf):
# 所有信息从取景区相关信息并去重
spot_df = allInfoDf[['SPOT_NAME', 'LEVEL', 'LOCATION', 'GRADE', 'QTY', 'INTRO', 'OPENTIME',
'NOTICE', 'ST', 'HOT', 'CITY_ID', 'RATING', 'SPOT_ID']].drop_duplicates().copy()
# 添加需要列 随机填充电话
spot_df.insert(3, 'PHONE', [fakerCn.phone_number() for _ in range(spot_df.iloc[:, 0].size)])
return spot_df


def clearSpotCity(spotWithCityDf):
# 城市表先将城市名添加景区表
spotWithCityDf['CITY_NAME'] = \
spotWithCityDf['LOCATION'].str.extract('([^市]{2}()+市)')[
0].fillna('未知市')
# 挂上城市id
city_id_dict = {city_name: rank + 1 for rank, city_name in enumerate(spotWithCityDf['CITY_NAME'].unique())}
# 设置唯一的id
spotWithCityDf['CITY_ID'] = spotWithCityDf['CITY_NAME'].apply(lambda x: city_id_dict[x])
# 复制字段到城市df 并去重
city_temp_df = spotWithCityDf.copy()[['CITY_NAME', 'CITY_ID']].drop_duplicates()
# 删除城市名
spotWithCityDf.drop(columns=['CITY_NAME'], inplace=True)
return city_temp_df


# 该方法是处理一些景区名相同的景区,保留nan值少的,如果nan值一样取QTY大的
def keep_row_with_less_missing_values_and_less_qty(group):
# 计算每行的缺失值数量和QTY的数量
missing_values = group.isnull().sum(axis=1)
qty_sum = group['QTY']
# 合并两个Series
combined = pd.DataFrame({'Missing': missing_values, 'QTY': qty_sum})
# 先对缺失值数量排序,有平局的话就按照QTY的排序
combined_sorted = combined.sort_values(by=['Missing', 'QTY'])
# 返回最佳行的index
return group.loc[combined_sorted.index[0]]


def clearSpotFinal(df):
# 尝试将评论数字段转换为整数
try:
df['QTY'] = df['QTY'].str[:-3].astype(int)
except ValueError:
print("评论数字段包含一些无法转换为数字的值!")
# 将缺失的热度(nan值)赋予一个负数,以便于后面处理
df['HOT'].fillna(-1, inplace=True)
# 取出缺失数据
predict_data = df[df['HOT'] == -1]
listData = []
for _ in range(10):
listData.append(df[df['HOT'] != -1].sample(frac=0.8))
enhance = pd.concat(listData, ignore_index=True)
# 创建线性回归模型
lr = LinearRegression()
# 建立训练数据集和测试数据集 (使用的是80/20划分)
train_data = enhance.sample(frac=0.8)
# 把大部分数据留给测试集,只使用一小部分作为训练集
test_data = df[df['HOT'] != -1]
# # fit模型
lr.fit(train_data[['GRADE', 'QTY']].values, train_data['HOT'].values)
print('模型得分: ', lr.score(test_data[['GRADE', 'QTY']].values, test_data['HOT'].values))
# 测试模型
prediction = lr.predict(predict_data[['GRADE', 'QTY']].values)
prediction[prediction > 10] = 10
prediction[prediction < 0] = 0
# 预测缺失的热度
predict_data['HOT'] = prediction.round(1)
# 把预测结果填充回原数据集
df.update(predict_data)
# 找出所有重复的spot_name,我们只保留第一个(默认情况)
duplicated_mask = df.duplicated(subset='SPOT_NAME', keep=False)
# 获取所有重复的行
duplicated_rows = df[duplicated_mask]
# 景区名分组
new_rows = duplicated_rows.groupby('SPOT_NAME').apply(keep_row_with_less_missing_values_and_less_qty)
# 找出不重复的行
non_duplicated_rows = df[~duplicated_mask]
# 将两部分的数据合并
result = pd.concat([non_duplicated_rows, new_rows])
# 算出评论数的最大值
max_comment_sum = result['QTY'].max()
# 加权做默认推荐指标
result['RATING'] = round((result['GRADE'] * 6) + (result['HOT'] * 4) + (result['QTY'] / max_comment_sum * 30), 2)
# 缺少景区等级的表 XA景区
result.to_pickle('allClearDataSpot.pkl')
# 读取数据
df = pd.read_pickle('allClearDataSpot.pkl')
# 将LEVEL转换为数字标签
level_dict = {'3A景区': 1, '4A景区': 2, '5A景区': 3}
df['LEVEL'] = df['LEVEL'].map(level_dict)
# 将等级缺失的填充为-1
df['LEVEL'].fillna(-1, inplace=True)
# 创建LEVEL有缺失值的表
predict_data = df[df['LEVEL'] == -1]
# 数据增强
listData = []
for _ in range(10):
listData.append(df[df['LEVEL'] != -1].sample(frac=0.8))
enhance = pd.concat(listData, ignore_index=True)
# 创建线性回归模型
lr = LinearRegression()
# 建立训练数据集和测试数据集 (使用的是80/20划分)
train_data = enhance.sample(frac=0.8)
# 把大部分数据留给测试集,只使用一小部分作为训练集
test_data = df[df['LEVEL'] != -1]
# fit模型
lr.fit(train_data[['GRADE', 'QTY', 'HOT', 'RATING']].values, train_data['LEVEL'].values)
print('模型得分: ', lr.score(test_data[['GRADE', 'QTY', 'HOT', 'RATING']].values, test_data['LEVEL'].values))
# 测试模型 缺失值导入
prediction = lr.predict(predict_data[['GRADE', 'QTY', 'HOT', 'RATING']].values)
prediction[prediction > 3] = 3
prediction[prediction < 1] = 1

# 预测缺失的景区等级
predict_data['LEVEL'] = prediction.round(0)
df.update(predict_data)
# 景区等级还原
level_dict = {1: '3A景区', 2: '4A景区', 3: '5A景区'}
df['LEVEL'] = df['LEVEL'].map(level_dict)
# 最终成品景区表
df.to_pickle('spotFinalData.pkl')
df[df.select_dtypes(include=['object']).columns] = df.select_dtypes(include=['object']).astype('string')


if __name__ == '__main__':
a = time.time()
# 省份经纬度表
province_city = pd.read_csv("../data/GT_CITYS.csv",encoding = 'gbk')
intoDm(province_city,"GT_CITYS")
# 构建出所有省份表 方便给IP添加省份信息
provinces = provinceList()
print("省份筛选完成")
# 读取处理好的所有评论信息
allDf = pd.read_pickle('allClearData.pkl')
# 处理评论表 字段添加以及缺失值填充
comment_temp_df = clearSpotComment(allDf, provinces)
# 保存磁盘
path = 'commentFinalData.pkl'
comment_temp_df.to_pickle(path)
print("开始导入")
# 读取评论表
comment_temp = pd.read_pickle(path)
comment_temp[comment_temp.select_dtypes(include=['object']).columns] = comment_temp.select_dtypes(include=['object']).astype('string')

# 修改object的类型 转换为string
for col in comment_temp.select_dtypes(include=[object]):
comment_temp[col] = comment_temp[col].astype('string')
# 分批导入5000一次
chunkSize = 5200
for i in range(0, len(comment_temp), chunkSize):
data_chunk = comment_temp[i:i + chunkSize]
intoDm(data_chunk, 'comment_info')
# 清洗景区表 并保存磁盘
spot_info = clearSpotDf(allDf)
spot_info.to_pickle('allClearSpot.pkl')
# 清洗城市表 并导入dm数据库
city_df = clearSpotCity(spot_info)
intoDm(city_df, 'city_info')
# 清洗省份表 并导入dm数据库
province_df = clearSpotProvince(city_df)
intoDm(province_df, 'province_info')
# 最后对景区表的处理+填充+预测
spot_final_info = pd.read_pickle('allClearSpot.pkl').reset_index().drop(columns=['index'])
clearSpotFinal(spot_final_info)
intoDm(spot_final_info, "spot_info")
a1 = time.time()
finalTime = arrow.get(a1 - a).format('mm分:ss秒:SSS毫秒')
print("执行速度", finalTime)

BUG

​ 报出编码错误,已解决,问题点在pycharm编译器的问题,我刚开始是用windows中的python跑的,不管怎样都会报这个错误,然后就想到是不是之前编译器设置的时候utf-8的问题,导致没法处理gbk的一些数据,就换了平台中的python编译器,最后运行成功了。

image-20230621145347196

日总结

​ 今日的任务就是梳理清晰部分以及接口部分的代码,因为要求注释需要25%,之前其实已经达到了,因为代码有些部分比较冗余,修改封装一下,接口nohup 挂载一下,清洗需要两次,看后面性能,可以放到一个里面,处理评论表实在是太慢了,单单评论表需要20分钟,其他加起来不超过一分钟。后期写文档的时候看看能不能优化。