去重大量数据?

Deduplicate large amount of data?

我们有一个包含数亿到数十亿个 url 的文件。

我想用 c# 对 url 进行重复数据删除,但是将其放入 HashSet 集合或类似的集合中会很快抛出 OutOfMemoryException

如果我流式传输它,我可以避免内存问题,但我需要在集合中维护一些状态,我想这也会导致内存问题。

如何使用 c# 快速删除此数据,而不会 运行 出现内存问题?我试图避免将其批量加载到数据库中只是为了再次将其拉回,并将所有内容都保存在机器本地。

我不关心 OutOfMemoryException;我只是在解释为什么那行不通。我要去重

为了了解更多背景信息,我们将此文件发送给对其进行分析的大数据供应商,他们根据处理的数据量向我们收费。我们的系统管理小组不喜欢为最终大量临时数据设置数据库的想法,并问我们,"can't you just do it in code?" 我需要试一试。

假设您有十亿个 URL,并且您可以在哈希集中保存一百万个而不会出现内存问题。

将前一百万个 URL 放入哈希集中。记录哈希集大小 s1。将它们写入新文件。读取其余的 URL,根据哈希集检查每个 URL,然后将它们写入新文件。您现在知道前 s1 个 URL 是唯一的。

将新文件的位置 s1s1 + 1m 的 URL 放入新的哈希集中。记录大小s2。写下您知道的新文件唯一的前 s1 个 URL。现在写入哈希集内容。现在读取其余的 URL,根据哈希集检查它们。您现在知道前 s1 + s2 个 URL 是唯一的。

将位置 s1 + s2s1 + s2 + 1m 的 URL 放入新的哈希集中。依此类推,直到您知道它们都是独一无二的。

我认为您应该复习一下 URL 的独特之处。就像 的回答一样,我认为拆分数据是个好主意,但是,您可以通过将数据拆分为较小的文件来大大缩短路径,例如:

  • url
  • 的域
  • url
  • 的长度
  • url
  • 的第一个和最后一个字符

所以程序的流程会是这样的(伪代码,随便想想):

while (url = largeFile.ReadLine()) {
    string directory = GetDirectoryForUrl( url );
    int wordLength = url.Length;
    string filename = url[0] + url[1];
    string fullName = Path.Combine( directory, wordLength.ToString(), fileName);

    var set = LoadSet( fullName );
    if (!set.Contains(url)) {
        AppendToFile( fullName, url );
    }
}

然后对于子方法,类似于

string GetDirectoryForUrl( string url ) {
    return GetDomain(url);
}

ISet<string> LoadSet( string fullName ) {
    // check if directories exists...
    if (!File.Exists( fullName )) { 
        return new HashSet<string>();
    }
    // load the hashset based on the file
}

void AppendToFile(string fullName, string url) {
    // add or create the file (check if directories exist)
}

这当然会创建许多小文件,但它的优点是您只检查数据的一小部分(尽管坦率地说,我不知道您的数据看起来如何,也许只有一些字符不同)

这里的好处是你可以根据你知道的标准细分数据,并且你可以及时调整它

没有时间创建完整的代码(但由于您主要是在寻找算法 :))

更新

我创建了一个小型控制台程序,它既创建了一个文件进行测试,又创建了对大文件的分析。这是代码

有问题的测试生成了大约 100 万 url 秒,但是由于随机数据集如此有限,重复项可能比您描述的数据看起来要多 20%。

我机器上的分析本身花了大约 26 分钟才完成,我无法估计这个时间是否合适,因为我没有测试任何其他编写方法。

到目前为止,代码与我的初始设置稍有不同,我将 url 的部分内容用于我的目录结构。到目前为止,在使用数据集的情况下,我没有看到程序接近尾声时速度变慢,但我还必须提到 Comodo 将我的程序保存在沙箱中。

也没有实现将所有数据组合回 1 个大文件,不过,我真的没有看到任何大问题。

类 需要 运行 程序

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;

namespace analyzeduplicates {
  public interface IFileAnalyzer {
    IStringToDirectoryHelper StringToDirectoryHelper { set; }
    ISetFileHelper SetFileLoader { set; }
    void Analyze( string targetFile, string targetDirectory );
  }

  public interface IStringToDirectoryHelper {
    string[] GetPathFromString( string value );
  }

  public class StringToDirectoryHelper : IStringToDirectoryHelper {
    public string[] GetPathFromString( string value ) {
      string item = value.Trim();
      return item
        .Trim()
        .Split( new[] { "\", "/", ":", "@", "%", ":", "?", "&", ";", "." }, StringSplitOptions.RemoveEmptyEntries )
        .Take( 3 )
        .Concat(new string[] { item.Length.ToString(), item[0].ToString() + item[value.Length-1].ToString() } )
        .ToArray();
    }
  }

  public interface ISetFileHelper {
    IReadOnlyCollection<string> GetSetFromFile( string path );
    void AddToSetFile( string path, string value );
  }

  public class SetFileHelper : ISetFileHelper {
    public IReadOnlyCollection<string> GetSetFromFile( string path ) {
      if (!Directory.Exists(Path.GetDirectoryName(path))) {
        return new List<string>();
      }
      if (!File.Exists(path)) {
        return new List<string>();
      }
      return File.ReadAllLines( path );
    }

    public void AddToSetFile( string path, string value) {
      if (!Directory.Exists(Path.GetDirectoryName(path))) {
        Directory.CreateDirectory( Path.GetDirectoryName( path ) );
      }
      File.AppendAllLines( path, new string[] { value } );
    }
  }

  public class FileAnalyzer: IFileAnalyzer {
    public IStringToDirectoryHelper StringToDirectoryHelper { get; set; }
    public ISetFileHelper SetFileLoader { get; set; }

    public FileAnalyzer() {

    }

    public FileAnalyzer(
      IStringToDirectoryHelper stringToDirectoryHelper, 
      ISetFileHelper setFileLoader) : this() {
      StringToDirectoryHelper = stringToDirectoryHelper;
      SetFileLoader = setFileLoader;
    }

    private void EnsureParametersSet() {
      if (StringToDirectoryHelper == null) {
        throw new InvalidOperationException( $"Cannot start analyzing without {nameof(StringToDirectoryHelper)}" );
      }
      if (SetFileLoader == null) {
        throw new InvalidOperationException( $"Cannot start analyzing without {nameof( SetFileLoader )}" );
      }
    }

    public void Analyze( string targetFile, string targetDirectory ) {
      EnsureParametersSet();
      using (var reader = new StreamReader(targetFile, true)) {
        long count = 0;
        while (!reader.EndOfStream) {
          if (count % 1000 == 0) {
            Console.WriteLine( $"Analyzing line {count}-{count + 1000}" );
          }
          count++;
          string line = reader.ReadLine();
          if (string.IsNullOrWhiteSpace(line)) {
            // nothing meaningfull can be done
            continue;
          }
          var path = StringToDirectoryHelper.GetPathFromString( line );
          string targetPath = Path.Combine( new[] { targetDirectory }.Concat( path ).ToArray() );
          var set = SetFileLoader.GetSetFromFile( targetPath );
          if (set.Contains(line)) {
            // duplicate, don't care for it
            continue;
          }
          SetFileLoader.AddToSetFile( targetPath, line );
        }
      }
    }
  }
}

控制台程序本身

using System;
using System.Diagnostics;
using System.IO;

namespace analyzeduplicates {
  class Program {

    static void Main( string[] args ) {
      string targetFile = Path.Combine( Environment.CurrentDirectory, "source.txt" );
      if (File.Exists(targetFile)) {
        File.Delete( targetFile );
      }
      if ( !File.Exists( targetFile ) ) {
        Console.WriteLine( "Generating extensive list of urls" );
        Stopwatch generateWatch = Stopwatch.StartNew();
        GenerateList( targetFile );
        generateWatch.Stop();
        Console.WriteLine( "Generating took {0:hh\:mm\:ss}", generateWatch.Elapsed );
      }
      Console.WriteLine( "Analyzing file" );
      Stopwatch analyzeWatch = Stopwatch.StartNew();
      IFileAnalyzer analyzer = new FileAnalyzer(new StringToDirectoryHelper(), new SetFileHelper());
      analyzer.Analyze( targetFile, Environment.CurrentDirectory );
      analyzeWatch.Stop();
      Console.WriteLine( "Analyzing took {0:hh\:mm\:ss}", analyzeWatch.Elapsed );
      Console.WriteLine( "done, press enter to clean up" );
      Console.ReadLine();
      File.Delete( targetFile );
      foreach (var dir in Directory.GetDirectories( Environment.CurrentDirectory )) {
        Directory.Delete( dir, true );
      }
      Console.WriteLine( "Cleanup completed, press enter to exit" );
      Console.ReadLine();
    }

    public static void GenerateList( string targetFile ) {
      string[] domains = new[] {
        "www.google.com",
        "www.google.de",
        "www.google.ca",
        "www.google.uk",
        "www.google.co.uk",
        "www.google.nl",
        "www.google.be",
        "www.google.fr",
        "www.google.sa",
        "www.google.me",
        "www.youtube.com",
        "www.youtube.de",
        "www.youtube.ca",
        "www.youtube.uk",
        "www.youtube.co.uk",
        "www.youtube.nl",
        "www.youtube.be",
        "www.youtube.fr",
        "www.youtube.sa",
        "www.youtube.me"
      };
      string[] paths = new[] {
        "search","indicate", "test", "generate", "bla", "bolognes", "macaroni", "part", "of", "web", "site", "index", "main", "nav"
      };
      string[] extensions = new[] {
        "", ".html", ".php", ".aspx", ".aspx", "htm"
      };
      string[] query = new[] {
        "", "?s=test", "?s=query&b=boloni", "?isgreat", "#home", "#main", "#nav"
      };
      string[] protocols = new[] {
        "http://", "https://", "ftp://", "ftps://"
      };
      using (var writer = new StreamWriter(targetFile)) {
        var rnd = new Random();
        for (long i = 0; i < 1000000; i++) {
          int pathLength = rnd.Next( 5 );
          string path = "/";
          if (pathLength > 0) {
            for (int j = 0; j< pathLength; j++ ) {
              path += paths[rnd.Next( paths.Length )] + "/";
            }
          }
          writer.WriteLine( "{0}{1}{2}{3}{4}{5}", protocols[rnd.Next( protocols.Length )], domains[rnd.Next(domains.Length)], path, paths[rnd.Next(paths.Length)], extensions[rnd.Next(extensions.Length)], query[rnd.Next(query.Length)] );
        }
      }
    }
  }
}

我不确定现在的完整答案是否有点大,但我想我可以分享一下。我不知道该程序如何在像您描述的那样大的数据集上执行,我很想花点时间以防它对您有用:)