1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73
| rdd = sc.parallelize([(1, 'Alice', 18), (2, 'Andy', 19), (3, 'Bob', 17)]) schema = StructType([ StructField("id", IntegerType(), True), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) sp_test = spark.createDataFrame(rdd, schema) sp_test.show()
sp_test.cache() sp_test.persist()
def func(a, b): return a + b
sp_test.withColumn("price_detail", F.udf(func, IntegerType())(sp_test.a, sp_test.b)) sp_test.withColumn("price_detail", F.udf(func, IntegerType())(sp_test['a'], sp_test['b'])) sp_test.withColumn("price_detail", F.udf(func, IntegerType())(F.col("a"), F.col("b")))
sp_test.withColumnRenamed("old_name", "new_name")
sp_data_join = sp_data_new.join(sp_data_old, sp_data_new_filter.begin_org_name_new == sp_data_old_filter.begin_org_name_old) & (sp_data_new_filter.real_vehicle_type_new == sp_data_old_filter.vehicle_type_old), how="left")
sp_data_join = sp_data_new.join(sp_data_old, ['id'], 'left')
def filter_milage(milage_old, milage_new): return abs(milage_old - milage_new) <= 5 sp_data_join = sp_data_new.filter( F.udf(filter_milage, BooleanType())(sp_data_new["milage_old"], sp_data_new["milage_new"]))
sp_test_filter = sp_test.select('code', 'name')
sp_test = sp_test.drop('name', "code")
sp_test = sp_test.withColumn('name',F.lit(''))
sp_test = sp_test.filter(~(F.isnull(sp_test.d)))
sp_test = sp_test.filter(~((sp_test.code.isNull()) | (sp_test.code == "")))
sp_test.select('code').distinct() sp_test_filter = sp_test.drop_duplicates(["code"])
sp_test = sp_test.withColumn('code', F.when(F.isnull(sp_test.code), 0).otherwise(sp_test.code))
sp_test_collect = sp_test.groupBy('number').agg( F.collect_set('province').alias('set_province'), F.first('city').alias('set_city'), F.collect_list('district').alias('set_district'), F.max('report_user').alias('set_report_user'), F.min('first_type').alias('set_first_type')) sp_test_collect = sp_test.groupby().agg({'code': 'sum'}).collect()
sp_test.groupBy(sp_test.code).count().show()
|