将 Spark Dataframe 保存到 Elasticsearch - 无法处理类型异常
Save Spark Dataframe into Elasticsearch - Can’t handle type exception
我设计了一个简单的作业来从 MySQL 读取数据并使用 Spark 将其保存在 Elasticsearch 中。
代码如下:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
你可以看到代码非常简单。它将数据读入 DataFrame,选择一些列,然后执行 count
作为对 Dataframe 的基本操作。到目前为止一切正常。
然后它尝试将数据保存到Elasticsearch 中,但由于无法处理某些类型而失败。可以看到错误日志here.
我不确定为什么它不能处理那种类型。 有人知道为什么会这样吗?
我正在使用 Apache Spark 1.5.0、Elasticsearch 1.4.4 和 elaticsearch-hadoop 2.1.1
编辑:
- 我已经用示例数据集和源代码更新了要点 link。
- 我也尝试使用邮件列表中@costin 提到的 elasticsearch-hadoop dev builds。
这个问题的答案很棘手,但多亏了 samklr,我设法弄清楚了问题所在。
尽管如此,解决方案并不简单,可能会考虑一些“不必要的”转换。
首先来说说序列化。
Spark数据序列化和函数序列化需要考虑序列化的两个方面。在这种情况下,它是关于数据序列化和反序列化的。
从 Spark 的角度来看,唯一需要做的就是设置序列化 - Spark 默认依赖 Java 序列化,这很方便但效率很低。这就是为什么 Hadoop 本身引入了自己的序列化机制和自己的类型——即 Writables
的原因。因此,InputFormat
和 OutputFormats
是 return Writables
所必需的,Spark 开箱即用无法理解。
对于 elasticsearch-spark 连接器,必须启用一种不同的序列化 (Kryo),它可以自动处理转换,而且效率很高。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
即使 Kryo 不要求 class 实现要序列化的特定接口,这意味着 POJO 可以在 RDD 中使用,除了启用 Kryo 序列化之外无需任何进一步的工作。
也就是说,@samklr 向我指出 Kryo 在使用它们之前需要注册 classes。
这是因为Kryo写了一个对被序列化对象的class的引用(每写一个对象就写一个引用),如果class已经被序列化,这只是一个整数标识符已注册,但为 class 的全名。 Spark 代表您注册 Scala classes 和许多其他框架 classes(如 Avro Generic 或 Thrift classes)。
在 Kryo 注册 classes 很简单。创建 KryoRegistrator 的子class,并覆盖registerClasses()
方法:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
最后,在您的驱动程序中,将 spark.kryo.registrator 属性 设置为您的 KryoRegistrator 实现的完全限定 class 名称:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
其次,即使设置了 Kryo 序列化器并注册了 class,对 Spark 1.5 进行了更改,但出于某种原因 Elasticsearch 无法 反序列化 Dataframe 因为它无法将 Dataframe 的 SchemaType
推断到连接器中。
所以我不得不将 Dataframe 转换为 JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
现在数据已准备好写入 elasticsearch :
JavaEsSpark.saveToEs(products, "test/test");
参考文献:
- Elasticsearch 的 Apache Spark 支持 documentation。
- Hadoop 权威指南,第 19 章。Spark,编辑。 4 – 汤姆·怀特。
- 用户samklr.
我设计了一个简单的作业来从 MySQL 读取数据并使用 Spark 将其保存在 Elasticsearch 中。
代码如下:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
你可以看到代码非常简单。它将数据读入 DataFrame,选择一些列,然后执行 count
作为对 Dataframe 的基本操作。到目前为止一切正常。
然后它尝试将数据保存到Elasticsearch 中,但由于无法处理某些类型而失败。可以看到错误日志here.
我不确定为什么它不能处理那种类型。 有人知道为什么会这样吗?
我正在使用 Apache Spark 1.5.0、Elasticsearch 1.4.4 和 elaticsearch-hadoop 2.1.1
编辑:
- 我已经用示例数据集和源代码更新了要点 link。
- 我也尝试使用邮件列表中@costin 提到的 elasticsearch-hadoop dev builds。
这个问题的答案很棘手,但多亏了 samklr,我设法弄清楚了问题所在。
尽管如此,解决方案并不简单,可能会考虑一些“不必要的”转换。
首先来说说序列化。
Spark数据序列化和函数序列化需要考虑序列化的两个方面。在这种情况下,它是关于数据序列化和反序列化的。
从 Spark 的角度来看,唯一需要做的就是设置序列化 - Spark 默认依赖 Java 序列化,这很方便但效率很低。这就是为什么 Hadoop 本身引入了自己的序列化机制和自己的类型——即 Writables
的原因。因此,InputFormat
和 OutputFormats
是 return Writables
所必需的,Spark 开箱即用无法理解。
对于 elasticsearch-spark 连接器,必须启用一种不同的序列化 (Kryo),它可以自动处理转换,而且效率很高。
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
即使 Kryo 不要求 class 实现要序列化的特定接口,这意味着 POJO 可以在 RDD 中使用,除了启用 Kryo 序列化之外无需任何进一步的工作。
也就是说,@samklr 向我指出 Kryo 在使用它们之前需要注册 classes。
这是因为Kryo写了一个对被序列化对象的class的引用(每写一个对象就写一个引用),如果class已经被序列化,这只是一个整数标识符已注册,但为 class 的全名。 Spark 代表您注册 Scala classes 和许多其他框架 classes(如 Avro Generic 或 Thrift classes)。
在 Kryo 注册 classes 很简单。创建 KryoRegistrator 的子class,并覆盖registerClasses()
方法:
public class MyKryoRegistrator implements KryoRegistrator, Serializable {
@Override
public void registerClasses(Kryo kryo) {
// Product POJO associated to a product Row from the DataFrame
kryo.register(Product.class);
}
}
最后,在您的驱动程序中,将 spark.kryo.registrator 属性 设置为您的 KryoRegistrator 实现的完全限定 class 名称:
conf.set("spark.kryo.registrator", "MyKryoRegistrator")
其次,即使设置了 Kryo 序列化器并注册了 class,对 Spark 1.5 进行了更改,但出于某种原因 Elasticsearch 无法 反序列化 Dataframe 因为它无法将 Dataframe 的 SchemaType
推断到连接器中。
所以我不得不将 Dataframe 转换为 JavaRDD
JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
public Product call(Row row) throws Exception {
long id = row.getLong(0);
String title = row.getString(1);
String description = row.getString(2);
int merchantId = row.getInt(3);
double price = row.getDecimal(4).doubleValue();
String keywords = row.getString(5);
long brandId = row.getLong(6);
int categoryId = row.getInt(7);
return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
}
});
现在数据已准备好写入 elasticsearch :
JavaEsSpark.saveToEs(products, "test/test");
参考文献:
- Elasticsearch 的 Apache Spark 支持 documentation。
- Hadoop 权威指南,第 19 章。Spark,编辑。 4 – 汤姆·怀特。
- 用户samklr.