Spark:读取 inputStream 而不是 File
Spark: Read an inputStream instead of File
我在 Java 应用程序中使用 SparkSQL 对 CSV 文件进行一些处理,使用 Databricks 进行解析。
我正在处理的数据来自不同的来源(远程 URL、本地文件、Google 云存储),我习惯于将所有内容都变成一个 InputStream,这样我可以在不知道数据来自哪里的情况下解析和处理数据。
我在 Spark 上看到的所有文档都是从路径读取文件,例如
SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);
DataFrame df = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("path/to/file.csv");
DataFrame dfGrouped = df.groupBy("varA","varB")
.avg("varC","varD");
dfGrouped.show();
我想做的是从 InputStream 中读取,或者甚至只是一个已经在内存中的字符串。类似于以下内容:
InputStream stream = new URL(
"http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
).openStream();
DataFrame dfRemote = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load(stream);
String someString = "imagine,some,csv,data,here";
DataFrame dfFromString = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.read(someString);
这里有什么我遗漏的简单的东西吗?
我已经阅读了一些关于 Spark Streaming 和自定义接收器的文档,但据我所知,这是为了打开一个将持续提供数据的连接。 Spark Streaming 似乎将数据分成块并对其进行一些处理,期望更多数据以无休止的流形式出现。
我最好的猜测是 Spark 作为 Hadoop 的后代,需要大量数据,这些数据可能驻留在某个文件系统中。但由于 Spark 无论如何都会在内存中进行处理,所以 SparkSQL 能够解析内存中已有的数据对我来说很有意义。
如有任何帮助,我们将不胜感激。
您至少可以使用四种不同的方法让您的生活更轻松:
使用您的输入流,写入本地文件(使用 SSD 快速),使用 Spark 读取。
为 S3 使用 Hadoop 文件系统连接器,Google Cloud Storage 并将所有内容转换为文件操作。 (这不会解决从任意 URL 读取的问题,因为没有用于此的 HDFS 连接器。)
将不同的输入类型表示为不同的 URI,并创建一个实用函数来检查 URI 并触发适当的读取操作。
与 (3) 相同,但使用 case 类 而不是 URI,并根据输入类型简单地重载。
我在 Java 应用程序中使用 SparkSQL 对 CSV 文件进行一些处理,使用 Databricks 进行解析。
我正在处理的数据来自不同的来源(远程 URL、本地文件、Google 云存储),我习惯于将所有内容都变成一个 InputStream,这样我可以在不知道数据来自哪里的情况下解析和处理数据。
我在 Spark 上看到的所有文档都是从路径读取文件,例如
SparkConf conf = new SparkConf().setAppName("spark-sandbox").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlc = new SQLContext(sc);
DataFrame df = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("path/to/file.csv");
DataFrame dfGrouped = df.groupBy("varA","varB")
.avg("varC","varD");
dfGrouped.show();
我想做的是从 InputStream 中读取,或者甚至只是一个已经在内存中的字符串。类似于以下内容:
InputStream stream = new URL(
"http://www.sample-videos.com/csv/Sample-Spreadsheet-100-rows.csv"
).openStream();
DataFrame dfRemote = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load(stream);
String someString = "imagine,some,csv,data,here";
DataFrame dfFromString = sqlc.read()
.format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.read(someString);
这里有什么我遗漏的简单的东西吗?
我已经阅读了一些关于 Spark Streaming 和自定义接收器的文档,但据我所知,这是为了打开一个将持续提供数据的连接。 Spark Streaming 似乎将数据分成块并对其进行一些处理,期望更多数据以无休止的流形式出现。
我最好的猜测是 Spark 作为 Hadoop 的后代,需要大量数据,这些数据可能驻留在某个文件系统中。但由于 Spark 无论如何都会在内存中进行处理,所以 SparkSQL 能够解析内存中已有的数据对我来说很有意义。
如有任何帮助,我们将不胜感激。
您至少可以使用四种不同的方法让您的生活更轻松:
使用您的输入流,写入本地文件(使用 SSD 快速),使用 Spark 读取。
为 S3 使用 Hadoop 文件系统连接器,Google Cloud Storage 并将所有内容转换为文件操作。 (这不会解决从任意 URL 读取的问题,因为没有用于此的 HDFS 连接器。)
将不同的输入类型表示为不同的 URI,并创建一个实用函数来检查 URI 并触发适当的读取操作。
与 (3) 相同,但使用 case 类 而不是 URI,并根据输入类型简单地重载。