推荐系统学习笔记目录

  1. 推荐系统介绍
  2. 推荐算法
  3. Hadoop
  4. Hive & HBase
  5. Spark core
  6. Spark SQL & Spark streaming
  7. 推荐系统案例

Spark SQL

Spark SQL概念

  • Spark SQL is Apache Spark’s module for working with structured data.
    • 它是spark中用于处理结构化数据的一个模块

Spark SQL历史

  • Hive是目前大数据领域,事实上的数据仓库标准。

s9

  • Shark:shark底层使用spark的基于内存的计算模型,从而让性能比Hive提升了数倍到上百倍。
  • 底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块
  • 2014年6月1日的时候,Spark宣布了不再开发Shark,全面转向Spark SQL的开发

Spark SQL优势

  • Write Less Code
    s10
  • Performance
    s11

python操作RDD,转换为可执行代码,运行在java虚拟机,涉及两个不同语言引擎之间的切换,进行进程间 通信很耗费性能。

DataFrame

  • 是RDD为基础的分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型
  • dataFrame引入schema和off-heap(使用操作系统层面上的内存)
    • 1、解决了RDD的缺点
    • 序列化和反序列化开销大
    • 频繁的创建和销毁对象造成大量的GC
    • 2、丢失了RDD的优点
    • RDD编译时进行类型检查
    • RDD具有面向对象编程的特性

用scala/python编写的RDD比Spark SQL编写转换的RDD慢,涉及到执行计划

  • CatalystOptimizer:Catalyst优化器
  • ProjectTungsten:钨丝计划,为了提高RDD的效率而制定的计划
  • Code gen:代码生成器

s12

直接编写RDD也可以自实现优化代码,但是远不及SparkSQL前面的优化操作后转换的RDD效率高,快1倍左右

优化引擎:类似mysql等关系型数据库基于成本的优化器

首先执行逻辑执行计划,然后转换为物理执行计划(选择成本最小的),通过Code Generation最终生成为RDD

  • Language-independent API

    用任何语言编写生成的RDD都一样,而使用spark-core编写的RDD,不同的语言生成不同的RDD

  • Schema

    结构化数据,可以直接看出数据的详情

    在RDD中无法看出,解释性不强,无法告诉引擎信息,没法详细优化。

**为什么要学习sparksql **

sparksql特性

  • 1、易整合
  • 2、统一的数据源访问
  • 3、兼容hive
  • 4、提供了标准的数据库连接(jdbc/odbc)

DataFrame

介绍

在Spark语义中,DataFrame是一个分布式的行集合,可以想象为一个关系型数据库的表,或者一个带有列名的Excel表格。它和RDD一样,有这样一些特点:

  • Immuatable:一旦RDD、DataFrame被创建,就不能更改,只能通过transformation生成新的RDD、DataFrame
  • Lazy Evaluations:只有action才会触发Transformation的执行
  • Distributed:DataFrame和RDD一样都是分布式的
  • dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名。由于Python是弱类型语言,只能使用DataFrame

DataFrame vs RDD

  • RDD:分布式的对象的集合,Spark并不知道对象的详细模式信息
  • DataFrame:分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
  • DataFrame和普通的RDD的逻辑框架区别如下所示:

s13

  • 左侧的RDD Spark框架本身不了解 Person类的内部结构。

  • 右侧的DataFrame提供了详细的结构信息(schema——每列的名称,类型)

  • DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where …)。

  • DataFrame还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。

  • RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

  • DataFrame的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了

  • 通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不仅高效,也可以运行的很快。

  • DataFrame相当于是一个带着schema的RDD

Pandas DataFrame vs Spark DataFrame

  • Cluster Parallel:集群并行执行
  • Lazy Evaluations: 只有action才会触发Transformation的执行
  • Immutable:不可更改
  • Pandas rich API:比Spark SQL api丰富

创建DataFrame

  1. 创建dataFrame的步骤
    调用方法例如:spark.read.xxx方法

  2. 其他方式创建dataframe

  • createDataFrame:pandas dataframe、list、RDD

  • 数据源:RDD、csv、json、parquet、orc、jdbc

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
      jsonDF = spark.read.json("xxx.json")

    jsonDF = spark.read.format('json').load('xxx.json')

    parquetDF = spark.read.parquet("xxx.parquet")

    jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
    PNG

    - Transformation:延迟性操作

    - action:立即操作

    ![s14](s14.PNG)

    ## DataFrame API实现

    **基于RDD创建**

    ```python
    from pyspark.sql import SparkSession
    from pyspark.sql import Row

    spark = SparkSession.builder.appName('test').getOrCreate()
    sc = spark.sparkContext
    # spark.conf.set("spark.sql.shuffle.partitions", 6)
    # ================直接创建==========================
    l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(l)
    #为数据添加列名
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
    #创建DataFrame
    schemaPeople = spark.createDataFrame(people)

从csv中读取数据

1
2
3
4
5
6
7
8
9
10
11
12
13
# ==================从csv读取======================
#加载csv类型的数据并转换为DataFrame
df = spark.read.format("csv"). \
option("header", "true") \
.load("iris.csv")
#显示数据结构
df.printSchema()
#显示前10条数据
df.show(10)
#统计总量
df.count()
#列名
df.columns

增加一列

1
2
3
4
# ===============增加一列(或者替换) withColumn===========
#定义一个新的列,数据为其他某列数据的两倍
#如果操作的是原有列,可以替换原有列的数据
df.withColumn('newWidth',df.SepalWidth * 2).show()

删除一列

1
2
3
# ==========删除一列  drop=========================
#删除一列
df.drop('cls').show()

统计信息

1
2
3
4
#================ 统计信息 describe================
df.describe().show()
#计算某一列的描述信息
df.describe('cls').show()

提取部分列

1
2
# ===============提取部分列 select==============
df.select('SepalLength','SepalWidth').show()

基本统计功能

1
2
# ==================基本统计功能 distinct count=====
df.select('cls').distinct().count()

分组统计

1
2
3
4
5
6
# 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'})
df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()

# avg(), count(), countDistinct(), first(), kurtosis(),
# max(), mean(), min(), skewness(), stddev(), stddev_pop(),
# stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()

自定义的汇总方法

1
2
3
4
# 自定义的汇总方法
import pyspark.sql.functions as fn
#调用函数并起一个别名
df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()

拆分数据集

1
2
3
#====================数据集拆成两部分 randomSplit ===========
#设置数据比例将数据划分为两部分
trainDF, testDF = df.randomSplit([0.6, 0.4])

采样数据

1
2
3
4
5
# ================采样数据 sample===========
#withReplacement:是否有放回的采样
#fraction:采样比例
#seed:随机种子
sdf = df.sample(False,0.2,100)

查看两个数据集在类别上的差异

1
2
3
#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类
diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))
diff_in_train_test.distinct().count()

交叉表

1
2
# ================交叉表 crosstab=============
df.crosstab('cls','SepalLength').show()

udf

udf:自定义函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
#================== 综合案例 + udf================
# 测试数据集中有些类别在训练集中是不存在的,找到这些数据集做后续处理
trainDF,testDF = df.randomSplit([0.99,0.01])

diff_in_train_test = trainDF.select('cls').subtract(testDF.select('cls')).distinct().show()

#首先找到这些类,整理到一个列表
not_exist_cls = trainDF.select('cls').subtract(testDF.select('cls')).distinct().rdd.map(lambda x :x[0]).collect()

#定义一个方法,用于检测
def should_remove(x):
if x in not_exist_cls:
return -1
else :
return x

#创建udf,udf函数需要两个参数:
# Function
# Return type (in my case StringType())

#在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行
#在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行

from pyspark.sql.types import StringType
from pyspark.sql.functions import udf


check = udf(should_remove,StringType())

resultDF = trainDF.withColumn('New_cls',check(trainDF['cls'])).filter('New_cls <> -1')

resultDF.show()

JSON数据的处理

介绍

JSON数据

  • Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame

    Spark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame

  • This conversion can be done using SparkSession.read.json on a JSON file

    读取一个JSON文件可以用SparkSession.read.json方法

从JSON到DataFrame

  • 指定DataFrame的schema

    1. 通过反射自动推断,适合静态数据
    2. 程序指定,适合程序运行中动态生成的数据

加载json数据

1
2
3
4
5
6
#使用内部的schema
jsonDF = spark.read.json("xxx.json")
jsonDF = spark.read.format('json').load('xxx.json')

#指定schema
jsonDF = spark.read.schema(jsonSchema).json('xxx.json')

嵌套结构的JSON

  • 重要的方法

    1. get_json_object
    2. get_json
    3. explode

实践

静态json数据的读取和操作

无嵌套结构的json数据

1
2
3
4
5
6
7
8
9
10
11
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('json_demo').getOrCreate()
sc = spark.sparkContext

# ==========================================
# 无嵌套结构的json
# ==========================================
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

从json字符串数组得到DataFrame

1
2
3
4
5
6
7
8
# 从json字符串数组得到rdd有两种方法
# 1. 转换为rdd,再从rdd到DataFrame
# 2. 直接利用spark.createDataFrame(),见后面例子

jsonRDD = sc.parallelize(jsonString) # stringJSONRDD
jsonDF = spark.read.json(jsonRDD) # convert RDD into DataFrame
jsonDF.printSchema()
jsonDF.show()

直接从文件生成DataFrame

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# -- 直接从文件生成DataFrame
#只有被压缩后的json文件内容,才能被spark-sql正确读取,否则格式化后的数据读取会出现问题
jsonDF = spark.read.json("xxx.json")
# or
# jsonDF = spark.read.format('json').load('xxx.json')

jsonDF.printSchema()
jsonDF.show()

jsonDF.filter(jsonDF.pop>4000).show(10)
#依照已有的DataFrame,创建一个临时的表(相当于mysql数据库中的一个表),这样就可以用纯sql语句进行数据操作
jsonDF.createOrReplaceTempView("tmp_table")

resultDF = spark.sql("select * from tmp_table where pop>4000")
resultDF.show(10)

动态json数据的读取和操作

指定DataFrame的Schema

3.1节中的例子为通过反射自动推断schema,适合静态数据

下面我们来讲解如何进行程序指定schema

没有嵌套结构的json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
jsonString = [
"""{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""",
"""{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
]

jsonRDD = sc.parallelize(jsonString)

from pyspark.sql.types import *

#定义结构类型
#StructType:schema的整体结构,表示JSON的对象结构
#XXXStype:指的是某一列的数据类型
jsonSchema = StructType() \
.add("id", StringType(),True) \
.add("city", StringType()) \
.add("pop" , LongType()) \
.add("state",StringType())

jsonSchema = StructType() \
.add("id", LongType(),True) \
.add("city", StringType()) \
.add("pop" , DoubleType()) \
.add("state",StringType())

reader = spark.read.schema(jsonSchema)

jsonDF = reader.json(jsonRDD)
jsonDF.printSchema()
jsonDF.show()

带有嵌套结构的json

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark.sql.types import *
jsonSchema = StructType([
StructField("id", StringType(), True),
StructField("city", StringType(), True),
StructField("loc" , ArrayType(DoubleType())),
StructField("pop", LongType(), True),
StructField("state", StringType(), True)
])

reader = spark.read.schema(jsonSchema)
jsonDF = reader.json('data/nest.json')
jsonDF.printSchema()
jsonDF.show(2)
jsonDF.filter(jsonDF.pop>4000).show(10)

数据清洗

前面我们处理的数据实际上都是已经被处理好的规整数据,但是在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。

数据去重

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
'''
1.删除重复数据

groupby().count():可以看到数据的重复情况
'''
df = spark.createDataFrame([
(1, 144.5, 5.9, 33, 'M'),
(2, 167.2, 5.4, 45, 'M'),
(3, 124.1, 5.2, 23, 'F'),
(4, 144.5, 5.9, 33, 'M'),
(5, 133.2, 5.7, 54, 'F'),
(3, 124.1, 5.2, 23, 'F'),
(5, 129.2, 5.3, 42, 'M'),
], ['id', 'weight', 'height', 'age', 'gender'])

# 查看重复记录
#无意义重复数据去重:数据中行与行完全重复
# 1.首先删除完全一样的记录
df2 = df.dropDuplicates()

#有意义去重:删除除去无意义字段之外的完全重复的行数据
# 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)
# 删除某些字段值完全一样的重复记录,subset参数定义这些字段
df3 = df2.dropDuplicates(subset = [c for c in df2.columns if c!='id'])
# 3.有意义的重复记录去重之后,再看某个无意义字段的值是否有重复(在这个例子中,是看id是否重复)
# 查看某一列是否有重复值
import pyspark.sql.functions as fn
df3.agg(fn.count('id').alias('id_count'),fn.countDistinct('id').alias('distinct_id_count')).collect()
# 4.对于id这种无意义的列重复,添加另外一列自增id

df3.withColumn('new_id',fn.monotonically_increasing_id()).show()

缺失值处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
'''
2.处理缺失值
2.1 对缺失值进行删除操作(行,列)
2.2 对缺失值进行填充操作(列的均值)
2.3 对缺失值对应的行或列进行标记
'''
df_miss = spark.createDataFrame([
(1, 143.5, 5.6, 28,'M', 100000),
(2, 167.2, 5.4, 45,'M', None),
(3, None , 5.2, None, None, None),
(4, 144.5, 5.9, 33, 'M', None),
(5, 133.2, 5.7, 54, 'F', None),
(6, 124.1, 5.2, None, 'F', None),
(7, 129.2, 5.3, 42, 'M', 76000),],
['id', 'weight', 'height', 'age', 'gender', 'income'])

# 1.计算每条记录的缺失值情况

df_miss.rdd.map(lambda row:(row['id'],sum([c==None for c in row]))).collect()
[(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]

# 2.计算各列的缺失情况百分比
df_miss.agg(*[(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()

# 3、删除缺失值过于严重的列
# 其实是先建一个DF,不要缺失值的列
df_miss_no_income = df_miss.select([
c for c in df_miss.columns if c != 'income'
])

# 4、按照缺失值删除行(threshold是根据一行记录中,缺失字段的百分比的定义)
df_miss_no_income.dropna(thresh=3).show()

# 5、填充缺失值,可以用fillna来填充缺失值,
# 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing
# 对于数值类型,可以用均值或者中位数等填充

# fillna可以接收两种类型的参数:
# 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。
# 也可以接收一个字典{列名:值}这样

# 先计算均值,并组织成一个字典
means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
# 然后添加其它的列
means['gender'] = 'missing'

df_miss_no_income.fillna(means).show()

异常值处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
'''
3、异常值处理
异常值:不属于正常的值 包含:缺失值,超过正常范围内的较大值或较小值
分位数去极值
中位数绝对偏差去极值
正态分布去极值
上述三种操作的核心都是:通过原始数据设定一个正常的范围,超过此范围的就是一个异常值
'''
df_outliers = spark.createDataFrame([
(1, 143.5, 5.3, 28),
(2, 154.2, 5.5, 45),
(3, 342.3, 5.1, 99),
(4, 144.5, 5.5, 33),
(5, 133.2, 5.4, 54),
(6, 124.1, 5.1, 21),
(7, 129.2, 5.3, 42),
], ['id', 'weight', 'height', 'age'])
# 设定范围 超出这个范围的 用边界值替换

# approxQuantile方法接收三个参数:参数1,列名;参数2:想要计算的分位点,可以是一个点,也可以是一个列表(0和1之间的小数),第三个参数是能容忍的误差,如果是0,代表百分百精确计算。

cols = ['weight', 'height', 'age']

bounds = {}
for col in cols:
quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
IQR = quantiles[1] - quantiles[0]
bounds[col] = [
quantiles[0] - 1.5 * IQR,
quantiles[1] + 1.5 * IQR
]

>>> bounds
{'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}

# 为异常值字段打标志
outliers = df_outliers.select(*['id'] + [( (df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
outliers.show()
#
# +---+--------+--------+-----+
# | id|weight_o|height_o|age_o|
# +---+--------+--------+-----+
# | 1| false| false|false|
# | 2| false| false|false|
# | 3| true| false| true|
# | 4| false| false|false|
# | 5| false| false|false|
# | 6| false| false|false|
# | 7| false| false|false|
# +---+--------+--------+-----+

# 再回头看看这些异常值的值,重新和原始数据关联

df_outliers = df_outliers.join(outliers, on='id')
df_outliers.filter('weight_o').select('id', 'weight').show()
# +---+------+
# | id|weight|
# +---+------+
# | 3| 342.3|
# +---+------+

df_outliers.filter('age_o').select('id', 'age').show()
# +---+---+
# | id|age|
# +---+---+
# | 3| 99|
# +---+---+

sparkStreaming

sparkStreaming概述

SparkStreaming是什么

  • 它是一个可扩展,高吞吐具有容错性的流式计算框架

    吞吐量:单位时间内成功传输数据的数量

之前我们接触的spark-core和spark-sql都是处理属于离线批处理任务,数据一般都是在固定位置上,通常我们写好一个脚本,每天定时去处理数据,计算,保存数据结果。这类任务通常是T+1(一天一个任务),对实时性要求不高。

ss1

但在企业中存在很多实时性处理的需求,例如:双十一的京东阿里,通常会做一个实时的数据大屏,显示实时订单。这种情况下,对数据实时性要求较高,仅仅能够容忍到延迟1分钟或几秒钟。

ss2

实时计算框架对比

Storm

  • 流式计算框架
  • 以record为单位处理数据
  • 也支持micro-batch方式(Trident)

Spark

  • 批处理计算框架
  • 以RDD为单位处理数据
  • 支持micro-batch流式处理数据(Spark Streaming)

对比:

  • 吞吐量:Spark Streaming优于Storm
  • 延迟:Spark Streaming差于Storm

SparkStreaming的组件

  • Streaming Context
    • 一旦一个Context已经启动(调用了Streaming Context的start()),就不能有新的流算子(Dstream)建立或者是添加到context中
    • 一旦一个context已经停止,不能重新启动(Streaming Context调用了stop方法之后 就不能再次调 start())
    • 在JVM(java虚拟机)中, 同一时间只能有一个Streaming Context处于活跃状态, 一个SparkContext创建一个Streaming Context
    • 在Streaming Context上调用Stop方法, 也会关闭SparkContext对象, 如果只想仅关闭Streaming Context对象,设置stop()的可选参数为false
    • 一个SparkContext对象可以重复利用去创建多个Streaming Context对象(不关闭SparkContext前提下), 但是需要关一个再开下一个
  • DStream (离散流)
    • 代表一个连续的数据流
    • 在内部, DStream由一系列连续的RDD组成
    • DStreams中的每个RDD都包含确定时间间隔内的数据
    • 任何对DStreams的操作都转换成了对DStreams隐含的RDD的操作
    • 数据源
      • 基本源
        • TCP/IP Socket
        • FileSystem
      • 高级源
        • Kafka
        • Flume

Spark Streaming编码实践

Spark Streaming编码步骤:

  • 创建一个StreamingContext
  • 从StreamingContext中创建一个数据对象
  • 对数据对象进行Transformations操作
  • 输出结果
  • 开始和停止

利用Spark Streaming实现WordCount

需求:监听某个端口上的网络数据,实时统计出现的不同单词个数。

  1. 需要安装一个nc工具:sudo yum install -y nc
  2. 执行指令:nc -lk 9999 -v
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == "__main__":

sc = SparkContext("local[2]",appName="NetworkWordCount")
#参数2:指定执行计算的时间间隔
ssc = StreamingContext(sc, 1)
#监听ip,端口上的上的数据
lines = ssc.socketTextStream('localhost',9999)
#将数据按空格进行拆分为多个单词
words = lines.flatMap(lambda line: line.split(" "))
#将单词转换为(单词,1)的形式
pairs = words.map(lambda word:(word,1))
#统计单词个数
wordCounts = pairs.reduceByKey(lambda x,y:x+y)
#打印结果信息,会使得前面的transformation操作执行
wordCounts.pprint()
#启动StreamingContext
ssc.start()
#等待计算结束
ssc.awaitTermination()

可视化查看效果:http://192.168.199.188:4040

点击streaming,查看效果

Spark Streaming的状态操作

在Spark Streaming中存在两种状态操作

  • UpdateStateByKey
  • Windows操作

使用有状态的transformation,需要开启Checkpoint

  • spark streaming 的容错机制
  • 它将足够多的信息checkpoint到某些具备容错性的存储系统如hdfs上,以便出错时能够迅速恢复

updateStateByKey

Spark Streaming实现的是一个实时批处理操作,每隔一段时间将数据进行打包,封装成RDD,是无状态的。

无状态:指的是每个时间片段的数据之间是没有关联的。

需求:想要将一个大时间段(1天),即多个小时间段的数据内的数据持续进行累积操作

一般超过一天都是用RDD或Spark SQL来进行离线批处理

如果没有UpdateStateByKey,我们需要将每一秒的数据计算好放入mysql中取,再用mysql来进行统计计算

Spark Streaming中提供这种状态保护机制,即updateStateByKey

步骤:

  • 首先,要定义一个state,可以是任意的数据类型
  • 其次,要定义state更新函数–指定一个函数如何使用之前的state和新值来更新state
  • 对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除
  • 对于每个新出现的key,也会执行state更新函数

举例:词统计。

案例:updateStateByKey

需求:监听网络端口的数据,获取到每个批次的出现的单词数量,并且需要把每个批次的信息保留下来

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

# 创建SparkContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext

ssc = StreamingContext(sc, 3)
#开启检查点
ssc.checkpoint("checkpoint")

#定义state更新函数
def updateFunc(new_values, last_sum):
return sum(new_values) + (last_sum or 0)

lines = ssc.socketTextStream("localhost", 9999)
# 对数据以空格进行拆分,分为多个单词
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.updateStateByKey(updateFunc=updateFunc)#应用updateStateByKey函数

counts.pprint()

ssc.start()
ssc.awaitTermination()

Windows

  • 窗口长度L:运算的数据量
  • 滑动间隔G:控制每隔多长时间做一次运算

每隔G秒,统计最近L秒的数据

ss14

操作细节

  • Window操作是基于窗口长度和滑动间隔来工作的
  • 窗口的长度控制考虑前几批次数据量
  • 默认为批处理的滑动间隔来确定计算结果的频率

相关函数

ss15

  • Smart computation
  • invAddFunc

reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[num,Tasks])

func:正向操作,类似于updateStateByKey

invFunc:反向操作

ss16

例如在热词时,在上一个窗口中可能是热词,这个一个窗口中可能不是热词,就需要在这个窗口中把该次剔除掉

典型案例:热点搜索词滑动统计,每隔10秒,统计最近60秒钟的搜索词的搜索频次,并打印出最靠前的3个搜索词出现次数。

ss17

案例

监听网络端口的数据,每隔3秒统计前6秒出现的单词数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import os
# 配置spark driver和pyspark运行时,所使用的python解释器路径
PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
# 当存在多个版本时,不指定很可能会导致出错
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
os.environ['JAVA_HOME']=JAVA_HOME
os.environ["SPARK_HOME"] = SPARK_HOME
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql.session import SparkSession

def get_countryname(line):
country_name = line.strip()

if country_name == 'usa':
output = 'USA'
elif country_name == 'ind':
output = 'India'
elif country_name == 'aus':
output = 'Australia'
else:
output = 'Unknown'

return (output, 1)

if __name__ == "__main__":
#定义处理的时间间隔
batch_interval = 1 # base time unit (in seconds)
#定义窗口长度
window_length = 6 * batch_interval
#定义滑动时间间隔
frequency = 3 * batch_interval

#获取StreamingContext
spark = SparkSession.builder.master("local[2]").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, batch_interval)

#需要设置检查点
ssc.checkpoint("checkpoint")

lines = ssc.socketTextStream('localhost', 9999)
addFunc = lambda x, y: x + y
invAddFunc = lambda x, y: x - y
#调用reduceByKeyAndWindow,来进行窗口函数的调用
window_counts = lines.map(get_countryname) \
.reduceByKeyAndWindow(addFunc, invAddFunc, window_length, frequency)
#输出处理结果信息
window_counts.pprint()

ssc.start()
ssc.awaitTermination()

Spark Streaming对接Kafka

对接数据的两种方式

在前面的案例中,我们监听了来自网络端口的数据,实现了WordCount,但是在实际开发中并不是这样。我们更多的是接收来自高级数据源的数据,例如Kafka。

下面我们来介绍如何利用Spark Streaming对接Kafka

以下两种方式都是为了数据可靠性:

  • Receiver-based Approach:由Receiver来对接数据,Receiver接收到数据后会将日志预先写入到hdfs上(WAL),同时也会将数据做副本传输到其他的Worker节点。在读取数据的过程中,Receiver是从Zookeeper中获取数据的偏移信息。
  • Direct Approach(No Receivers):没有Receiver接收信息,由Spark Streaming直接对接Kafka的broker,获取数据和数据的偏移信息。

上述两种方式中,Direct Approach方式更加可靠,不需要Spark Streaming自己去保证维护数据的可靠性,而是由善于处理这类工作的Kafka来做。

对应代码

  • KafkaUtils.createStream(ssc,zkQuorum,”spark-streaming-consumer”,{topic:1})
  • KafkaUtils.createDirectStream(ssc,[topic],{“metadata.broker.list”:’localhost:9092’})

Direct API的好处

  • 简化的并行:在Receiver的方式中我们提到创建多个Receiver之后利用union来合并成一个Dstream的方式提高数据传输并行度。而在Direct方式中,Kafka中的partition与RDD中的partition是一一对应的并行读取Kafka数据,这种映射关系也更利于理解和优化。
  • 高效:在Receiver的方式中,为了达到0数据丢失需要将数据存入Write Ahead Log中,这样在Kafka和日志中就保存了两份数据,浪费!而第二种方式不存在这个问题,只要我们Kafka的数据保留时间足够长,我们都能够从Kafka进行数据恢复。
  • 精确一次:在Receiver的方式中,使用的是Kafka的高阶API接口从Zookeeper中获取offset值,这也是传统的从Kafka中读取数据的方式,但由于Spark Streaming消费的数据和Zookeeper中记录的offset不同步,这种方式偶尔会造成数据重复消费。而第二种方式,直接使用了简单的低阶Kafka API,Offsets则利用Spark Streaming的checkpoints进行记录,消除了这种不一致性。

准备工作

步骤:

  • 配置spark streaming kafka开发环境

      1. 下载spark streaming集成kafka的jar包
        spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
      1. 将jar包放置到spark的jars目录下
      1. 编辑spark/conf目录下的spark-defaults.conf,添加如下两条配置
      1
      2
      3
      spark.driver.extraClassPath=$SPAKR_HOME/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
      spark.executor.extraClassPath=$SPARK_HOME/jars/spark-streaming-kafka-0-8-assembly_2.11-2.3.0.jar
      #driver和executor对应的两个路径一致
  • 测试配置是否成功

    • 启动zookeeper

      1
      zkServer.sh start
    • 启动kafka

      1
      kafka-server-start.sh config/server.properties
    • 创建topic

      • bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test

        replication-factor:副本数量

        partitions:分区数量

        出现Created topic “test”,说明创建成功

    • 查看所有topic

      • bin/kafka-topics.sh –list –zookeeper localhost:2181
    • 通过Pycharm远程连接Centos 创建代码

    • 通过KafkaUtils 成功连接Kafka 创建DStream对象说明连接成功

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      import os
      # 配置spark driver和pyspark运行时,所使用的python解释器路径
      PYSPARK_PYTHON = "/home/hadoop/miniconda3/envs/datapy365spark23/bin/python"
      JAVA_HOME='/home/hadoop/app/jdk1.8.0_191'
      SPARK_HOME = "/home/hadoop/app/spark-2.3.0-bin-2.6.0-cdh5.7.0"
      # 当存在多个版本时,不指定很可能会导致出错
      os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON
      os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON
      os.environ['JAVA_HOME']=JAVA_HOME
      os.environ["SPARK_HOME"] = SPARK_HOME
      from pyspark.streaming import StreamingContext
      from pyspark.streaming.kafka import KafkaUtils
      from pyspark.sql.session import SparkSession

      sc = sparkContext('master[2]','kafkastreamingtest'
      ssc = StreamingContext(sc,3)
      #createDirectStream 连接kafka数据源获取数据
      # 参数1 streamingcontext
      #参数2 topic的名字
      # 参数3 kafka broker地址
      ks = KafkaUtils.createDirectStream(ssc,["topic1"],{"metadata.broker.list":"localhost:9092"})

案例实现

需求:利用Spark Streaming不断处理来自Kafka生产者生产的数据,并统计出现的单词数量

    1. 编写producer.py,用于生产数据
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    from kafka import KafkaProducer
    import time

    #创建KafkaProducer,连接broker
    producer = KafkaProducer(bootstrap_servers='localhost:9092')

    #每隔一段时间发送一端字符串数据到broker
    def send_data():
    for i in range(60):
    # (key,value) 参数2 是value
    producer.send('topic_name',"hello,kafka,spark,streaming,kafka")
    time.sleep(2)
    send_data()
    1. 编辑Spark Streaming代码,统计单词出现的数量
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    from pyspark.sql.session import SparkSession

    topic="topic_name"

    spark = SparkSession.builder.master("local[2]").getOrCreate()
    sc = spark.sparkContext
    ssc = StreamingContext(sc,3)

    #创建direct连接,指定要连接的topic和broker地址
    ks = KafkaUtils.createDirectStream(ssc,[topic],{"metadata.broker.list":"localhost:9092"})
    #(None,内容)
    ks.pprint()
    #(key,value)
    #以下代码每操作一次,就打印输出一次
    lines = ks.map(lambda x:x[1])
    lines.pprint()

    words = lines.flatMap(lambda line:line.split(","))
    #words.pprint()

    pairs = words.map(lambda word:(word,1))
    #pairs.pprint()

    counts = pairs.reduceByKey(lambda x,y:x+y)
    counts.pprint()

    ssc.start()
    #等待计算结束
    ssc.awaitTermination()
    1. 开启Spark Streaming消费数据,将产生的日志结果存储到日志中

    spark-submit xxx.py>a.log

    1. 开启producer.py,生产数据

    python3 producer.py

    1. 通过浏览器观察运算过程

    http://node-teach:4040

    1. 分析生成的日志内容
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    -------------------------------------------
    Time: 2018-12-11 01:31:21
    -------------------------------------------
    (None, 'hello,kafka,spark,streaming,kafka')
    (None, 'hello,kafka,spark,streaming,kafka')
    (None, 'hello,kafka,spark,streaming,kafka')
    (None, 'hello,kafka,spark,streaming,kafka')

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    hello,kafka,spark,streaming,kafka
    hello,kafka,spark,streaming,kafka

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    hello
    kafka
    spark
    streaming
    kafka
    hello
    kafka
    spark
    streaming
    kafka

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    ('hello', 1)
    ('kafka', 1)
    ('spark', 1)
    ('streaming', 1)
    ('kafka', 1)
    ('hello', 1)
    ('kafka', 1)
    ('spark', 1)
    ('streaming', 1)
    ('kafka', 1)

    -------------------------------------------
    Time: 2018-12-11 01:02:33
    -------------------------------------------
    ('streaming', 2)
    ('hello', 2)
    ('kafka', 4)
    ('spark', 2)

    -------------------------------------------
    Time: 2018-12-11 01:02:36
    -------------------------------------------

    -------------------------------------------
    Time: 2018-12-11 01:02:36
    -------------------------------------------

Spark Streaming对接flume

flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。

Spark Streaming对接FlumeNG有两种方式,一种是FlumeNG将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Pull拉取数据。

Pull方式

    1. 安装flume1.6以上
    1. 下载依赖包

    spark-streaming-flume-assembly_2.11-2.3.0.jar放入到flume的lib目录下

    1. 写flume的agent,注意既然是拉取的方式,那么flume向自己所在的机器上产数据就行
    1. 编写flume-pull.conf配置文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    simple-agent.sources = netcat-source
    simple-agent.sinks = spark-sink
    simple-agent.channels = memory-channel

    # source
    simple-agent.sources.netcat-source.type = netcat
    simple-agent.sources.netcat-source.bind = localhost
    simple-agent.sources.netcat-source.port = 44444

    # Describe the sink
    simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
    simple-agent.sinks.spark-sink.hostname = localhost
    simple-agent.sinks.spark-sink.port = 41414

    # Use a channel which buffers events in memory
    simple-agent.channels.memory-channel.type = memory

    # Bind the source and sink to the channel
    simple-agent.sources.netcat-source.channels = memory-channel
    simple-agent.sinks.spark-sink.channel=memory-channel
  • 5,启动flume

    flume-ng agent -n simple-agent -f flume-pull.conf -Dflume.root.logger=INFO,console

  • 6,编写word count代码

    代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

sc=SparkContext("local[2]","FlumeWordCount_Pull")
#处理时间间隔为2s
ssc=StreamingContext(sc,2)

#利用flume工具类创建pull方式的流
lines = FlumeUtils.createPollingStream(ssc, [("localhost",41414)])

lines1=lines.map(lambda x:x[1])
counts = lines1.flatMap(lambda line:line.split(" "))\
.map(lambda word:(word,1))\
.reduceByKey(lambda a,b:a+b)
counts.pprint()
#启动spark streaming应用
ssc.start()
#等待计算终止
ssc.awaitTermination()

启动

bin/spark-submit --jars xxx/spark-streaming-flume-assembly_2.11-2.3.0.jar xxx/flume_pull.py

push方式

大部分操作和之前一致

flume配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
simple-agent.sources = netcat-source
simple-agent.sinks = avro-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = localhost
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.avro-sink.type = avro
simple-agent.sinks.avro-sink.hostname = localhost
simple-agent.sinks.avro-sink.port = 41414
simple-agent.channels.memory-channel.type = memory
simple-agent.sources.netcat-source.channels = memory-channel

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.avro-sink.channel=memory-channel

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

sc=SparkContext("local[2]","FlumeWordCount_Push")
#处理时间间隔为2s
ssc=StreamingContext(sc,2)
#创建push方式的DStream
lines = FlumeUtils.createStream(ssc, "localhost",41414)
lines1=lines.map(lambda x:x[1].strip())
#对1s内收到的字符串进行分割
words=lines1.flatMap(lambda line:line.split(" "))
#映射为(word,1)元祖
pairs=words.map(lambda word:(word,1))
wordcounts=pairs.reduceByKey(lambda x,y:x+y)
wordcounts.pprint()
#启动spark streaming应用
ssc.start()
#等待计算终止
ssc.awaitTermination()