Symfony3:如何尽可能快地从 CSV 文件进行大量导入?

Symfony3 : How to do a massive import from a CSV file as fast as possible?

我有一个包含超过 690 000 行.

的 .csv 文件

我找到了一个导入数据的解决方案,效果很好,但有点慢...(每 3 秒大约 100 条记录 = 63 小时!!)。

如何改进我的代码以使其更快?

我通过控制台命令进行导入。

另外,我想只导入数据库中不存在的处方者(以节省时间)。更复杂的是,没有字段是真正唯一的(id 除外)。

两位开处方者可以有相同的姓氏、名字、居住在同一个城市并且具有相同的 RPPS 和专业代码。但是,正是这 6 个字段的组合使它们独一无二!

这就是我在创建新字段之前检查每个字段的原因。

<?php

namespace AppBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Helper\ProgressBar;
use AppBundle\Entity\Prescriber;

class PrescribersImportCommand extends ContainerAwareCommand
{
    protected function configure()
    {
        $this
            // the name of the command (the part after "bin/console")
            ->setName('import:prescribers')
            ->setDescription('Import prescribers from .csv file')
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        // Show when the script is launched
        $now = new \DateTime();
        $output->writeln('<comment>Start : ' . $now->format('d-m-Y G:i:s') . ' ---</comment>');

        // Import CSV on DB via Doctrine ORM
        $this->import($input, $output);

        // Show when the script is over
        $now = new \DateTime();
        $output->writeln('<comment>End : ' . $now->format('d-m-Y G:i:s') . ' ---</comment>');
    }

    protected function import(InputInterface $input, OutputInterface $output)
    {
      $em = $this->getContainer()->get('doctrine')->getManager();

      // Turning off doctrine default logs queries for saving memory
      $em->getConnection()->getConfiguration()->setSQLLogger(null);

      // Get php array of data from CSV
      $data = $this->getData();

      // Start progress
      $size = count($data);
      $progress = new ProgressBar($output, $size);
      $progress->start();

      // Processing on each row of data
      $batchSize = 100; # frequency for persisting the data
      $i = 1;               # current index of records

      foreach($data as $row) {
         $p = $em->getRepository('AppBundle:Prescriber')->findOneBy(array(
                'rpps'       => $row['rpps'],
                'lastname'   => $row['nom'],
                'firstname'  => $row['prenom'],
                'profCode'   => $row['code_prof'],
                'postalCode' => $row['code_postal'],
                'city'       => $row['ville'],
         ));

         # If the prescriber doest not exist we create one
         if(!is_object($p)){
            $p = new Prescriber();
            $p->setRpps($row['rpps']);
            $p->setLastname($row['nom']);
            $p->setFirstname($row['prenom']);
            $p->setProfCode($row['code_prof']);
            $p->setPostalCode($row['code_postal']);
            $p->setCity($row['ville']);
            $em->persist($p);
         }    

         # flush each 100 prescribers persisted
         if (($i % $batchSize) === 0) {
            $em->flush();
            $em->clear();   // Detaches all objects from Doctrine!

            // Advancing for progress display on console
            $progress->advance($batchSize);
            $progress->display();
         }
         $i++;
      }

      // Flushing and clear data on queue
      $em->flush();
      $em->clear();

      // Ending the progress bar process
      $progress->finish();
    }

    protected function getData()
    {
        // Getting the CSV from filesystem
        $fileName = 'web/docs/prescripteurs.csv';

        // Using service for converting CSV to PHP Array
        $converter = $this->getContainer()->get('app.csvtoarray_converter');
        $data = $converter->convert($fileName);

        return $data;
    }
}

编辑

根据@Jake N的回答,这里是最终代码。

速度非常非常快! 10 分钟导入 653 727 / 693 230 行(39 503 个重复项!)

1) 在我的 table 中添加两列:created_atupdated_at

2) 在我的 table 的每一列(id 和日期除外)中添加一个类型为 UNIQUE 的单个 index,以防止与 phpMyAdmin 重复项目。

3) 在我的查询中添加 ON DUPLICATE KEY UPDATE,以仅更新 updated_at 列。

foreach($data as $row) {
    $sql = "INSERT INTO prescripteurs (rpps, nom, prenom, code_prof, code_postal, ville)
        VALUES(:rpps, :nom, :prenom, :codeprof, :cp, :ville)
        ON DUPLICATE KEY UPDATE updated_at = NOW()";

    $stmt = $em->getConnection()->prepare($sql);
    $r = $stmt->execute(array(
        'rpps'      => $row['rpps'],
        'nom'       => $row['nom'],
        'prenom'    => $row['prenom'],
        'codeprof'  => $row['code_prof'],
        'cp'        => $row['code_postal'],
        'ville'     => $row['ville'],
    ));

    if (!$r) {
        $progress->clear();
        $output->writeln('<comment>An error occured.</comment>');
        $progress->display();

    } elseif (($i % $batchSize) === 0) {
        $progress->advance($batchSize);
        $progress->display();
    }
    $i++;
}

// Ending the progress bar process
$progress->finish();

1.不要使用 Doctrine

如果可以的话,尽量不要使用 Doctrine,它会占用内存,而且你发现它很慢。尝试仅使用原始 SQL 通过简单的 INSERT 语句进行导入:

$sql = <<<SQL
INSERT INTO `category` (`label`, `code`, `is_hidden`) VALUES ('Hello', 'World', '1');
SQL;
$stmt = $this->getDoctrine()->getManager()->getConnection()->prepare($sql);
$stmt->execute();

或者您可以使用以下值准备语句:

$sql = <<<SQL
INSERT INTO `category` (`label`, `code`, `is_hidden`) VALUES (:label, :code, :hidden);
SQL;
$stmt = $this->getDoctrine()->getManager()->getConnection()->prepare($sql);
$stmt->execute(['label' => 'Hello', 'code' => 'World', 'hidden' => 1);

未经测试的代码,但它应该可以帮助您入门,因为我以前就是这样做的。

2。指数

此外,对于您的支票,您是否有所有这些字段的索引?以便尽快查找。