近期致力于总结科研或者工作中用到的主要技术栈,从技术原理到常用语法,这次查缺补漏当作我的小百科。主要技术包括:

以下整理错误或者缺少的部分欢迎指正!!!

大数据处理常用:Pyspark, Pandas

性能对比

Pyspark Pandas
运行环境 分布式计算集群(Hadoop/Apache Spark集群) 单个计算机
数据规模 亿级大规模 百万级小规模
优势 分布式计算->并行处理,处理速度快 API简单->数据处理简单
延迟机制 lazy execution, 执行动作之前不执行任务 eager execution, 任务立即被执行
内存缓存 persist()/cache()将转换的RDDs保存在内存 单机缓存
DataFrame可变性 不可变,修改则返回一个新的DataFrame 可变
可扩展性
列名允许重复 ×

常用语法对比

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# 头文件
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, LongType, StringType, ArrayType # 或者直接导入*
import pandas as pd


# 创建SparkSession对象
spark = SparkSession.builder \
.appName("username") \
.getOrCreate()


# 创建空表
schema = StructType([
StructField('id', LongType()),
StructField('type', StringType()),
]) # spark需要指定列名和类型
spark_df = spark.createDataFrame(spark.sparkContext.emptyRDD(), schema=schema)
pandas_df = pd.DataFrame(columns=['id', 'type'], index=[0, 1, 2])


# 根据现有数据创建
data = [(1, "Alice", 2000), (2, "Bob", 2001), (3, "Charlie", 2002)]
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("birth_year", IntegerType(), True)
])
spark_df = spark.createDataFrame(data, ["id", "name", "birth_year"])
spark_df = spark.createDataFrame(data, schema)
pandas_df = pd.DataFrame(data=data, columns=["id", "name", "birth_year"])


# 读取csv文件
spark_df = spark.read.csv("data.csv", header=True, inferSchema=True)
pandas_df = pd.read_csv("data.csv", sep="\t") # read_excel
# 保存数据到csv
spark_df.write.csv('data.csv', header=True)
pandas_df.to_csv("data.csv", index=False)

# 读取hive表数据
spark_df = spark.sql('select * from tab')
# 保存数据到hive表
spark_df.write.mode('overwrite').saveAsTable('db_name.tab_name')


# 相互转换
spark_df = SQLContext.createDataFrame(pandas_df)
pandas_df = spark_df.toPandas()


# 转换数据类型
spark_df = spark_df.withColumn("A", col("age").cast(StringType))
pandas_df["A"] = pandas_df['A'].astype("int")


# 重置索引
spark_df = spark_df.withColumn("id", monotonically_increasing_id()) # 生成一个增长的id列
pandas_df.reset_index()


# 切片
pandas_df['a':'c'] # a-c三行
pandas_df.iloc[1:3, 0:2] # 1-2行,0-1列。左闭右开
pandas_df.iloc[[0, 2], [1, 2]] # 第0,2行第0,2列
pandas_df.loc['a':'c', ['A', 'B']] # 第a-c行A,B列


# 选择列
spark_df.select('A', 'B')
pandas_df[['A', 'B']]

# 删除列
spark_df.drop('A', 'B')
pandas_df.drop(['A', 'B'], axis=1, inplace=True) # inplace表示是否创建新对象

# 新增列,设置列值
spark_df = spark_df.withColumn('name', F.lit(0))
pandas_df['name'] = 0

# 修改列值
spark_df.withColumn('name', 1)
pandas_df['name'] = 1
# 使用函数修改列值
spark_df = spark_df.withColumn('code', F.when(F.isnull(spark_df.code), 0).otherwise(spark_df.code))

# 修改列名
spark_df.withColumnRenamed('old_name', 'new_name')
pandas_df.rename(columns={'old_name1': 'new_name1', 'old_name1': 'new_name2'}, inplace=True)


# 显示数据
spark_df.limit(10) # 前10行
spark_df.show/take(10) # collect()返回全部数据
spark_df/pandas_df.first/head/tail(10)


# 表格遍历
saprk_df.collect()[:10]
spark_df.foreach(lambda row: print(row['c1'], row['c2']))
for i, row in pandas_df.iterrows():
print(row["c1"], row["c2"])


# 排序
spark/pandas_df.sort() # 按列值排序
pandas_df.sort_index() # 按轴排序
pandas_df.sort_values(by=["A", "B"], axis=0, ascending=[True, False], inplace=True) # 指定列升序/降序排序


# 过滤
spark_df.filter(df['col_name'] > 1) # spark_df.where(df['col_name'] > 1)
pandas_df[pandas_df['col_name'] > 1]
pandas_df_new = pandas_df[pandas_df["code"].apply(lambda x: len(x) == 11)]


# 去重
spark_df.select('col_name').distinct()
spark_df_filter = spark_df.drop_duplicates(["col_name"])
pandas_df.drop_duplicates(["col_name"], keep='first', inplace=True)

# 缺失数据处理
spark_df.na.fill()
spark_df.na.drop(subset=['A', "B"]) # 同dropna
pandas_df.fillna()
pandas_df.dropna(subset=['A', "B"], how="any", inplace=True)

# 空值过滤 filter=choose
spark_df.filter(~(F.isnull(spark_df.d)))
spark_df.filter(~(spark_df['A'].isNull() | spark_df['B'].isNull())) # 选出列值不为空的行 isnan()=isNull()<->isNOtnan()
pandas_df[pandas_df['A'].isna()] # 选出列值为空的行
pandas_df[pandas_df['A'].notna()] # 选出列值不为空的行


# 统计
spark/pandas_df.count() # spark返回总行数,pandas返回列非空总数
spark/pandas_df.describe() # 描述列的count, mean, min, max...

# 计算某一列均值
average_value = spark_df.select("col_name").agg({"col_name": "avg"}).collect()[0][0]
average_value = pandas_df["col_name"].mean()


# 表合并
# 按行合并,相当于追加
spark_df = spark_df.unionAll(spark_df1)
pandas_df = pd.concat([df_up, df_down], axis=0)
# 按列合并
spark_df = spark_df.join(df1, df1.id==spark_df.id, 'inner').drop(df1.id) # df1.id==spark_df.id也可写成['id](当且仅当列名相同)
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")


# 聚合函数
spark_df_collect = spark_df.groupBy('number').agg(
F.collect_set('province').alias('set_province'),
F.first('city').alias('set_city'),
F.collect_list('district').alias('set_district'),
F.max('report_user').alias('set_report_user'),
F.min('first_type').alias('set_first_type'))
# 分组聚合
spark_df.groupBy('A').agg(F.avg('B'), F.min('B'))
spark/pandas_df.groupby('A').avg('B')

# 根据函数分组聚合
def func(x):
return pd.DataFrame({
"A": x["A"].tolist()[0],
"B": sum(x["B"])}, index=[0])
pandas_df_result = pandas_df.groupby(["A"]).apply(func)


# spark udf函数和pandas apply函数
def func1(a, b):
return a + b
spark_df.withColumn("col_name", F.udf(func1, IntegerType())(spark_df.a, spark_df.b)) # spark_df['a']或F.col("a")))
def func2(x,y):
return 1 if x > np.mean(y) else 0
pandas_df['A'].apply(func2, args=(pandas_df['B'],))
pandas_df['C'] = pandas_df.apply(lambda x: 1 if x['A'] > (x['B']*0.5) else 0, axis=1)


# spark创建临时表
spark_df.createOrReplaceTempView('tmp_table') # 用sql API
res1 = spark.sql('select * from tmp_table')
spark_df.registerTempTable('tmp_table') # 用dataframe API
res2 = spark.table('tmp_table')

其他常用设置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class SparkUtils:
def __init__(self):
self.spark = None

def get_spark(self):
if self.spark is None:
self.spark = SparkSession.builder.appName("username") \
.enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \
.config("spark.sql.broadcastTimeout", "3600") \
.config("spark.driver.memory", "200g") \
.config("spark.executor.memory", "40g") \
.config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
.config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
.config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name",
"bdp-docker.jd.com:5000/wise_mart_bag:latest") \
.config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name",
"bdp-docker.jd.com:5000/wise_mart_bag:latest") \
.getOrCreate()
self.spark.sql('SET hive.exec.dynamic.partition=true')
self.spark.sql('SET hive.exec.dynamic.partition.mode=nonstrict')
return self.spark

spark = SparkUtils()

# 生成dataframe
spark_data = spark.sql("""
select
id,
username
from
tab1
where
status in (1, 2, 3)
and dt = '{}'
""".format(date))

# pandas常用显示设置
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)