pyspark常用操作

spark连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from pyspark.sql import SparkSession
from pyspark.shell import sc
from pyspark.sql.types import *
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.types import ArrayType

class SparkUtils:
def __init__(self):
self.spark = None

def get_spark(self):
if self.spark is None:
self.spark = SparkSession.builder.appName("username") \
.enableHiveSupport().config("spark.sql.shuffle.partitions", "500") \
.config("spark.sql.broadcastTimeout", "3600") \
.config("spark.driver.memory", "200g") \
.config("spark.executor.memory", "40g") \
.config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
.config("spark.executorEnv.yarn.nodemanager.container-executor.class", "DockerLinuxContainer") \
.config("spark.yarn.appMasterEnv.yarn.nodemanager.docker-container-executor.image-name",
"bdp-docker.jd.com:5000/wise_mart_bag:latest") \
.config("spark.executorEnv.yarn.nodemanager.docker-container-executor.image-name",
"bdp-docker.jd.com:5000/wise_mart_bag:latest") \
.getOrCreate()
return self.spark

spark = SparkUtils()

# 生成dataframe
spark_data = spark.sql("""
select
id,
username,
num
from
table1
where
status in (1, 2, 3)
and dt = '{}'
""".format(date))

# 创建sql数据表
sp_test.createOrReplaceTempView('data')

常用命令
参考:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# 创建第一个dataframe
rdd = sc.parallelize([(1, 'Alice', 18), (2, 'Andy', 19), (3, 'Bob', 17)])
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True)
])
sp_test = spark.createDataFrame(rdd, schema)
sp_test.show()

# 数据缓存
sp_test.cache()
sp_test.persist()

# 新增一列
def func(a, b):
return a + b

sp_test.withColumn("price_detail", F.udf(func, IntegerType())(sp_test.a, sp_test.b))
sp_test.withColumn("price_detail", F.udf(func, IntegerType())(sp_test['a'], sp_test['b']))
sp_test.withColumn("price_detail", F.udf(func, IntegerType())(F.col("a"), F.col("b")))

# 修改名字
sp_test.withColumnRenamed("old_name", "new_name")

# 保持关联
sp_data_join = sp_data_new.join(sp_data_old,
sp_data_new_filter.begin_org_name_new == sp_data_old_filter.begin_org_name_old) &
(sp_data_new_filter.real_vehicle_type_new == sp_data_old_filter.vehicle_type_old),
how="left") # 默认为inner
# 通过一个字段关联
sp_data_join = sp_data_new.join(sp_data_old, ['id'], 'left')

# 利用udf函数过滤数据
def filter_milage(milage_old, milage_new):
# print(type(milage_old), type(milage_new))
return abs(milage_old - milage_new) <= 5
sp_data_join = sp_data_new.filter(
F.udf(filter_milage, BooleanType())(sp_data_new["milage_old"], sp_data_new["milage_new"]))

# 选择两列
sp_test_filter = sp_test.select('code', 'name')

# 删除列
sp_test = sp_test.drop('name', "code")

# 设置列值
sp_test = sp_test.withColumn('name',F.lit(''))

# 过滤非空符号
sp_test = sp_test.filter(~(F.isnull(sp_test.d)))

# 判断不为空(字符串)
sp_test = sp_test.filter(~((sp_test.code.isNull()) | (sp_test.code == "")))

# 去重
sp_test.select('code').distinct()
sp_test_filter = sp_test.drop_duplicates(["code"])

# 赋值为0或者""
sp_test = sp_test.withColumn('code', F.when(F.isnull(sp_test.code), 0).otherwise(sp_test.code))

# 聚合
sp_test_collect = sp_test.groupBy('number').agg(
F.collect_set('province').alias('set_province'),
F.first('city').alias('set_city'),
F.collect_list('district').alias('set_district'),
F.max('report_user').alias('set_report_user'),
F.min('first_type').alias('set_first_type'))
sp_test_collect = sp_test.groupby().agg({'code': 'sum'}).collect()

# 字段占比数量
sp_test.groupBy(sp_test.code).count().show()

pandas常用操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import pandas as pd
pd.set_option('display.max_rows', 100)
pd.set_option('display.max_columns', None)
pd.set_option('display.width',1000)
pd.set_option('display.max_colwidth',1000)

# 导入数据
df = pd.read_excel("file_name")
df = pd.read_csv("file_name", sep="\t")
df.to_csv("data.csv", index=False)

# 创建空表
df_empty = pd.DataFrame()
df_empty.append([3, 4, 5, "哈哈"])

# 过滤数据
df_new = df[df["code"].apply(lambda x: len(x) == 11)]
df_new2 = df[df["code"] == 1]

# 删除列,更改列名
df.drop(['column1', "column2"], axis=1, inplace=True)
df.rename(columns={"column1": "new_name1", "column2": "new_name2"}, inplace=True)

# 修改表
df["电话"] = df["电话"].apply(lambda x: x == "")

# 修改表2
def func(x):
if x["所属市"] == "赣州市":
return "宁都县"
return x
df["所属县"] = df.apply(func, axis=1)

# 上下合并表
df_rule = pd.concat([df_rule_1, df_rule_2], axis=0)

# 左右表合并
pd.merge(df_left, df_right, left_on="a", right_on="b", how="left|right|inner|outer")


# 分组聚合
def func1(gg):
return pd.DataFrame({
"上游客户电话号码": gg["上游电话号码修改"].tolist()[0],
"上游商家单量": sum(gg["上游商家单量"])}, index=[0])

df_result = df_new.groupby(["上游电话号码修改"]).apply(func1)
df_result2 = df.groupby("tag").apply(func1)

# 索引变列
df_result.reset_index()

# 遍历iterrows
for i, row in df.iterrows():
print(row["c1"], row["c2"])

# 补零操作
num = 233
str_num = str(num).zfill(4)
print(str_num)

# 去重
df.drop_duplicates(["列名"], keep='first', inplace=True)

# 排序
df.sort_values(by=["A", "D"], axis=0, ascending=[True, False], inplace=True)

# 删除空值
df.dropna(subset=['trader_province_name', "trader_county_name"], how="any", inplace=True)

# 转换数据类型
df["Customer"] = df['Customer'].astype("int")

# 创建sql数据表
df_data = spark.createDataFrame(df[["Customer"]])
df_data.registerTempTable('df_data')