
本文主要讲述了spark对hbase进行独写的两种方式,这两种方式分别为:
1利用spark提供的 newAPIHadoopRDD api 对hbase进行读写
2SparkOnHbase,这种方式其实是利用Cloudera-labs开源的一个HbaseContext的工具类来支持spark用RDD的方式批量读写hbase
hbase 表格式如下:
部分数据集如下:
文中的spark 的版本为232,hbase 的版本为126
因为hbase数据集的数据都是序列化的,所以spark 默认读取Hbase的数据时会报数据序列化的错误,不管哪种方式,在读取hbase数据之前,为spark配置序列化方式,如图所示:
主要是利用TableInputFormat,TableOutPutFormat的方式对hbase进行读写。
下边是对hbase进行读
运行结果如图:
通过maven 将hbase-spark jar 报 导入
由于hbase-spark 运用的spark 版本为16 而实际的spark 版本为232,所以执行spark 任务会报 没有 orgapachesparklogging 类没有定义,这是因为 spark 232 这个类名已经改变,因此需要重新构造这个类并打成jar包放入到spark 的jar目录里即可
以下为读方式:
sparkOnHbase 对于第一种方式的优势在于:
1>无缝的使用Hbase connection
2>和Kerberos无缝集成
3>通过get或者scan直接生成rdd
4>利用RDD支持hbase的任何组合 *** 作
5>为通用 *** 作提供简单的方法,同时通过API允许不受限制的未知高级 *** 作
6>支持java和scala
7>为spark和 spark streaming提供相似的API
不知道你问的是框架层还是应用层?应用层无法获取Spark的JobID。
框架层,action *** 作触发SparkContext执行runJob方法,SparkContext会维护一个AtomicInteger对象,对象名称为nextJobId,该对象通过调用getAndIncrement()生成JobID。
# 大坑
# pyspark所有的结果只要不show结果,看行数等 *** 作,都是定义表,并没有计算结果
# 所以在join时,为了保证数据的准确性,养成好习惯:1、小表关联大表 2、大表关联小表[‘A_KEY’ rename]
from pysparksql import functions as F
from pysparksqlfunctions import
from pysparksqltypes import DateType
from pysparksqlwindow import
from pysparksqlfunctions import lower, col # 小写
from pysparksqlfunctions import upper, col # 大写
from pysparksqlfunctions import lit # 增加列
from pysparksqlfunctions import when # ifelse
from pysparksqlfunctions import split, explode, concat, concat_ws # split(列数据的分割), explode(一行分成多行) concat,concat_ws(列数据合并)
from pysparksqltypes import StringType # 导入数据类型
from pysparksqlfunctions import UserDefinedFunction # 定义函数
from pysparksqlfunctions import desc #降序排列
from pysparksqlfunctions import trim # 去空格
acreateOrReplaceTempView("a")
a = sparksql("select from a")cache()
ashow(10)
atake(10)
df = dfdropDuplicates()
dfselect('A_field')distinct()count()
acount()
len(ORD_procollect()) #运行速度会更快
acolumns
adtypes
aprintSchema()
PriceBook_STBT = STselect( ST["BT"]cast("int")cast("string"),ST["ST"]cast("int")cast("string"))
priceBook_BT = kadlselect(kadl["BT"]cast("int")cast("string"),kadl["AICPGP"]cast("string"))
ST_SKU_1= ST_SKU_1withColumnRenamed("CUST_ID",'ShipToNumber')withColumnRenamed("SKU",'SKUNumber')
ORD_tableselectExpr("SVC_NOTIF_KEY as SVC_NOTIF_KEY_1", "NOTIF_NBR as NOTIF_NBR_1")show()
aselect('CONTACT_KEY')describe()show()
b1 = bdrop("CONTACT_KEY")show()
afilter(aCONTACT_KEY == 504943)
EQUIP_pro_1filter( (EQUIP_pro_1INSTALL_DT >= '20200101') & (EQUIP_pro_1INSTALL_DT <= '20200331')) # 条件之间用括号分开
Total_Booking5 = Total_Booking4filter(~ col('Login')like('%thermofishercom%') )
df_1 = dffilter(lower(dfcurrent_pagename_new)like('products:%')
EQUIP_profilter("INSTALL_DT is NULL")select('INSTALL_DT')count()
Q3_Campaign_1filter(Q3_Campaign_1loginisNotNull())
dfwithColumn("is_person_name_null",col("USER_NM")isNull()/isNotNull()
framewithColumn("contant", Flit(10))
framewithColumn("name_length", Flength(framename))
方法一:
df= df('UPDT_DT',Fto_date(df))
df= dfwithColumn('CRT_DT',Fto_date(dfCRT_DT))
方法二:
dfselect('UPDT_DT')withColumn("UPDT_DT_1",col("UPDT_DT")cast("date"))
dfwithColumn("BPEFTJ_1",df['BPEFTJ']cast(DateType()))
from pysparksqlfunctions import lower, col
df = sparktable('df')withColumn('CONTACT_ID_1', lower(col('dfCONTACT_ID')))
dfwithColumn("USER_NM", upper(trim(col("USER_NM"))))show() # *** 作在dataframe上
dataframeColnamescreateOrReplaceTempView("dataframeColnames")
import pysparksqlfunctions as F
from pysparksqlfunctions import col
def single_space(col):
return Ftrim(Fregexp_replace(col, " +", " "))
def remove_all_whitespace(col):
return Fregexp_replace(col, "\\s+", "")
sparktable('WEB_USER')withColumn('USER_NM_1', lower(remove_all_whitespace(single_space(col("USER_NM")))))show() # *** 作Table上
WEB_USER_3sort('CONTACT_ID_1','USER_NM_1',ascending = False)show() #降序排列 默认为升序 (同升同降)
WEB_USER_3sort(WEB_USER_3CONTACT_ID_1desc(),WEB_USER_3USER_NM_1asc())show() # 自定义升降
WEB_USER_3groupBy('CONTACT_ID_1')agg(fcount/countDistinct('CONTACT_ID_1')alias('count'))sort(desc('count'))show()
dfgroupBy("login")count()sort(desc("count"))show()
dfgroupBy('level')agg(sfconcat_ws(',', sfcollect_list(dfname)))show()
dfgroupBy("SKU")agg(Fmax("BPEFTJ")alias("BPEFTJ"))show() # 只会显示SKU 与 BPEFTJ两列
dfjoin(dfgroupBy("SKU")agg(Fmax("BPEFTJ")alias("BPEFTJ")),["SKU","BPEFTJ"],how="inner") # 会显示所有列
WOP_PricewithColumn("OP_Data",current_date())
TO_CHAR(COMPLT_DATETIME, 'YYYYMMDD' ) COMPLT_DATETIME
方法一:
from pysparksqlfunctions import when
df = dfwithColumn("profile", when(dfage >= 40,"Senior") otherwise("Executive"))
order3 = order2withColumn("Order Cancelled(Y/N/P)", when( (order2status == -1) & (order2received_quantity > 0) ,"P")when(order2status == -1,"Y")otherwise("N"))
方法二:
#定义函数
def somefunc(value):
if (value=='a') | (value=='b') :
return 'Yes'
else:
return 'No'
# Fudf(函数,输出类型)
udfsomefunc = Fudf(somefunc, StringType())
a2 = a1withColumn("abc", udfsomefunc("SCb_Name"))select('SCb_Name','SGN','abc')
frame3_1 = WEB_USER_3withColumn("name_length", flength(WEB_USER_3USER_NM_1))
ST_SKU_1withColumn('Input',Flit('Viewed'))show()
from pysparksqlfunctions import lit
new_df = df1withColumn('newCol', lit(0))show() # 新列为0
new_df = dfwithColumn('new_column_1', lit(None)cast(StringType())) #新列为NULL
df = df1join(df2, taname == tbname, how='inner'/'outer'/'left'/'right'/left_anti)
df = dfjoin(df,['BT'],how='inner')
dfshow()
jd = dfjoin(defaults, on="foo", how='outer')
from pysparksqlfunctions import split, explode, concat, concat_ws
df_split = dfwithColumn("s", split(df['score'], " ")) #切分字段score,生成为s
df_splitshow()
ST_SKU_2withColumn('STSKU',concat(ST_SKU_2['ShipToNumber'],ST_SKU_2['SKUNumber'])) (没有分隔符)
ST_SKU_2withColumn('STSKU',concat_ws("-",ST_SKU_2['ShipToNumber'],ST_SKU_2['SKUNumber'])) (指定分隔符)
from pysparksqlfunctions import split, explode, concat, concat_ws
df1 = dfwithColumn("SKU", explode(split(df['prod_list'], ",")))
from pysparksqlfunctions import pandas_udf,pandasUDFType
@pandas_udf("user string,PL string,Order_Number integer",pandasUDFTypeGROUPED_MAP)
def data_partiotion(df):
V=dfselect('Order_Number')
return sparkcreateDataFrame()
dfwithColumn("datetime", col("datetime")cast("timestamp"))
groupBy("userId", "memberId")
agg(max_("datetime"))
#注意事项
1 filter (命名)
test = WEB_USERgroupBy('USER_NM')agg(Fcount('USER_NM')alias('count'))sort(desc('count'))
testfilter(testcount > 1)show() 会报错:'>' not supported between instances of 'method' and 'int'
修改成:testfilter(test['count'] > 1)show()
报错原因:'count'为默认方法,名字冲突
def remove_all_whitespace(col):
return Fregexp_replace(col, "\\s+", "")
df = dfwithColumn('Materia_Number',remove_all_whitespace(trim(upper(col('Materia_Number')))))
sentenceData = sparkcreateDataFrame(df)
Delear = dfunion(df1)union(df2)union(df3)union(df4)distinct()withColumn("op_date",current_date())
dfwritemode("append")saveAsTable("dbdf_a")
dfwritemode(SaveModeOverwrite)saveAsTable("testdbtesttable")
a1 = afilter(~col('USER_NM')isin(['1468706287@qqcom']))
#方法一、
mvv_list = bb1select('USER_NM')collect()
mvv_array = [iUSER_NM for i in mvv_list]
df = awithColumn('is_user_nm_null',col('USER_NM')isin(mvv_array))
display(dfgroupBy('is_user_nm_null')count()sort(desc('count')))
# a1 = afilter(~col('USER_NM')isin(mvv_array))
#方法二、
df = dfwithColumn("AddCol", when(dfcol("Pclass")contains("3"),"three")otherwise("notthree"))
fwithColumn('cat',
Fwhen(dfdeviceisin(phone_list), 'phones')otherwise(
Fwhen(dfdeviceisin(pc_list), 'pc')otherwise(
Fwhen(dfdeviceisin(security_list), 'security')))
)groupBy('id')pivot('cat')agg(Fcount('cat'))show()
#方法三
a1 = ajoin(bb1,['USER_NM'],how='outer') 并集
jd = a1withColumn("ShipToNumber", coalesce("ShipToNumber", "shipToName_1"))withColumn("ShipToName", coalesce("ShipToName", "shipToName_1"))
-- 主表仍然是a1表,当ShipToNumber 与 shipToName_1 同时存在/同时不存在,选择当ShipToNumber。当当ShipToNumber为null,而当ShipToNumber_1 非空,选择当ShipToNumber_1
Customer_info_5_2withColumn("row_num", row_number()over(WindowpartitionBy("ShipToNumber")orderBy(desc("Login_Last_Order_Date"))))filter(col('row_num') == '1')
###
a2withColumn("row_number",Frow_number()over(WindowpartitionBy("CONTACT_KEY")orderBy(desc("CITY"))))filter(col('row_number') == 1)
"','"join(df_1columns)
D_Equip_2schemanames
for item in aaacolumns:
aaa=aaawithColumn(item, col(item)cast("string"))
aaa
df=dfastype(str)
for item in dfcolumns:
df=dfwithColumnRenamed(item, itemreplace(" ",''))
print(dfcolumns)
%scala
val df = Seq(
(1, "First Value", javasqlDatevalueOf("2010-01-01"))
)toDF("int_column", "string_column", "date_column")
dfcreateOrReplaceTempView("df")
wop_latest1 = wop_latest[['SKU Number','Sales Order Number']]rename(columns={"SKU Number": "SKUNumber", "Sales Order Number": "SalesOrderNumber"})
result_4drop('Source')join(result_4groupBy("CONTACT_KEY")agg(concat_ws("/",collect_set("Source"))alias('Source')),['CONTACT_KEY'],how='left')
以上就是关于spark on hbase 读写全部的内容,包括:spark on hbase 读写、spark 怎么获取jobid、Py-Spark 常用语句(命令)等相关内容解答,如果想了解更多相关内容,可以关注我们,你们的支持是我们更新的动力!
欢迎分享,转载请注明来源:内存溢出
微信扫一扫
支付宝扫一扫
评论列表(0条)