怎样从spark rdd中得到某个元素,并将他赋值给一个对象

怎样从spark rdd中得到某个元素,并将他赋值给一个对象,第1张

一般来讲,对于陌生的名词,大家的第一个反应都是“What is it?”.

RDD是Spark的核心内容,在Spark的官方文档中解释如下:RDD is a fault-tolerant collection of elements that can be operated on in parallel.由此可见,其中有两个关键词:fault-tolerant &in parallel.首先,容错性是RDD的一个重要特性;其次,它是并行计算的数据.

RDD的中文解释为:d性分布式数据集,全称Resilient Distributed Datasets.宾语是dataset,即内存中的数据库.RDD 只读、可分区,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用.所谓d性,是指内存不够时可以与磁盘进行交换.这涉及到了RDD的另一特性:内存计算,就是将数据保存到内存中.同时,为解决内存容量限制问题,Spark为我们提供了最大的自由度,所有数据均可由我们来进行cache的设置,包括是否cache和如何cache.

如果看到这里,你的思维里对RDD还是没有任何概念的话,或许可以参照我的形象化理RDD,就是一个被武装起来的数据集.

Spark UDF函数可以通过使用array()函数来传入一个数组。这个函数需要一个参数,一个表示需要传入的数组元素的表达式,并将它们作为一个单独的数组返回。例如,下面的示例将一个数组复制到一个Spark UDF函数中:

myArray = array(2, 4, 6, 8)

myUDF = udf(lambda x: x * 2, IntegerType())

# 使用myArray作为参数传入myUDF

result = myUDF(myArray).show()

在这个例子中,myArray将传入myUDF,并返回一个新的数组,数组中的每个元素都是原来的两倍。因此,结果将是:[4, 8, 12, 16]。

Spark UDF函数还可以使用collect_list函数来传入一个数组。该函数接受一个表达式,将其中的元素收集到一个数组中,并返回一个数组。例如,下面的示例将一个数组复制到一个Spark UDF函数中:

myArray = collect_list(2, 4, 6, 8)

myUDF = udf(lambda x: x * 2, IntegerType())

# 使用myArray作为参数传入myUDF

result = myUDF(myArray).show()

在这个例子中,myArray将传入myUDF,并返回一个新的数组,数组中的每个元素都是原来的两倍。因此,结果将是:[4, 8, 12, 16]。

本文介绍基于Spark(2.0+)的Json字符串和DataFrame相互转换。

json字符串转DataFrame

spark提供了将json字符串解析为DF的接口,如果不指定生成的DF的schema,默认spark会先扫码一遍给的json字符串,然后推断生成DF的schema:

* 若列数据全为null会用String类型

* 整数默认会用Long类型

* 浮点数默认会用Double类型 val json1 = """{"a":null, "b": 23.1, "c": 1}""" val json2 =

"""{"a":null, "b": "hello", "d": 1.2}""" val ds =

spark.createDataset(Seq(json1, json2))val df = spark.read.json(ds) df.show

df.printSchema +----+-----+----+----+ | a| b| c| d| +----+-----+----+----+ |null

|23.1| 1|null| |null|hello|null| 1.2| +----+-----+----+----+ root |-- a: string

(nullable =true) |-- b: string (nullable = true) |-- c: long (nullable = true)

|-- d: double (nullable =true)

若指定schema会按照schema生成DF:

* schema中不存在的列会被忽略

* 可以用两种方法指定schema,StructType和String,具体对应关系看后面

*

若数据无法匹配schema中类型:若schema中列允许为null会转为null;若不允许为null会转为相应类型的空值(如Double类型为0.0值),若无法转换为值会抛出异常

val schema = StructType(List( StructField("a", ByteType, true), StructField("b"

, FloatType,false), StructField("c", ShortType, true) )) //或 val schema = "b

float, c short" val df = spark.read.schema(schema).json(ds) df.show

df.printSchema +----+----+----+ | a| b| c| +----+----+----+ |null|23.1| 1| |null

|0|null| +----+----+----+ root |-- a: byte (nullable = true) |-- b: float

(nullable =true) |-- c: short (nullable = true)

json解析相关配置参数

primitivesAsString (default false): 把所有列看作string类型

prefersDecimal(default false): 将小数看作decimal,如果不匹配decimal,就看做doubles.

allowComments (default false): 忽略json字符串中Java/C++风格的注释

allowUnquotedFieldNames (default false): 允许不加引号的列名

allowSingleQuotes (default true): 除双引号外,还允许用单引号

allowNumericLeadingZeros (default false): 允许数字中额外的前导0(如0012)

allowBackslashEscapingAnyCharacter (default false): 允许反斜杠机制接受所有字符

allowUnquotedControlChars (default false):

允许JSON字符串包含未加引号的控制字符(值小于32的ASCII字符,包括制表符和换行字符)。

mode (default PERMISSIVE): 允许在解析期间处理损坏记录的模式。

PERMISSIVE

:当遇到损坏的记录时,将其他字段设置为null,并将格式错误的字符串放入由columnNameOfCorruptRecord配置的字段中。若指定schema,在schema中设置名为columnNameOfCorruptRecord的字符串类型字段。

如果schema中不具有该字段,则会在分析过程中删除损坏的记录。若不指定schema(推断模式),它会在输出模式中隐式添加一个columnNameOfCorruptRecord字段。

DROPMALFORMED : 忽略整条损害记录

FAILFAST : 遇到损坏记录throws an exception

columnNameOfCorruptRecord

(默认值为spark.sql.columnNameOfCorruptRecord的值):允许PERMISSIVE

mode添加的新字段,会重写spark.sql.columnNameOfCorruptRecord

dateFormat (default yyyy-MM-dd): 自定义日期格式,遵循java.text.SimpleDateFormat格式.

只有日期部分(无详细时间)

timestampFormat (default yyyy-MM-dd’T’HH:mm:ss.SSSXXX):

自定义日期格式,遵循java.text.SimpleDateFormat格式. 可以有详细时间部分(到微秒)

multiLine (default false): 解析一个记录,该记录可能跨越多行,每个文件

以上参数可用option方法配置:

val stringDF = spark.read.option("primitivesAsString", "true").json(ds)

stringDF.show stringDF.printSchema +----+-----+----+----+ | a| b| c| d|

+----+-----+----+----+ |null| 23.1| 1|null| |null|hello|null| 1.2|

+----+-----+----+----+ root |-- a: string (nullable =true) |-- b: string

(nullable =true) |-- c: string (nullable = true) |-- d: string (nullable = true)

二进制类型会自动用base64编码方式表示

‘Man’(ascci) base64编码后为:”TWFu”

val byteArr = Array('M'.toByte, 'a'.toByte, 'n'.toByte) val binaryDs =

spark.createDataset(Seq(byteArr))val dsWithB64 = binaryDs.withColumn("b64",

base64(col("value"))) dsWithB64.show(false) dsWithB64.printSchema

+----------+----+ |value |b64 | +----------+----+ |[4D 61 6E]|TWFu|

+----------+----+ root |-- value: binary (nullable =true) |-- b64: string

(nullable =true) //=================================================

dsWithB64.toJSON.show(false) +-----------------------------+ |value |

+-----------------------------+ |{"value":"TWFu","b64":"TWFu"}|

+-----------------------------+

//================================================= val json =

"""{"value":"TWFu"}""" val jsonDs = spark.createDataset(Seq(json)) val binaryDF

= spark.read.schema("value binary").json(jsonDs ) binaryDF.show

binaryDF.printSchema +----------+ | value| +----------+ |[4D 61 6E]|

+----------+ root |-- value: binary (nullable =true)

指定schema示例:

以下是Spark SQL支持的所有基本类型:

val json = """{"stringc":"abc", "shortc":1, "integerc":null, "longc":3,

"floatc":4.5, "doublec":6.7, "decimalc":8.90, "booleanc":true, "bytec":23,

"binaryc":"TWFu", "datec":"2010-01-01", "timestampc":"2012-12-12

11:22:22.123123"}""" val ds = spark.createDataset(Seq(json)) val schema =

"stringc string, shortc short, integerc int, longc long, floatc float, doublec

double, decimalc decimal(10, 3), booleanc boolean, bytec byte, binaryc binary,

datec date, timestampc timestamp" val df = spark.read.schema(schema).json(ds)

df.show(false) df.printSchema

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

|stringc|shortc|integerc|longc|floatc|doublec|decimalc|booleanc|bytec|binaryc

|datec |timestampc |

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

|abc |1 |null |3 |4.5 |6.7 |8.900 |true |23 |[4D 61 6E]|2010-01-01|2012-12-12 11

:22:22.123|

+-------+------+--------+-----+------+-------+--------+--------+-----+----------+----------+-----------------------+

root |-- stringc: string (nullable =true) |-- shortc: short (nullable = true)

|-- integerc: integer (nullable =true) |-- longc: long (nullable = true) |--

floatc: float (nullable =true) |-- doublec: double (nullable = true) |--

decimalc: decimal(10,3) (nullable = true) |-- booleanc: boolean (nullable = true

) |-- bytec: byte (nullable =true) |-- binaryc: binary (nullable = true) |--

datec: date (nullable =true) |-- timestampc: timestamp (nullable = true)

复合类型:

val json = """ { "arrayc" : [ 1, 2, 3 ], "structc" : { "strc" : "efg",

"decimalc" : 1.1 }, "mapc" : { "key1" : 1.2, "key2" : 1.1 } } """ val ds =

spark.createDataset(Seq(json))val schema = "arrayc array, structc

struct, mapc map" val df =

spark.read.schema(schema).json(ds) df.show(false) df.printSchema

+---------+--------+--------------------------+ |arrayc |structc |mapc |

+---------+--------+--------------------------+ |[1, 2, 3]|[efg, 1]|[key1 ->1.2

, key2 ->1.1]| +---------+--------+--------------------------+ root |-- arrayc:

array (nullable =true) | |-- element: short (containsNull = true) |-- structc:

struct (nullable =true) | |-- strc: string (nullable = true) | |-- decimalc:

decimal(10,0) (nullable = true) |-- mapc: map (nullable = true) | |-- key:

string | |-- value: float (valueContainsNull =true)

SparkSQL数据类型

基本类型:

DataType simpleString typeName sql defaultSize catalogString json

StringType string string STRING 20 string “string”

ShortType smallint short SMALLINT 2 smallint “short”

IntegerType int integer INT 4 int “integer”

LongType bigint long BIGINT 8 bigint “long”

FloatType float float FLOAT 4 float “float”

DoubleType double double DOUBLE 8 double “double”

DecimalType(10,3) decimal(10,3) decimal(10,3) DECIMAL(10,3) 8 decimal(10,3)

“decimal(10,3)”

BooleanType boolean boolean BOOLEAN 1 boolean “boolean”

ByteType tinyint byte TINYINT 1 tinyint “byte”

BinaryType binary binary BINARY 100 binary “binary”

DateType date date DATE 4 date “date”

TimestampType timestamp timestamp TIMESTAMP 8 timestamp “timestamp”

三个复合类型:

DataType simpleString typeName sql defaultSize catalogString json

ArrayType(IntegerType, true) array array ARRAY 4 array

{“type”:”array”,”elementType”:”integer”,”containsNull”:true}

MapType(StringType, LongType, true) map map MAP

28 map

{“type”:”map”,”keyType”:”string”,”valueType”:”long”,”valueContainsNull”:true}

StructType(StructField(“sf”, DoubleType)::Nil) struct struct

STRUCT 8 struct

{“type”:”struct”,”fields”:[{“name”:”sf”,”type”:”double”,”nullable”:true,”metadata”:{}}]}


欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/bake/11645122.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-17
下一篇2023-05-17

发表评论

登录后才能评论

评论列表(0条)

    保存