如何找到路径流并使用 pig 或 hive 对它们进行排名?

how to find the pathing flow and rank them using pig or hive?

下面是我的用例示例。

您可以参考 OP 询问类似问题的地方。如果我正确理解你的问题,你想从路径中删除重复项,但只有当它们彼此相邻时。所以 1 -> 1 -> 2 -> 1 会变成 1 -> 2 -> 1。如果这是正确的,那么你不能只分组和 distinct (我相信你已经注意到了)因为它会删除 all 重复项。一个简单的解决方案是编写一个 UDF 来删除这些重复项,同时保留用户的不同路径。

UDF:

package something;

import java.util.ArrayList;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public class RemoveSequentialDuplicatesUDF extends UDF {
    public ArrayList<Text> evaluate(ArrayList<Text> arr) {
        ArrayList<Text> newList = new ArrayList<Text>();
        newList.add(arr.get(0));
        for (int i = 1; i < arr.size(); i++) {

            String front = arr.get(i).toString();
            String back  = arr.get(i-1).toString();

            if (!back.equals(front)) {
                newList.add(arr.get(i));
            }
        }
        return newList;
    }
}

要构建此 jar,您需要 hive-core.jarhadoop-core.jar,您可以在 Maven Repository 中找到它们。确保您获得在您的环境中使用的 Hive 和 Hadoop 版本。此外,如果您计划在生产环境中使用 运行,我建议您向 UDF 添加一些异常处理。构建 jar 后,将其导入并 运行 此查询:

查询:

add jar /path/to/jars/brickhouse-0.7.1.jar;
add jar /path/to/jars/hive_common-SNAPSHOT.jar;
create temporary function collect as "brickhouse.udf.collect.CollectUDAF";
create temporary function remove_dups as "something.RemoveSequentialDuplicatesUDF";

select screen_flow, count
  , dense_rank() over (order by count desc) rank
from (
  select screen_flow
    , count(*) count
  from (
    select session_id
      , concat_ws("->", remove_dups(screen_array)) screen_flow
    from (
      select session_id
        , collect(screen_name) screen_array
      from (
        select *
        from database.table
        order by screen_launch_time ) a
      group by session_id ) b
    ) c
  group by screen_flow ) d

输出:

s1->s2->s3      2       1
s1->s2          1       2
s1->s2->s3->s1  1       2

希望对您有所帮助。

输入

990004916946605-1404157897784,S1,1404157898275
990004916946605-1404157897784,S1,1404157898286
990004916946605-1404157897784,S2,1404157898337
990004947764274-1435162269418,S1,1435162274044
990004947764274-1435162269418,S2,1435162274057
990004947764274-1435162269418,S3,1435162274081
990004947764274-1435162287965,S2,1435162690002
990004947764274-1435162287965,S1,1435162690001
990004947764274-1435162287965,S3,1435162690003
990004947764274-1435162287965,S1,1435162690004
990004947764274-1435162212345,S1,1435168768574
990004947764274-1435162212345,S2,1435168768585
990004947764274-1435162212345,S3,1435168768593


register /home/cloudera/jar/ScreenFilter.jar;

screen_records =  LOAD '/user/cloudera/inputfiles/screen.txt' USING PigStorage(',') AS(session_id:chararray,screen_name:chararray,launch_time:long);

screen_rec_order =  ORDER screen_records  by launch_time ASC;

session_grped = GROUP screen_rec_order BY session_id;

eached = FOREACH session_grped
                      {
                         ordered = ORDER screen_rec_order by launch_time;

                        GENERATE group as session_id, REPLACE(BagToString(ordered.screen_name),'_','-->') as screen_str;

                      };

screen_each  =  FOREACH eached GENERATE session_id, GetOrderedScreen(screen_str) as screen_pattern;

screen_grp   = GROUP screen_each by screen_pattern;

screen_final_each = FOREACH screen_grp GENERATE group as screen_pattern, COUNT(screen_each) as pattern_cnt;

ranker = RANK screen_final_each BY pattern_cnt DESC DENSE;

output_data = FOREACH ranker GENERATE screen_pattern, pattern_cnt, [=10=] as rank_value;

dump output_data;

我无法找到一种方法来使用 Pig 内置函数删除相同 session_id 的相邻屏幕,因此我使用 JAVA UDF 来删除相邻的屏幕名称。

我创建了一个名为 GetOrderedScreen 的 JAVA UDF 并将该 UDF 复制到 jar 中并将该 jar 命名为 ScreenFilter.jar 并在此 Pig 脚本中注册了该 jar

下面是 GetOrderedScreen Java UDF

的代码
public class GetOrderedScreen extends EvalFunc<String> {


@Override
public String exec(Tuple input) throws IOException {


    String incoming_screen_str= (String)input.get(0);
    String outgoing_screen_str ="";
    String screen_array[] =incoming_screen_str.split("-->");

    String full_screen=screen_array[0];

  for (int i=0; i<screen_array.length;i++)
  {
     String prefix_screen=  screen_array[i];
     String suffix_screen="";
     int j=i+1;

     if(j< screen_array.length)
     {
         suffix_screen  = screen_array[j];
     }


 if (!prefix_screen.equalsIgnoreCase(suffix_screen))
     {
     full_screen = full_screen+ "-->" +suffix_screen;
     }

  }
  outgoing_screen_str =full_screen.substring(0, full_screen.lastIndexOf("-->")); 


  return outgoing_screen_str;

}

}

输出

(S1-->S2-->S3,2,1)
(S1-->S2,1,2)
(S1-->S2-->S3-->S1,1,2)

希望这对你有帮助!..再等一段时间,一些看到这个问题的好脑子会有效地回答(没有JAVA UDF)