博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
2.sparkSQL--DataFrames与RDDs的相互转换
阅读量:7247 次
发布时间:2019-06-29

本文共 3801 字,大约阅读时间需要 12 分钟。

Spark SQL支持两种RDDs转换为DataFrames的方式
使用反射获取RDD内的Schema
    当已知类的Schema的时候,使用这种基于反射的方法会让代码更加简洁而且效果也很好。
通过编程接口指定Schema
    通过Spark SQL的接口创建RDD的Schema,这种方式会让代码比较冗长。
    这种方法的好处是,在运行时才知道数据的列以及列的类型的情况下,可以动态生成Schema。

原文和作者一起讨论:

微信:intsmaze

使用反射获取Schema(Inferring the Schema Using Reflection)
import org.apache.spark.sql.{DataFrameReader, SQLContext}import org.apache.spark.{SparkConf, SparkContext}object InferringSchema {  def main(args: Array[String]) {    //创建SparkConf()并设置App名称    val conf = new SparkConf().setAppName("SQL-intsmaze")    //SQLContext要依赖SparkContext    val sc = new SparkContext(conf)    //创建SQLContext    val sqlContext = new SQLContext(sc)    //从指定的地址创建RDD    val lineRDD = sc.textFile("hdfs://192.168.19.131:9000/person.tzt").map(_.split(","))    //创建case class    //将RDD和case class关联    val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))    //导入隐式转换,如果不导入无法将RDD转换成DataFrame    //将RDD转换成DataFrame    import sqlContext.implicits._    val personDF = personRDD.toDF    //注册表    personDF.registerTempTable("intsmaze")    //传入SQL    val df = sqlContext.sql("select * from intsmaze order by age desc limit 2")    //将结果以JSON的方式存储到指定位置    df.write.json("hdfs://192.168.19.131:9000/personresult")    //停止Spark Context    sc.stop()  }}//case class一定要放到外面case class Person(id: Int, name: String, age: Int)
spark shell中不需要导入sqlContext.implicits._是因为spark shell默认已经自动导入了。
打包提交到yarn集群:
/home/hadoop/app/spark/bin/spark-submit --class InferringSchema \--master yarn \--deploy-mode cluster \--driver-memory 512m \--executor-memory 512m \--executor-cores 2 \--queue default \/home/hadoop/sparksql-1.0-SNAPSHOT.jar

 

通过编程接口指定Schema(Programmatically Specifying the Schema)

当JavaBean不能被预先定义的时候,编程创建DataFrame分为三步:

从原来的RDD创建一个Row格式的RDD.

创建与RDD中Rows结构匹配的StructType,通过该StructType创建表示RDD的Schema.

通过SQLContext提供的createDataFrame方法创建DataFrame,方法参数为RDD的Schema.

 

import org.apache.spark.sql.{Row, SQLContext}import org.apache.spark.sql.types._import org.apache.spark.{SparkContext, SparkConf}object SpecifyingSchema {  def main(args: Array[String]) {    //创建SparkConf()并设置App名称    val conf = new SparkConf().setAppName("SQL-intsmaze")    //SQLContext要依赖SparkContext    val sc = new SparkContext(conf)    //创建SQLContext    val sqlContext = new SQLContext(sc)    //从指定的地址创建RDD    val personRDD = sc.textFile(args(0)).map(_.split(","))    //通过StructType直接指定每个字段的schema    val schema = StructType(      List(        StructField("id", IntegerType, true),        StructField("name", StringType, true),        StructField("age", IntegerType, true)      )    )    //将RDD映射到rowRDD    val rowRDD = personRDD.map(p => Row(p(0).toInt, p(1).trim, p(2).toInt))    //将schema信息应用到rowRDD上    val personDataFrame = sqlContext.createDataFrame(rowRDD, schema)    //注册表    personDataFrame.registerTempTable("intsmaze")    //执行SQL    val df = sqlContext.sql("select * from intsmaze order by age desc ")    //将结果以JSON的方式存储到指定位置    df.write.json(args(1))    //停止Spark Context    sc.stop()  }}
将程序打成jar包,上传到spark集群,提交Spark任务
/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \ --master yarn \ --deploy-mode cluster \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 2 \ --queue default \ /home/hadoop/sparksql-1.0-SNAPSHOT.jar \ hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

 

/home/hadoop/app/spark/bin/spark-submit --class SpecifyingSchema \--master yarn \--deploy-mode client \--driver-memory 512m \--executor-memory 512m \--executor-cores 2 \--queue default \/home/hadoop/sparksql-1.0-SNAPSHOT.jar \hdfs://192.168.19.131:9000/person.txt hdfs://192.168.19.131:9000/intsmazeresult

maven项目的pom.xml中添加Spark SQL的依赖

  
org.apache.spark
  
spark-sql_2.10
  
1.6.2

 

转载地址:http://rjnbm.baihongyu.com/

你可能感兴趣的文章
求两个数的最大公约数(辗转相除法)
查看>>
Linux 中gdb调试工具的使用
查看>>
设计模式系列 - 策略模式
查看>>
Windows 2012R2安装KB2919355失败
查看>>
系统集成网络工程师所具备的知识
查看>>
正则表达式
查看>>
Vue.js学习笔记: 插值
查看>>
linux常用命令
查看>>
WooCommerce 支付宝扫码支付与银行直连
查看>>
mysql慢查询日志
查看>>
Office 365系列之九:使用Windows PowerShell管理O365平台
查看>>
CenOS 6.0配置本地yum源
查看>>
小作文_通知和备忘录
查看>>
06-Windows Server 2012 R2 会话远程桌面-标准部署-RD网关(RemoteApp)
查看>>
Alcatel 7750 常用维护命令
查看>>
mysql AUTO_INCREMENT 字符 删除后重1开始
查看>>
IOS应用从容地崩溃
查看>>
CMS之图片管理(1)
查看>>
vue组件
查看>>
生活热水循环泵选型怎么选,如何选型计算?
查看>>