具有包含前导零的字符串字段的数据集的镶木地板文件 returns 该字段没有前导零,如果它被它分割

A parquet file of a dataset having a String field containing leading zeroes returns that field without leading zeroes, if it is paritionned by it

我有一个 Dataset 收集有关法国城市的信息,

* Obtenir un Dataset des communes.
* @param session Session Spark.
* @param anneeCOG Année du Code Officiel Géographique de référence.
* @param verifications Vérifications demandées.
* @return Dataset des communes.
* @throws TechniqueException si un incident survient.
public Dataset<Row> rowCommunes(SparkSession session, int anneeCOG, Verification... verifications) throws TechniqueException {
  String nomStore = "communes_par_codeCommune";
  Dataset<Row> communes = loadFromStore(session, "{0}_{1,number,#0}", nomStore, anneeCOG, verifications);
  if (communes != null) {
     return communes;

  LOGGER.info("Constitution du dataset des communes depuis pour le Code Officiel Géographique (COG) de l'année {}...", anneeCOG);
  Dataset<Row> c = loadAndRenameCommmunesCSV(session, anneeCOG, false, verifications);

  Dataset<Row> s = this.datasetSirenCommunaux.rowSirenCommunes(session, anneeCOG, TriSirenCommunaux.CODE_COMMUNE);

  Column condition1 = c.col("codeCommune").equalTo(s.col("codeCommune"));
  Column condition2 = c.col("codeCommuneParente").equalTo(s.col("codeCommune"));
  verifications("jonction communes et siren par codeCommune", c, null, s, condition1, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
  Dataset<Row> join1 = c.join(s, condition1)

  verifications("jonction communes et siren par codeCommune, join1", c, null, null, null, verifications);
  verifications("jonction communes et siren par codeCommuneParente", c, null, s, condition2, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
  Dataset<Row> join2 = c.join(s, condition2)

  verifications("jonction communes et siren par codeCommuneParente, join2", c, null, null, null, verifications);
  communes = join1.union(join2);
  // La strate communale doit concorder avec celle des comptes individuels des communes.
  communes = communes.withColumn("strateCommune", 
     when(s.col("populationTotale").between(0, 249), lit(1))       // communes de moins de 250 hab
     .when(s.col("populationTotale").between(250, 499), lit(2))     // communes de 250 à 500 hab
     .when(s.col("populationTotale").between(500, 1999), lit(3))    // communes de 500 à 2 000 hab
     .when(s.col("populationTotale").between(2000, 3499), lit(4))   // communes de 2 000 à 3 500 hab
     .when(s.col("populationTotale").between(3500, 4999), lit(5))   // communes de 3 500 à 5 000 hab
     .when(s.col("populationTotale").between(5000, 9999), lit(6))   // communes de 5 000 à 10 000 hab
     .when(s.col("populationTotale").between(10000, 19999), lit(7)) // communes de 10 000 à 20 000 hab
     .when(s.col("populationTotale").between(20000, 49999), lit(8)) // communes de 20 000 à 50 000 hab
     .when(s.col("populationTotale").between(50000, 99999), lit(9)) // communes de 50 000 à 100 000 hab
     .otherwise(lit(10)));                                          // communes de plus de 100 000 hab

  // Obtenir les contours des communes.
  // "(requête SQL) contours" est la forme de substitution pour Spark. cf 
  String format = "(select insee as codecommuneosm, nom as nomcommuneosm, surf_ha as surface2, st_x(st_centroid(geom)) as longitude, st_y(st_centroid(geom)) as latitude from communes_{0,number,#0}) contours";
  String sql = MessageFormat.format(format, anneeCOG);
  Dataset<Row> contours = sql(session, sql).load();
  contours = contours.withColumn("surface", col("surface2").cast(DoubleType)).drop(col("surface2"))
  Column conditionJoinContours = col("codeCommune").equalTo(col("codecommuneosm"));
  verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, contours, conditionJoinContours, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
  communes = communes.join(contours, conditionJoinContours, "left_outer")
  verifications("jonction communes et contours communaux OSM (centroïde, surface)", communes, null, null, null, verifications);


  // Associer à chaque commune son code intercommunalité, si elle en a un (les communes-communautés peuvent ne pas en avoir).
  Dataset<Row> perimetres = this.datasetPerimetres.rowPerimetres(session, anneeCOG, EPCIPerimetreDataset.TriPerimetresEPCI.CODE_COMMUNE_MEMBRE).selectExpr("sirenCommuneMembre", "sirenGroupement as codeEPCI", "nomGroupement as nomEPCI");
  Column conditionJoinPerimetres = communes.col("sirenCommune").equalTo(perimetres.col("sirenCommuneMembre"));

  verifications("jonction communes et périmètres", communes, null, perimetres, conditionJoinPerimetres, verifications, SHOW_REJETS, COMPTAGES_ET_STATISTIQUES);
  communes = communes.join(perimetres, conditionJoinPerimetres, "left");


  // Y associer les départements.
  communes = this.datasetDepartements.withDepartement(session, "codeDepartementRetabli", communes, "codeDepartement", null, true, anneeCOG)


  //communes = communes.repartition(col("codeDepartement"));
  communes = communes.sortWithinPartitions(col("codeCommune"));
  communes = communes.persist(); // Important : améliore les performances.


  saveToStore(communes, new String[] {/*"codeDepartement"*/}, "{0}_{1,number,#0}", nomStore, anneeCOG);
  LOGGER.info("Le dataset des communes du Code Officiel Géographique de l'année {} est prêt et stocké.", anneeCOG);

  return communes;


当数据集未按此字符串字段 codeDepartement 分区时:一切正常



2022-01-18 08:47:50.995  INFO 25913 --- [nio-9090-exec-1] f.e.spark.dataset.cog.CogDataset         : X07
 |-- typeCommune: string (nullable = true)
 |-- codeCommune: string (nullable = true)
 |-- codeRegion: string (nullable = true)
 |-- codeDepartement: string (nullable = true)
 |-- CTCD: string (nullable = true)
 |-- arrondissement: string (nullable = true)
 |-- typeNomEtCharniere: string (nullable = true)
 |-- nomMajuscules: string (nullable = true)
 |-- nomCommune: string (nullable = true)
 |-- LIBELLE: string (nullable = true)
 |-- codeCanton: string (nullable = true)
 |-- codeCommuneParente: string (nullable = true)
 |-- sirenCommune: string (nullable = true)
 |-- populationTotale: integer (nullable = true)
 |-- populationMunicipale: integer (nullable = true)
 |-- populationCompteApart: integer (nullable = true)
 |-- strateCommune: integer (nullable = false)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- surface: double (nullable = true)
 |-- sirenCommuneMembre: string (nullable = true)
 |-- codeEPCI: string (nullable = true)
 |-- nomEPCI: string (nullable = true)
 |-- codeDepartementRetabli: string (nullable = true)
 |-- codeCommuneChefLieuDepartement: string (nullable = true)
 |-- typeNomEtCharniereDepartement: string (nullable = true)
 |-- nomMajusculesDepartement: string (nullable = true)
 |-- nomDepartement: string (nullable = true)
 |-- libelleDepartement: string (nullable = true)

|typeCommune|codeCommune|codeRegion|codeDepartement|CTCD|arrondissement|typeNomEtCharniere|nomMajuscules            |nomCommune               |LIBELLE                  |codeCanton|codeCommuneParente|sirenCommune|populationTotale|populationMunicipale|populationCompteApart|strateCommune|longitude         |latitude          |surface|sirenCommuneMembre|codeEPCI |nomEPCI                                |codeDepartementRetabli|codeCommuneChefLieuDepartement|typeNomEtCharniereDepartement|nomMajusculesDepartement|nomDepartement         |libelleDepartement     |
|COM        |02053      |32        |02             |02D |021           |0                 |VALLEES EN CHAMPAGNE     |Vallées en Champagne     |Vallées en Champagne     |0204      |null              |200056307   |576             |570                 |6                    |3            |3.616431731194736 |49.007989478704616|4168.0 |200056307         |200072031|CA de la Région de Château-Thierry     |02                    |02408                         |5                            |AISNE                   |Aisne                  |Aisne                  |
|COM        |02057      |32        |02             |02D |023           |0                 |BEAUREVOIR               |Beaurevoir               |Beaurevoir               |0201      |null              |210200564   |1433            |1415                |18                   |3            |3.330083095945232 |49.996641228776475|2180.0 |210200564         |240200493|CC du Pays du Vermandois               |02                    |02408                         |5                            |AISNE                   |Aisne                  |Aisne                  |
|COM        |02070      |32        |02             |02D |025           |0                 |BERNOT                   |Bernot                   |Bernot                   |0207      |null              |210200697   |461             |448                 |13                   |2            |3.4866867514904327|49.87884521170864 |1672.0 |210200697         |200071983|CC Thiérache Sambre et Oise            |02                    |02408                         |5                            |AISNE                   |Aisne                  |Aisne                  |
|COM        |02090      |32        |02             |02D |024           |0                 |BILLY SUR OURCQ          |Billy-sur-Ourcq          |Billy-sur-Ourcq          |0221      |null              |210200887   |220             |210                 |10                   |1            |3.2992043452730138|49.22146881666656 |1021.0 |210200887         |240200519|CC du Canton d'Oulchy le Château       |02                    |02408                         |5                            |AISNE                   |Aisne                  |Aisne                  |
|COM        |02291      |32        |02             |02D |023           |1                 |ESTREES                  |Estrées                  |Estrées                  |0201      |null              |210202743   |419             |411                 |8                    |2            |3.2807599426576335|49.97077968837569 |704.0  |210202743         |240200493|CC du Pays du Vermandois               |02                    |02408                         |5                            |AISNE                   |Aisne                  |Aisne                  |
|COM        |02527      |32        |02             |02D |024           |0                 |MORSAIN                  |Morsain                  |Morsain                  |0220      |null              |210205043   |464             |455                 |9                    |2            |3.1908402513250707|49.46152187457162 |1434.0 |210205043         |200071991|CC Retz en Valois                      |02                    |02408                         |5                            |AISNE                   |Aisne                  |Aisne                  |
|COM        |03098      |84        |03             |03D |031           |0                 |DESERTINES               |Désertines               |Désertines               |0310      |null              |210300984   |4519            |4432                |87                   |5            |2.627485866618113 |46.357035806459585|827.0  |210300984         |200071082|CA Montluçon Communauté                |03                    |03190                         |5                            |ALLIER                  |Allier                 |Allier                 |
|COM        |04138      |93        |04             |04D |044           |0                 |NIOZELLES                |Niozelles                |Niozelles                |0406      |null              |210401386   |284             |278                 |6                    |2            |5.847058475050684 |43.931981561717315|1061.0 |210401386         |240400440|CC Pays Forcalquier et Montagne de Lure|04                    |04070                         |4                            |ALPES DE HAUTE PROVENCE |Alpes-de-Haute-Provence|Alpes-de-Haute-Provence|
|COM        |05107      |93        |05             |05D |051           |0                 |PUY SAINT ANDRE          |Puy-Saint-André          |Puy-Saint-André          |0502      |null              |210501078   |480             |457                 |23                   |2            |6.566597158877383 |44.891314199898126|1530.0 |210501078         |240500439|CC du Briançonnais                     |05                    |05061                         |4                            |HAUTES ALPES            |Hautes-Alpes           |Hautes-Alpes           |
|COM        |05163      |93        |05             |05D |052           |2                 |SAUZE DU LAC             |Sauze-du-Lac             |Le Sauze-du-Lac          |0504      |null              |210501631   |149             |145                 |4                    |1            |6.323608116359499 |44.48601132516764 |1223.0 |210501631         |200067742|CC Serre-Ponçon                        |05                    |05061                         |4                            |HAUTES ALPES            |Hautes-Alpes           |Hautes-Alpes           |
|COM        |05176      |93        |05             |05D |052           |0                 |VALSERRES                |Valserres                |Valserres                |0514      |null              |210501763   |271             |267                 |4                    |2            |6.1382720630480465|44.48971119562803 |1123.0 |210501763         |200067320|CC Serre-Ponçon Val d'Avance           |05                    |05061                         |4                            |HAUTES ALPES            |Hautes-Alpes           |Hautes-Alpes           |
|COM        |07038      |84        |07             |07D |071           |0                 |BORNE                    |Borne                    |Borne                    |0713      |null              |210700381   |49              |49                  |0                    |1            |4.041460041609979 |44.61694459940681 |3199.0 |210700381         |200072007|CC de la Montagne d'Ardèche            |07                    |07186                         |5                            |ARDECHE                 |Ardèche                |Ardèche                |
|COM        |09120      |76        |09             |09D |093           |0                 |FABAS                    |Fabas                    |Fabas                    |0911      |null              |210901203   |356             |352                 |4                    |2            |1.1143034959106055|43.11978973617488 |2327.0 |210901203         |200067940|CC Couserans-Pyrénées                  |09                    |09122                         |5                            |ARIEGE                  |Ariège                 |Ariège                 |
|COM        |09231      |76        |09             |09D |093           |2                 |PORT                     |Port                     |Le Port                  |0903      |null              |210902318   |160             |159                 |1                    |1            |1.3862330079391603|42.83569938864616 |5016.0 |210902318         |200067940|CC Couserans-Pyrénées                  |09                    |09122                         |5                            |ARIEGE                  |Ariège                 |Ariège                 |
|COM        |09276      |76        |09             |09D |092           |0                 |SAINT VICTOR ROUZAUD     |Saint-Victor-Rouzaud     |Saint-Victor-Rouzaud     |0907      |null              |210902763   |232             |222                 |10                   |1            |1.5478697013903504|43.087886607776255|1270.0 |210902763         |200066231|CC des Portes d'Ariège Pyrénées        |09                    |09122                         |5                            |ARIEGE                  |Ariège                 |Ariège                 |


protected boolean saveToStore(Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) {
   return saveToStore(this.tempDir, isCacheUtilise(), ds, colonnesPartionnement, format, args);

public static boolean saveToStore(String tempDir, boolean useCache, Dataset<Row> ds, String[] colonnesPartionnement, String format, Object... args) {
   if (useCache && ds.isEmpty() == false) {
      String store = tempDir + "/" + MessageFormat.format(format, args);
      LOGGER.info("Un dataset a été sauvegardé dans le fichier parquet {}.", store);
      return true;
   if (useCache == false) {
      LOGGER.debug("Le cache est désactivé, le dataset n'a pas été sauvegardé dans un fichier parquet.");
   else {
      LOGGER.warn("Le dataset est vide et n'a pas été sauvegardé dans un fichier parquet.");
   return false;

创建了一个 parquet 文件(一个包含 200 个文件的文件夹,如 part-00000-522c936f-7b72-4ed8-bab9-9d4acee6bc7c-c000.snappy.parquet,未分区),如果我使用 Zeppelin 加载生成的 parquet 文件,我接收该内容:

val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021")

 |-- typeCommune: string (nullable = true)
 |-- codeCommune: string (nullable = true)
 |-- codeRegion: string (nullable = true)
 |-- codeDepartement: string (nullable = true)
 |-- CTCD: string (nullable = true)
 |-- arrondissement: string (nullable = true)
 |-- typeNomEtCharniere: string (nullable = true)
 |-- nomMajuscules: string (nullable = true)
 |-- nomCommune: string (nullable = true)
 |-- LIBELLE: string (nullable = true)
 |-- codeCanton: string (nullable = true)
 |-- codeCommuneParente: string (nullable = true)
 |-- sirenCommune: string (nullable = true)
 |-- populationTotale: integer (nullable = true)
 |-- populationMunicipale: integer (nullable = true)
 |-- populationCompteApart: integer (nullable = true)
 |-- strateCommune: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- surface: double (nullable = true)
 |-- sirenCommuneMembre: string (nullable = true)
 |-- codeEPCI: string (nullable = true)
 |-- nomEPCI: string (nullable = true)
 |-- codeDepartementRetabli: string (nullable = true)
 |-- codeCommuneChefLieuDepartement: string (nullable = true)
 |-- typeNomEtCharniereDepartement: string (nullable = true)
 |-- nomMajusculesDepartement: string (nullable = true)
 |-- nomDepartement: string (nullable = true)
 |-- libelleDepartement: string (nullable = true)

|             01|
|             02|
|             03|
|             04|
|             05|
|             06|
|             07|
|             08|
|             09|
|             10|
|             11|
|             12|
|             13|
|             14|
|             15|
|             16|
|             17|
|             18|
|             19|
|             21|
|             22|
|             23|
|             24|
|             25|
|             26|
|             27|
|             28|
|             29|
|             2A|
|             2B|
|             30|
|             31|
|             32|
|             33|
|             34|
|             35|
|             36|
|             37|
|             38|
|             39|
|             40|
|             41|
|             42|
|             43|
|             44|
|             45|
|             46|
|             47|
|             48|
|             49|
|             50|
|             51|
|             52|
|             53|
|             54|
|             55|
|             56|
|             57|
|             58|
|             59|
|             60|
|             61|
|             62|
|             63|
|             64|
|             65|
|             66|
|             67|
|             68|
|             69|
|             70|
|             71|
|             72|
|             73|
|             74|
|             75|
|             76|
|             77|
|             78|
|             79|
|             80|
|             81|
|             82|
|             83|
|             84|
|             85|
|             86|
|             87|
|             88|
|             89|
|             90|
|             91|
|             92|
|             93|
|             94|
|             95|
|            971|
|            972|
|            973|
|            974|
only showing top 100 rows

codeDepartement 字段的前导零在这里,这是正常的,这个字段是一个字符串,由一些 2A2B 证明是 Corse(南北)部门。

我注意到在显示的架构中,codeDepartment 排在第四位。

当数据集使用 codeDepartement 分区时:加载 parquet 文件时前导零丢失:

如果我在我的 CogDataset 源文件中激活数据集的 codeDepartement 分区,方法是取消注释以下行:

communes = communes.repartition(col("codeDepartement"));
saveToStore(communes, new String[] {"codeDepartement"}, "{0}_{1,number,#0}", nomStore, anneeCOG);

转储“X07”的内容是相同的,除了 codeDepartment 是有序的,并保留它们的前导零,镶木地板文件现在有像 codeDepartement=02、(和 02 有它的前导零,所以它很有希望),但是当我用 Zeppelin 加载那个镶木地板文件时,事情出了问题:

 |-- typeCommune: string (nullable = true)
 |-- codeCommune: string (nullable = true)
 |-- codeRegion: string (nullable = true)
 |-- CTCD: string (nullable = true)
 |-- arrondissement: string (nullable = true)
 |-- typeNomEtCharniere: string (nullable = true)
 |-- nomMajuscules: string (nullable = true)
 |-- nomCommune: string (nullable = true)
 |-- LIBELLE: string (nullable = true)
 |-- codeCanton: string (nullable = true)
 |-- codeCommuneParente: string (nullable = true)
 |-- sirenCommune: string (nullable = true)
 |-- populationTotale: integer (nullable = true)
 |-- populationMunicipale: integer (nullable = true)
 |-- populationCompteApart: integer (nullable = true)
 |-- strateCommune: integer (nullable = true)
 |-- longitude: double (nullable = true)
 |-- latitude: double (nullable = true)
 |-- surface: double (nullable = true)
 |-- sirenCommuneMembre: string (nullable = true)
 |-- codeEPCI: string (nullable = true)
 |-- nomEPCI: string (nullable = true)
 |-- codeDepartementRetabli: string (nullable = true)
 |-- codeCommuneChefLieuDepartement: string (nullable = true)
 |-- typeNomEtCharniereDepartement: string (nullable = true)
 |-- nomMajusculesDepartement: string (nullable = true)
 |-- nomDepartement: string (nullable = true)
 |-- libelleDepartement: string (nullable = true)
 |-- codeDepartement: string (nullable = true)

|              1|
|             10|
|             11|
|             12|
|             13|
|             14|
|             15|
|             16|
|             17|
|             18|
|             19|
|              2|
|             21|
|             22|
|             23|
|             24|
|             25|
|             26|
|             27|
|             28|
|             29|
|             2A|
|             2B|
|              3|
|             30|
|             31|
|             32|
|             33|
|             34|
|             35|
|             36|
|             37|
|             38|
|             39|
|              4|
|             40|
|             41|
|             42|
|             43|
|             44|
|             45|
|             46|
|             47|
|             48|
|             49|
|              5|
|             50|
|             51|
|             52|
|             53|
|             54|
|             55|
|             56|
|             57|
|             58|
|             59|
|              6|
|             60|
|             61|
|             62|
|             63|
|             64|
|             65|
|             66|
|             67|
|             68|
|             69|
|              7|
|             70|
|             71|
|             72|
|             73|
|             74|
|             75|
|             76|
|             77|
|             78|
|             79|
|              8|
|             80|
|             81|
|             82|
|             83|
|             84|
|             85|
|             86|
|             87|
|             88|
|             89|
|              9|
|             90|
|             91|
|             92|
|             93|
|             94|
|             95|
|            971|
|            972|
|            973|
|            974|
only showing top 100 rows

codeDepartement 的前导零丢失,
2A2B 部门代码仍然在这里,表明该字段仍然是 string.

我注意到 Parquet 给出的 codeDepartment 位于架构的最后位置,就好像它重新创建了该字段本身 (?)。

看起来我缺少一些在存储到 Parquet 或为其重新加载我的内容之前应该设置的选项?

我正在使用 Spark 3.2.0

我找到了答案。 问题不是 parquet 文件本身,而是这些语句的事实:

val communes = spark.read.parquet("/data/tmp/communes_par_codeCommune_2021")

即使他们展示了良好的数据集模式,也不要考虑这个模式真的,并尝试从数据中推断它,同时阅读,只要可能 (?!),
因此,将我的 codeDepartement 字段视为数字,直到偶然发现值 2A2B 迫使它将其字段类型切换为字符串。

我想知道为什么 parquet 文件以这种方式加载。如果我的解释是好的。

但确保 parquet 文件的模式被考虑在内的正确方法是明确要求首先从该 parquet 文件中提取它,然后要求加载函数使用它:

val parquetFile = "/data/tmp/communes_par_codeCommune_2021";
val schema = spark.read.parquet(parquetFile).schema;
val communes = spark.read.schema(schema).parquet(parquetFile);


|             01|
|             02|
|             03|
|             04|
|             05|
|             06|
