配色: 字号:
Spark5_Spark SQL
2022-09-15 | 阅:  转:  |  分享 
  
SparkSQL2018/12/11版本修改人修改记录修改时间V1.0王守奎编写2018/5/23目录SparkSQL产生的背景3SQL
的重要性3SparkSQL的前身3SparkSQL的诞生3SparkSQL和HiveOnSpark的区别4SparkS
QL简介4SparkSQL的作用5SparkSQL5SparkSQL架构5语言API6模式RDD6数据源6SparkSQL
原理6SqarkSQL操作json16SqarkSQL操作hive数据源20SparkSQL产生的背景SQL的重要性很多传统
的dba人员或者熟悉关系新数据库的人在遇到日益增长的数据量,关系型数据库已经存储不了那么多信息,那么如果想要使用大数据的手段来进行
处理,那么一大批人肯定还是有SQL那最好,如果你要使用mapreduce或者spark,那么这些框架对传统的dba的同学来说,门槛
比较高,要么你要学java,要么你要学python或者Scala编写程序,这个成本还是很高的,如果能把原来关系型数据库的那一套拿
到大数据中那岂不是更好。SparkSQL的前身SparkSQL的前身是Shark,给熟悉RDBMS但又不理解MapReduce的
技术人员提供快速上手的工具,但是说到Shark不得不说一说Hive了。Hive是基于hadoop之上的一个开源的工具,他提供的就是
类似于SQL的一种HQL语言,它的好处就是可以直接把你的SQL语句转换成mapreduce作业,然后提交在集群上运行,好处就是我们
不需要基于mapreduce的api进行编程,你只需要写SQL语句就能完成大数据的统计和分析。这个最大的优势就是对于不太熟悉编程语
言的人来说,也不用了解太多mapreduce底层的东西就能对存储在HDFS上的海量数据进行分析和查询,当然这也是有缺点的:hive
是把sql翻译成mapreduce作业,所以底层还是基于mapreduce,那么mapreduce框架的缺点就是效率太低,那么这样
子我们hive的效率肯定不会高,对于批处理的作业hive进行实现的话,如果很大的话,耗时十几个小时都是很可能的。那么缺点这么明显
,hive有没有改进呢?肯定也有,你不是慢嘛,那我把你底层的执行引擎换掉,后来就产生的TEZ还有spark这些底层的执行引擎,也就
是说hive可以跑在TEZ、SPARK上面。后来慢慢发展就推出了spark,spark是一个基于内存的分布式计算框架,他的执行效率
比mapreduce高了太多。SparkSQL的诞生随着Spark的发展,对于野心勃勃的Spark团队来说,Shark对于Hiv
e的太多依赖(如采用Hive的语法解析器、查询优化器等等),制约了Spark的OneStackRuleThemAll的既定
方针,制约了Spark各个组件的相互集成,所以提出了SparkSQL项目。SparkSQL抛弃原有Shark的代码,汲取了Shar
k的一些优点,如内存列存储(In-MemoryColumnarStorage)、Hive兼容性等,重新开发了SparkSQL代
码;由于摆脱了对Hive的依赖性,SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步,海阔天
空”。SparkSQL和HiveOnSpark的区别其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hiv
e,只是兼容Hive;而HiveonSpark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说
,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。两者区别:SparkSQL刚开始也是使
用了hive里面一些东西的,但是SparkSQL里面的hive版本肯定要比hive社区理的版本要低一些的,那么hive里面有的东
西,比如说原来跑在mapreduce之上已有的一些功能,如果你使用hiveonspark的话他是能支持的,但是有一些功能想要直
接在SparkSQL上直接用,很可能是没有办法支持,因为SparkSQL里面的一些功能并没有hive完善,毕竟hive已经这么
多年,而Spark只是发展了这两三年而已。那么shark终止以后,在Spark界重心就已经在SparkSQL上了,SparkS
Ql干的事情和原来的shark是有很大的差别的,因为原来的shark依赖了很多hive的东西,那么在sparksql里面就必须要把
这个依赖更好的减轻。SparkSQL简介SparkSQL是Spark处理数据的一个模块,跟基本的SparkRDD的API不同
,SparkSQL中提供的接口将会提供给Spark更多关于结构化数据和计算的信息。其本质是,SparkSQL使用这些额外的信息
去执行额外的优化,这儿有几种和SparkSQL进行交互的方法,包括SQL和DatasetAPI,当使用相同的执行引擎时,API
或其它语言对于计算的表达都是相互独立的,这种统一意味着开发人员可以轻松地在不同的API之间进行切换。SparkSQL的作用Spa
rkSQL的一大用处就是执行SQL查询语句,SparkSQL也可以用来从Hive中读取数据,当我们使用其它编程语言来运行一个S
QL语句,结果返回的是一个Dataset或者DataFrame.你可以使用命令行,JDBC或者ODBC的方式来与SQL进行交互。1
)SparkSQL的应用并不局限于SQL;2)访问hive、json、parquet等文件的数据;3)SQL只是SparkSQ
L的一个功能而已;4)SparkSQL提供了SQL的api、DataFrame和Dataset的API;SparkSQLSpa
rkSQL是处理结构化、半结构化数据的一个spark模块。不同于sparkrdd的基本API,sparksql接口关注数据
结构本身与执行计划等更多信息。在spark内部,sql利用这些信息去更好地进行优化。有如下几种方式执行:SQL、DataFrame
sAPI与DatasetsAPI。主要功能如下:1.从各种结构化数据源加载数据(json/hive/Parquet)2.使用SQ
L查询,使用JDBC机制3.在sql和python/java代码中做了整合,可以进行RDD和tablejoin操作4.Spark
给出了SchemaRDD,和record对应SparkSQL架构此架构包含三个层,即LanguageAPI,SchemaRD
D和数据源。语言APISpark与不同的语言和SparkSQL兼容。它也是由这些语言支持的API(python,scala,j
ava,HiveQL)。模式RDDSparkCore是使用称为RDD的特殊数据结构设计的。通常,SparkSQL适用于模式,
表和记录。因此,我们可以使用SchemaRDD作为临时表。我们可以将此SchemaRDD称为数据帧。数据源通常spark-
core的数据源是文本文件,Avro文件等。但是,SparkSQL的数据源不同。这些是Parquet文件,JSON文档,HIV
E表和Cassandra数据库。SparkSQL原理sparkSQL层级当我们想用sparkSQL来解决我们的需求时,其实说简
单也简单,就经历了三步:读入数据->对数据进行处理->写入最后结果,那么这三个步骤用的主要类其实就三个:读入数据和写入最
后结果用到两个类HiveContext和SQLContext,对数据进行处理用到的是DataFrame类,此类是你把数据从外部读入
到内存后,数据在内存中进行存储的基本数据结构,在对数据进行处理时还会用到一些中间类,用到时在进行讲解。如下图所示:HiveCont
ext和SQLContext把HiveContext和SQLContext放在一起讲解是因为他们是差不多的,因为HiveConte
xt继承自SQLContext,为什么会有两个这样的类,其实与hive和sql有关系的,虽然hive拥有HQL语言,但是它是一个类
sql语言,和sql语言还是有差别的,有些sql语法,HQL是不支持的。所以他们还是有差别的。选择不同的类,最后执行的查询引擎的驱
动是不一样的。但是对于底层是怎么区别的这里不做详细的介绍,你就知道一点,使用不同的读数据的类,底层会进行标记,自动识别是使用哪个类
进行数据操作,然后采用不同的执行计划执行操作。当从hive库中读数据的时候,必须使用HiveContext来进行读取数据,不然在进
行查询的时候会出一些奇怪的错。其他的数据源两者都可以选择,但是最好使用SQLContext来完成。因为其支持的sql语法更多。由于
HiveContext是继承自SQLContext,这里只对SQLContext进行详细的介绍,但是以下这些方法是完全可以用在Hi
veContext中的。其实HiveContext类就扩展了SQLContext的两个我们可以使用的方法(在看源码时以protec
ted和private开头的方法都是我们不能使用的,这个是scala的控制逻辑,相反,不是以这两个关键字标记的方法是我们可以直接使
用的方法):analyze(tableName:String)和refreshTable(tableName:String)。方法
用途analyze方法这个我们一般使用不到,它是来对我们写的sql查询语句进行分析用的,一般用不到。refreshTable方法当
我们在sparkSQL中处理的某个表的存储位置发生了变换,但是我们在内存的metaData中缓存(cache)了这张表,则需要调用
这个方法来使这个缓存无效,需要重新加载。读数据?我们在解决我们的需求时,首先是读入数据,需要把数据读入到内存中去,读数据SQLCo
ntext提供了两个方法,我们提供两个数据表,为了便于演示,我采用的是用JSON格式进行存储的,写成这样的格式,但是可以保存为.t
xt格式的文件。1、第一种数据读入:这种是对数据源文件进行操作。import?org.apache.spark.sql.SQLCo
ntextval?sql?=?new?SQLContext(sc)?//声明一个SQLContext的对象,以便对数据进行操作va
l?peopleInfo?=?sql.read.json("文件路径")//其中peopleInfo返回的结果是:org.apac
he.spark.sql.DataFrame?=//?[age:?bigint,?id:?bigint,?name:?string
],这样就把数据读入到内存中了写了这几行代码后面总共发生了什么,首先sparkSQL先找到文件,以解析json的形式进行解析,同
时通过json的key形成schema,schema的字段的顺序不是按照我们读入数据时期默认的顺序,如上,其字段的顺序是通过字符串
的顺序进行重新组织的。默认情况下,会把整数解析成bigint类型的,把字符串解析成string类型的,通过这个方法读入数据时,返回
值得结果是一个DataFrame数据类型。DataFrame是什么?其实它是sparkSQL处理大数据的基本并且是核心的数据结构,
是来存储sparkSQL把数据读入到内存中,数据在内存中进行存储的基本数据结构。它采用的存储是类似于数据库的表的形式进行存储的。我
们想一想,一个数据表有几部分组成:1、数据,这个数据是一行一行进行存储的,一条记录就是一行,2、数据表的数据字典,包括表的名称,表
的字段和字段的类型等元数据信息。那么DataFrame也是按照行进行存储的,这个类是Row,一行一行的进行数据存储。一般情况下处理
粒度是行粒度的,不需要对其行内数据进行操作,如果想单独操作行内数据也是可以的,只是在处理的时候要小心,因为处理行内的数据容易出错,
比如选错数据,数组越界等。数据的存储的形式有了,数据表的字段和字段的类型都存放在哪里呢,就是schema中。我们可以调用schem
a来看其存储的是什么。peopleInfo.schema//返回的结果是:org.apache.spark.sql.types.S
tructType?=?//StructType(StructField(age,LongType,true),?StructFi
eld(id,LongType,true),//?StructField(name,StringType,true))可以看出pe
opleInfo存储的是数据,schema中存储的是这些字段的信息。需要注意的是表的字段的类型与scala数据类型的对应关系:bi
gint->Long,int->Int,Float->Float,double->Double,string->S
tring等。一个DataFrame是有两部分组成的:以行进行存储的数据和scheam,schema是StructType类型的。
当我们有数据而没有schema时,我们可以通过这个形式进行构造从而形成一个DataFrame。第二种读入数据:这个读入数据的方法,
主要是处理从一个数据表中选择部分字段,而不是选择表中的所有字段。那么这种需求,采用这个数据读入方式比较有优势。这种方式是直接写sq
l的查询语句。把上述json格式的数据保存为数据库中表的格式。需要注意的是这种只能处理数据库表数据。val?peopleInfo?
=?sql.sql("""|select|?id,|?name,|?age|from?peopleInfo""".stripMar
gin)//其中stripMargin方法是来解析我们写的sql语句的。//返回的结果是和read读取返回的结果是一样的://or
g.apache.spark.sql.DataFrame?=//?[age:?bigint,?id:?bigint,?name:?
string]read其它读入数据的接口函数函数用途json(path:String)读取json文件用此方法table(tabl
eName:String)读取数据库中的表jdbc(url:String,table:String,predicates:Ar
ray[String],connectionProperties:Properties)通过jdbc读取数据库中的表orc(pat
h:String)读取以orc格式进行存储的文件parquet(path:String)读取以parquet格式进行存储的文件sc
hema(schema:StructType)这个是一个优化,当我们读入数据的时候指定了其schema,底层就不会再次解析sche
ma从而进行了优化,一般不需要这样的优化,不进行此优化,时间效率还是可以接受写入数据写入数据就比较的简单,因为其拥有一定的模式,按
照这个模式进行数据的写入。一般情况下,我们需要写入的数据是一个DataFrame类型的,如果其不是DataFrame类型的我们需要
把其转换为DataFrame类型,有些人可能会有疑问,数据读入到内存中,其类型是DataFrame类型,我们在处理数据时用到的是D
ataFrame类中的方法,但是DataFrame中的方法不一定返回值仍然是DataFrame类型的,同时有时我们需要构建自己的类
型,所以我们需要为我们的数据构建成DataFrame的类型。把没有schema的数据,构建schema类型,我所知道的就有两种方法
。通过类构建schema,还以上面的peopleInfo为例子。下面的例子是利用了scala的反射技术,生成了一个DataFram
e类型。可以看出我们是把RDD给转换为DataFrame的。val?sql?=?new?SQLContext(sc)?//创建一个
SQLContext对象import?sql.implicits._?//这个sql是上面我们定义的sql,而不是某一个jar包,
网上有很多?//是import?sqlContext.implicits._,那是他们定义的是?//sqlContext?=?SQ
LContext(sc),这个是scala的一个特性val?people?=?sc.textFile("people.txt")/
/我们采用spark的类型读入数据,因为如果用?//SQLContext进行读入,他们自动有了schemacase?clase?P
eople(id:Int,name:String,age:Int)//定义一个类val?peopleInfo?=?people.m
ap(lines?=>?lines.split(","))?.map(p?=>?People(p(0).toInt,p(1),p(
2).toInt)).toDF?//这样的一个toDF就创建了一个DataFrame,如果不导入?//sql.implicits.
_,这个toDF方法是不可以用的。直接构造schema,以peopelInfo为例子。直接构造,我们需要把我们的数据类型进行转化成
Row类型,不然会报错。val?sql?=?new?SQLContext(sc)?//创建一个SQLContext对象val?pe
ople?=?sc.textFile("people.txt").map(lines?=>?lines.split(","))va
l?peopleRow?=?sc.map(p?=>?Row(p(0),p(1),(2)))//把RDD转化成RDD(Row)类型v
al?schema?=?StructType(StructFile("id",IntegerType,true)::?Struct
File("name",StringType,true)::?StructFile("age",IntegerType,true)
::Nil)val?peopleInfo?=?sql.createDataFrame(peopleRow,schema)//peo
pleRow的每一行的数据?//类型一定要与schema的一致?//否则会报错,说类型无法匹配?//同时peopleRow每一行的
长度?//也要和schema一致,否则?//也会报错构造schema用到了两个类StructType和StructFile,其中S
tructFile类的三个参数分别是(字段名称,类型,数据是否可以用null填充)采用直接构造有很大的制约性,字段少了还可以,如果
有几十个甚至一百多个字段,这种方法就比较耗时,不仅要保证Row中数据的类型要和我们定义的schema类型一致,长度也要一样,不然都
会报错,所以要想直接构造schema,一定要细心细心再细心。写数据操作:val?sql?=?new?SQLContext(sc)?
val?people?=?sc.textFile("people.txt").map(lines?=>?lines.split("
,"))val?peopleRow?=?sc.map(p?=>?Row(p(0),p(1),(2)))val?schema?=?S
tructType(StructFile("id",IntegerType,true)::?StructFile("name",S
tringType,true)::?StructFile("age",IntegerType,true)::Nil)val?peo
pleInfo?=?sql.createDataFrame(peopleRow,schema)peopleInfo.registe
rTempTable("tempTable")//只有有了这个注册的表tempTable,我们?//才能通过sql.sql(“”“
?”“”)进行查询?//这个是在内存中注册一个临时表用户查询sql.sql.sql("""|insert?overwrite?ta
ble?tagetTable|select|?id,|?name,|?age|from?tempTable""".stripMar
gin)//这样就把数据写入到了数据库目标表tagetTable中有上面可以看到,sparkSQL的sql()其实就是用来执行我们
写的sql语句的。通过DataFrame中的方法对数据进行操作在介绍DataFrame之前,我们还是要先明确一下,sparkSQL
是用来干什么的,它主要为我们提供了怎样的便捷,我们为什么要用它。它是为了让我们能用写代码的形式来处理sql,这样说可能有点不准确,
如果就这么简单,只是对sql进行简单的替换,要是我,我也不学习它,因为我已经会sql了,会通过sql进行处理数据仓库的etl,我还
学习sparkSQL干嘛,而且学习的成本又那么高。sparkSQL肯定有好处了,我们都知道通过写sql来进行数据逻辑的处理时有限的
,写程序来进行数据逻辑的处理是非常灵活的,所以sparkSQL是用来处理那些不能够用sql来进行处理的数据逻辑或者用sql处理起来
比较复杂的数据逻辑。一般的原则是能用sql来处理的,尽量用sql来处理,毕竟开发起来简单,sql处理不了的,再选择用sparkSQ
L通过写代码的方式来处理。sparkSQL非常强大,它提供了我们sql中的增删改查所有的功能,每一个功能都对应了一个实现此功能的方
法。对schema的操作val?sql?=?new?SQLContext(sc)val?people?=?sql.read.jso
n("people.txt")//people是一个DataFrame类型的对象?//数据读进来了,那我们查看一下其schema吧
people.schema?//返回的类型//org.apache.spark.sql.types.StructType?=?//
StructType(StructField(age,LongType,true),?//???????????StructFie
ld(id,LongType,true),//???????????StructField(name,StringType,tru
e))?//以数组的形式展示schemapeople.dtypes?//返回的结果://Array[(String,?String
)]?=?//???????Array((age,LongType),?(id,LongType),?(name,StringTy
pe))//返回schema中的字段people.columns//返回的结果://Array[String]?=?Array(a
ge,?id,?name)??//以tree的形式打印输出schemapeople.printSchema//返回的结果://ro
ot//?|--?age:?long?(nullable?=?true)//?|--?id:?long?(nullable?=?t
rue)//?|--?name:?string?(nullable?=?true)对表的操作对表的操作,对表的操作语句一般情况下是
不常用的,因为虽然sparkSQL把sql查的每一个功能都封装到了一个方法中,但是处理起来还是不怎么灵活,一般情况下我们采用的是用
sql()方法直接来写sql,这样比较实用,还更灵活,而且代码的可读性也是很高的。那下面就把能用到的方法做一个简要的说明。对表的操
作,对表的操作语句一般情况下是不常用的,因为虽然sparkSQL把sql查的每一个功能都封装到了一个方法中,但是处理起来还是不怎么
灵活一般情况下我们采用的是用sql()方法直接来写sql,这样比较实用,还更灵活,而且代码的可读性也是很高的。那下面就把能用到的方
法做一个简要的说明。方法(sql使我们定义的sql=newSQLContext(sc))df是一个DataFrame对象实
例说明sql.read.table(tableName)读取一张表的数据df.where(),df.filte
r()过滤条件,相当于sql的where部分;用法:选择出年龄字段中年龄大于20的字段。返回值类型:DataFramedf.whe
re("age>=20"),df.filter("age>=20")df.limit()限制输出的行数,对应于sql的li
mit用法:限制输出一百行返回值类型:DataFramedf.limit(100)df.join()链接操作,相当于sql的joi
n对于join操作,下面会单独进行介绍df.groupBy()聚合操作,相当于sql的groupBy用法:对于某几行进行聚合返回值
类型:DataFramedf.groupBy("id")df.agg()求聚合用的相关函数,下面会详细介绍df.intersect
(other:DataFrame)求两个DataFrame的交集df.except(other:DataFrame)求在df中而不
在other中的行df.withColumn(colName:String,col:Column)增加一列df.withColum
nRenamed(exName,newName)对某一列的名字进行重新命名df.map(),这些方法都是spark的RDD的基本操
作,其中在DataFrame类中也封装了这些方法,需要注意的是这些方法的返回值是RDD类型的,不是DataFrame类型的,在这些
方法的使用上,一定要记清楚返回值类型,不然就容易出现错误df.flatMap,df.mapPartitions(),df.fore
ach()df.foreachPartition()df.collect()df.collectAsList()df.repart
ition()df.distinct()df.count()df.select()选取某几列元素,这个方法相当于sql的selec
t的功能用法:返回选择的某几列数据返回值类型:DataFramedf.select("id","name")sparkSQL的jo
in操作spark的join操作就没有直接写sql的join操作来的灵活,在进行链接的时候,不能对两个表中的字段进行重新命名,这样
就会出现同一张表中出现两个相同的字段。下面就一点一点的进行展开用到的两个表,一个是用户信息表,一个是用户的收入薪资表:1、内连接,
等值链接,会把链接的列合并成一个列val?sql?=?new?SQLContext(sc)val?pInfo?=?sql.read
.json("people.txt")val?pSalar?=?sql.read.json("salary.txt")val?in
fo_salary?=?pInfo.join(pSalar,"id")//单个字段进行内连接val?info_salary1?=?
pInfo.join(pSalar,Seq("id","name"))//多字段链接返回的结果如下图:单个id进行链接(一张表出
现两个name字段)????????两个字段进行链接2、join还支持左联接和右链接,但是其左联接和右链接和我们
sql的链接的意思是一样的,同样也是在链接的时候不能对字段进行重新命名,如果两个表中有相同的字段,则就会出现在同一个join的表中
,同事左右链接,不会合并用于链接的字段。链接用的关键词:outer,inner,left_outer,right_outer//单
字段链接val?left?=?pInfo.join(pSalar,pInfo("id")?===?pSalar("id"),"le
ft_outer")//多字段链接val?left2?=?pInfo.join(pSalar,pInfo("id")?===?pS
alar("id")?and??pInfo("name")?===?pSalar("name"),"left_outer")返回的
结果:单字段链接多字段链接由上可以发现,sparkSQL的join操作还是没有sql的join灵活,容易出现重复的字段在同一张
表中,一般我们进行链接操作时,我们都是先利用registerTempTable()函数把此DataFrame注册成一个内部表,然后
通过sql.sql("")写sql的方法进行链接,这样可以更好的解决了重复字段的问题。sparkSQL的agg(聚合)操作其中sp
arkSQL的agg是sparkSQL聚合操作的一种表达式,当我们调用agg时,其一般情况下都是和groupBy()的一起使用的,
选择操作的数据表为:val?pSalar?=?new?SQLContext(sc).read.json("salary.txt")
val?group?=?pSalar.groupBy("name").agg("salary"?->?"avg")val?grou
p2?=?pSalar.groupBy("id","name").agg("salary"?->?"avg")val?group3
?=?pSalar.groupBy("name").agg(Map("id"?->?"avg","salary"->"max"))
得到的结过如下:group的结果????????group2????????????
???group3??使用agg时需要注意的是,同一个字段不能进行两次操作比如:agg(Map("salary"->"a
vg","salary"->"max"),他只会计算max的操作,原因很简单,agg接入的参数是Map类型的key-value
对,当key相同时,会覆盖掉之前的value。同时还可以直接使用agg,这样是对所有的行而言的。聚合所用的计算参数有:avg,ma
x,min,sum,count,而不是只有例子中用到的avg。sparkSQL的na操作?sparkSQL的na方法,返回的是一个
DataFrameFuctions对象,此类主要是对DataFrame中值为null的行的操作,只提供三个方法,drop()删除行
,fill()填充行,replace()代替行的操作。很简单不做过多的介绍。原理总结我们使用sparkSQL的目的就是为了解决用写
sql不能解决的或者解决起来比较困难的问题,在平时的开发过程中,我们不能为了高逼格什么样的sql问题都是用sparkSQL,这样不
是最高效的。使用sparkSQL,主要是利用了写代码处理数据逻辑的灵活性,但是我们也不能完全的只使用sparkSQL提供的sql方
法,这样同样是走向了另外一个极端,有上面的讨论可知,在使用join操作时,如果使用sparkSQL的join操作,有很多的弊端。为
了能结合sql语句的优越性,我们可以先把要进行链接的DataFrame对象,注册成内部的一个中间表,然后在通过写sql语句,用SQ
LContext提供的sql()方法来执行我们写的sql,这样处理起来更加的合理而且高效。在工作的开发过程中,我们要结合写代码和写
sql的各自的所长来处理我们的问题,这样会更加的高效。SqarkSQL操作json因为spark-2.1.1-bin-witho
ut-hadoop.tgz中sqlContext初始化错,(没有错误则无需理会)改用spark-2.1.1-bin-hadoop2
.7.tgz或者spark-2.1.1-bin-hadoop2解压spark-2.1.1-bin-hadoop2.7.tgz,并使
得/soft/spark指针指向spark-2.1.1-bin-hadoop2.71.使用sqlContext读取json文件1.
创建json文件[/home/hadoop/spark/employee.json][{"id":"1201","name":"s
atish","age":"25"},{"id":"1202","name":"krishna","age":"28"},{"id
":"1203","name":"amith","age":"29"},{"id":"1204","name":"javed","
age":"23"},{"id":"1205","name":"prudvi","age":"23"}]读入json文件scala
>importorg.apache.spark.sql.SQLContextscala>valsqlContext=new
SQLContext(sc)scala>valmyRdd=sqlContext.read.json("file:///ho
me/hadoop/spark/employee.json")操作dataFramescala>myRdd.collectscal
a>myRdd.show//查询数据scala>myRdd.printSchema//查询表结构scala>myRdd
.select("name").show//选中指定列进行展示scala>myRdd.filter(myRdd("age")
>23).show//条件查询,过滤年龄大于23scala>myRdd.filter("age>23").select("i
d","name").showscala>myRdd.groupBy("age").count().show//分组scal
a>varg=myRdd.groupBy("age")scala>g.count()4.将dataFrame注册成临时表sc
ala>myRdd.registerTempTable("t1")scala>sqlContext.sql("select
fromt1").showscala>sqlContext.sql("selectagefromt1groupbya
ge").show2.根据反射推断schema0.准备文本文件[employee.txt]1201,satu,251202,sa
to,261203,satp,291204,satq,231205,satqe,231.通过sc加载txt文件scala>val
rdd=sc.textFile("file:///home/hadoop/spark/employee.txt")2.创建sc
hema串scala>valschema="idnameage"3.导入相关的数据类型scala>importorg.
apache.spark.sql.Row;scala>importorg.apache.spark.sql.types.{Str
uctType,StructField,StringType};4.生成schemascala>valsch=StructT
ype(schema.split("").map(fieldName=>StructField(fieldName,Stri
ngType,true)))5.对文本数据进行变换scala>valrowRDD=rdd.map(_.split(","))
.map(e=>Row(e(0).trim.toInt,e(1),e(2).trim.toInt))6.结合rowRDD和sche
ma创建dataFramescala>valdfs=sqlContext.createDataFrame(rowRDD,sc
h)scala>dfs.printSchemaSqarkSQL操作hive数据源查看安装的spark是否支持hive登录Linu
x系统,打开一个终端,然后,执行下面命令:返回错误信息,也就是spark无法识别org.apache.spark.sql.hive
.HiveContext,这就说明你当前电脑上的Spark版本不包含Hive支持。如果你当前电脑上的Spark版本包含Hive支持
,那么应该显示下面的正确信息:spark-hive集成-安装包自带hive数据源此时默认使用hive自带的元数据管理:Derby数
据库。安装spark使用spark-2.1.1-bin-hadoop2.7.tgz来作为spark的安装文件(自带hive数据源
)。而不是用spark-2.1.1-bin-without-hadoop.tgz。创建spark软链接。修改spark-env
.sh配置文件,增加如下的配置。exportJAVA_HOME=/usr/java/jdk1.8.0_172exportSP
ARK_DIST_CLASSPATH=$(/soft/hadoop/bin/hadoopclasspath)2.启动shell>
spark-shell3.导入HiveContextscala>importorg.apache.spark.sql.hive.
HiveContextscala>valhcContext=newHiveContext(sc);使用HiveQL创建表自
带的hive数据创建于本地文件系统。scala>hcContext.sql("CREATETABLEIFNOTEXISTS
employee(idINT,nameSTRING,ageINT)ROWFORMATDELIMITEDFIELDS
TERMINATEDBY'',''LINESTERMINATEDBY''\n''")5.使用Hiveql加载和保存数据sca
la>hcContext.sql("LOADDATALOCALINPATH''file:///home/hadoop/spa
rk/employee.txt''INTOTABLEemployee")选择查询scala>valdataFrame=h
cContext.sql("FROMemployeeSELECTid,name,age")显示记录scala>dataFra
me.show()spark-hive集成-第三方数据源1.安装spark同上操作集成配置根据hive的配置参数hive.meta
store.uris的情况,采用不同的集成方式分别:1.hive.metastore.uris没有给定配置值,为空(默认
情况)SparkSQL通过hive配置的javax.jdo.option.XXX相关配置值直接连接metastore数据库直接获
取hive表元数据但是,需要将连接数据库的驱动添加到Spark应用的classpath中2.hive.metastore.u
ris给定了具体的参数值SparkSQL通过连接hive提供的metastore服务来获取hive表的元数据直接启动hive的
metastore服务即可完成SparkSQL和Hive的集成$hive--servicemetastore&配置操作H
IVE_HOME/conf/hive-site.xml中增加关于hive.metastore.uris的配置信息,如下:operty>hive.metastore.uristhrift://namenode
:9083
IPaddress(orfully-qualifieddomain
name)andportofthemetastorehost
执行:H
IVE_HOME/bin/hive--servicemetastore&,启动元数据存储服务将HIVE_HOME/conf
/hive-site.xml拷贝至SPARK_HOME/conf/目录下或创建软链接4.需要启动hadoop集群>start-al
l.sh5.启动spark-shell,使用--jars参数指定所有第三方依赖类库>spark-shell6.使用HiveQL创建
表scala>importorg.apache.spark.sql.hive.HiveContextscala>valhcCo
ntext=newHiveContext(sc);scala>hcContext.tables().collect//
查表scala>hcContext.tables("default").collect//指定数据库查表scala>hcCo
ntext.sql("CREATETABLEIFNOTEXISTSemployee1(idINT,nameSTRIN
G,ageINT)ROWFORMATDELIMITEDFIELDSTERMINATEDBY'',''LINESTE
RMINATEDBY''\n''")7.使用Hiveql加载和保存数据scala>hcContext.sql("LOADDATA
LOCALINPATH''file:///home/hadoop/spark/employee.txt''INTOTABLE
employee1")ERROR信息不明,不影响作业的执行,据说是一个BUG8.从表中选择字段进行查询scala>valdat
aFrame=hcContext.sql("FROMemployee1SELECTid,name,age")9.显示记录
scala>dataFrame.show()10.drop表scala>valdataFrame=hcContext.sql
("droptableemployee1")11.用sqlContext替代以上6-10步操作可以用sqlConText替代h
cContext,命令如下:scala>importorg.apache.spark.sql.SQLContextscala>v
alsqlContext=newSQLContext(sc)scala>sqlContext.tables().colle
ctscala>sqlContext.sql("CREATETABLEIFNOTEXISTSemployee(idIN
T,nameSTRING,ageINT)ROWFORMATDELIMITEDFIELDSTERMINATEDBY
'',''LINESTERMINATEDBY''\n''")scala>sqlContext.tables().collectsc
ala>sqlContext.sql("LOADDATALOCALINPATH''file:///home/hadoop/spark/employee.txt''INTOTABLEemployee")scala>valdataFrame=sqlContext.sql("FROMemployeeSELECTid,name,age")scala>dataFrame.show()常见错误问题:Thespecifieddatastoredriver("com.mysql.cj.jdbc.Driver")wasnotfoundintheCLASSPATH解决方案:在/soft/spark/jars目录下添加mysql-connector-java-6.0.6.jar问题:启动spark-shell报一堆HiveConfofname找不到解决方案:spark的hiveConf并不接受这些配置,相当于多扔给它一些参数。于是把spark配置目录下的hive-site.xml中对应WARN配置项都删除了,因为是直接拷贝的hive配置,配置项比较多。可以先vim处理下输出日志截出配置项。问题:使用mysql-connector-java.jar连接MySql时出现:Errorwhileretrievingmetadataforprocedurecolumns:java.sql.SQLException:Parameter/ColumnnamepatterncannotbeNULLorempty.解决方案:mysql连接驱动的版本太高,换一个低版本的驱动即可。Spark-sql操作输入spark-sql进入spark-sql窗口,可以向在hive中一样,编写sql。spark-sql>CREATETABLEIFNOTEXISTSemployee(idINT,nameSTRING,ageINT)ROWFORMATDELIMITEDFIELDSTERMINATEDBY'',''LINESTERMINATEDBY''\n'';spark-sql>showtables;spark-sql>selectfromemployee;spark-sql>LOADDATALOCALINPATH''file:///home/hadoop/spark/employee.txt''INTOTABLEemployeespark-sql>selectfromemployee;常见错误问题:MetaException(message:Versioninformationnotfoundinmetastore.)解决方案:将hive-site.xml中的属相hive.metastore.schema.verification值改为false魁魁语录:宝剑锋从磨砺出梅花香自苦寒来魁魁语录:宝剑锋从磨砺出梅花香自苦寒来江湖一哥版权所有江湖一哥版权所有
献花(0)
+1
(本文系王守奎的图...原创)