如何在 spark sql 中创建永久 table

how to create permanent table in spark sql

在我的项目中,我将数据从 MongoDB 传输到 SparkSQL table,用于基于 SQL 的查询。但是 Spark SQL 让我创建临时文件。当我想查询一些东西时,执行时间非常长,因为数据传输和映射操作需要太多时间。

那么,我可以减少执行时间吗?我可以创建永久性 Spark SQL tables 吗?我可以用 JDBC 查询永久 tables 吗?

我正在添加我的代码和执行时间结果。我在独立模式下做所有事情。

package com.mongodb.spark.sql;

import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.bson.BSONObject;

import com.mongodb.hadoop.MongoInputFormat;
import com.mongodb.spark.demo.Observation;
import com.mongodb.spark.demo.Sensor;

import scala.Tuple2;

public class SparkSqlMongo {

public static void main(String[] args) {

    Configuration conf = new Configuration();

    conf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
    conf.set("mongo.input.uri", "mongodb://localhost:27017/test.observations");

    Configuration sensConf = new Configuration();

    sensConf.set("mongo.job.input.format", "com.mongodb.hadoop.MongoInputFormat");
    sensConf.set("mongo.input.uri", "mongodb://localhost:27017/test.sens");

    SparkConf sconf = new SparkConf().setMaster("local[2]").setAppName("SQL DENEME").set("nsmc.connection.host",
            "mongodb:");

    JavaSparkContext sc = new JavaSparkContext(sconf);
    SQLContext sql = new SQLContext(sc);

    JavaRDD<Observation> obs = sc.newAPIHadoopRDD(conf, MongoInputFormat.class, Object.class, BSONObject.class)
            .map(new Function<Tuple2<Object, BSONObject>, Observation>() {

                private static final long serialVersionUID = 1L;

                @Override
                public Observation call(Tuple2<Object, BSONObject> v1) throws Exception {

                    int id = (int) v1._2.get("_id");
                    double value = (double) v1._2.get("Value");
                    // Date time = (Date) v1._2.get("Time");
                    int sensor = (int) v1._2.get("SensorId");
                    int stream = (int) v1._2.get("DataStreamId");

                    Observation obs = new Observation(id, value, sensor, stream);
                    return obs;

                }
            });

    DataFrame obsi = sql.createDataFrame(obs, Observation.class);

    obsi.registerTempTable("obsi");

    JavaRDD<Sensor> sens = sc.newAPIHadoopRDD(sensConf, MongoInputFormat.class, Object.class, BSONObject.class)
            .map(new Function<Tuple2<Object, BSONObject>, Sensor>() {

                /**
                 * 
                 */
                private static final long serialVersionUID = 1L;

                @Override
                public Sensor call(Tuple2<Object, BSONObject> v1) throws Exception {

                    int id = (int) v1._2.get("_id");
                    String name = (String) v1._2.get("Name");
                    String description = (String) v1._2.get("Description");

                    Sensor s = new Sensor(id, name, description);

                    System.out.println(s.getName());
                    return s;

                }
            });

    DataFrame sensi = sql.createDataFrame(sens, Sensor.class);

    sensi.registerTempTable("sensi");

    sensi.show();

    long start = System.currentTimeMillis();

    DataFrame obser = sql
            .sql("SELECT obsi.value, obsi.id, sensi.name FROM obsi, sensi WHERE obsi.sensorID = sensi.id  and sensi.id = 107")
            .cache();
    long stop = System.currentTimeMillis();

    // System.out.println("count ====>>> " + a.toString());
    System.out.println("toplam sorgu zamani : " + (stop - start));
    ;
    //
    // while(!obser.equals(null)){
    // System.out.println(obser);
    // }

    List<String> names = obser.javaRDD().map(new Function<Row, String>() {

        private static final long serialVersionUID = 1L;

        public String call(Row row) {

            // System.out.println(row);
            // System.out.println("value : " + row.getDouble(0) + " id : " +
            // row.getInt(1) + " name : " + row.getString(0));
            return "Name: " + row;
        }
    }).collect();

}

}

对于大约5M的观察和1K的sns数据,所有执行时间大约为120秒。我加入了这些 tables 并且这个执行时间非常高并且 unacceptable.

Spark SQL 不是数据库,只要创建它们的 spark 上下文可用,其中发生的数据操作就会存在。有几种 spark 作业服务器实现可以让您保存一个作业的结果并针对同一数据集发送其他作业。如果服务器(即 spark 上下文)关闭,它仍然是暂时的并且必须重新加载

那是说您可以保留计算结果并稍后检索(返回 Mongo,存入 Hadoop/other 文件系统上的文件)

  1. 是的,您可以通过 Caching your Table、Dataframe 或 Rdd 来缩短程序执行时间。
  2. 而且,如果您想将数据保存为永久 table,您可以使用 df.saveAsTable 方法,但应通过 HiveContext 创建数据框。
  3. 对于 JDBC 连接,您需要启动 Thrift service 然后您可以在寄存器 tables 上执行 Spark Sql