pyspark API
https://spark.apache.org/docs/2.2.1/api/python/search.html?q=dataframe.write
简明教程
https://sparkbyexamples.com/pyspark/pyspark-orderby-and-sort-explained/
http://www.learnbymarketing.com/1100/pyspark-joins-by-example/
Spark调优
https://tech.meituan.com/2016/04/29/spark-tuning-basic.html
资源参数调优
了解完了Spark作业运行的基本原理之后,对资源相关的参数就容易理解了。所谓的Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。以下参数就是Spark中主要的资源参数,每个参数都对应着作业运行原理中的某个部分,我们同时也给出了一个调优的参考值。
- num-executors
参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。 - executor-memory
参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。 - executor-cores
参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同学的作业运行。 - driver-memory
参数说明:该参数用于设置Driver进程的内存。
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。 - spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。 - spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。 - spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
资源参数的调优,没有一个固定的值,需要同学们根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),同时参考本篇文章中给出的原理以及调优建议,合理地设置上述参数。
资源参数参考示例
以下是一份spark-submit命令的示例,大家可以参考一下,并根据自己的实际情况进行调节:
1 | /bin/spark-submit \ |
spark 本地执行
1 | # 本机4个CPU核心上执行 |
spark tutorial
spark-shell 不用创建sparkContext,默认已经启用了一个,如果再次生成会提示:”ValueError: Cannot run multiple SparkContexts at once”.
SparkSession和SparkContext的关系
1 | . |
SparkSession是Spark 2.0引入的新概念。SparkSession为用户提供了统一的切入点,来让用户学习spark的各项功能。
在spark的早期版本中,SparkContext是spark的主要切入点,由于RDD是主要的API,我们通过SparkContext来创建和操作RDD。对于每个其他的API,我们需要使用不同的context:
- Streaming使用StreamingContext
- sql使用SqlContext
- hive使用HiveContext
但是随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点。
SparkSession封装了SparkContext和SQLContext。为了向后兼容,SQLContext和HiveContext也被保存下来。
在大多数情况下,我们不需要显式初始化SparkContext; 而尽量通过SparkSession来访问它。
https://www.jianshu.com/p/4705988b0c84
1 | # Creating a SparkSession in Python |
Spark Cluster Components
Spark applications run as independent sets of processes on a cluster, coordinated by the SparkContext object in your main program (called the driver program).
driver program是协调集群进程的hub,所以当driver不在集群里时(client模式),网络带宽延迟会很影响通信与状态更新。
- Specifically, to run on a cluster, the SparkContext can connect to several types of cluster managers (either Spark’s own standalone cluster manager, Mesos or YARN), which allocate resources across applications.
- Once connected, Spark acquires executors on nodes in the cluster, which are processes that run computations and store data for your application.
- Next, it sends your application code (defined by JAR or Python files passed to SparkContext) to the executors.
- Finally, SparkContext sends tasks to the executors to run.
Spark Config
spark.sql.shuffle.partitions
http://blog.madhukaraphatak.com/dynamic-spark-shuffle-partitions/
Spark Submit
1 | /bin/spark-submit \ |
注意:当提交spark 任务时,如果jar包有多个,用逗号隔开,逗号前后不能有空格!,有逗号会导致后续参数解析错误,可能提交的scalar程序提示class notFound!
具体的例子参见《Submitting Applications》
python的例子:
对于 main.py 依赖的 util.py, module1.py, module2.py,需要先压缩成一个 .zip 文件,再通过 spark-submit 的 —py—files 选项上传到 yarn,mail.py 才能 import 这些子模块。命令如下:
1 | spark-submit |
deploy-mode
具体参考《standalone mode》
总结起来就一句话:culster/client的主要区别就是driver是否在cluster里
cluster 集群模式
1 | /bin/spark-submit \ |
- client模式提交任务后,会在客户端启动Driver进程。
- Driver会向Master申请启动Application启动的资源。
- 资源申请成功,Driver端将task发送到worker端执行。
- worker将task执行结果返回到Driver端。(由代码设置)
总结:client模式适用于测试调试程序。Driver进程是在客户端启动的,这里的客户端就是指提交应用程序的当前节点。在Driver端可以看到task执行的情况。生产环境下不能使用client模式,是因为:假设要提交100个application到集群运行,Driver每次都会在client端启动,那么就会导致客户端100次网卡流量暴增的问题。(因为要监控task的运行情况,会占用很多端口,如上图的结果图)客户端网卡通信,都被task监控信息占用。
集群模式如果是用来本地文件,需要添加—files参数
例如:
1 |
|
client
1 | /bin/spark-submit \ |
- 客户端使用命令spark-submit —deploy-mode cluster 后会启动spark-submit进程
- 此进程为Driver向Master申请资源,Driver进程默认需要1G内存和1Core,
- Master会随机选择一台worker节点来启动Driver进程(这样通信会较近,利于信息、状态收集)
- Driver启动成功后,spark-submit关闭,然后Driver向Master申请资源
- Master接收到请求后,会在资源充足的worker节点上启动Executor进程
- Driver分发Task到Executor中执行
总结:这种模式会将单节点的网卡流量激增问题分散到集群中。在客户端看不到task执行情况和结果,要去webui中看。cluster模式适用于生产环境,Master模式先启动Driver,再启动Application
spark shell 模式是以client提交的(第一种),所以不能加入—deploy-mode cluster的(第二种) client方式用于测试环境,用于方便查看结果,因为 spark shell 模式以client方式提交,所以 spark shell 模式不支持—deploy-mode cluster提
RDD and DataFrame
Transform函数
flatMap
1 | df = spark.sql("select id_feat from table where dt = '2020-10-01'") |
DataFrame 速查表
https://sparkbyexamples.com/pyspark/pyspark-structtype-and-structfield/
https://www.cnblogs.com/liaowuhen1314/p/12792202.html
DataFrame Split 列生成新column
https://sparkbyexamples.com/spark/spark-split-dataframe-column-into-multiple-columns/
[split 教程][2]
1 | df = spark.sql("select * from xxx where dt = '2020-10-20'") |
DataFrame 筛选
https://blog.csdn.net/sinat_26917383/article/details/80500349
join
例子
1 | a_rdd = sc.parallelize([('a', 13132), ('b', 121212),('c',56577)]) |
Where
https://stackoverflow.com/questions/35870760/filtering-a-pyspark-dataframe-with-sql-like-in-clause
1 | df = sc.parallelize([(1, "foo"), (2, "x"), (3, "bar")]).toDF(("k", "v")) |
DataFrame 统计
GroupBy
1 | df = df.where((df['dt'] >= '2020-09-14') & (df['dt'] <= '2020-10-05')).groupby('dt').count() |
DataFrame 转 rdd
参考资料
DataFrame的表相关操作不能处理一些问题,例如需要对一些数据利用指定的函数进行计算时,就需要将DataFrame转换为RDD。DataFrame可以直接利用df.rdd获取对应的RDD对象,此RDD对象的每个元素使用Row对象来表示,每列值会成为Row对象的一个域=>值映射。例如:
1 | >>> lists = [['a', 1], ['b', 2]] |
DataFrame写Hive表
1 | # 2. 处理数据 |
rdd 转 dataframe
1 | df = sqlContext.createDataFrame(data, schema=None, samplingRatio=None, verifySchema=True) |
schema:DataFrame各列类型信息,在提前知道RDD所有类型信息时设定。例如:
1 | schema = StructType([StructField('col1', StringType()), StructField('col2', IntegerType())]) |
读写文件
读文件
1 | data = spark.read.csv(filepath, sep=',', header=True, inferSchema=True) |
rdd写文件
1 | ## 单独一个文件 |
通常rdd直接write到文件,内容会是如下形式:
1 | >>cat rdd_res.txt |
rdd想没有括号输出到文本文件可以使用如下方式:
1 | data.map(lambda (k,v): "{0} {1}".format(k,v)).coalesce(1).write.csv('path') |
DataFrame写文件
1 | # dataframe 写csv文件 |
option 支持参数
path: csv文件的路径。支持通配符;
header: csv文件的header。默认值是false;
delimiter: 分隔符。默认值是’,’;
quote: 引号。默认值是””;
mode: 解析的模式。支持的选项有:
dataframe 添加一列
如果数据很多,想分块处理,以打到如下目的:
添加 id 选择区间
1 | sku_img_df = spark.sql(sql) |
读取TXT文件
- spark.sparkContext.textFile(file_path)
读取到的是RDD (pyspark.rdd.RDD),每一个元素直接是字符串
1 | >> rdd = spark.sparkContext.textFile(file_path) |
- spark.read.text(file_path)
读取到的是DataFrame (pyspark.sql.dataframe.DataFrame),每一个元素是 Row (pyspark.sql.types.Row)
1 | >> df = spark.read.text(file_path) |
所以这里如果直接rdd.map(lambda line: line.split(“ “)) 会报错,因为操作的元素是Row,Row对象没有split方法。
DataFrame 写入hive表
一般步骤
参考:https://blog.csdn.net/a2639491403/article/details/80044121
1.创建数据集的spark DattaFrame
df_tmp = spark.createDataFrame(RDD,schema)
这里schema是由StructFied函数定义的
2.将数据集的DataFrames格式映射到零时表
#### 3.用spark sql语句将零时表的数据导入hive的tmp_table表中
```sqlContext.sql('insert overwrite table des_table select *from tempTable')
写入分区表
参考:https://xinancsd.github.io/Python/pyspark_save_hive_table.html
df.write.saveAsTable() 方法
需要注意的是hive表名是不明感大小写的,经历过如下现象:
1 | df = spark.sql("select tmpr.abc") |
hive (hive默认格式,数据文件纯文本无压缩存储)
parquet (spark默认采用格式)
orc
json
csv
text(若用saveAsTable只能保存只有一个列的df)
jdbc
libsvm
1 | df.write.saveAsTable(save_table, mode='append', partitionBy=['pt_day']) |
from pyspark.sql.types import StringType, StructType, BooleanType, StructField
schema = StructType([
StructField(“vin”, StringType(), True),
StructField(“cust_id”, StringType(), True),
StructField(“is_maintain”, BooleanType(), True),
StructField(“is_wash”, BooleanType(), True),
StructField(“pt_day”, StringType(), True),
]
)
data = pd.read_csv(‘/path/to/data.csv’, header=0)
df = spark.createDataFrame(data, schema=schema)
写入hive表时就是指定的数据类型了
df.write.saveAsTable(save_table, mode=’append’, partitionBy=[‘pt_day’]
1 | #### option("delimiter", '\t') |
df.write.mode(“overwrite”).format(“orc”).partitionBy(“dt”).saveAsTable(“aicloud.cust_features”)
1 | 第二种:这种只会重写覆盖的分区,其他旧分区不会被删除 |
Scala
Scala匿名函数不可用return
Scala - return in anonymous function
https://www.jianshu.com/p/2053634328d3
[2]: https://sparkbyexamples.com/pyspark/pyspark-split-dataframe-column-into-multiple-columns/ 2019-12-25 14:26:31
vin Javascript & JQuery # Javascript & JQuery
标签(空格分隔): javascript
JQuery选择器
https://blog.csdn.net/qq_38225558/article/details/83780618
chrome 脚本编辑器
https://www.jianshu.com/p/87adbf88e2e3
https://www.cnblogs.com/liun1994/p/7265828.html