spark中有partition的概念(和slice是同一個概念,在spark1.2中官網已經做出了說明),一般每個partition對應一個task。在我的測試過程中,如果沒有設置spark.default.parallelism參數,spark計算出來的partition非常巨大,與我的cores非常不搭。我在兩臺機器上(8cores *2 +6g * 2)上,spark計算出來的partition達到2.8萬個,也就是2.9萬個tasks,每個task完成時間都是幾毫秒或者零點幾毫秒,執行起來非常緩慢。在我嘗試設置了 spark.default.parallelism 后,任務數減少到10,執行一次計算過程從minute降到20second。
參數可以通過spark_home/conf/spark-default.conf配置文件設置。
eg.
spark.master spark://master:7077
spark.default.parallelism 10
spark.driver.memory 2g
spark.serializer org.apache.spark.serializer.KryoSerializer
spark.sql.shuffle.partitions 50
下面是官網的相關描述:
from:http://spark.apache.org/docs/latest/configuration.html
Property Name | Default | Meaning |
spark.default.parallelism |
For distributed shuffle operations like reduceByKey and join , the largest number of partitions in a parent RDD. For operations likeparallelize with no parent RDDs, it depends on the cluster manager:
- Local mode: number of cores on the local machine
- Mesos fine grained mode: 8
- Others: total number of cores on all executor nodes or 2, whichever is larger
|
Default number of partitions in RDDs returned by transformations like join , reduceByKey , and parallelize when not set by user. |
from:http://spark.apache.org/docs/latest/tuning.html
Level of Parallelism
Clusters will not be fully utilized unless you set the level of parallelism for each operation high enough. Spark automatically sets the number of “map” tasks to run on each file according to its size (though you can control it through optional parameters to SparkContext.textFile
, etc), and for distributed “reduce” operations, such as groupByKey
and reduceByKey
, it uses the largest parent RDD’s number of partitions. You can pass the level of parallelism as a second argument (see the spark.PairRDDFunctions
documentation), or set the config propertyspark.default.parallelism
to change the default. In general, we recommend 2-3 tasks per CPU core in your cluster.
原文地址:http://www.cnblogs.com/wrencai/p/4231966.html