spark如何从数据库提取数据 SparkSQL通用数据源加载

spark如何从数据库提取数据 SparkSQL通用数据源加载(1)

SparkSQL能用数据加载(load)和保存(save)

对于Spark SQL的DataFrame来说,无论是从什么数据源创建出来的DataFrame,都有一些共同的load和save操作。load操作主要用于加载数据,创建出DataFrame;save操作,主要用于将DataFrame中的数据保存到文件中

一、数据加载(Load)
  • 1.数据加载默认选项

在SparkSQL中,默认的数据源是parquet, 除非配置了spark.sql.sources.default选项为其它格式的数据源。

//SparkSQL中读取时并没有指定数据源加载的option选项,默认读取的数据格式是parquet //由配置项spark.sql.sources.default选项配置 val sanguoDF: DataFrame = spark.read.load("./dataset/sanguo.parquet")

  • 2.手动指定选项(Manually Specifying Options)

除了SparkSQL默认提供的parquet, 还可以手动指定数据源的选项。通常,要加载的数据源需要使用完全限定名(fully qualified name), 例如org.apche.spark.sql.parquet,但是SparkSQL提供了内置的源,可以使用它们的短名称(shortnames), 例如,json, csv, jdbc, orc, parquet, text, libsvm等。

    • 指定json加载选项

# 注:这晨的filename.json不一定是json文件,也可能是存放json格式文件的目录 val df: DataFrame = spark.read.format("json").load("path/filename.json")

    • 指定csv加载选项

// 注:people.csv不一定是csv文件,也可以是存放csv文件的目录 val peopleDFCsv = spark.read.format("csv") .option("sep", ",") //指定分隔符 .option("inferSchema", "true") //推断schema信息 .option("header", "true") //是否读取csv头部 .load("examples/src/main/resources/people.csv") //csv文件所在的路径

    • 指定jdbc加载选项

//load这个方法做了哪些事情呢? //1.会和数据库建立连接,读取元数据信息 //2.调用load时并没读取数据 val employeeDF: DataFrame = spark.read.format("jdbc") .option("url", "jdbc:mysql://node5:3306/spark") // MySQL数据库的URL .option("driver", "com.mysql.jdbc.Driver") //MySQL的jdbc区动 .option("dbtable", "employee") //需要读取的表名 .option("user", "root") //MySQL数据库用户名 .option("password", "XXXXXXXX") //MySQL数据库的密码 .load()

  • 直接在文件上运行SQL(Run SQL on files directly)

SparkSQL不仅可以使用数据读取的API将文件加载到DataFrame进行查询,也可以直接使用SQL查询文件,例如:

//也可以直接使用SQL查询 val sanguoSQLDF = spark.sql("SELECT * FROM parquet.`./dataset/parquet/sanguo.parquet`") sanguoSQLDF.show()

spark如何从数据库提取数据 SparkSQL通用数据源加载(2)

直接在文件上SQL查询

二、数据保存(Save)

Spark SQL对于save操作,提供了不同的save mode。主要用来处理,当目标位置,已经有数据时,应该如何处理。而且save操作并不会执行锁操作,并且不是原子的,因此是有一定风险出现脏数据的。

spark如何从数据库提取数据 SparkSQL通用数据源加载(3)

SaveMode几种模式的区别

  • 1.常见数据格式保存
    • parquet

// 1.保存为parquet文件 //sanguoSQLDF.write.mode(SaveMode.Append).parquet("./dataset/output/sanguo.parquet") sanguoSQLDF.write.format("parquet").mode(SaveMode.Append).save("./dataset/output/sanguo.parquet")

    • json

//2.保存为json文件 // sanguoSQLDF.write.mode(SaveMode.Overwrite).json("./dataset/output/sanguo.json") sanguoSQLDF.write.format("json").mode(SaveMode.Overwrite).save("./dataset/output/sanguo.json")

    • csv

//3.保存为csv文件 // sanguoSQLDF.write.mode(SaveMode.Ignore).csv("./dataset/output/sanguo.csv") sanguoSQLDF.write.format("csv").mode(SaveMode.Ignore).save("./dataset/output/sanguo.csv")

    • jdbc(以MySQL)为例

//4.保存为jdbc数据库,以MySQL为例 // sanguoSQLDF.write.mode(SaveMode.ErrorIfExists) // .option("driver","com.mysql.jdbc.Driver") // .option("url","jdbc:mysql://node5:3306/spark") // .option("dbtable","t_sanguo") // .option("user","root") // .option("password","Love=-.,me1314") // .save() sanguoSQLDF.write.format("jdbc") .option("driver","com.mysql.jdbc.Driver") .option("url","jdbc:mysql://node5:3306/spark") .option("dbtable","t_sanguo") .option("user","root") .option("password","Love=-.,me1314") .mode(SaveMode.Overwrite) .save()

  • 2. 四种SaveMode验证与详细讲解
    • SaveMode.ErrorIfExists: 假设数据库中开始没有表,执行下面代码两次:

package com.dvtn.spark.sql.load import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} object SparkSQLGenericLoad { def main(args: Array[String]): Unit = { //创建SparkSQL通往集群的切入点 val spark: SparkSession = SparkSession.builder() .appName(SparkSQLGenericLoad.getClass.getSimpleName) .master("local") .getOrCreate() //设置log的运行级别为"WARN" spark.sparkContext.setLogLevel("WARN") /** * 默认加载数据源格式:parquet * 在SparkSQL中,默认的加载的数据是parquet * 由配置项spark.sql.sources.default选项配置 */ //SparkSQL中读取时并没有指定数据源加载的option选项,默认读取的数据格式是parquet //由配置项spark.sql.sources.default选项配置 //val sanguoDF: DataFrame = spark.read.load("./dataset/parquet/sanguo.parquet") //sanguoDF.show() println("----------------------------------------------------------------------------") //也可以直接使用SQL查询 val sanguoSQLDF = spark.sql("SELECT * FROM parquet.`./dataset/parquet/sanguo.parquet`") /** * 数据的保存 */ // // 1.保存为parquet文件 // //sanguoSQLDF.write.mode(SaveMode.Append).parquet("./dataset/output/sanguo.parquet") // sanguoSQLDF.write.format("parquet").mode(SaveMode.Append).save("./dataset/output/sanguo.parquet") // // //2.保存为json文件 // // sanguoSQLDF.write.mode(SaveMode.Overwrite).json("./dataset/output/sanguo.json") // sanguoSQLDF.write.format("json").mode(SaveMode.Overwrite).save("./dataset/output/sanguo.json") // // //3.保存为csv文件 // // sanguoSQLDF.write.mode(SaveMode.Ignore).csv("./dataset/output/sanguo.csv") // sanguoSQLDF.write.format("csv").mode(SaveMode.Ignore).save("./dataset/output/sanguo.csv") //4.保存为jdbc数据库,以MySQL为例 // sanguoSQLDF.write.mode(SaveMode.ErrorIfExists) // .option("driver","com.mysql.jdbc.Driver") // .option("url","jdbc:mysql://node5:3306/spark") // .option("dbtable","t_sanguo") // .option("user","root") // .option("password","Love=-.,me1314") // .save() sanguoSQLDF.write.format("jdbc") .option("driver","com.mysql.jdbc.Driver") .option("url","jdbc:mysql://node5:3306/spark") .option("dbtable","t_sanguo") .option("user","root") .option("password","Love=-.,me1314") .mode(SaveMode.ErrorIfExists) .save() //释放资源 spark.stop() } }

      • 运行第一次,在MySQL中创建了表:t_sanguo

spark如何从数据库提取数据 SparkSQL通用数据源加载(4)

SaveMode.ErrorIfExists

      • 运行第二次,因为该表在MySQL数据库已存在,此时依然用SaveMode.ErrorIfExists, 会报错,说明SaveMode.ErrorIfExists表明如果数据不存在, 会保存数据; 如果数据存在,会报错。

spark如何从数据库提取数据 SparkSQL通用数据源加载(5)

SaveMode.ErrorIfExists表明如果数据不存在会保存数据,如果数据存在,会报错

    • SaveMode.Overwrite: 通过下面的实验证明,SaveMode.Overwrite会把原表的数据全部覆盖写入, 就是类似于drop/recreate
      • 在第一次保存在数据的表上手动添加一条记录:

spark如何从数据库提取数据 SparkSQL通用数据源加载(6)

      • 然后修改代码SaveMode.Overwrite, 再次运行:

spark如何从数据库提取数据 SparkSQL通用数据源加载(7)

SaveMode.Overwrite会覆盖原表数据

  • SaveMode.Append: SaveMode.Append会在原有数据下面追加写入
    • MySQL数据表原始记录如下:

spark如何从数据库提取数据 SparkSQL通用数据源加载(8)

MySQL现有表t_sanguo的记录

    • 修改代码中SaveMode.Append, 再次执行, 发现数据在后面追加写入:

spark如何从数据库提取数据 SparkSQL通用数据源加载(9)

SaveMode.Append会在原有数据下面追加写入

  • SaveMode.Ignore: 如果数据整个不存在,则写入;如果文件或表存在,则不执行任何操作
    • 在运行前,表中的数据如下:

spark如何从数据库提取数据 SparkSQL通用数据源加载(10)

原表数据

    • 修改代码中SaveMode为SaveMode.Ignore, 运行, 发现表中数据没有变化。
后话
  • 有Hive数据的加载与保存因为非常重要,后面会有专门章节详细讲解,在此先不展开。

本文所有文章都是原创,相当于个人在学习时整理的笔记分享到大家,欢迎大家阅读,如转载请标明出处。

如您觉得本人写的东西对您还有所帮助,请给予关注或点个赞,送人玫瑰,留有余香。我相信在您的鼓励下,本人会在后面将所有精华干货都慢慢分享出来。

本人在此非常感谢大家的理解和支持!

,

免责声明:本文仅代表文章作者的个人观点,与本站无关。其原创性、真实性以及文中陈述文字和内容未经本站证实,对本文以及其中全部或者部分内容文字的真实性、完整性和原创性本站不作任何保证或承诺,请读者仅作参考,并自行核实相关内容。文章投诉邮箱:anhduc.ph@yahoo.com

    分享
    投诉
    首页