spark如何从数据库提取数据 SparkSQL通用数据源加载
SparkSQL能用数据加载(load)和保存(save)
对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中。
一、数据加载(Load)- 1.数据加载默认选项
在SparkSQL中,默认的数据源是parquet, 除非配置了spark.sql.sources.default选项为其它格式的数据源。
//SparkSQL中读取时并没有指定数据源加载的option选项,默认读取的数据格式是parquet
//由配置项spark.sql.sources.default选项配置
val sanguoDF: DataFrame = spark.read.load("./dataset/sanguo.parquet")
- 2.手动指定选项(Manually Specifying Options)
除了SparkSQL默认提供的parquet, 还可以手动指定数据源的选项。通常,要加载的数据源需要使用完全限定名(fully qualified name), 例如org.apche.spark.sql.parquet,但是SparkSQL提供了内置的源,可以使用它们的短名称(shortnames), 例如,json, csv, jdbc, orc, parquet, text, libsvm等。
- 指定json加载选项
# 注:这晨的filename.json不一定是json文件,也可能是存放json格式文件的目录
val df: DataFrame = spark.read.format("json").load("path/filename.json")
- 指定csv加载选项
// 注:people.csv不一定是csv文件,也可以是存放csv文件的目录
val peopleDFCsv = spark.read.format("csv")
.option("sep", ",") //指定分隔符
.option("inferSchema", "true") //推断schema信息
.option("header", "true") //是否读取csv头部
.load("examples/src/main/resources/people.csv") //csv文件所在的路径
- 指定jdbc加载选项
//load这个方法做了哪些事情呢?
//1.会和数据库建立连接,读取元数据信息
//2.调用load时并没读取数据
val employeeDF: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://node5:3306/spark") // MySQL数据库的URL
.option("driver", "com.mysql.jdbc.Driver") //MySQL的jdbc区动
.option("dbtable", "employee") //需要读取的表名
.option("user", "root") //MySQL数据库用户名
.option("password", "XXXXXXXX") //MySQL数据库的密码
.load()
- 直接在文件上运行SQL(Run SQL on files directly)
SparkSQL不仅可以使用数据读取的API将文件加载到DataFrame进行查询,也可以直接使用SQL查询文件,例如:
//也可以直接使用SQL查询
val sanguoSQLDF = spark.sql("SELECT * FROM parquet.`./dataset/parquet/sanguo.parquet`")
sanguoSQLDF.show()
直接在文件上SQL查询
二、数据保存(Save)Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。
SaveMode几种模式的区别
- 1.常见数据格式保存
- parquet
// 1.保存为parquet文件
//sanguoSQLDF.write.mode(SaveMode.Append).parquet("./dataset/output/sanguo.parquet")
sanguoSQLDF.write.format("parquet").mode(SaveMode.Append).save("./dataset/output/sanguo.parquet")
- json
//2.保存为json文件
// sanguoSQLDF.write.mode(SaveMode.Overwrite).json("./dataset/output/sanguo.json")
sanguoSQLDF.write.format("json").mode(SaveMode.Overwrite).save("./dataset/output/sanguo.json")
- csv
//3.保存为csv文件
// sanguoSQLDF.write.mode(SaveMode.Ignore).csv("./dataset/output/sanguo.csv")
sanguoSQLDF.write.format("csv").mode(SaveMode.Ignore).save("./dataset/output/sanguo.csv")
- jdbc(以MySQL)为例
//4.保存为jdbc数据库,以MySQL为例
// sanguoSQLDF.write.mode(SaveMode.ErrorIfExists)
// .option("driver","com.mysql.jdbc.Driver")
// .option("url","jdbc:mysql://node5:3306/spark")
// .option("dbtable","t_sanguo")
// .option("user","root")
// .option("password","Love=-.,me1314")
// .save()
sanguoSQLDF.write.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url","jdbc:mysql://node5:3306/spark")
.option("dbtable","t_sanguo")
.option("user","root")
.option("password","Love=-.,me1314")
.mode(SaveMode.Overwrite)
.save()
- 2. 四种SaveMode验证与详细讲解
- SaveMode.ErrorIfExists: 假设数据库中开始没有表,执行下面代码两次:
package com.dvtn.spark.sql.load
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
object SparkSQLGenericLoad {
def main(args: Array[String]): Unit = {
//创建SparkSQL通往集群的切入点
val spark: SparkSession = SparkSession.builder()
.appName(SparkSQLGenericLoad.getClass.getSimpleName)
.master("local")
.getOrCreate()
//设置log的运行级别为"WARN"
spark.sparkContext.setLogLevel("WARN")
/**
* 默认加载数据源格式:parquet
* 在SparkSQL中,默认的加载的数据是parquet
* 由配置项spark.sql.sources.default选项配置
*/
//SparkSQL中读取时并没有指定数据源加载的option选项,默认读取的数据格式是parquet
//由配置项spark.sql.sources.default选项配置
//val sanguoDF: DataFrame = spark.read.load("./dataset/parquet/sanguo.parquet")
//sanguoDF.show()
println("----------------------------------------------------------------------------")
//也可以直接使用SQL查询
val sanguoSQLDF = spark.sql("SELECT * FROM parquet.`./dataset/parquet/sanguo.parquet`")
/**
* 数据的保存
*/
// // 1.保存为parquet文件
// //sanguoSQLDF.write.mode(SaveMode.Append).parquet("./dataset/output/sanguo.parquet")
// sanguoSQLDF.write.format("parquet").mode(SaveMode.Append).save("./dataset/output/sanguo.parquet")
//
// //2.保存为json文件
// // sanguoSQLDF.write.mode(SaveMode.Overwrite).json("./dataset/output/sanguo.json")
// sanguoSQLDF.write.format("json").mode(SaveMode.Overwrite).save("./dataset/output/sanguo.json")
//
// //3.保存为csv文件
// // sanguoSQLDF.write.mode(SaveMode.Ignore).csv("./dataset/output/sanguo.csv")
// sanguoSQLDF.write.format("csv").mode(SaveMode.Ignore).save("./dataset/output/sanguo.csv")
//4.保存为jdbc数据库,以MySQL为例
// sanguoSQLDF.write.mode(SaveMode.ErrorIfExists)
// .option("driver","com.mysql.jdbc.Driver")
// .option("url","jdbc:mysql://node5:3306/spark")
// .option("dbtable","t_sanguo")
// .option("user","root")
// .option("password","Love=-.,me1314")
// .save()
sanguoSQLDF.write.format("jdbc")
.option("driver","com.mysql.jdbc.Driver")
.option("url","jdbc:mysql://node5:3306/spark")
.option("dbtable","t_sanguo")
.option("user","root")
.option("password","Love=-.,me1314")
.mode(SaveMode.ErrorIfExists)
.save()
//释放资源
spark.stop()
}
}
- 运行第一次,在MySQL中创建了表:t_sanguo
SaveMode.ErrorIfExists
- 运行第二次,因为该表在MySQL数据库已存在,此时依然用SaveMode.ErrorIfExists, 会报错,说明SaveMode.ErrorIfExists表明如果数据不存在, 会保存数据; 如果数据存在,会报错。
SaveMode.ErrorIfExists表明如果数据不存在会保存数据,如果数据存在,会报错
- SaveMode.Overwrite: 通过下面的实验证明,SaveMode.Overwrite会把原表的数据全部覆盖写入, 就是类似于drop/recreate
- 在第一次保存在数据的表上手动添加一条记录:
- 然后修改代码SaveMode.Overwrite, 再次运行:
SaveMode.Overwrite会覆盖原表数据
- SaveMode.Append: SaveMode.Append会在原有数据下面追加写入
- MySQL数据表原始记录如下:
MySQL现有表t_sanguo的记录
- 修改代码中SaveMode.Append, 再次执行, 发现数据在后面追加写入:
SaveMode.Append会在原有数据下面追加写入
- SaveMode.Ignore: 如果数据整个不存在,则写入;如果文件或表存在,则不执行任何操作
- 在运行前,表中的数据如下:
原表数据
- 修改代码中SaveMode为SaveMode.Ignore, 运行, 发现表中数据没有变化。
- 有Hive数据的加载与保存因为非常重要,后面会有专门章节详细讲解,在此先不展开。
本文所有文章都是原创,相当于个人在学习时整理的笔记分享到大家,欢迎大家阅读,如转载请标明出处。
如您觉得本人写的东西对您还有所帮助,请给予关注或点个赞,送人玫瑰,留有余香。我相信在您的鼓励下,本人会在后面将所有精华干货都慢慢分享出来。
本人在此非常感谢大家的理解和支持!
,免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com