SparkSQL数据加载和保存实战
一:前置知识详解:
SparkSQL重要是操作DataFrame,DataFrame本身提供了save和load的操作,
Load:可以创建DataFrame,
Save:把DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。
二:SparkSQL读写数据代码实战:
importorg.apache.spark.SparkConf;
importorg.apache.spark.api.java.JavaRDD;
importorg.apache.spark.api.java.JavaSparkContext;
importorg.apache.spark.api.java.function.Function;
importorg.apache.spark.sql.;
importorg.apache.spark.sql.types.DataTypes;
importorg.apache.spark.sql.types.StructField;
importorg.apache.spark.sql.types.StructType;
importjava.util.ArrayList;
importjava.util.List;
publicclassSparkSQLLoadSaveOps{
publicstaticvoidmain(String[]args){
SparkConfconf=newSparkConf().setMaster("local").setAppName("SparkSQLLoadSaveOps");
JavaSparkContextsc=newJavaSparkContext(conf);
SQLContext=newSQLContext(sc);
/
read()是DataFrameReader类型,load可以将数据读取出来
/
DataFramepeopleDF=sqlContext.read().format("json").load("E:\\Spark\\Sparkinstanll_package\\Big_Data_Software\\spark-1.6.0-bin-hadoop2.6\\examples\\src\\main\\resources\\people.json");
/
直接对DataFrame进行操作
Json:是一种自解释的格式,读取Json的时候怎么判断其是什么格式?
通过扫描整个Json。扫描之后才会知道元数据
/
//通过mode来指定输出文件的是append。创建新文件来追加文件
peopleDF.select("name").write().mode(SaveMode.Append).save("E:\\personNames");
}
}
读取过程源码分析如下:
1.read方法返回DataFrameReader,用于读取数据。
/
::Experimental::
Returnsa[[DataFrameReader]]thatcanbeusedtoreaddatainasa[[DataFrame]].
{{{
sqlContext.read.parquet("/path/to/file.parquet")
sqlContext.read.schema(schema).json("/path/to/file.json")
}}}
@groupgenericdata
@since1.4.0
/
@Experimental
//创建DataFrameReader实例,获得了DataFrameReader引用
defread:DataFrameReader=newDataFrameReader(this)
2.然后再调用DataFrameReader类中的format,指出读取文件的格式。
/
Specifiestheinputdatasourceformat.
@since1.4.0
/
defformat(source:String):DataFrameReader={
this.source=source
this
}
3.通过DtaFrameReader中load方法通过路径把传入过来的输入变成DataFrame。
/
Loadsinputinasa[[DataFrame]],fordatasourcesthatrequireapath(e.g.databackedby
alocalordistributedfilesystem).
@since1.4.0
/
//TODO:RemovethisoneinSpark2.0.
defload(path:String):DataFrame={
option("path",path).load()
}
至此,数据的读取工作就完成了,下面就对DataFrame进行操作。
下面就是写操作!!!
1.调用DataFrame中select函数进行对列筛选
/
Selectsasetofcolumns.Thisisavariantof`select`thatcanonlyselect
existingcolumnsusingcolumnnames(i.e.cannotconstructexpressions).
{{{
//Thefollowingtwoareequivalent:
df.select("colA","colB")
df.select($"colA",$"colB")
}}}
@groupdfops
@since1.3.0
/
@scala.annotation.varargs
defselect(col:String,cols:String):DataFrame=select((col+:cols).map(Column(_)):_)
2.然后通过write将结果写入到外部存储系统中。
/
::Experimental::
Interfaceforsavingthecontentofthe[[DataFrame]]outintoexternalstorage.
@groupoutput
@since1.4.0
/
@Experimental
defwrite:DataFrameWriter=newDataFrameWriter(this)
3.在保持文件的时候mode指定追加文件的方式
/
Specifiesthebehaviorwhendataortablealreadyexists.Optionsinclude:
//Overwrite是覆盖
-`SaveMode.Overwrite`:overwritetheexistingdata.
//创建新的文件,然后追加
-`SaveMode.Append`:appendthedata.
-`SaveMode.Ignore`:ignoretheoperation(i.e.no-op).
-`SaveMode.ErrorIfExists`:defaultoption,throwanexceptionatruntime.
@since1.4.0
/
defmode(saveMode:SaveMode):DataFrameWriter={
this.mode=saveMode
this
}
4.最后,save()方法触发action,将文件输出到指定文件中。
/
Savesthecontentofthe[[DataFrame]]atthespecifiedpath.
@since1.4.0
/
defsave(path:String):Unit={
this.extraOptions+=("path"->path)
save()
}
三:SparkSQL读写整个流程图如下:
这里写图片描述
四:对于流程中部分函数源码详解:
DataFrameReader.Load()
1.Load()返回DataFrame类型的数据集合,使用的数据是从默认的路径读取。
/
ReturnsthedatasetstoredatpathasaDataFrame,
usingthedefaultdatasourceconfiguredbyspark.sql.sources.default.
@groupgenericdata
@deprecatedAsof1.4.0,replacedby`read().load(path)`.ThiswillberemovedinSpark2.0.
/
@deprecated("Useread.load(path).ThiswillberemovedinSpark2.0.","1.4.0")
defload(path:String):DataFrame={
//此时的read就是DataFrameReader
read.load(path)
}
2.追踪load源码进去,源码如下:
在DataFrameReader中的方法。Load()通过路径把输入传进来变成一个DataFrame。
/
Loadsinputinasa[[DataFrame]],fordatasourcesthatrequireapath(e.g.databackedby
alocalordistributedfilesystem).
@since1.4.0
/
//TODO:RemovethisoneinSpark2.0.
defload(path:String):DataFrame={
option("path",path).load()
}
3.追踪load源码如下:
/
Loadsinputinasa[[DataFrame]],fordatasourcesthatdon''trequireapath(e.g.external
key-valuestores).
@since1.4.0
/
defload():DataFrame={
//对传入的Source进行解析
valresolved=ResolvedDataSource(
sqlContext,
userSpecifiedSchema=userSpecifiedSchema,
partitionColumns=Array.empty[String],
provider=source,
options=extraOptions.toMap)
DataFrame(sqlContext,LogicalRelation(resolved.relation))
}
DataFrameReader.format()
1.Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。
SparkSQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet.
/
Specifiestheinputdatasourceformat.Built-inoptionsinclude“parquet”,”json”,etc.
@since1.4.0
/
defformat(source:String):DataFrameReader={
this.source=source//FileType
this
}
DataFrame.write()
1.创建DataFrameWriter实例
/
::Experimental::
Interfaceforsavingthecontentofthe[[DataFrame]]outintoexternalstorage.
@groupoutput
@since1.4.0
/
@Experimental
defwrite:DataFrameWriter=newDataFrameWriter(this)
2.追踪DataFrameWriter源码如下:
以DataFrame的方式向外部存储系统中写入数据。
/
::Experimental::
Interfaceusedtowritea[[DataFrame]]toexternalstoragesystems(e.g.filesystems,
key-valuestores,etc).Use[[DataFrame.write]]toaccessthis.
@since1.4.0
/
@Experimental
finalclassDataFrameWriterprivate[sql](df:DataFrame){
DataFrameWriter.mode()
1.Overwrite是覆盖,之前写的数据全都被覆盖了。
Append:是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。
/
Specifiesthebehaviorwhendataortablealreadyexists.Optionsinclude:
-`SaveMode.Overwrite`:overwritetheexistingdata.
-`SaveMode.Append`:appendthedata.
-`SaveMode.Ignore`:ignoretheoperation(i.e.no-op).
//默认操作
-`SaveMode.ErrorIfExists`:defaultoption,throwanexceptionatruntime.
@since1.4.0
/
defmode(saveMode:SaveMode):DataFrameWriter={
this.mode=saveMode
this
2.通过模式匹配接收外部参数
/
Specifiesthebehaviorwhendataortablealreadyexists.Optionsinclude:
-`overwrite`:overwritetheexistingdata.
-`append`:appendthedata.
-`ignore`:ignoretheoperation(i.e.no-op).
-`error`:defaultoption,throwanexceptionatruntime.
@since1.4.0
/
defmode(saveMode:String):DataFrameWriter={
this.mode=saveMode.toLowerCasematch{
case"overwrite"=>SaveMode.Overwrite
case"append"=>SaveMode.Append
case"ignore"=>SaveMode.Ignore
case"error"|"default"=>SaveMode.ErrorIfExists
case_=>thrownewIllegalArgumentException(s"Unknownsavemode:$saveMode."+
"Acceptedmodesare''overwrite'',''append'',''ignore'',''error''.")
}
this
}
DataFrameWriter.save()
1.save将结果保存传入的路径。
/
Savesthecontentofthe[[DataFrame]]atthespecifiedpath.
@since1.4.0
/
defsave(path:String):Unit={
this.extraOptions+=("path"->path)
save()
}
2.追踪save方法。
/
Savesthecontentofthe[[DataFrame]]asthespecifiedtable.
@since1.4.0
/
defsave():Unit={
ResolvedDataSource(
df.sqlContext,
source,
partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
mode,
extraOptions.toMap,
df)
3.其中source是SQLConf的defaultDataSourceName
privatevarsource:String=df.sqlContext.conf.defaultDataSourceName
1
1
其中DEFAULT_DATA_SOURCE_NAME默认参数是parquet。
//Thisisusedtosetthedefaultdatasource
valDEFAULT_DATA_SOURCE_NAME=stringConf("spark.sql.sources.default",
defaultValue=Some("org.awww.wang027.compache.spark.sql.parquet"),
doc="Thedefaultdatasourcetouseininput/output.")
DataFrame.Scala中部分函数详解:
1.toDF函数是将RDD转换成DataFrame
/
Returnstheobjectitself.
@groupbasic
@since1.3.0
/
//ThisisdeclaredwithparenthesestopreventtheScalacompilerfromtreating
//`rdd.toDF("1")`asinvokingthistoDFandthenapplyonthereturnedDataFrame.
deftoDF():DataFrame=this
2.show()方法:将结果显示出来
/
Displaysthe[[DataFrame]]inatabularform.Forexample:
{{{
yearmonthAVG(''AdjClose)MAX(''AdjClose)
1980120.5032180.595103
1981010.5232890.570307
1982020.4365040.475256
1983030.4105160.442194
1984040.4500900.483521
}}}
@paramnumRowsNumberofrowstoshow
@paramtruncateWhethertruncatelongstrings.Iftrue,stringsmorethan20characterswill
betruncatedandallcellswillbealignedright
@groupaction
@since1.5.0
/
//scalastyle:offprintln
defshow(numRows:Int,truncate:Boolean):Unit=println(showString(numRows,truncate))
//scalastyle:onprintln
追踪showString源码如下:showString中触发action收集数据。
/
Composethestringrepresentingrowsforoutput
@param_numRowsNumberofrowstoshow
@paramtruncateWhethertruncatelongstringsandaligncellsright
/
private[sql]defshowString(_numRows:Int,truncate:Boolean=true):String={
valnumRows=_numRows.max(0)
valsb=newStringBuilder
valtakeResult=take(numRows+1)
valhasMoreData=takeResult.length>numRows
valdata=takeResult.www.baiyuewang.nettake(numRows)
valnumCols=schema.fieldNames.length
|
|