具有包含前导零的字符串字段的数据集的镶木地板文件 returns 该字段没有前导零,如果它被它分割
A parquet file of a dataset having a String field containing leading zeroes returns that field without leading zeroes, if it is paritionned by it
我有一个 Dataset
收集有关法国城市的信息,
/**
* Obtenir un Dataset des communes.
* @param session Session Spark.
* @param anneeCOG Année du Code Officiel Géographique de référence.
* @param verifications Vérifications demandées.
* @return Dataset des communes.
* @throws TechniqueException si un incident survient.
*/
public Dataset<Row> rowCommunes(SparkSession session, int anneeCOG, Verification... verifications) throws TechniqueException {
String nomStore = "communes_par_codeCommune";
Dataset<Row> communes = loadFromStore(session, "{0}_{1,number,#0}", nomStore, anneeCOG, verifications);
if (communes != null) {
return communes;
}
LOGGER.info("Constitution du dataset des communes depuis pour le Code Officiel Géographique (COG) de l'année {}...", anneeCOG);
Dataset<Row> c = loadAndRenameCommmunesCSV(session, anneeCOG, false, verifications);
LOGGER.info("X01");
c.printSchema();
c.show(false);
Dataset<Row> s = this.datasetSirenCommunaux.rowSirenCommunes(session, anneeCOG, TriSirenCommunaux.CODE_COMMUNE);
LOGGER.info("X02");
s.printSchema();
s.show(false);
Column condition1 = c.col("codeCommune").equalTo(s.col("codeCommune"));
Column condition2 = c.col("codeCommuneParente").equalTo(s.col("codeCommune"));
verifications("jonction communes et siren par codeCommune", c, null, s, condition1, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
Dataset<Row> join1 = c.join(s, condition1)
.drop(s.col("codeCommune"))
.drop(s.col("nomCommune"))
.drop(s.col("codeRegion"))
.drop(s.col("codeDepartement"));
verifications("jonction communes et siren par codeCommune, join1", c, null, null, null, verifications);
verifications("jonction communes et siren par codeCommuneParente", c, null, s, condition2, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
Dataset<Row> join2 = c.join(s, condition2)
.drop(s.col("codeCommune"))
.drop(s.col("nomCommune"))
.drop(s.col("codeRegion"))
.drop(s.col("codeDepartement"));
verifications("jonction communes et siren par codeCommuneParente, join2", c, null, null, null, verifications);
communes = join1.union(join2);
LOGGER.info("X03");
communes.printSchema();
communes.show(false);
// La strate communale doit concorder avec celle des comptes individuels des communes.
communes = communes.withColumn("strateCommune",
when(s.col("populationTotale").between(0, 249), lit(1)) // communes de moins de 250 hab
.when(s.col("populationTotale").between(250, 499), lit(2)) // communes de 250 à 500 hab
.when(s.col("populationTotale").between(500, 1999), lit(3)) // communes de 500 à 2 000 hab
.when(s.col("populationTotale").between(2000, 3499), lit(4)) // communes de 2 000 à 3 500 hab
.when(s.col("populationTotale").between(3500, 4999), lit(5)) // communes de 3 500 à 5 000 hab
.when(s.col("populationTotale").between(5000, 9999), lit(6)) // communes de 5 000 à 10 000 hab
.when(s.col("populationTotale").between(10000, 19999), lit(7)) // communes de 10 000 à 20 000 hab
.when(s.col("populationTotale").between(20000, 49999), lit(8)) // communes de 20 000 à 50 000 hab
.when(s.col("populationTotale").between(50000, 99999), lit(9)) // communes de 50 000 à 100 000 hab
.otherwise(lit(10))); // communes de plus de 100 000 hab
// Obtenir les contours des communes.
// "(requête SQL) contours" est la forme de substitution pour Spark. cf
String format = "(select insee as codecommuneosm, nom as nomcommuneosm, surf_ha as surface2, st_x(st_centroid(geom)) as longitude, st_y(st_centroid(geom)) as latitude from communes_{0,number,#0}) contours";
String sql = MessageFormat.format(format, anneeCOG);
Dataset<Row> contours = sql(session, sql).load();
contours = contours.withColumn("surface", col("surface2").cast(DoubleType)).drop(col("surface2"))
.orderBy("codecommuneosm");
Column conditionJoinContours = col("codeCommune").equalTo(col("codecommuneosm"));
verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, contours, conditionJoinContours, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
communes = communes.join(contours, conditionJoinContours, "left_outer")
.drop(col("codecommuneosm")).drop(col("nomcommuneosm"));
verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, null, null, verifications);
LOGGER.info("X04");
communes.printSchema();
communes.show(false);
// Associer à chaque commune son code intercommunalité, si elle en a un (les communes-communautés peuvent ne pas en avoir).
Dataset<Row> perimetres = this.datasetPerimetres.rowPerimetres(session, anneeCOG, EPCIPerimetreDataset.TriPerimetresEPCI.CODE_COMMUNE_MEMBRE).selectExpr("sirenCommuneMembre", "sirenGroupement as codeEPCI", "nomGroupement as nomEPCI");
Column conditionJoinPerimetres = communes.col("sirenCommune").equalTo(perimetres.col("sirenCommuneMembre"));
verifications("jonction communes et périmètres", communes, null, perimetres, conditionJoinPerimetres, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
communes = communes.join(perimetres, conditionJoinPerimetres, "left");
LOGGER.info("X05");
communes.printSchema();
communes.show(false);
// Y associer les départements.
communes = this.datasetDepartements.withDepartement(session, "codeDepartementRetabli", communes, "codeDepartement", null, true, anneeCOG)
.drop("codeRegionDepartement");
LOGGER.info("X06");
communes.printSchema();
communes.show(false);
//communes = communes.repartition(col("codeDepartement"));
communes = communes.sortWithinPartitions(col("codeCommune"));
communes = communes.persist(); // Important : améliore les performances.
LOGGER.info("X07");
communes.printSchema();
communes.show(false);
saveToStore(communes, new String[] {/*"codeDepartement"*/}, "{0}_{1,number,#0}", nomStore, anneeCOG);
LOGGER.info("Le dataset des communes du Code Officiel Géographique de l'année {} est prêt et stocké.", anneeCOG);
return communes;
}
而困扰我的领域是一系(CodeDepartement
)。
当数据集未按此字符串字段 codeDepartement 分区时:一切正常
当该函数运行时,如果我不尝试对数据集进行分区(分区所需的语句已在此处注释),一切正常:
控制台显示的内容是这样的:
2022-01-18 08:47:50.995 INFO 25913 --- [nio-9090-exec-1] f.e.spark.dataset.cog.CogDataset : X07
root
|-- typeCommune: string (nullable = true)
|-- codeCommune: string (nullable = true)
|-- codeRegion: string (nullable = true)
|-- codeDepartement: string (nullable = true)
|-- CTCD: string (nullable = true)
|-- arrondissement: string (nullable = true)
|-- typeNomEtCharniere: string (nullable = true)
|-- nomMajuscules: string (nullable = true)
|-- nomCommune: string (nullable = true)
|-- LIBELLE: string (nullable = true)
|-- codeCanton: string (nullable = true)
|-- codeCommuneParente: string (nullable = true)
|-- sirenCommune: string (nullable = true)
|-- populationTotale: integer (nullable = true)
|-- populationMunicipale: integer (nullable = true)
|-- populationCompteApart: integer (nullable = true)
|-- strateCommune: integer (nullable = false)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- surface: double (nullable = true)
|-- sirenCommuneMembre: string (nullable = true)
|-- codeEPCI: string (nullable = true)
|-- nomEPCI: string (nullable = true)
|-- codeDepartementRetabli: string (nullable = true)
|-- codeCommuneChefLieuDepartement: string (nullable = true)
|-- typeNomEtCharniereDepartement: string (nullable = true)
|-- nomMajusculesDepartement: string (nullable = true)
|-- nomDepartement: string (nullable = true)
|-- libelleDepartement: string (nullable = true)
+-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+
|typeCommune|codeCommune|codeRegion|codeDepartement|CTCD|arrondissement|typeNomEtCharniere|nomMajuscules |nomCommune |LIBELLE |codeCanton|codeCommuneParente|sirenCommune|populationTotale|populationMunicipale|populationCompteApart|strateCommune|longitude |latitude |surface|sirenCommuneMembre|codeEPCI |nomEPCI |codeDepartementRetabli|codeCommuneChefLieuDepartement|typeNomEtCharniereDepartement|nomMajusculesDepartement|nomDepartement |libelleDepartement |
+-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+
|COM |02053 |32 |02 |02D |021 |0 |VALLEES EN CHAMPAGNE |Vallées en Champagne |Vallées en Champagne |0204 |null |200056307 |576 |570 |6 |3 |3.616431731194736 |49.007989478704616|4168.0 |200056307 |200072031|CA de la Région de Château-Thierry |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02057 |32 |02 |02D |023 |0 |BEAUREVOIR |Beaurevoir |Beaurevoir |0201 |null |210200564 |1433 |1415 |18 |3 |3.330083095945232 |49.996641228776475|2180.0 |210200564 |240200493|CC du Pays du Vermandois |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02070 |32 |02 |02D |025 |0 |BERNOT |Bernot |Bernot |0207 |null |210200697 |461 |448 |13 |2 |3.4866867514904327|49.87884521170864 |1672.0 |210200697 |200071983|CC Thiérache Sambre et Oise |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02090 |32 |02 |02D |024 |0 |BILLY SUR OURCQ |Billy-sur-Ourcq |Billy-sur-Ourcq |0221 |null |210200887 |220 |210 |10 |1 |3.2992043452730138|49.22146881666656 |1021.0 |210200887 |240200519|CC du Canton d'Oulchy le Château |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02291 |32 |02 |02D |023 |1 |ESTREES |Estrées |Estrées |0201 |null |210202743 |419 |411 |8 |2 |3.2807599426576335|49.97077968837569 |704.0 |210202743 |240200493|CC du Pays du Vermandois |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02527 |32 |02 |02D |024 |0 |MORSAIN |Morsain |Morsain |0220 |null |210205043 |464 |455 |9 |2 |3.1908402513250707|49.46152187457162 |1434.0 |210205043 |200071991|CC Retz en Valois |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |03098 |84 |03 |03D |031 |0 |DESERTINES |Désertines |Désertines |0310 |null |210300984 |4519 |4432 |87 |5 |2.627485866618113 |46.357035806459585|827.0 |210300984 |200071082|CA Montluçon Communauté |03 |03190 |5 |ALLIER |Allier |Allier |
|COM |04138 |93 |04 |04D |044 |0 |NIOZELLES |Niozelles |Niozelles |0406 |null |210401386 |284 |278 |6 |2 |5.847058475050684 |43.931981561717315|1061.0 |210401386 |240400440|CC Pays Forcalquier et Montagne de Lure|04 |04070 |4 |ALPES DE HAUTE PROVENCE |Alpes-de-Haute-Provence|Alpes-de-Haute-Provence|
|COM |05107 |93 |05 |05D |051 |0 |PUY SAINT ANDRE |Puy-Saint-André |Puy-Saint-André |0502 |null |210501078 |480 |457 |23 |2 |6.566597158877383 |44.891314199898126|1530.0 |210501078 |240500439|CC du Briançonnais |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes |
|COM |05163 |93 |05 |05D |052 |2 |SAUZE DU LAC |Sauze-du-Lac |Le Sauze-du-Lac |0504 |null |210501631 |149 |145 |4 |1 |6.323608116359499 |44.48601132516764 |1223.0 |210501631 |200067742|CC Serre-Ponçon |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes |
|COM |05176 |93 |05 |05D |052 |0 |VALSERRES |Valserres |Valserres |0514 |null |210501763 |271 |267 |4 |2 |6.1382720630480465|44.48971119562803 |1123.0 |210501763 |200067320|CC Serre-Ponçon Val d'Avance |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes |
|COM |07038 |84 |07 |07D |071 |0 |BORNE |Borne |Borne |0713 |null |210700381 |49 |49 |0 |1 |4.041460041609979 |44.61694459940681 |3199.0 |210700381 |200072007|CC de la Montagne d'Ardèche |07 |07186 |5 |ARDECHE |Ardèche |Ardèche |
|COM |09120 |76 |09 |09D |093 |0 |FABAS |Fabas |Fabas |0911 |null |210901203 |356 |352 |4 |2 |1.1143034959106055|43.11978973617488 |2327.0 |210901203 |200067940|CC Couserans-Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège |
|COM |09231 |76 |09 |09D |093 |2 |PORT |Port |Le Port |0903 |null |210902318 |160 |159 |1 |1 |1.3862330079391603|42.83569938864616 |5016.0 |210902318 |200067940|CC Couserans-Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège |
|COM |09276 |76 |09 |09D |092 |0 |SAINT VICTOR ROUZAUD |Saint-Victor-Rouzaud |Saint-Victor-Rouzaud |0907 |null |210902763 |232 |222 |10 |1 |1.5478697013903504|43.087886607776255|1270.0 |210902763 |200066231|CC des Portes d'Ariège Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège |
+-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+
调用了子函数saveToStore
,它们只是在做这个:
protected boolean saveToStore(Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) {
return saveToStore(this.tempDir, isCacheUtilise(), ds, colonnesPartionnement, format, args);
}
public static boolean saveToStore(String tempDir, boolean useCache, Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) {
if (useCache && ds.isEmpty() == false) {
String store = tempDir + "/" + MessageFormat.format(format, args);
ds.write().partitionBy(colonnesPartionnement).parquet(store);
LOGGER.info("Un dataset a été sauvegardé dans le fichier parquet {}.", store);
return true;
}
if (useCache == false) {
LOGGER.debug("Le cache est désactivé, le dataset n'a pas été sauvegardé dans un fichier parquet.");
}
else {
LOGGER.warn("Le dataset est vide et n'a pas été sauvegardé dans un fichier parquet.");
}
return false;
}
创建了一个 parquet 文件(一个包含 200 个文件的文件夹,如 part-00000-522c936f-7b72-4ed8-bab9-9d4acee6bc7c-c000.snappy.parquet
,未分区),如果我使用 Zeppelin 加载生成的 parquet 文件,我接收该内容:
val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021")
communes.printSchema()
communes.select("codeDepartement").distinct()
.orderBy("codeDepartement")
.show(100)
root
|-- typeCommune: string (nullable = true)
|-- codeCommune: string (nullable = true)
|-- codeRegion: string (nullable = true)
|-- codeDepartement: string (nullable = true)
|-- CTCD: string (nullable = true)
|-- arrondissement: string (nullable = true)
|-- typeNomEtCharniere: string (nullable = true)
|-- nomMajuscules: string (nullable = true)
|-- nomCommune: string (nullable = true)
|-- LIBELLE: string (nullable = true)
|-- codeCanton: string (nullable = true)
|-- codeCommuneParente: string (nullable = true)
|-- sirenCommune: string (nullable = true)
|-- populationTotale: integer (nullable = true)
|-- populationMunicipale: integer (nullable = true)
|-- populationCompteApart: integer (nullable = true)
|-- strateCommune: integer (nullable = true)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- surface: double (nullable = true)
|-- sirenCommuneMembre: string (nullable = true)
|-- codeEPCI: string (nullable = true)
|-- nomEPCI: string (nullable = true)
|-- codeDepartementRetabli: string (nullable = true)
|-- codeCommuneChefLieuDepartement: string (nullable = true)
|-- typeNomEtCharniereDepartement: string (nullable = true)
|-- nomMajusculesDepartement: string (nullable = true)
|-- nomDepartement: string (nullable = true)
|-- libelleDepartement: string (nullable = true)
+---------------+
|codeDepartement|
+---------------+
| 01|
| 02|
| 03|
| 04|
| 05|
| 06|
| 07|
| 08|
| 09|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 21|
| 22|
| 23|
| 24|
| 25|
| 26|
| 27|
| 28|
| 29|
| 2A|
| 2B|
| 30|
| 31|
| 32|
| 33|
| 34|
| 35|
| 36|
| 37|
| 38|
| 39|
| 40|
| 41|
| 42|
| 43|
| 44|
| 45|
| 46|
| 47|
| 48|
| 49|
| 50|
| 51|
| 52|
| 53|
| 54|
| 55|
| 56|
| 57|
| 58|
| 59|
| 60|
| 61|
| 62|
| 63|
| 64|
| 65|
| 66|
| 67|
| 68|
| 69|
| 70|
| 71|
| 72|
| 73|
| 74|
| 75|
| 76|
| 77|
| 78|
| 79|
| 80|
| 81|
| 82|
| 83|
| 84|
| 85|
| 86|
| 87|
| 88|
| 89|
| 90|
| 91|
| 92|
| 93|
| 94|
| 95|
| 971|
| 972|
| 973|
| 974|
+---------------+
only showing top 100 rows
codeDepartement
字段的前导零在这里,这是正常的,这个字段是一个字符串,由一些 2A
和 2B
证明是 Corse(南北)部门。
我注意到在显示的架构中,codeDepartment
排在第四位。
当数据集使用 codeDepartement 分区时:加载 parquet 文件时前导零丢失:
如果我在我的 CogDataset
源文件中激活数据集的 codeDepartement 分区,方法是取消注释以下行:
communes = communes.repartition(col("codeDepartement"));
[...]
saveToStore(communes, new String[] {"codeDepartement"}, "{0}_{1,number,#0}", nomStore, anneeCOG);
转储“X07”的内容是相同的,除了 codeDepartment
是有序的,并保留它们的前导零,镶木地板文件现在有像 codeDepartement=02
、(和 02
有它的前导零,所以它很有希望),但是当我用 Zeppelin 加载那个镶木地板文件时,事情出了问题:
root
|-- typeCommune: string (nullable = true)
|-- codeCommune: string (nullable = true)
|-- codeRegion: string (nullable = true)
|-- CTCD: string (nullable = true)
|-- arrondissement: string (nullable = true)
|-- typeNomEtCharniere: string (nullable = true)
|-- nomMajuscules: string (nullable = true)
|-- nomCommune: string (nullable = true)
|-- LIBELLE: string (nullable = true)
|-- codeCanton: string (nullable = true)
|-- codeCommuneParente: string (nullable = true)
|-- sirenCommune: string (nullable = true)
|-- populationTotale: integer (nullable = true)
|-- populationMunicipale: integer (nullable = true)
|-- populationCompteApart: integer (nullable = true)
|-- strateCommune: integer (nullable = true)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- surface: double (nullable = true)
|-- sirenCommuneMembre: string (nullable = true)
|-- codeEPCI: string (nullable = true)
|-- nomEPCI: string (nullable = true)
|-- codeDepartementRetabli: string (nullable = true)
|-- codeCommuneChefLieuDepartement: string (nullable = true)
|-- typeNomEtCharniereDepartement: string (nullable = true)
|-- nomMajusculesDepartement: string (nullable = true)
|-- nomDepartement: string (nullable = true)
|-- libelleDepartement: string (nullable = true)
|-- codeDepartement: string (nullable = true)
+---------------+
|codeDepartement|
+---------------+
| 1|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 2|
| 21|
| 22|
| 23|
| 24|
| 25|
| 26|
| 27|
| 28|
| 29|
| 2A|
| 2B|
| 3|
| 30|
| 31|
| 32|
| 33|
| 34|
| 35|
| 36|
| 37|
| 38|
| 39|
| 4|
| 40|
| 41|
| 42|
| 43|
| 44|
| 45|
| 46|
| 47|
| 48|
| 49|
| 5|
| 50|
| 51|
| 52|
| 53|
| 54|
| 55|
| 56|
| 57|
| 58|
| 59|
| 6|
| 60|
| 61|
| 62|
| 63|
| 64|
| 65|
| 66|
| 67|
| 68|
| 69|
| 7|
| 70|
| 71|
| 72|
| 73|
| 74|
| 75|
| 76|
| 77|
| 78|
| 79|
| 8|
| 80|
| 81|
| 82|
| 83|
| 84|
| 85|
| 86|
| 87|
| 88|
| 89|
| 9|
| 90|
| 91|
| 92|
| 93|
| 94|
| 95|
| 971|
| 972|
| 973|
| 974|
+---------------+
only showing top 100 rows
codeDepartement
的前导零丢失,
而 2A
和 2B
部门代码仍然在这里,表明该字段仍然是 string
.
我注意到 Parquet 给出的 codeDepartment
位于架构的最后位置,就好像它重新创建了该字段本身 (?)。
你知道是什么影响了我吗?
看起来我缺少一些在存储到 Parquet 或为其重新加载我的内容之前应该设置的选项?
我正在使用 Spark 3.2.0
。
我找到了答案。
问题不是 parquet 文件本身,而是这些语句的事实:
val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021")
communes.printSchema()
即使他们展示了良好的数据集模式,也不要考虑这个模式真的,并尝试从数据中推断它,同时阅读,只要可能 (?!),
因此,将我的 codeDepartement
字段视为数字,直到偶然发现值 2A
和 2B
迫使它将其字段类型切换为字符串。
我想知道为什么 parquet 文件以这种方式加载。如果我的解释是好的。
但确保 parquet 文件的模式被考虑在内的正确方法是明确要求首先从该 parquet 文件中提取它,然后要求加载函数使用它:
val parquetFile = "/data/tmp/communes_par_codeCommune_2021";
val schema = spark.read.parquet(parquetFile).schema;
val communes = spark.read.schema(schema).parquet(parquetFile);
然后读取parquet文件的行为就正常了:
+---------------+
|codeDepartement|
+---------------+
| 01|
| 02|
| 03|
| 04|
| 05|
| 06|
也许有更简单的方法来实现,如果你知道,我很乐意学习它,因为这有点累赘。
我有一个 Dataset
收集有关法国城市的信息,
/**
* Obtenir un Dataset des communes.
* @param session Session Spark.
* @param anneeCOG Année du Code Officiel Géographique de référence.
* @param verifications Vérifications demandées.
* @return Dataset des communes.
* @throws TechniqueException si un incident survient.
*/
public Dataset<Row> rowCommunes(SparkSession session, int anneeCOG, Verification... verifications) throws TechniqueException {
String nomStore = "communes_par_codeCommune";
Dataset<Row> communes = loadFromStore(session, "{0}_{1,number,#0}", nomStore, anneeCOG, verifications);
if (communes != null) {
return communes;
}
LOGGER.info("Constitution du dataset des communes depuis pour le Code Officiel Géographique (COG) de l'année {}...", anneeCOG);
Dataset<Row> c = loadAndRenameCommmunesCSV(session, anneeCOG, false, verifications);
LOGGER.info("X01");
c.printSchema();
c.show(false);
Dataset<Row> s = this.datasetSirenCommunaux.rowSirenCommunes(session, anneeCOG, TriSirenCommunaux.CODE_COMMUNE);
LOGGER.info("X02");
s.printSchema();
s.show(false);
Column condition1 = c.col("codeCommune").equalTo(s.col("codeCommune"));
Column condition2 = c.col("codeCommuneParente").equalTo(s.col("codeCommune"));
verifications("jonction communes et siren par codeCommune", c, null, s, condition1, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
Dataset<Row> join1 = c.join(s, condition1)
.drop(s.col("codeCommune"))
.drop(s.col("nomCommune"))
.drop(s.col("codeRegion"))
.drop(s.col("codeDepartement"));
verifications("jonction communes et siren par codeCommune, join1", c, null, null, null, verifications);
verifications("jonction communes et siren par codeCommuneParente", c, null, s, condition2, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
Dataset<Row> join2 = c.join(s, condition2)
.drop(s.col("codeCommune"))
.drop(s.col("nomCommune"))
.drop(s.col("codeRegion"))
.drop(s.col("codeDepartement"));
verifications("jonction communes et siren par codeCommuneParente, join2", c, null, null, null, verifications);
communes = join1.union(join2);
LOGGER.info("X03");
communes.printSchema();
communes.show(false);
// La strate communale doit concorder avec celle des comptes individuels des communes.
communes = communes.withColumn("strateCommune",
when(s.col("populationTotale").between(0, 249), lit(1)) // communes de moins de 250 hab
.when(s.col("populationTotale").between(250, 499), lit(2)) // communes de 250 à 500 hab
.when(s.col("populationTotale").between(500, 1999), lit(3)) // communes de 500 à 2 000 hab
.when(s.col("populationTotale").between(2000, 3499), lit(4)) // communes de 2 000 à 3 500 hab
.when(s.col("populationTotale").between(3500, 4999), lit(5)) // communes de 3 500 à 5 000 hab
.when(s.col("populationTotale").between(5000, 9999), lit(6)) // communes de 5 000 à 10 000 hab
.when(s.col("populationTotale").between(10000, 19999), lit(7)) // communes de 10 000 à 20 000 hab
.when(s.col("populationTotale").between(20000, 49999), lit(8)) // communes de 20 000 à 50 000 hab
.when(s.col("populationTotale").between(50000, 99999), lit(9)) // communes de 50 000 à 100 000 hab
.otherwise(lit(10))); // communes de plus de 100 000 hab
// Obtenir les contours des communes.
// "(requête SQL) contours" est la forme de substitution pour Spark. cf
String format = "(select insee as codecommuneosm, nom as nomcommuneosm, surf_ha as surface2, st_x(st_centroid(geom)) as longitude, st_y(st_centroid(geom)) as latitude from communes_{0,number,#0}) contours";
String sql = MessageFormat.format(format, anneeCOG);
Dataset<Row> contours = sql(session, sql).load();
contours = contours.withColumn("surface", col("surface2").cast(DoubleType)).drop(col("surface2"))
.orderBy("codecommuneosm");
Column conditionJoinContours = col("codeCommune").equalTo(col("codecommuneosm"));
verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, contours, conditionJoinContours, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
communes = communes.join(contours, conditionJoinContours, "left_outer")
.drop(col("codecommuneosm")).drop(col("nomcommuneosm"));
verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, null, null, verifications);
LOGGER.info("X04");
communes.printSchema();
communes.show(false);
// Associer à chaque commune son code intercommunalité, si elle en a un (les communes-communautés peuvent ne pas en avoir).
Dataset<Row> perimetres = this.datasetPerimetres.rowPerimetres(session, anneeCOG, EPCIPerimetreDataset.TriPerimetresEPCI.CODE_COMMUNE_MEMBRE).selectExpr("sirenCommuneMembre", "sirenGroupement as codeEPCI", "nomGroupement as nomEPCI");
Column conditionJoinPerimetres = communes.col("sirenCommune").equalTo(perimetres.col("sirenCommuneMembre"));
verifications("jonction communes et périmètres", communes, null, perimetres, conditionJoinPerimetres, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
communes = communes.join(perimetres, conditionJoinPerimetres, "left");
LOGGER.info("X05");
communes.printSchema();
communes.show(false);
// Y associer les départements.
communes = this.datasetDepartements.withDepartement(session, "codeDepartementRetabli", communes, "codeDepartement", null, true, anneeCOG)
.drop("codeRegionDepartement");
LOGGER.info("X06");
communes.printSchema();
communes.show(false);
//communes = communes.repartition(col("codeDepartement"));
communes = communes.sortWithinPartitions(col("codeCommune"));
communes = communes.persist(); // Important : améliore les performances.
LOGGER.info("X07");
communes.printSchema();
communes.show(false);
saveToStore(communes, new String[] {/*"codeDepartement"*/}, "{0}_{1,number,#0}", nomStore, anneeCOG);
LOGGER.info("Le dataset des communes du Code Officiel Géographique de l'année {} est prêt et stocké.", anneeCOG);
return communes;
}
而困扰我的领域是一系(CodeDepartement
)。
当数据集未按此字符串字段 codeDepartement 分区时:一切正常
当该函数运行时,如果我不尝试对数据集进行分区(分区所需的语句已在此处注释),一切正常:
控制台显示的内容是这样的:
2022-01-18 08:47:50.995 INFO 25913 --- [nio-9090-exec-1] f.e.spark.dataset.cog.CogDataset : X07
root
|-- typeCommune: string (nullable = true)
|-- codeCommune: string (nullable = true)
|-- codeRegion: string (nullable = true)
|-- codeDepartement: string (nullable = true)
|-- CTCD: string (nullable = true)
|-- arrondissement: string (nullable = true)
|-- typeNomEtCharniere: string (nullable = true)
|-- nomMajuscules: string (nullable = true)
|-- nomCommune: string (nullable = true)
|-- LIBELLE: string (nullable = true)
|-- codeCanton: string (nullable = true)
|-- codeCommuneParente: string (nullable = true)
|-- sirenCommune: string (nullable = true)
|-- populationTotale: integer (nullable = true)
|-- populationMunicipale: integer (nullable = true)
|-- populationCompteApart: integer (nullable = true)
|-- strateCommune: integer (nullable = false)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- surface: double (nullable = true)
|-- sirenCommuneMembre: string (nullable = true)
|-- codeEPCI: string (nullable = true)
|-- nomEPCI: string (nullable = true)
|-- codeDepartementRetabli: string (nullable = true)
|-- codeCommuneChefLieuDepartement: string (nullable = true)
|-- typeNomEtCharniereDepartement: string (nullable = true)
|-- nomMajusculesDepartement: string (nullable = true)
|-- nomDepartement: string (nullable = true)
|-- libelleDepartement: string (nullable = true)
+-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+
|typeCommune|codeCommune|codeRegion|codeDepartement|CTCD|arrondissement|typeNomEtCharniere|nomMajuscules |nomCommune |LIBELLE |codeCanton|codeCommuneParente|sirenCommune|populationTotale|populationMunicipale|populationCompteApart|strateCommune|longitude |latitude |surface|sirenCommuneMembre|codeEPCI |nomEPCI |codeDepartementRetabli|codeCommuneChefLieuDepartement|typeNomEtCharniereDepartement|nomMajusculesDepartement|nomDepartement |libelleDepartement |
+-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+
|COM |02053 |32 |02 |02D |021 |0 |VALLEES EN CHAMPAGNE |Vallées en Champagne |Vallées en Champagne |0204 |null |200056307 |576 |570 |6 |3 |3.616431731194736 |49.007989478704616|4168.0 |200056307 |200072031|CA de la Région de Château-Thierry |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02057 |32 |02 |02D |023 |0 |BEAUREVOIR |Beaurevoir |Beaurevoir |0201 |null |210200564 |1433 |1415 |18 |3 |3.330083095945232 |49.996641228776475|2180.0 |210200564 |240200493|CC du Pays du Vermandois |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02070 |32 |02 |02D |025 |0 |BERNOT |Bernot |Bernot |0207 |null |210200697 |461 |448 |13 |2 |3.4866867514904327|49.87884521170864 |1672.0 |210200697 |200071983|CC Thiérache Sambre et Oise |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02090 |32 |02 |02D |024 |0 |BILLY SUR OURCQ |Billy-sur-Ourcq |Billy-sur-Ourcq |0221 |null |210200887 |220 |210 |10 |1 |3.2992043452730138|49.22146881666656 |1021.0 |210200887 |240200519|CC du Canton d'Oulchy le Château |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02291 |32 |02 |02D |023 |1 |ESTREES |Estrées |Estrées |0201 |null |210202743 |419 |411 |8 |2 |3.2807599426576335|49.97077968837569 |704.0 |210202743 |240200493|CC du Pays du Vermandois |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |02527 |32 |02 |02D |024 |0 |MORSAIN |Morsain |Morsain |0220 |null |210205043 |464 |455 |9 |2 |3.1908402513250707|49.46152187457162 |1434.0 |210205043 |200071991|CC Retz en Valois |02 |02408 |5 |AISNE |Aisne |Aisne |
|COM |03098 |84 |03 |03D |031 |0 |DESERTINES |Désertines |Désertines |0310 |null |210300984 |4519 |4432 |87 |5 |2.627485866618113 |46.357035806459585|827.0 |210300984 |200071082|CA Montluçon Communauté |03 |03190 |5 |ALLIER |Allier |Allier |
|COM |04138 |93 |04 |04D |044 |0 |NIOZELLES |Niozelles |Niozelles |0406 |null |210401386 |284 |278 |6 |2 |5.847058475050684 |43.931981561717315|1061.0 |210401386 |240400440|CC Pays Forcalquier et Montagne de Lure|04 |04070 |4 |ALPES DE HAUTE PROVENCE |Alpes-de-Haute-Provence|Alpes-de-Haute-Provence|
|COM |05107 |93 |05 |05D |051 |0 |PUY SAINT ANDRE |Puy-Saint-André |Puy-Saint-André |0502 |null |210501078 |480 |457 |23 |2 |6.566597158877383 |44.891314199898126|1530.0 |210501078 |240500439|CC du Briançonnais |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes |
|COM |05163 |93 |05 |05D |052 |2 |SAUZE DU LAC |Sauze-du-Lac |Le Sauze-du-Lac |0504 |null |210501631 |149 |145 |4 |1 |6.323608116359499 |44.48601132516764 |1223.0 |210501631 |200067742|CC Serre-Ponçon |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes |
|COM |05176 |93 |05 |05D |052 |0 |VALSERRES |Valserres |Valserres |0514 |null |210501763 |271 |267 |4 |2 |6.1382720630480465|44.48971119562803 |1123.0 |210501763 |200067320|CC Serre-Ponçon Val d'Avance |05 |05061 |4 |HAUTES ALPES |Hautes-Alpes |Hautes-Alpes |
|COM |07038 |84 |07 |07D |071 |0 |BORNE |Borne |Borne |0713 |null |210700381 |49 |49 |0 |1 |4.041460041609979 |44.61694459940681 |3199.0 |210700381 |200072007|CC de la Montagne d'Ardèche |07 |07186 |5 |ARDECHE |Ardèche |Ardèche |
|COM |09120 |76 |09 |09D |093 |0 |FABAS |Fabas |Fabas |0911 |null |210901203 |356 |352 |4 |2 |1.1143034959106055|43.11978973617488 |2327.0 |210901203 |200067940|CC Couserans-Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège |
|COM |09231 |76 |09 |09D |093 |2 |PORT |Port |Le Port |0903 |null |210902318 |160 |159 |1 |1 |1.3862330079391603|42.83569938864616 |5016.0 |210902318 |200067940|CC Couserans-Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège |
|COM |09276 |76 |09 |09D |092 |0 |SAINT VICTOR ROUZAUD |Saint-Victor-Rouzaud |Saint-Victor-Rouzaud |0907 |null |210902763 |232 |222 |10 |1 |1.5478697013903504|43.087886607776255|1270.0 |210902763 |200066231|CC des Portes d'Ariège Pyrénées |09 |09122 |5 |ARIEGE |Ariège |Ariège |
+-----------+-----------+----------+---------------+----+--------------+------------------+-------------------------+-------------------------+-------------------------+----------+------------------+------------+----------------+--------------------+---------------------+-------------+------------------+------------------+-------+------------------+---------+---------------------------------------+----------------------+------------------------------+-----------------------------+------------------------+-----------------------+-----------------------+
调用了子函数saveToStore
,它们只是在做这个:
protected boolean saveToStore(Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) {
return saveToStore(this.tempDir, isCacheUtilise(), ds, colonnesPartionnement, format, args);
}
public static boolean saveToStore(String tempDir, boolean useCache, Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) {
if (useCache && ds.isEmpty() == false) {
String store = tempDir + "/" + MessageFormat.format(format, args);
ds.write().partitionBy(colonnesPartionnement).parquet(store);
LOGGER.info("Un dataset a été sauvegardé dans le fichier parquet {}.", store);
return true;
}
if (useCache == false) {
LOGGER.debug("Le cache est désactivé, le dataset n'a pas été sauvegardé dans un fichier parquet.");
}
else {
LOGGER.warn("Le dataset est vide et n'a pas été sauvegardé dans un fichier parquet.");
}
return false;
}
创建了一个 parquet 文件(一个包含 200 个文件的文件夹,如 part-00000-522c936f-7b72-4ed8-bab9-9d4acee6bc7c-c000.snappy.parquet
,未分区),如果我使用 Zeppelin 加载生成的 parquet 文件,我接收该内容:
val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021")
communes.printSchema()
communes.select("codeDepartement").distinct()
.orderBy("codeDepartement")
.show(100)
root
|-- typeCommune: string (nullable = true)
|-- codeCommune: string (nullable = true)
|-- codeRegion: string (nullable = true)
|-- codeDepartement: string (nullable = true)
|-- CTCD: string (nullable = true)
|-- arrondissement: string (nullable = true)
|-- typeNomEtCharniere: string (nullable = true)
|-- nomMajuscules: string (nullable = true)
|-- nomCommune: string (nullable = true)
|-- LIBELLE: string (nullable = true)
|-- codeCanton: string (nullable = true)
|-- codeCommuneParente: string (nullable = true)
|-- sirenCommune: string (nullable = true)
|-- populationTotale: integer (nullable = true)
|-- populationMunicipale: integer (nullable = true)
|-- populationCompteApart: integer (nullable = true)
|-- strateCommune: integer (nullable = true)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- surface: double (nullable = true)
|-- sirenCommuneMembre: string (nullable = true)
|-- codeEPCI: string (nullable = true)
|-- nomEPCI: string (nullable = true)
|-- codeDepartementRetabli: string (nullable = true)
|-- codeCommuneChefLieuDepartement: string (nullable = true)
|-- typeNomEtCharniereDepartement: string (nullable = true)
|-- nomMajusculesDepartement: string (nullable = true)
|-- nomDepartement: string (nullable = true)
|-- libelleDepartement: string (nullable = true)
+---------------+
|codeDepartement|
+---------------+
| 01|
| 02|
| 03|
| 04|
| 05|
| 06|
| 07|
| 08|
| 09|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 21|
| 22|
| 23|
| 24|
| 25|
| 26|
| 27|
| 28|
| 29|
| 2A|
| 2B|
| 30|
| 31|
| 32|
| 33|
| 34|
| 35|
| 36|
| 37|
| 38|
| 39|
| 40|
| 41|
| 42|
| 43|
| 44|
| 45|
| 46|
| 47|
| 48|
| 49|
| 50|
| 51|
| 52|
| 53|
| 54|
| 55|
| 56|
| 57|
| 58|
| 59|
| 60|
| 61|
| 62|
| 63|
| 64|
| 65|
| 66|
| 67|
| 68|
| 69|
| 70|
| 71|
| 72|
| 73|
| 74|
| 75|
| 76|
| 77|
| 78|
| 79|
| 80|
| 81|
| 82|
| 83|
| 84|
| 85|
| 86|
| 87|
| 88|
| 89|
| 90|
| 91|
| 92|
| 93|
| 94|
| 95|
| 971|
| 972|
| 973|
| 974|
+---------------+
only showing top 100 rows
codeDepartement
字段的前导零在这里,这是正常的,这个字段是一个字符串,由一些 2A
和 2B
证明是 Corse(南北)部门。
我注意到在显示的架构中,codeDepartment
排在第四位。
当数据集使用 codeDepartement 分区时:加载 parquet 文件时前导零丢失:
如果我在我的 CogDataset
源文件中激活数据集的 codeDepartement 分区,方法是取消注释以下行:
communes = communes.repartition(col("codeDepartement"));
[...]
saveToStore(communes, new String[] {"codeDepartement"}, "{0}_{1,number,#0}", nomStore, anneeCOG);
转储“X07”的内容是相同的,除了 codeDepartment
是有序的,并保留它们的前导零,镶木地板文件现在有像 codeDepartement=02
、(和 02
有它的前导零,所以它很有希望),但是当我用 Zeppelin 加载那个镶木地板文件时,事情出了问题:
root
|-- typeCommune: string (nullable = true)
|-- codeCommune: string (nullable = true)
|-- codeRegion: string (nullable = true)
|-- CTCD: string (nullable = true)
|-- arrondissement: string (nullable = true)
|-- typeNomEtCharniere: string (nullable = true)
|-- nomMajuscules: string (nullable = true)
|-- nomCommune: string (nullable = true)
|-- LIBELLE: string (nullable = true)
|-- codeCanton: string (nullable = true)
|-- codeCommuneParente: string (nullable = true)
|-- sirenCommune: string (nullable = true)
|-- populationTotale: integer (nullable = true)
|-- populationMunicipale: integer (nullable = true)
|-- populationCompteApart: integer (nullable = true)
|-- strateCommune: integer (nullable = true)
|-- longitude: double (nullable = true)
|-- latitude: double (nullable = true)
|-- surface: double (nullable = true)
|-- sirenCommuneMembre: string (nullable = true)
|-- codeEPCI: string (nullable = true)
|-- nomEPCI: string (nullable = true)
|-- codeDepartementRetabli: string (nullable = true)
|-- codeCommuneChefLieuDepartement: string (nullable = true)
|-- typeNomEtCharniereDepartement: string (nullable = true)
|-- nomMajusculesDepartement: string (nullable = true)
|-- nomDepartement: string (nullable = true)
|-- libelleDepartement: string (nullable = true)
|-- codeDepartement: string (nullable = true)
+---------------+
|codeDepartement|
+---------------+
| 1|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
| 2|
| 21|
| 22|
| 23|
| 24|
| 25|
| 26|
| 27|
| 28|
| 29|
| 2A|
| 2B|
| 3|
| 30|
| 31|
| 32|
| 33|
| 34|
| 35|
| 36|
| 37|
| 38|
| 39|
| 4|
| 40|
| 41|
| 42|
| 43|
| 44|
| 45|
| 46|
| 47|
| 48|
| 49|
| 5|
| 50|
| 51|
| 52|
| 53|
| 54|
| 55|
| 56|
| 57|
| 58|
| 59|
| 6|
| 60|
| 61|
| 62|
| 63|
| 64|
| 65|
| 66|
| 67|
| 68|
| 69|
| 7|
| 70|
| 71|
| 72|
| 73|
| 74|
| 75|
| 76|
| 77|
| 78|
| 79|
| 8|
| 80|
| 81|
| 82|
| 83|
| 84|
| 85|
| 86|
| 87|
| 88|
| 89|
| 9|
| 90|
| 91|
| 92|
| 93|
| 94|
| 95|
| 971|
| 972|
| 973|
| 974|
+---------------+
only showing top 100 rows
codeDepartement
的前导零丢失,
而 2A
和 2B
部门代码仍然在这里,表明该字段仍然是 string
.
我注意到 Parquet 给出的 codeDepartment
位于架构的最后位置,就好像它重新创建了该字段本身 (?)。
你知道是什么影响了我吗?
看起来我缺少一些在存储到 Parquet 或为其重新加载我的内容之前应该设置的选项?
我正在使用 Spark 3.2.0
。
我找到了答案。 问题不是 parquet 文件本身,而是这些语句的事实:
val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021")
communes.printSchema()
即使他们展示了良好的数据集模式,也不要考虑这个模式真的,并尝试从数据中推断它,同时阅读,只要可能 (?!),
因此,将我的 codeDepartement
字段视为数字,直到偶然发现值 2A
和 2B
迫使它将其字段类型切换为字符串。
我想知道为什么 parquet 文件以这种方式加载。如果我的解释是好的。
但确保 parquet 文件的模式被考虑在内的正确方法是明确要求首先从该 parquet 文件中提取它,然后要求加载函数使用它:
val parquetFile = "/data/tmp/communes_par_codeCommune_2021";
val schema = spark.read.parquet(parquetFile).schema;
val communes = spark.read.schema(schema).parquet(parquetFile);
然后读取parquet文件的行为就正常了:
+---------------+
|codeDepartement|
+---------------+
| 01|
| 02|
| 03|
| 04|
| 05|
| 06|
也许有更简单的方法来实现,如果你知道,我很乐意学习它,因为这有点累赘。