
Spark的join *** 作可能触发shuffle *** 作。shuffle *** 作要经过磁盘IO,网络传输,对性能影响比较大。本文聊一聊Spark的join在哪些情况下可以避免shuffle过程。
针对Spark DataFrame/DataSet的join,可以通过broadcast join和bucket join来避免shuffle *** 作。
Broadcast join很好理解,小表被分发到所有executors,所以不需要做shuffle就可以完成join Spark SQL控制自动broadcast join的参数是:sparksqlautoBroadcastJoinThreshold , 默认为10MB 就是说当join中的一张表的size小于10MB时,spark会自动将其封装为broadcast发送到所有结点,然后进行broadcast join 当然也可以手动将join中的某张表转化成broadcast :
sparkSessionsparkContextbroadcast(df)
Bucket join其实就是将要join的两张表按照join columns(或join columns的子集)根据相同的partitioner预先做好分区,并将这些分区信息存储到catalog中(比如HiveExternalCatalog);然后在读取这两张表并做join时,spark根据bucket信息将两张表的相同partition进行join即可,从而避免了shuffle的过程。注意,这里是避免了shuffle过程,并没有完全避免网络传输,由于两张表的相同partition不一定在同一台机器上,所以这里仍需要对其中一张表的partition进行网络传输。关于spark bucketing的原理和使用细节可以参见这个 视频 。
笔者这里想讨论的是PairRDDFunctions类的join方法。在RDD对象中有一个隐式转换可以将rdd转换成PairRDDFunctions对象,这样就可以直接在rdd对象上调用join方法:
先来看看PairRDDFunctions的join方法:
PairRDDFunctions有多个重载的join方法,上面这个只带一个RDD对象的参数,我们接着看它调用的另一个重载的join方法:
可以看到,RDD的join实现是由cogroup方法完成的,cogroup完后得到的是类型为RDD[(K, (Iterable[V], Iterable[W]))]的rdd对象,其中K为key的类型,V为第一张join表的value类型,W为第二张join表的value类型;然后,调用flatMapValues将其转换成RDD[(K, V, W)]的rdd对象。
下面来看看PairRDDFunctionscogroup方法的实现:
cogroup中生成了CoGroupedRDD对象,所以关键是这个RDD的getDependencies方法返回的dependencies中是否存在shuffle dependency
看看这个RDD的getDependencies方法:
其中的rdds就是进行cogroup的rdd序列,也就是PairRDDFunctionscogroup方法中传入的 Seq(self, other)
重点来了,对于所有参与cogroup的rdd,如果它的partitioner和结果CoGroupedRDD的partitioner相同,则该rdd会成为CoGroupedRDD的一个oneToOne窄依赖,否则就是一个shuffle依赖,即宽依赖。
我们知道,只有宽依赖才会触发shuffle,所以RDD的join可以避免shuffle的条件是: 参与join的所有rdd的partitioner都和结果rdd的partitioner相同。
那么,结果rdd的partitioner是怎么确定的呢?上文讲到PairRDDFunctionsjoin方法有多个重载,其中就有可以指定partitioner的重载,如果没有指定,则使用默认的partitioner,看看默认的partitioner是怎么确定的:
简单地说就是:
1 如果父rdds中有可用的合格的partitioner,则直接使用其中分区数最大的那个partitioner;
2 如果没有,则根据默认分区数生成HashPartitioner
至于怎样的partitioner是合格的,请读者阅读上面的PartitionerdefaultPartitioner方法和PartitionerisEligiblePartitioner方法。
RDD的compute方法是真正计算得到数据的方法,我们来看看CoGroupedRDD的compute方法是怎么实现的:
可以看到,CoGroupedRDD的数据是根据不同的依赖从父rdd中获取的:
1 对于窄依赖,直接调用父rdd的iterator方法获取对应partition的数据
2 对于宽依赖,从shuffleManager获取shuffleReader对象进行读取,这里就是shuffle read了
还有一个重点是读取多个父rdds的数据后,怎么将这些数据根据key进行cogroup?
这里用到了ExternalAppendOnlyMap来构建key和grouped values的映射。先来看看createExternalMap的实现:
相关类型定义如下:
可以看到,ExternalAppendOnlyMap的构造函数的参数是是三个方法参数:
1 createCombiner : 对每个key创建用于合并values的combiner数据结构,在这里就是一个CoGroup的数据,数组大小就是dependencies的数量
2 mergeValue : 将每个value合并到对应key的combiner数据结构中,在这里就是将一个CoGroupValue对象添加到其所在rdd对应的CoGroup中
3 mergeCombiners : 合并相同key的两个combiner数据结构,在这里就是合并两个CoGroupCombiner对象
CoGroupedRDDcompute会调用ExternalAppendOnlyMapinsertAll方法将从父rdds得到的数据一个一个地插入到ExternalAppendOnlyMap对象中进行合并。
最后,以这个ExternalAppendOnlyMap对象作为参数构造InterruptibleIterator,这个iterator会被调用者用于访问CoGroupedRDD的单个partition的所有数据。
本文简单地介绍了DataFrame/DataSet如何避免join中的shuffle过程,并根据源码详述了RDD的join *** 作的具体实现细节,分析了RDD的join在什么情况下可以避免shuffle过程。
1 源码版本:240
2 水平有限,如有错误,望读者指出
本文介绍基于Spark(20+)的Json字符串和DataFrame相互转换。
json字符串转DataFrame
spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:
若列数据全为null会用String类型
整数默认会用Long类型
浮点数默认会用Double类型 val json1 = """{"a":null, "b": 231, "c": 1}""" val json2 =
"""{"a":null, "b": "hello", "d": 12}""" val ds =
sparkcreateDataset(Seq(json1, json2))val df = sparkreadjson(ds) dfshow
dfprintSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null
|231| 1|null| |null|hello|null| 12| +----+-----+----+----+ root |-- a: string
(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)
|-- d: double (nullable =true)
若指定schema会按照schema生成DF:
schema中不存在的列会被忽略
可以用两种方法指定schema,StructType和String,具体对应关系看后面
若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为00值),若无法转换为值会抛出异常
val schema = StructType(List( StructField("a", ByteType, true), StructField("b"
, FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "b
float, c short" val df = sparkreadschema(schema)json(ds) dfshow
dfprintSchema +----+----+----+ | a| b| c| +----+----+----+ |null|231| 1| |null
|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float
(nullable =true) |-- c: short (nullable = true)
json解析相关配置参数
primitivesAsString (default false): 把所有列看作string类型
prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles
allowComments (default false): 忽略json字符串中Java/C++风格的注释
allowUnquotedFieldNames (default false): 允许不加引号的列名
allowSingleQuotes (default true): 除双引号外,还允许用单引号
allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)
allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符
allowUnquotedControlChars (default false):
允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。
mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。
PERMISSIVE
:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。
如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。
DROPMALFORMED : 忽略整条损害记录
FAILFAST : 遇到损坏记录throws an exception
columnNameOfCorruptRecord
(默认值为sparksqlcolumnNameOfCorruptRecord的值):允许PERMISSIVE
mode添加的新字段,会重写sparksqlcolumnNameOfCorruptRecord
dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循javatextSimpleDateFormat格式
只有日期部分(无详细时间)
timestampFormat (default yyyy-MM-dd’T’HH:mm:ssSSSXXX):
自定义日期格式,遵循javatextSimpleDateFormat格式 可以有详细时间部分(到微秒)
multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件
以上参数可用option方法配置:
val stringDF = sparkreadoption("primitivesAsString", "true")json(ds)
stringDFshow stringDFprintSchema +----+-----+----+----+ | a| b| c| d|
+----+-----+----+----+ |null| 231| 1|null| |null|hello|null| 12|
+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string
(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)
二进制类型会自动用base64编码方式表示
‘Man’(ascci) base64编码后为:”TWFu”
val byteArr = Array('M'toByte, 'a'toByte, 'n'toByte) val binaryDs =
sparkcreateDataset(Seq(byteArr))val dsWithB64 = binaryDswithColumn("b64",
base64(col("value"))) dsWithB64show(false) dsWithB64printSchema
+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|
+----------+----+ root |-- value: binary (nullable =true) |-- b64: string
(nullable =true) //=================================================
dsWithB64toJSONshow(false) +-----------------------------+ |value |
+-----------------------------+ |{"value":"TWFu","b64":"TWFu"}|
+-----------------------------+
//================================================= val json =
"""{"value":"TWFu"}""" val jsonDs = sparkcreateDataset(Seq(json)) val binaryDF
= sparkreadschema("value binary")json(jsonDs ) binaryDFshow
binaryDFprintSchema +----------+ | value| +----------+ |[4D 61 6E]|
+----------+ root |-- value: binary (nullable =true)
指定schema示例:
以下是Spark SQL支持的所有基本类型:
val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3,
"floatc":45, "doublec":67, "decimalc":890, "booleanc":true, "bytec":23,
"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12
11:22:22123123"}""" val ds = sparkcreateDataset(Seq(json)) val schema =
"stringc string, shortc short, integerc int, longc long, floatc float, doublec
double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,
datec date, timestampc timestamp" val df = sparkreadschema(schema)json(ds)
dfshow(false) dfprintSchema
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc
|datec |timestampc |
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
|abc |1 |null |3 |45 |67 |8900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11
:22:22123|
+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+
root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)
|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--
floatc: float (nullable =true) |-- doublec: double (nullable = true) |--
decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true
) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--
datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)
复合类型:
val json = """ { "arrayc" : [ 1, 2, 3 ], "structc" : { "strc" : "efg",
"decimalc" : 11 }, "mapc" : { "key1" : 12, "key2" : 11 } } """ val ds =
sparkcreateDataset(Seq(json))val schema = "arrayc array, structc
struct, mapc map" val df =
sparkreadschema(schema)json(ds) dfshow(false) dfprintSchema
+---------+--------+--------------------------+ |arrayc |structc |mapc |
+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 -> 12
, key2 ->11]| +---------+--------+--------------------------+ root |-- arrayc:
array (nullable =true) | |-- element: short (containsNull = true) |-- structc:
struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:
decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:
string | |-- value: float (valueContainsNull =true)
SparkSQL数据类型
基本类型:
DataType simpleString typeName sql defaultSize catalogString json
StringType string string STRING 20 string “string”
ShortType smallint short SMALLINT 2 smallint “short”
IntegerType int integer INT 4 int “integer”
LongType bigint long BIGINT 8 bigint “long”
FloatType float float FLOAT 4 float “float”
DoubleType double double DOUBLE 8 double “double”
DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)
“decimal(10,3)”
BooleanType boolean boolean BOOLEAN 1 boolean “boolean”
ByteType tinyint byte TINYINT 1 tinyint “byte”
BinaryType binary binary BINARY 100 binary “binary”
DateType date date DATE 4 date “date”
TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”
三个复合类型:
DataType simpleString typeName sql defaultSize catalogString json
ArrayType(IntegerType, true) array array ARRAY 4 array
{“type”:”array”,”elementType”:”integer”,”containsNull”:true}
MapType(StringType, LongType, true) map map MAP
28 map
{“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true}
StructType(StructField(“sf”, DoubleType)::Nil) struct struct
STRUCT 8 struct
{“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}}]}
Hadoop和Apache Spark两者都是大数据框架,但是各自存在的目的不尽相同。Hadoop实质上更多是一个分布式数据基础设施: 它将巨大的数据集分派到一个由普通计算机组成的集群中的多个节点进行存储,意味着您不需要购买和维护昂贵的服务器硬件。
同时,Hadoop还会索引和跟踪这些数据,让大数据处理和分析效率达到前所未有的高度。Spark,则是那么一个专门用来对那些分布式存储的大数据进行处理的工具,它并不会进行分布式数据的存储。
ADO NET是Net FrameWork SDK中用以 *** 作数据库的类库的总称 而DataSet类则是ADO NET中最核心的成员之一 也是各种开发基于 Net平台程序语言开发数据库应用程序最常接触的类 之所以DataSet类在ADO NET中具有特殊的地位 是因为DataSet在ADO NET实现从数据库抽取数据中起到关键作用 在从数据库完成数据抽取后 DataSet就是数据的存放地 它是各种数据源中的数据在计算机内存中映射成的缓存 所以有时说DataSet可以看成是一个数据容器 同时它在客户端实现读取 更新数据库等过程中起到中间部件的作用(DataReader只能检索数据库中的数据)
各种 Net平台开发语言开发数据库应用程序 一般并不直接对数据库 *** 作(直接在程序中调用存储过程等除外) 而是先完成数据连接和通过数据适配器填充DataSet对象 然后客户端再通过读取DataSet来获得需要的数据 同样更新数据库中数据 也是首先更新DataSet 然后再通过DataSet来更新数据库中对应的数据的 可见了解 掌握ADO NET 首先必须了解 掌握DataSet DataSet主要有三个特性
独立性 DataSet独立于各种数据源 微软公司在推出DataSet时就考虑到各种数据源的多样性 复杂性 在 Net中 无论什么类型数据源 它都会提供一致的关系编程模型 而这就是DataSet
离线(断开)和连接 DataSet既可以以离线方式 也可以以实时连接来 *** 作数据库中的数据 这一点有点像ADO中的RecordSet
DataSet对象是一个可以用XML形式表示的数据视图 是一种数据关系视图
一 DataSet对象的结构模型及和RecordSet的比较
虽说ADO NET是 ADO在 Net平台下得后继版本 但二者的区别是很大的 突出表现在ADO中的RecordSet对象和ADO NET中的DataSet对象 RecordSet其实也是非常灵活的一个对象 微软公司推出它也是煞费苦心 如 RecordSet可以离线 *** 作数据库 性能优良 效率较高等等这些都让当时的程序员为之一振 RecordSet虽然已经很复杂 但DataSet却比RecordSet复杂的多 我们知道每一DataSet往往是一个或多个DataTable 对象的集合 这些对象由数据行和数据列以及主键 外键 约束和有关DataTable对象中数据的关系信息组成 而RecordSet只能存放单张数据表 虽然这张数据表可以由几张数据表JOIN生成 所以有些时候说 RecordSet更类似于DataSet中的DataTable DataSet对象的结构模型如图 所示
图 DataSet对象的结构模型图通过图 可见在DataSet对象结构还是非常复杂的 在DataSet对象的下一层中是DataTableCollection对象 DataRelationCollection对象和ExtendedProperties对象 上文已经说过 每一个DataSet对象是由若干个DataTable对象组成 DataTableCollection就是管理DataSet中的所有DataTable对象 表示DataSet中两个DataTable对象之间的父/子关系是DataRelation对象 它使一个DataTable 中的行与另一个DataTable中的行相关联 这种关联类似于关系数据库中数据表之间的主键列和外键列之间的关联 DataRelationCollection对象就是管理DataSet中所有DataTable之间的DataRelation关系的 在DataSet中DataSet DataTable和DataColumn都具有ExtendedProperties属性 ExtendedProperties其实是一个属性集(PropertyCollection) 用以存放各种自定义数据 如生成数据集的SELECT语句等
二.使用DataSet
DataSet其实就是数据集 上文已经说过DataSet是把数据库中的数据映射到内存缓存中的所构成的数据容器 对于任何数据源 它都提供一致的关系编程模型 在DataSet中既定义了数据表的约束关系以及数据表之间的关系 还可以对数据表中的数据进行排序等 DataSet使用方法一般有三种
把数据库中的数据通过DataAdapter对象填充DataSet
通过DataAdapter对象 *** 作DataSet实现更新数据库
把XML数据流或文本加载到DataSet
下面就来详细探讨以上DataSet使用方法的具体实现 使用语言是C#
把数据库中的数据通过DataAdapter对象填充DataSet
掌握DataSet使用方法必须掌握ADO NET另外一个核心常用成员 数据提供者(Data Provider) 数据提供者(也称为托管提供者Managed Provider)是一个类集合 在 Net FrameWork SDK 中数据提供者分为二种 The SQL Server NET Data Provider和The OLE DB NET Data Provider 而到了 Net FrameWork SDK 时 ADO NET中又增加了The ODBC NET Data Provider和 The Oracle NET Data Provider二个数据提供者 The SQL Server NET Data Provider的 *** 作数据库对象只限于Sql Server 及以上版本 Oracle NET Data Provider的 *** 作数据库对象只限于Oracle 及以上版本 而The OLE DB NET Data Provider和The ODBC NET Data Provider可 *** 作的数据库类型就相对多了许多 只要它们在本地分别提供Ole Db提供程序和ODBC提供程序
在这些数据提供者中都有一个DataAdapter类 如 OLE DB NET Framework 数据提供者中是 OleDbDataAdapter类 The SQL Server NET Framework 数据提供者中是SqlDataAdapter类 The ODBC NET Framework 数据提供者中是OdbcDataAdapter类 通过这些DataAdapter就能够实现从数据库中检索数据并填充 DataSet 中的表
DataAdapter填充DataSet的过程分为二步 首先通过DataAdapter的SelectCommand属性从数据库中检索出需要的数据 SelectCommand其实是一个Command对象 然后再通过DataAdapter的Fill方法把检索来的数据填充 DataSet 代码清单 就是以Microsoft SQL Server 中的Northwind数据库为对象 C#使用The SQL Server NET Data Provider中的SqlDataAdapter填充DataSet的具体实现方法
代码清单
SqlConnection sqlConnection = new SqlConnection ( Data Source=localhost;Integrated Security=SSPI ;Initial Catalog=Northwind ) ;//创建数据连接SqlCommand selectCMD = new SqlCommand ( SELECT CustomerID CompanyName FROMCustomers sqlConnection ) ;//创建并初始化SqlCommand对象SqlDataAdapter sqlDataAdapter = new SqlDataAdapter ( ) ;custDA SelectCommand = selectCMD ;sqlConnection Open ( ) ;//创建SqlDataAdapter对象 并根据SelectCommand属性检索数据DataSet dsDataSet = new DataSet ( ) ;sqlDataAdapter Fill ( dsDataSet Customers ) ;//使用SqlDataAdapter的Fill方法填充DataSetsqlConnection Close ( ) ;//关闭数据连接
对于其他数据提供者的DataAdapter 具体的实现检索数据库中的数据并填充DataSet的实现方法类似于以上方法
通过DataAdapter对象 *** 作DataSet实现更新数据库
DataAdapter是通过其Update方法实现以DataSet中数据来更新数据库的 当DataSet实例中包含数据发生更改后 此时调用Update方法 DataAdapter 将分析已作出的更改并执行相应的命令(INSERT UPDATE 或 DELETE) 并以此命令来更新数据库中的数据 如果DataSet中的DataTable是映射到单个数据库表或从单个数据库表生成 则可以利用 CommandBuilder 对象自动生成 DataAdapter 的 DeleteCommand InsertCommand 和 UpdateCommand 使用DataAdapter对象 *** 作DataSet实现更新数据库具体的实现方法 只需把下面的代码清单 添加到代码清单 之后 二者合并即可实现删除Customers数据表中第一行数据
代码清单
SqlCommandBuilder sqlCommandBuilder = new SqlCommandBuilder(sqlDataAdapter ) ;//以sqlDataAdapter 为参数来初始化SqlCommandBuilder实例dsDataSet Tables[ Customers ] Rows[ ] Delete ( ) ;//删除DataSet中删除数据表Customers中第一行数据sqlDataAdapter Update ( dsDataSet Customers ) ;//调用Update方法 以DataSet中的数据更新从数据库dsDataSet Tables[ Customers ] AcceptChanges ( ) ;
由于不了解DataSet结构和与数据库关系 很多初学者往往只是更新了DataSet中的数据 就认为数据库中的数据也随之更新 所以当打开数据库浏览时发现并没有更新数据 都会比较疑惑 通过上面的介绍 疑惑应当能够消除了
XML和DataSet
DataSet中的数据可以从XML数据流或文档创建 并且 Net Framework可以控制加载XML数据流或文档中那些数据以及如何创建DataSet的关系结构 加载XML数据流和文档到DataSet中是可使用DataSet对象的ReadXml方法(注意 ReadXml来加载非常大的文件 则性能会有所下降) ReadXml 方法将从文件 流或 XmlReader 中进行读取 并将 XML 的源以及可选的 XmlReadMode 参数用作参数 该ReadXml方法读取 XML 流或文档的内容并将数据加载到 DataSet 中 根据所指定的XmlReadMode和关系架构是否已存在 它还将创建DataSet的关系架构
三.DataSet和数据绑定(DataBinding)
数据绑定是数据绑定是绑定技术中使用最频繁 也是最为重要的技术 也可以说是各种 Net开发语言开发数据库应用程序最需要掌握的基本的知识之一 数据绑定之所以很重要 是因为在 Net FrameWork SDK中并没有提供数据库开发的相关组件 即如 DbTextBox DbLabel等用于数据库开发的常用组件在 Net FrameWork SDK中都没有 而数据绑定技术则能够把TextBox组件 改造 成DbTextBox组件 把Label组件 改造 成DbLabel组件等等 所有这些都与DataSet有直接关系
数据绑定分成二类 简单型数据绑定和复杂型数据绑定 适用于简单型数据绑定组件一般有Lable TextBox等 适用于复杂性数据绑定的组件一般有DataGrid ListBox ComboBox等 其实简单型数据绑定和复杂性数据绑定并没有明确的区分 只是在组件进行数据绑定时 一些结构复杂一点的组件在数据绑定时 *** 作步骤相近 而另外一些结构简单一点的组件在数据绑定时也比较类似 于是也就产生了二个类别 以下就结合TextBox组件和DataGrid组件分别探讨DataSet在实现简单型数据绑定和复杂性数据绑定作用和具体实现方法
简单型数据绑定
简单型数据绑定一般使用这些组件中的DataBindings属性的Add方法把DataSet中某一个DataTable中的某一行和组件的某个属性绑定起来 从而达到显示数据的效果 TextBox组件的数据绑定具体实现方法是在代码清单 后 再添加代码清单 中的代码 代码清单 中的代码是把DataSet中的Customers 数据表中的 CustomerID 的数据和TextBox的Text属性绑定起来 这样DbTextBox就产生了 其他适用于简单型数据绑定组件数据绑定的方法类似与此 *** 作
代码清单
textBox DataBindings Add ( Text dsDataSet Customers CustomerID ) ;
复杂性数据绑定
复杂性数据绑定一般是设定组件的DataSource属性和DisplayMember属性来完成数据绑定的 DataSource属性值一般设定为要绑定的DataSet DisplayMember属性值一般设定为要绑定的数据表或数据表中的某一列 DataGrid组件的数据绑定的一般实现方法是在代码清单 后 再添加代码清单 中的代码 代码清单 的功能是把DataSet中的Customers 数据表和DataGrid绑定起来 其他适用于复杂性数据绑定的组件实现数据绑定的方法类似此 *** 作
代码清单
dataGrid DataSource = dsDataSet ;dataGrid DataMember = Customers ;
四.总结
lishixinzhi/Article/program/net/201311/14485
Spark对硬件的要求
估计所有的spark开发者都很关心spark的硬件要求。恰当的硬件配置需要具体情况具体分析,在这里给出以下建议。主要译自官网
一,存储系统
因为大多数Spark工作可能需要从外部存储系统(例如Hadoop文件系统或HBase)中读取输入数据,所以将spark尽可能部署到靠近存储系统很重要。所以,有如下建议:
1,如果可能,在与HDFS相同的节点上运行Spark。最简单的方式是将spark的Standalone集群和hadoop集群安装在相同的节点,同时配置好Spark和hadoop的内存使用,避免相互干扰(对于hadoop,每个task的内存配置参数是mapredchildjavaopts;mapreducetasktrackermaptasksmaximum 和mapreducetasktrackerreducetasksmaximum决定了task的数目)。也可以将hadoop和spark运行在共同的集群管理器上,如mesos和 yarn。
2,如果不可能,请在与HDFS相同的局域网中的不同节点上运行Spark。
3,对于低延迟数据存储(如HBase),可能优先在与存储系统不同的节点上运行计算任务以避免干扰。
二,本地磁盘
虽然Spark可以在内存中执行大量的计算,但它仍然使用本地磁盘来存储不适合RAM的数据,以及在stage之间,也即shuffle的中间结果。建议每个节点至少有4-8块磁盘,并且不需要RAID,仅仅是独立的磁盘挂在节点。在Linux中,使用noatime选项安装磁盘,以减少不必要的写入。在spark任务中,sparklocaldir配置可以十多个磁盘目录,以逗号分开。如果运行在hdfs上,与hdfs保持一致就很好。
使用noatime选项安装磁盘,要求当挂载文件系统时,可以指定标准Linux安装选项(noatime),这将禁用该文件系统上的atime更新。磁盘挂在命令:
mount -t gfs BlockDevice MountPoint -onoatime
BlockDevice 指定GFS文件系统驻留的块设备。
MountPoint 指定GFS文件系统应安装的目录。
例子:
mount -t gfs /dev/vg01/lvol0 /gfs1 -onoatime
三,内存
单台机器内存从8GB到数百GB,spark都能运行良好。在所有情况下,建议仅为Spark分配最多75%的内存;留下其余的 *** 作系统和缓冲区缓存。
需要多少内存取决于你的应用程序。要确定你的应用的特定数据集需要多大内存,请加载部分数据集到内存,然后在Spark UI的Storage界面去看它的内存占用量。
请注意,内存使用受到存储级别和序列化格式的极大影响 - 有关如何减少内存使用的技巧,请参阅另一篇调优的文章。
最后,请注意,对于超过200GB的内存的机器JAVA VM运行状态并不一直表现良好。如果买的机器内存超过了200GB,那么可以在一个节点上运行多个worker。Spark Standalone模式下,可以在配置文件 conf/spark-envsh中设置SPARK_WORKER_INSTANCES的值来设置单节点worker的数目。也可以设置SPARK_WORKER_CORES参数来设置每个Worker的cpu数目。
四,网络
根据以往的经验,假如数据是在内存中,那么spark的应用的瓶颈往往就在网络。用10 Gigabit或者更高的网络,是使spark应用跑的最更快的最佳方式。特别是针对“distributed reduce”应用,如group-bys,reduce-bys和sql joins,就表现的更加明显。在任何给定的应用程序中,可以通过spark ui查看spark shuffle过程夸网络传输了多少数据。
五, cpu
对于每台机器几十个cpu的机器,spark也可以很好的扩展,因为他在线程之间执行最小的共享cpu。应该每台机器至少配置8-16个内核。根据cpu负载,可能需要更多的cpu:一旦数据在内存中,大多数应用程序的瓶颈就在CPU和网络。
推荐阅读:
面试必备|spark 高层通用调优
Spark Adaptive Execution调研
Spark 的硬件配置
从MapReduce的兴起,就带来一种思路,就是希望通过大量廉价的机器来处理以前需要耗费昂贵资源的海量数据。这种方式事实上是一种架构的水平伸缩模式——真正的以量取胜。毕竟,以现在的硬件发展来看,CPU的核数、内存的容量以及海量存储硬盘,都慢慢变得低廉而高效。然而,对于商业应用的海量数据挖掘或分析来看,硬件成本依旧是开发商非常关注的。当然最好的结果是:既要马儿跑得快,还要马儿少吃草。
\\
Spark相对于Hadoop的MapReduce而言,确乎要跑得迅捷许多。然而,Spark这种In-Memory的计算模式,是否在硬件资源尤其是内存资源的消耗上,要求更高呢?我既找不到这么多机器,也无法租用多台虚拟instance,再没法测评的情况下,只要寻求Spark的官方网站,又或者通过Google搜索。从Spark官方网站,Databricks公司Patrick Wendell的演讲以及Matei Zaharia的Spark论文,找到了一些关于Spark硬件配置的支撑数据。
\\
Spark 与存储系统
\\
如果Spark使用HDFS作为存储系统,则可以有效地运用Spark的standalone mode cluster,让Spark与HDFS部署在同一台机器上。这种模式的部署非常简单,且读取文件的性能更高。当然,Spark对内存的使用是有要求的,需要合理分配它与HDFS的资源。因此,需要配置Spark和HDFS的环境变量,为各自的任务分配内存和CPU资源,避免相互之间的资源争用。
\\
若HDFS的机器足够好,这种部署可以优先考虑。若数据处理的执行效率要求非常高,那么还是需要采用分离的部署模式,例如部署在Hadoop YARN集群上。
\\
Spark 对磁盘的要求
\\
Spark是in memory的迭代式运算平台,因此它对磁盘的要求不高。Spark官方推荐为每个节点配置4-8块磁盘,且并不需要配置为RAID(即将磁盘作为单独的mount point)。然后,通过配置sparklocaldir来指定磁盘列表。
\\
Spark 对内存的要求
\\
Spark虽然是in memory的运算平台,但从官方资料看,似乎本身对内存的要求并不是特别苛刻。官方网站只是要求内存在8GB之上即可(Impala要求机器配置在128GB)。当然,真正要高效处理,仍然是内存越大越好。若内存超过200GB,则需要当心,因为JVM对超过200GB的内存管理存在问题,需要特别的配置。
\\
内存容量足够大,还得真正分给了Spark才行。Spark建议需要提供至少75%的内存空间分配给Spark,至于其余的内存空间,则分配给 *** 作系统与buffer cache。这就需要部署Spark的机器足够干净。
\\
考虑内存消耗问题,倘若我们要处理的数据仅仅是进行一次处理,用完即丢弃,就应该避免使用cache或persist,从而降低对内存的损耗。若确实需要将数据加载到内存中,而内存又不足以加载,则可以设置Storage Level。09版本的Spark提供了三种Storage Level:MEMORY_ONLY(这是默认值),MEMORY_AND_DISK,以及DISK_ONLY。
\\
关于数据的持久化,Spark默认是持久化到内存中。但它也提供了三种持久化RDD的存储方式:
\\
• \\t
in-memory storage as deserialized Javaobjects
\\t\\t
• \\t
in-memory storage as serialised data
\\t\\t
• \\t
on-disk storage
\\t\
第一种存储方式性能最优,第二种方式则对RDD的展现方式(Representing)提供了扩展,第三种方式则用于内存不足时。
\\
然而,在最新版(V102)的Spark中,提供了更多的Storage Level选择。一个值得注意的选项是OFF_HEAP,它能够将RDD以序列化格式存储到Tachyon中。相比MEMORY_ONLY_SER,这一选项能够减少执行垃圾回收,使Spark的执行器(executor)更小,并能共享内存池。Tachyon是一个基于内存的分布式文件系统,性能远超HDFS。Tachyon与Spark同源同宗,都烙有伯克利AMPLab的印记。目前,Tachyon的版本为050,还处于实验阶段。
\\
注意,RDDs是Lazy的,在执行Transformation *** 作如map、filter时,并不会提交Job,只有在执行Action *** 作如count、first时,才会执行Job,此时才会进行数据的加载。当然,对于一些shuffle *** 作,例如reduceByKey,虽然仅是Transformation *** 作,但它在执行时会将一些中间数据进行持久化,而无需显式调用persist()函数。这是为了应对当节点出现故障时,能够避免针对大量数据进行重计算。要计算Spark加载的Dataset大小,可以通过Spark提供的Web UI Monitoring工具来帮助分析与判断。
\\
Spark的RDD是具有分区(partition)的,Spark并非是将整个RDD一次性加载到内存中。Spark针对partition提供了eviction
policy,这一Policy采用了LRU(Least Recently Used)机制。当一个新的RDD分区需要计算时,如果没有合适的空间存储,就会根据LRU策略,将最少访问的RDD分区d出,除非这个新分区与最少访问的分区属于同一个RDD。这也在一定程度上缓和了对内存的消耗。
\\
Spark对内存的消耗主要分为三部分:
\\
1 \\t
数据集中对象的大小;
\\t\\t
2 \\t
访问这些对象的内存消耗;
\\t\\t
3 \\t
垃圾回收GC的消耗。
\\t\
一个通常的内存消耗计算方法是:内存消耗大小= 对象字段中原生数据 (2~5)。这是因为Spark运行在JVM之上, *** 作的Java对象都有定义的“object header”,而数据结构(如Map,LinkedList)对象自身也需要占用内存空间。此外,对于存储在数据结构中的基本类型,还需要装箱(Boxing)。Spark也提供了一些内存调优机制,例如执行对象的序列化,可以释放一部分内存空间。还可以通过为JVM设置flag来标记存放的字节数(选择4个字节而非8个字节)。在JDK 7下,还可以做更多优化,例如对字符编码的设置。这些配置都可以在spark-envsh中设置。
\\
Spark 对网络的要求
\\
Spark属于网络绑定型系统,因而建议使用10G及以上的网络带宽。
\\
Spark 对 CPU 的要求
\\
Spark可以支持一台机器扩展至数十个CPU
core,它实现的是线程之间最小共享。若内存足够大,则制约运算性能的就是网络带宽与CPU数。
\\
Spark官方利用Amazon EC2的环境对Spark进行了基准测评。例如,在交互方式下进行数据挖掘(Interative Data Mining),租用Amazon EC2的100个实例,配置为8核、68GB的内存。对1TB的维基百科页面查阅日志(维基百科两年的数据)进行数据挖掘。在查询时,针对整个输入数据进行全扫描,只需要耗费5-7秒的时间。如下图所示:
在Matei Zaharia的Spark论文中还给出了一些使用Spark的真实案例。视频处理公司Conviva,使用Spark将数据子集加载到RDD中。报道说明,对于200GB压缩过的数据进行查询和聚合 *** 作,并运行在两台Spark机器上,占用内存为96GB,执行完全部 *** 作需要耗费30分钟左右的时间。同比情况下,Hadoop需要耗费20小时。注意:之所以200GB的压缩数据只占用96GB内存,是因为RDD的处理方式,使得我们可以只加载匹配客户过滤的行和列,而非所有压缩数据。`
Spark集群硬件配置推荐
计算与存储:
大多数Spark作业可能需要从外部存储系统(例如 :Cassandra、Hadoop文件系统或HBase)读取输入数据,所以要让Spark计算引擎尽可能靠近数据持久层。如果使用HDFS作为数据存储集群,可以在相同的集群上部署Spark集群,并配置Spark和Hadoop的内存和CPU使用率以避免干扰。我们的生产存储使用的是Cassandra集群,spark
master 服务单独部署,其它节点同时部署:Cassandra
+ spark worker,保证spark
worker 节点可以快速从本地读取数据进行计算汇总。
磁盘:
虽然Spark可以在内存中执行大量的计算,但它仍然可能会使用本地磁盘来存储不适用于RAM的数据,建议每个节点配置4-8个磁盘,不需要配置RAID(磁盘阵列),磁盘成本越来越低,可以考虑配置ssd硬盘,可以大幅提升性能。另外;在Linux中,使用noatime选项挂载磁盘,以减少不必要的写入 *** 作。 在Spark中,可以将sparklocaldir变量配置为多个本地磁盘的地址,多个地址之间以逗号分隔。
内存
建议为Spark分配的内存容量不大于机器总内存容量的75%;确保为 *** 作系统和缓冲区留下足够的内存。根据业务特点评估需要多少内存。请注意,当内存容量超过200GB时Java 虚拟机的性能表现会不稳定。如果您购买的RAM大于200G,则可以为每个节点运行多个worker
JVM。在Spark的standalone模式下,您可以通过conf/spark-envsh中的SPARK_WORKER_INSTANCES变量设置每个节点运行的worker进程数,以及通过SPARK_WORKER_CORES变量设置每个worker可用的cpu核心数。
网络
当数据已经存储在内存中时,很多Spark应用程序的性能瓶颈在于网络的传输速率。推荐最低使用10G的网络。
CPU
Spark运行汇总计算任务比较多,推荐配置更多的cpu核数,性能提升还是比较明显,推荐:每台机器至少配置8-16个核。可以根据Spark作业的CPU负载情况,进行配置调整。一旦数据已经在内存中,大多数应用程序的性能瓶颈在于CPU和网络。
参考文档
>
write接口包含Append Overwrite Ignore等等;
优化的API开发的Spark任务包含DataFrame SQL DataSet等等
侧重点不一样,中台更强调服务,是业务和数据的连接层
以上就是关于Spark的join什么情况下可以避免shuffle全部的内容,包括:Spark的join什么情况下可以避免shuffle、sparkdataframe转换成字节流、2 分钟读懂大数据框架 Hadoop 和 Spark 的异同等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)