为推荐系统处理 Firebase 中的大量数据

Dealing with lots of data in Firebase for a recommender system

我正在构建一个推荐系统,我在其中使用 Firebase 存储和检索有关 电影 用户偏好.[=12= 的数据]

每部电影可以有几个属性,数据如下:

{ 
    "titanic": 
    {"1997": 1, "english": 1, "dicaprio": 1,    "romance": 1, "drama": 1 }, 
    "inception": 
    { "2010": 1, "english": 1, "dicaprio": 1, "adventure": 1, "scifi": 1}
...
}

为了提出建议,我的算法需要输入所有数据(电影)并与用户个人资料进行匹配。

但是,在生产模式下,我需要检索超过 10,000 部电影。虽然该算法可以相对较快地处理此问题,但从 Firebase 加载此数据需要花费大量时间。

我检索数据如下:

firebase.database().ref(moviesRef).on('value', function(snapshot) {
    // snapshot.val();
}, function(error){
    console.log(error)
});

我想知道您是否对如何加快速度有任何想法?是否有已知的插件或技术可以解决此问题?

我知道非规范化有助于拆分数据,但问题是我需要所有电影和所有相应的属性。

虽然您说您的算法需要所有电影和所有属性,但这并不意味着它一次处理它们。任何计算单元都有其局限性,在您的算法中,您可能会将数据分成计算单元可以处理的更小部分。

话虽如此,如果您想加快速度,可以修改算法以并行处理 data/movies:

| fetch  | -> |process | -> | fetch  | ...
|chunk(1)|    |chunk(1)|    |chunk(3)|

(in parallel) | fetch  | -> |process | ...
              |chunk(2)|    |chunk(2)|

使用这种方法,如果处理真的比获取快(但你没有说 "relatively fast" 你的算法如何 运行,你几乎可以节省整个处理时间(但最后一个块),与获取所有电影相比)

如果获取电影真的很慢,这种 "high level" 解决您问题的方法可能是您更好的机会,尽管它需要比简单地激活库的假设 "speed up" 按钮更多的工作。虽然在处理大量数据时这是一种合理的方法。

我的建议是使用 Cloud Functions 来处理这个问题。

解决方案 1(理想情况下)

如果你能每小时/每天/每周计算一次建议

您可以使用 Cloud Functions Cron 每天/每周启动并每周/每天计算每个用户的推荐。通过这种方式,您可以获得或多或少类似于 Spotify 每周播放列表/推荐的结果。

这样做的主要优点是您的用户不必等待下载所有 10,000 部电影,因为这会发生在云功能中,每个星期天晚上,编制一个包含 25 部推荐的列表,然后保存到您用户的数据节点,您可以在用户访问他们的个人资料时下载它。

您的云函数代码如下所示:

var movies, allUsers; 

exports.weekly_job = functions.pubsub.topic('weekly-tick').onPublish((event) => {
  getMoviesAndUsers();
});  

function getMoviesAndUsers () {
  firebase.database().ref(moviesRef).on('value', function(snapshot) {
    movies = snapshot.val();
    firebase.database().ref(allUsersRef).on('value', function(snapshot) {
        allUsers = snapshot.val();
        createRecommendations();
    });
});
}

function createRecommendations () {
  // do something magical with movies and allUsers here.

  // then write the recommendations to each user's profiles kind of like 
  userRef.update({"userRecommendations" : {"reco1" : "Her", "reco2", "Black Mirror"}});
  // etc. 
}

原谅伪代码。不过,我希望这能提供一个思路。

然后在您的前端,您只需为每个用户获取 userRecommendations。通过这种方式,您可以将带宽和计算从用户设备转移到云功能。而且在效率方面,不知道你是怎么算推荐量的,我也提不出什么建议。

解决方案 2

如果您无法每小时/每天/每周计算一次建议,并且每次用户访问他们的建议面板时都必须这样做

然后你可以在用户每次访问他们的推荐页面时触发一个云函数。我为此使用的一个快速作弊解决方案是在用户配置文件中写入一个值,例如:{getRecommendations:true},一次在页面加载时,然后在云函数中监听 getRecommendations 中的变化。只要你有这样的结构:

用户 ID > getRecommendations : true

并且如果您有适当的安全规则以便每个用户只能写入他们的路径,则此方法也会为您提供发出请求的正确用户 ID。因此,您将知道要为哪个用户计算推荐。云功能很可能可以更快地提取 10,000 条记录并节省用户带宽,最后将 推荐写入用户配置文件。 (类似于上面的解决方案 1)您的设置如下:

[前端代码]

//on pageload
userProfileRef.update({"getRecommendations" : true});
userRecommendationsRef.on('value', function(snapshot) {  gotUserRecos(snapshot.val());  });

[云函数(后端代码)]

exports.userRequestedRecommendations = functions.database.ref('/users/{uid}/getRecommendations').onWrite(event => {
  const uid = event.params.uid;
  firebase.database().ref(moviesRef).on('value', function(snapshot) {
    movies = snapshot.val();
    firebase.database().ref(userRefFromUID).on('value', function(snapshot) {
        usersMovieTasteInformation = snapshot.val();
        // do something magical with movies and user's preferences here.
        // then 
        return userRecommendationsRef.update({"getRecommendations" : {"reco1" : "Her", "reco2", "Black Mirror"}});
    });
  });
});

由于您的前端将在 userRecommendationsRef 监听更改,一旦您的云功能完成,您的用户就会看到结果。这可能需要几秒钟,因此请考虑使用加载指示器。

P.S 1:我最终使用了比原先预期更多的伪代码,并删除了错误处理等。希望这通常能说明问题。如果有任何不清楚的地方,请发表评论,我很乐意澄清。

P.S。 2:我正在为我的一个客户构建的迷你内部服务使用非常相似的流程,现在它已经愉快地运行了一个多月。

Firebase NoSQL JSON 结构最佳实践是 "Avoid nesting data",但您说过,您不想更改数据。因此,根据您的情况,您可以对 firebase 的任何特定节点(每部电影的节点)进行 REST 调用。

解决方案 1) 你可以通过 ThreadPoolExecutors 创建一些固定数量的线程。从每个工作线程,您可以执行 HTTP(REST 调用请求),如下所示。根据您的设备性能和内存容量,您可以决定要通过 ThreadPoolExecutors 操作多少个工作线程。您可以使用如下代码片段:

/* creates threads on demand */
    ThreadFactory threadFactory = Executors.defaultThreadFactory(); 

/* Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available */

    ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10); /* you have 10 different worker threads */  

for(int i = 0; i<100; i++) { /* you can load first 100 movies */
/* you can use your 10 different threads to read first 10 movies */
threadPoolExecutor.execute(() -> {



        /* OkHttp Reqeust */
        /* urlStr can be something like "https://earthquakesenotifications.firebaseio.com/movies?print=pretty" */
                Request request = new Request.Builder().url(urlStr+"/i").build(); 

    /* Note: Firebase, by default, store index for every array. 
Since you are storing all your movies in movies JSON array, 
it would be easier, you read first (0) from the first worker thread, 
second (1) from the second worker thread and so on. */

                try {
                    Response response = new OkHttpClient().newCall(request).execute(); 
    /* OkHttpClient is HTTP client to request */
                    String str = response.body().string();
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return myStr;
            });
            }
                threadPoolExecutor.shutdown();

解决方案2)解决方案1不是基于Listener-Observer模式。其实Firebase是有PUSH技术的。意思是,每当 Firebase NoSQL JSON 中的某个特定节点发生变化时,具有 JSON 特定节点的连接侦听器的相应客户端将通过 onDataChange(DataSnapshot dataSnapshot) { } 获取新数据。为此,您可以创建一个 DatabaseReferences 数组,如下所示:

      Iterable<DataSnapshot> databaseReferenceList = FirebaseDatabase.getInstance().getReference().getRoot().child("movies").getChildren();

for(DataSnapshot o : databaseReferenceList) { 
 @Override
            public void onDataChange(DataSnapshot o) {



      /* show your ith movie in ListView. But even you use RecyclerView, showing each Movie in your RecyclerView's item is still show. */
/* so you can store movie in Movies ArrayList. When everything completes, then you can update RecyclerView */

                }

            @Override
            public void onCancelled(DatabaseError databaseError) {
            }
}