spark on hbase 读写

spark on hbase 读写,第1张

本文主要讲述了spark对hbase进行独写的两种方式,这两种方式分别为:

1利用spark提供的 newAPIHadoopRDD api 对hbase进行读写

2SparkOnHbase,这种方式其实是利用Cloudera-labs开源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase

hbase 表格式如下:

部分数据集如下:

文中的spark 的版本为232,hbase 的版本为126

因为hbase数据集的数据都是序列化的,所以spark 默认读取Hbase的数据时会报数据序列化的错误,不管哪种方式,在读取hbase数据之前,为spark配置序列化方式,如图所示:

主要是利用TableInputFormat,TableOutPutFormat的方式对hbase进行读写。

下边是对hbase进行读

运行结果如图:

通过maven 将hbase-spark jar 报 导入

由于hbase-spark 运用的spark 版本为16 而实际的spark 版本为232,所以执行spark 任务会报 没有 orgapachesparklogging 类没有定义,这是因为 spark 232 这个类名已经改变,因此需要重新构造这个类并打成jar包放入到spark 的jar目录里即可

以下为读方式:

sparkOnHbase 对于第一种方式的优势在于:

1>无缝的使用Hbase connection

2>和Kerberos无缝集成

3>通过get或者scan直接生成rdd

4>利用RDD支持hbase的任何组合 *** 作

5>为通用 *** 作提供简单的方法,同时通过API允许不受限制的未知高级 *** 作

6>支持java和scala

7>为spark和 spark streaming提供相似的API

不知道你问的是框架层还是应用层?应用层无法获取Spark的JobID。

框架层,action *** 作触发SparkContext执行runJob方法,SparkContext会维护一个AtomicInteger对象,对象名称为nextJobId,该对象通过调用getAndIncrement()生成JobID。

# 大坑

# pyspark所有的结果只要不show结果,看行数等 *** 作,都是定义表,并没有计算结果

# 所以在join时,为了保证数据的准确性,养成好习惯:1、小表关联大表  2、大表关联小表[‘A_KEY’ rename]

from pysparksql import functions as F

from pysparksqlfunctions import

from pysparksqltypes import DateType

from  pysparksqlwindow import

from pysparksqlfunctions import lower, col # 小写

from pysparksqlfunctions import upper, col  # 大写

from pysparksqlfunctions import lit # 增加列

from pysparksqlfunctions import when # ifelse

from pysparksqlfunctions import split, explode, concat, concat_ws  # split(列数据的分割), explode(一行分成多行) concat,concat_ws(列数据合并)

from pysparksqltypes import StringType # 导入数据类型

from pysparksqlfunctions import UserDefinedFunction # 定义函数

from pysparksqlfunctions import desc #降序排列

from pysparksqlfunctions import trim # 去空格

acreateOrReplaceTempView("a")

a = sparksql("select from a")cache()

ashow(10)

atake(10)

df = dfdropDuplicates()

dfselect('A_field')distinct()count()

acount()

len(ORD_procollect()) #运行速度会更快

acolumns

adtypes

aprintSchema()

PriceBook_STBT = STselect( ST["BT"]cast("int")cast("string"),ST["ST"]cast("int")cast("string"))

priceBook_BT  = kadlselect(kadl["BT"]cast("int")cast("string"),kadl["AICPGP"]cast("string"))

ST_SKU_1= ST_SKU_1withColumnRenamed("CUST_ID",'ShipToNumber')withColumnRenamed("SKU",'SKUNumber')

ORD_tableselectExpr("SVC_NOTIF_KEY as SVC_NOTIF_KEY_1", "NOTIF_NBR as NOTIF_NBR_1")show()

aselect('CONTACT_KEY')describe()show()

b1 = bdrop("CONTACT_KEY")show()

afilter(aCONTACT_KEY == 504943)

EQUIP_pro_1filter( (EQUIP_pro_1INSTALL_DT >= '20200101') & (EQUIP_pro_1INSTALL_DT <= '20200331')) # 条件之间用括号分开

Total_Booking5 = Total_Booking4filter(~ col('Login')like('%thermofishercom%') )

df_1 = dffilter(lower(dfcurrent_pagename_new)like('products:%')

EQUIP_profilter("INSTALL_DT is NULL")select('INSTALL_DT')count()

Q3_Campaign_1filter(Q3_Campaign_1loginisNotNull())                                         

dfwithColumn("is_person_name_null",col("USER_NM")isNull()/isNotNull() 

framewithColumn("contant", Flit(10))

framewithColumn("name_length", Flength(framename))

方法一:

df= df('UPDT_DT',Fto_date(df))

df= dfwithColumn('CRT_DT',Fto_date(dfCRT_DT))

方法二:

dfselect('UPDT_DT')withColumn("UPDT_DT_1",col("UPDT_DT")cast("date"))

dfwithColumn("BPEFTJ_1",df['BPEFTJ']cast(DateType()))                                         

from pysparksqlfunctions import lower, col

df = sparktable('df')withColumn('CONTACT_ID_1', lower(col('dfCONTACT_ID')))

dfwithColumn("USER_NM", upper(trim(col("USER_NM"))))show()  # *** 作在dataframe上

dataframeColnamescreateOrReplaceTempView("dataframeColnames")

import pysparksqlfunctions as F

from pysparksqlfunctions import col

def single_space(col):

    return Ftrim(Fregexp_replace(col, " +", " "))

def remove_all_whitespace(col):

    return Fregexp_replace(col, "\\s+", "")

sparktable('WEB_USER')withColumn('USER_NM_1', lower(remove_all_whitespace(single_space(col("USER_NM")))))show()  # *** 作Table上

WEB_USER_3sort('CONTACT_ID_1','USER_NM_1',ascending = False)show() #降序排列  默认为升序 (同升同降)

WEB_USER_3sort(WEB_USER_3CONTACT_ID_1desc(),WEB_USER_3USER_NM_1asc())show() # 自定义升降

WEB_USER_3groupBy('CONTACT_ID_1')agg(fcount/countDistinct('CONTACT_ID_1')alias('count'))sort(desc('count'))show()

dfgroupBy("login")count()sort(desc("count"))show()                                                                   

dfgroupBy('level')agg(sfconcat_ws(',', sfcollect_list(dfname)))show()

dfgroupBy("SKU")agg(Fmax("BPEFTJ")alias("BPEFTJ"))show()  # 只会显示SKU 与 BPEFTJ两列

dfjoin(dfgroupBy("SKU")agg(Fmax("BPEFTJ")alias("BPEFTJ")),["SKU","BPEFTJ"],how="inner") # 会显示所有列   

WOP_PricewithColumn("OP_Data",current_date())

TO_CHAR(COMPLT_DATETIME, 'YYYYMMDD' ) COMPLT_DATETIME                                                                   

方法一:            

from pysparksqlfunctions import when

df = dfwithColumn("profile", when(dfage >= 40,"Senior") otherwise("Executive"))

order3 = order2withColumn("Order Cancelled(Y/N/P)", when( (order2status == -1) & (order2received_quantity > 0) ,"P")when(order2status == -1,"Y")otherwise("N"))

方法二:             

#定义函数

def somefunc(value):

  if (value=='a') | (value=='b') :

    return 'Yes'

  else:

    return 'No'

# Fudf(函数,输出类型)

udfsomefunc = Fudf(somefunc, StringType())             

a2 = a1withColumn("abc", udfsomefunc("SCb_Name"))select('SCb_Name','SGN','abc')             

frame3_1 = WEB_USER_3withColumn("name_length", flength(WEB_USER_3USER_NM_1))

ST_SKU_1withColumn('Input',Flit('Viewed'))show()                                                                   

from pysparksqlfunctions import lit

new_df = df1withColumn('newCol', lit(0))show() # 新列为0

new_df = dfwithColumn('new_column_1', lit(None)cast(StringType()))  #新列为NULL

df = df1join(df2, taname == tbname, how='inner'/'outer'/'left'/'right'/left_anti)

df = dfjoin(df,['BT'],how='inner')

dfshow()

jd = dfjoin(defaults, on="foo", how='outer')

from pysparksqlfunctions import split, explode, concat, concat_ws

df_split = dfwithColumn("s", split(df['score'], " ")) #切分字段score,生成为s

df_splitshow()                           

ST_SKU_2withColumn('STSKU',concat(ST_SKU_2['ShipToNumber'],ST_SKU_2['SKUNumber'])) (没有分隔符)

ST_SKU_2withColumn('STSKU',concat_ws("-",ST_SKU_2['ShipToNumber'],ST_SKU_2['SKUNumber'])) (指定分隔符)                                                                   

from pysparksqlfunctions import split, explode, concat, concat_ws

df1 = dfwithColumn("SKU", explode(split(df['prod_list'], ",")))

from pysparksqlfunctions import pandas_udf,pandasUDFType

@pandas_udf("user string,PL string,Order_Number integer",pandasUDFTypeGROUPED_MAP)

def data_partiotion(df):

  V=dfselect('Order_Number')

  return sparkcreateDataFrame()

dfwithColumn("datetime", col("datetime")cast("timestamp"))

    groupBy("userId", "memberId")

    agg(max_("datetime"))                                                                   

#注意事项

1 filter (命名)                                                                   

test = WEB_USERgroupBy('USER_NM')agg(Fcount('USER_NM')alias('count'))sort(desc('count'))

testfilter(testcount > 1)show()  会报错:'>' not supported between instances of 'method' and 'int'

修改成:testfilter(test['count'] > 1)show()

报错原因:'count'为默认方法,名字冲突                                                                 

def remove_all_whitespace(col):

    return Fregexp_replace(col, "\\s+", "")

df = dfwithColumn('Materia_Number',remove_all_whitespace(trim(upper(col('Materia_Number')))))

sentenceData = sparkcreateDataFrame(df)                                         

Delear = dfunion(df1)union(df2)union(df3)union(df4)distinct()withColumn("op_date",current_date())

dfwritemode("append")saveAsTable("dbdf_a")

dfwritemode(SaveModeOverwrite)saveAsTable("testdbtesttable")                                         

a1 = afilter(~col('USER_NM')isin(['1468706287@qqcom']))

#方法一、

mvv_list = bb1select('USER_NM')collect()

mvv_array = [iUSER_NM for i in mvv_list]

df = awithColumn('is_user_nm_null',col('USER_NM')isin(mvv_array))

display(dfgroupBy('is_user_nm_null')count()sort(desc('count')))           

# a1 = afilter(~col('USER_NM')isin(mvv_array))

#方法二、

df = dfwithColumn("AddCol", when(dfcol("Pclass")contains("3"),"three")otherwise("notthree"))           

fwithColumn('cat',

Fwhen(dfdeviceisin(phone_list), 'phones')otherwise(

Fwhen(dfdeviceisin(pc_list), 'pc')otherwise(

Fwhen(dfdeviceisin(security_list), 'security')))

)groupBy('id')pivot('cat')agg(Fcount('cat'))show()           

#方法三

a1 = ajoin(bb1,['USER_NM'],how='outer') 并集

jd = a1withColumn("ShipToNumber", coalesce("ShipToNumber", "shipToName_1"))withColumn("ShipToName", coalesce("ShipToName", "shipToName_1"))

-- 主表仍然是a1表,当ShipToNumber 与 shipToName_1 同时存在/同时不存在,选择当ShipToNumber。当当ShipToNumber为null,而当ShipToNumber_1 非空,选择当ShipToNumber_1

Customer_info_5_2withColumn("row_num", row_number()over(WindowpartitionBy("ShipToNumber")orderBy(desc("Login_Last_Order_Date"))))filter(col('row_num') == '1')

###

a2withColumn("row_number",Frow_number()over(WindowpartitionBy("CONTACT_KEY")orderBy(desc("CITY"))))filter(col('row_number') == 1)

"','"join(df_1columns)

D_Equip_2schemanames           

for item in aaacolumns:

  aaa=aaawithColumn(item, col(item)cast("string"))

aaa

df=dfastype(str)           

for item in dfcolumns:

df=dfwithColumnRenamed(item, itemreplace(" ",''))

print(dfcolumns)

%scala

val df = Seq(

  (1, "First Value", javasqlDatevalueOf("2010-01-01"))

)toDF("int_column", "string_column", "date_column")

dfcreateOrReplaceTempView("df")           

wop_latest1 = wop_latest[['SKU Number','Sales Order Number']]rename(columns={"SKU Number": "SKUNumber", "Sales Order Number": "SalesOrderNumber"})

            result_4drop('Source')join(result_4groupBy("CONTACT_KEY")agg(concat_ws("/",collect_set("Source"))alias('Source')),['CONTACT_KEY'],how='left')

以上就是关于spark on hbase 读写全部的内容,包括:spark on hbase 读写、spark 怎么获取jobid、Py-Spark 常用语句(命令)等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!

欢迎分享,转载请注明来源:内存溢出

原文地址:https://54852.com/web/10148062.html

(0)
打赏 微信扫一扫微信扫一扫 支付宝扫一扫支付宝扫一扫
上一篇 2023-05-05
下一篇2023-05-05

发表评论

登录后才能评论

评论列表(0条)

    保存