Spark SQL

SparkSQL前身 Shark
        基于MapReduce的SQL执行引擎是Hive,而Shark是基于Spark计算框架之上且兼容Hive语法的SQL执行引擎,
        由于底层的计算采用了Spark,性能比MapReduce的Hive普遍快2倍以上,当数据全部load在内存的话,将快10倍以上,
        因此Shark可以作为交互式查询应用服务来使用。除了基于Spark的特性外,Shark是完全兼容Hive的语法,表结构以及UDF函数等,已有的HiveSql可以直接进行迁移至Shark上,Shark底层依赖于Hive的解析器,查询优化器。
        但正是由于SHark的整体设计架构对Hive的依赖性太强,难以支持其长远发展,比如不能和Spark的其他组件进行很好的集成,无法满足Spark的一栈式解决大数据处理的需求。
一、Spark SQL
1、SparkSQL介绍
        Hive是Shark的前身,Shark是SparkSQL的前身,SparkSQL产生的根本原因是其完全脱离了Hive的限制。
        SparkSQL支持查询原生的RDD。 RDD是Spark平台的核心概念,是Spark能够高效的处理大数据的各种场景的基础。
        能够在代码中写SQL语句。支持简单的SQL语法检查,能够在代码中写Hive语句访问Hive数据,并将结果取回作为RDD使用。    
       

2、Spark on Hive和Hive on Spark

        Spark on Hive: Hive只作为储存角色,Spark负责sql解析优化,执行计算。

        Hive on Spark:Hive即作为存储又负责sql的解析优化(使用HiveSQL解析器),Spark负责执行计算。Hive 2.X支持Hive on Spark

3、DataFrame分布式数据框

        

        DataFrame 也是一个分布式数据框(容器)。与RDD类似,然而DataFrame更像传统数据库的二维表格,除了数据以外,还掌握数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。

        从API易用性的角度上 看, DataFrame API提供的是一套高层的关系操作(即SQL操作),比函数式的RDD API要更加友好,门槛更低。

        DataFrame的底层是一个个的RDD,只不过 RDD的泛型是Row类型 DataFrame = RDD<Row>

一般来说,都是优先使用DataFrames而不直接使用RDD,因为DataFrames会优化你的执行计划,而RDD则是忠实的按照你代码生成执行计划,并且spark sql 中提供很多方法便利地从多种数据源生成DataFrames。
4、SparkSQL的数据源
        SparkSQL的数据源可以是JSON类型的字符串,JDBC,Parquent,Hive,HDFS等。
        
5、SparkSQL执行流程
        首先拿到sql后解析一批未被解决的逻辑计划,再经过分析得到分析后的逻辑计划,再经过一批优化规则转换成一批最佳优化的逻辑计划,再经过SparkPlanner的策略转化成一批物理计划,随后经过消费模型转换成一个个的Spark任务 执行。
        
6. 断言下推(predicate Pushdown)
        就是即使你写了未优化的SQL代码,SparkSQL会自动帮你优化执行流程。
        如下图本来是先 join 再 filter,自动优化成先 filter 再 join 增加执行效率。
    

二、创建DF的方式

        或者说SparkSQL能够接受的数据源

首先

    Spark SQL 有一个自己的上下文 SQLContext,对于集群的操作需要通过次上下文

JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);

1、json格式的文件

            spark会自动推测 列名,注册成临时表时,表中的列默认按ascii顺序显示列。

            不能使嵌套格式的json,也就是 {} 里不能有 {}

DataFrame df = sqlContext.read().json("people.json");


DataFrame df = sqlContext.read().format("json").load("people.json");
df.registerTempTable("p"); //注册成临时表

2、json格式的RDD

          创建一个 RDD<String> 类型的RDD,其中 String 为 json 格式 字符串

            再通过 sqlContext.read().json( rdd ) 方法得到 DataFrame

List<String> scoreList = Arrays.asList(
				"{'name':'zhangsan','score':100}",
				"{'name':'lisi','score':99}" );
JavaRDD<String> scoreRDD = sc.parallelize(scoreList);
DataFrame scoreDF = sqlContext.read().json(scoreRDD);

3、parquet格式文件

        读取parquet文件会自动推测分区: 即路径指定一个文件夹,会将 key=value 格式的文件夹,会将 列作为key

DataFrame usersDF = sqlContext.read().format("parquet").load("users.parquet");

导出parquet格式文件

        SaveMode指定文件保存时的模式。

resultDF.write().format("json").mode(SaveMode.Ignore).save("result.json"); // SaveMode.Overwrite
resultDF.write().format("json").save("result.json");


4、普通的RDD转成DF

    ①  反射

                对一个RDD,通过传入的自定义类的字段生成schema,创建dataframe

                自定义类注意点:    

                            类与字段修饰符 public

                            实现序列化 Serializable 接口 

public class Person implements Serializable{
	
	private Integer id;
	private String name;
	private Integer age;

.....
JavaRDD<String> lines = sc.textFile("Peoples.txt");

JavaRDD<Person> personsRdd = lines.map(new Function<String, Person>() {

	@Override
	public Person call(String line) throws Exception {
		String[] split = line.split(",");
		Person p = new Person();
		p.setId(Integer.valueOf(split[0].trim()));
		p.setName(split[1]);
		p.setAge(Integer.valueOf(split[2].trim()));
		return p;
	}
	
});

//传入进去Person.class的时候,sqlContext是通过反射的方式创建DataFrame
//在底层通过反射的方式或得Person的所有field,结合RDD本身,就生成了DataFrame
DataFrame df = sqlcontext.createDataFrame(personsRdd, Person.class);

    dataFrame 转 rdd<row>  

            这是要注意在 Java  row.getInt(index)  , index跟person类中 字段的定义顺序  无关  ,而是 字段按ascii码排序  的顺序。

                                   scala 中,则 index 跟 person类中 字段的定义顺序  一致  

            推荐使用 row.getAs(filedName) 

// dataFrame to rdd
JavaRDD<Row> rdd = resultDataFrame.javaRDD();
JavaRDD<Person> pRdd = rdd.map(new Function<Row, Person>() {

	private static final long serialVersionUID = 1L;

	@Override
	public Person call(Row row) throws Exception {
		int age = row.getInt(0);
		int id = row.getInt(1);
		String name = row.getString(2);
//		String name = row.getAs("name")
		Person person = new Person(id, name, age);
		return person;
	}
});

       反射的Scala代码:

val conf = new SparkConf()
conf.setMaster("local").setAppName("rddreflect")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lineRDD = sc.textFile("./sparksql/person.txt")
/**
 * 将RDD隐式转换成DataFrame
 */
import sqlContext.implicits._

val personRDD = lineRDD.map { x =>{
          val person = Person(x.split(",")(0),x.split(",")(1),Integer.valueOf(x.split(",")(2)))
          person
    }
}
val df = personRDD.toDF(); // RDD隐式转换成DataFrame
df.show()

/**
 * 将DataFrame转换成PersonRDD
 */
val rdd = df.rdd
val result = rdd.map { x => {
          Person(x.getAs("id"),x.getAs("name"),x.getAs("age"))
    }
}
result.foreach { println}
sc.stop()

 ② 动态创建schema的方式

            在RDD的基础上创建类型为Row的RDD

            动态构造DataFrame的元数据,用于构建StructType(DataFrame元数据的描述)

            基于已有的MetaData以及RDD<Row> 调用 createDataFrame(rowRDD : RDD[Row], schema : StructType)  来构造DataFrame

            一般而言,有多少列以及每列的具体类型可能来自于Json,也可能来自于DB(将schema存储在数据库

             动态创建比反射常用,因为dataframe的schema会经常变换,动态创建更灵活

SQLContext sqlcontext = new SQLContext(sc);
/**
 * 在RDD的基础上创建类型为Row的RDD
 */
JavaRDD<String> lines = sc.textFile("Peoples.txt");
JavaRDD<Row> rowRDD = lines.map(new Function<String, Row>() {

	@Override
	public Row call(String line) throws Exception {
		String[] split = line.split(",");
		return RowFactory.create(Integer.valueOf(split[0]),split[1],Integer.valueOf(split[2]));
	}
});

/**
 * 动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于Json,也可能来自于DB
 */
ArrayList<StructField> structFields = new ArrayList<StructField>();
structFields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
//构建StructType,用于最后DataFrame元数据的描述
StructType schema = DataTypes.createStructType(structFields);

/**
 * 基于已有的MetaData以及RDD<Row> 来构造DataFrame
 */
DataFrame df = sqlcontext.createDataFrame(rowRDD, schema);

        row.getInt(index),索引依据于 structFields  中 structField 的添加顺序

5、SparkSQL读取 jdbc 中的数据

    ①sqlContext.read().format("jdbc") .options(MapProperties).load();

 Map<String, String> options = new HashMap<String, String>();
 options.put("url", "jdbc:mysql://hadoop1:3306/testdb");
 options.put("driver", "com.mysql.jdbc.Driver"); 
 options.put("user","spark");
 options.put("password", "spark2016");
 options.put("dbtable", "student_info"); 
 DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();
  
 options.put("dbtable", "student_score"); 
 DataFrame studentScoresDF = sqlContext.read().format("jdbc") .options(options).load();

    sqlContext.read().format("jdbc") .options(PropertyName, PropertyValue).load();

DataFrameReader reader = sqlContext.read().format("jdbc");
reader.option("url", "jdbc:mysql://hadoop1:3306/testdb");
reader.option("dbtable", "student_info");
reader.option("driver", "com.mysql.jdbc.Driver");
reader.option("user", "spark");
reader.option("password", "spark2016");
DataFrame studentInfosDF = reader.load();

reader.option("dbtable", "student_score");
DataFrame studentScoresDF = reader.load();

6、SparkSQL读取hive中的数据 

     Spark on Hive 配置

               1.    在Spark客户端安装包下 spark-1.6.0/conf 中创建文件 hive-site.xml: 配置hive的metastore路径

<configuration>
   <property>
        <name>hive.metastore.uris</name>
        <value>thrift://node1:9083</value>
   </property>
</configuration>

                2. 启动Hive的metastore服务

hive --service metastore start

                3. 启动zookeeper集群,启动HDFS集群

                4. 启动SparkShell 读取Hive中的表总数,对比hive中查询同一表查询总数测试时间。

./spark-shell
import org.apache.spark.sql.hive.HiveContext
val hc = new HiveContext(sc)
hc.sql("show databases").show
hc.sql("user default").show
hc.sql("select count(*) from jizhan").show

                注意:

                            如果使用Spark on Hive  查询数据时,出现错误: 

                            找不到HDFS集群路径,要在客户端机器conf/spark-env.sh中设置 hadoop-conf 的路径:

export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop

     代码:

               通过创建 HiveContext  对象 , 在通过 hiveContext.sql( hivesql ) 执行 hivesql

//HiveContext是SQLContext的子类。
SQLContext hiveContext = new HiveContext(sc);
//删除hive中的student_infos表
hiveContext.sql("DROP TABLE IF EXISTS student_infos");


二、DataFrame的API

show

        默认只是显示这个DF里面的前十行数据   

        show(numRows),指定行数

        类似Spark core里面的action类算子

df.show();

select

df.select("age").show();
		 
//SELECT name,age+10 as plusAge FROM table;
df.select(df.col("name"),df.col("age").plus(10).as("plusAge")).show();

filter

//SELECT * FROM p WHERE age > 10
df.filter(df.col("age").gt(10)).show();

groupBy

//SELECT COUNT(*) FROM p GROUP BY age
df.groupBy(df.col("age")).count().show(); 

Join

/**
 * SELECT nameTable.name,nameTable.age,scoreTable.score
* 		FROM nameTable JOIN nameTable ON (nameTable.name == scoreTable.name)
*/
nameDF.join(scoreDF, nameDF.col("name").$eq$eq$eq(scoreDF.col("name")))


SQL

        直接传入sql,需要先将DataFrame注册成一张临时表

        registerTempTable(tableName)

 /**
  * 将DataFrame注册成一张临时表
  * p这个表,会物化到磁盘吗? 这个表只是逻辑上的。
  */
 df.registerTempTable("p");

           sqlContext.sql("")

sqlContext.sql("SELECT * FROM p WHERE age > 10").show();

           Join例子:

	SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkSQLwithJoin");
	JavaSparkContext sc = new JavaSparkContext(conf);
	SQLContext sqlcontext = new SQLContext(sc);
	DataFrame peopleDF = sqlcontext.read().json("student.json");
	// 基于Json构建的DataFrame来注册临时表
	peopleDF.registerTempTable("peopleTable");
	// 查询出年龄大于xx的人
	DataFrame peopleScore = sqlcontext.sql("select name,score from peopleTable where score > 70");
	// 在DataFrame的基础上转化成RDD,通过Map操作计算出分数大于90的所有人的姓名
	List<String> peopleList = peopleScore.javaRDD().map(new Function<Row, String>() {
		@Override
		public String call(Row row) throws Exception {
			return row.getAs("name");
		}
	}).collect();

	// 动态组拼出Json
	List<String> peopleInfomation = new ArrayList<String>();
	peopleInfomation.add("{\"name\":\"Michael\",\"age\":20}");
	peopleInfomation.add("{\"name\":\"Andy\",\"age\":17}");
	peopleInfomation.add("{\"name\":\"Justin\",\"age\":19}");

	// 通过内容为Json的RDD来构造DataFrame
	JavaRDD<String> peopleInfomationRDD = sc.parallelize(peopleInfomation);
	DataFrame peopleInfomationDF = sqlcontext.read().json(peopleInfomationRDD);
	peopleInfomationDF.registerTempTable("peopleInfomation");

	String sql = "select name,age from peopleInfomation where name in (";
	for (String string : peopleList) {
		sql += "'" + string + "',";
	}
	sql = sql.substring(0, sql.length() - 1);
	sql += ")";
	DataFrame execellentNameAgeDF = sqlcontext.sql(sql);
	JavaPairRDD<String, Tuple2<Integer, Integer>> resultRDD = peopleScore.javaRDD()
			.mapToPair(new PairFunction<Row, String, Integer>() {

				@Override
				public Tuple2<String, Integer> call(Row row) throws Exception {
					// TODO Auto-generated method stub
					return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")),
							Integer.valueOf(String.valueOf(row.getAs("score"))));
				}
			}).join(execellentNameAgeDF.javaRDD().mapToPair(new PairFunction<Row, String, Integer>() {

				@Override
				public Tuple2<String, Integer> call(Row row) throws Exception {
					return new Tuple2<String, Integer>(String.valueOf(row.getAs("name")),
							Integer.valueOf(String.valueOf(row.getAs("age"))));
				}
			}));
	
	JavaRDD<Row> resultRowRDD = resultRDD.map(new Function<Tuple2<String,Tuple2<Integer,Integer>>, Row>() {

		@Override
		public Row call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
			// TODO Auto-generated method stub
			return RowFactory.create(tuple._1,tuple._2._1,tuple._2._2);
		}
	});
	
	/**
	 * 动态构造DataFrame的元数据,一般而言,有多少列以及每列的具体类型可能来自于Json,也可能来自于DB
	 */
	ArrayList<StructField> structFields = new ArrayList<StructField>();
	structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
	structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
	structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
	//构建StructType,用于最后DataFrame元数据的描述
	StructType structType = DataTypes.createStructType(structFields);
	
	/**
	 * 基于已有的MetaData以及RDD<Row> 来构造DataFrame
	 */
	DataFrame df = sqlcontext.createDataFrame(resultRowRDD, structType);
	df.show();
	
	//方法2:直接在sql语句中join
	DataFrame sql2 = sqlcontext.sql("select * from peopleTable,peopleInfomation where peopleInfomation.name = peopleTable.name and peopleTable.score>70");
	sql2.show();
}


三、算子

        dataFrame也可以执行算子

        执行 map、mapPartitions、flatMap 会返回 RDD<row>

        dataFrame 转 rdd

JavaRDD<Row> rowRDD = df.rdd()

JavaRDD<Row> rowRDD = df. javaRDD()


添加新评论