由于 piggybank.jar 的问题,AWS 自己提交 Pig 作业的示例不起作用

AWS's own example of submitting Pig job does not work due to issue with piggybank.jar

我一直在尝试在 Amazon guide 之后测试在 AWS EMR 上提交 Pig 作业。我对 Pig 脚本进行了更改,以确保它可以按照 Amazon 的指示找到 piggybank.jar。当我 运行 脚本时,我得到一个错误 1070,表明无法解析 piggybank 中可用的功能之一。关于出了什么问题的任何想法?

关键部分错误

2018-03-15 21:47:08,258 ERROR org.apache.pig.PigServer (main): exception 
during parsing: Error during parsing. Could not resolve 
org.apache.pig.piggybank.evaluation.string.EXTRACT using imports: [, 
java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]
Failed to parse: Pig script failed to parse: <file s3://cis442f-
data/pigons3/do-reports4.pig, line 26, column 6> Failed to generate logical plan. Nested exception: org.apache.pig.backend.executionengine.ExecException: ERROR 1070: Could not resolve org.apache.pig.piggybank.evaluation.string.EXTRACT using imports: [, java.lang., org.apache.pig.builtin., org.apache.pig.impl.builtin.]

脚本第一部分如下:

错误中提到的第 26 行包含 "EXTRACT("

register file:/usr/lib/pig/lib/piggybank.jar;
DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.EXTRACT;
DEFINE FORMAT org.apache.pig.piggybank.evaluation.string.FORMAT;
DEFINE REPLACE org.apache.pig.piggybank.evaluation.string.REPLACE;
DEFINE DATE_TIME org.apache.pig.piggybank.evaluation.datetime.DATE_TIME;
DEFINE FORMAT_DT org.apache.pig.piggybank.evaluation.datetime.FORMAT_DT;


--
-- import logs and break into tuples
--
raw_logs =
  -- load the weblogs into a sequence of one element tuples
  LOAD '$INPUT' USING TextLoader AS (line:chararray);

logs_base =
  -- for each weblog string convert the weblong string into a
  -- structure with named fields
  FOREACH
    raw_logs
  GENERATE
    FLATTEN (
      EXTRACT(
        line,
        '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\S+) (\S+) "([^"]*)" "([^"]*)"'
      )
    )
    AS (
      remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray,
      request: chararray, status: int, bytes_string: chararray, referrer: chararray,
      browser: chararray
    )
  ;

正确的函数名称是REGEX_EXTRACT。因此,要么将您的 DEFINE 语句更改为

DEFINE EXTRACT org.apache.pig.piggybank.evaluation.string.REGEX_EXTRACT;

或直接在您的 pig 脚本中使用 REGEX_EXTRACT

logs_base =
-- for each weblog string convert the weblong string into a
-- structure with named fields
  FOREACH
    raw_logs
  GENERATE
    FLATTEN (
      REGEX_EXTRACT(
        line,
        '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\S+) (\S+) "([^"]*)" "([^"]*)"'
      )
    )
    AS (
      remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray,
      request: chararray, status: int, bytes_string: chararray, referrer: chararray,
      browser: chararray
    )
  ;

Amazon 的原始脚本无法运行,因为它依赖于旧版本的 piggybank。这是一个根本不需要存钱罐的更新版本。

--
-- import logs and break into tuples
--

raw_logs =
  -- load the weblogs into a sequence of one element tuples
  LOAD '$INPUT' USING TextLoader AS (line:chararray);

logs_base =
  -- for each weblog string convert the weblong string into a
  -- structure with named fields
  FOREACH
    raw_logs
  GENERATE
    FLATTEN (
      REGEX_EXTRACT_ALL(
        line,
        '^(\S+) (\S+) (\S+) \[([\w:/]+\s[+\-]\d{4})\] "(.+?)" (\S+) (\S+) "([^"]*)" "([^"]*)"'
      )
    )
    AS (
      remoteAddr: chararray, remoteLogname: chararray, user: chararray, time: chararray,
      request: chararray, status: int, bytes_string: chararray, referrer: chararray,
      browser: chararray
    )
  ;

logs =
  -- convert from string values to typed values such as date_time and integers
  FOREACH 
    logs_base 
  GENERATE 
    *, 
    ToDate(time, 'dd/MMM/yyyy:HH:mm:ss Z', 'UTC') as dtime,
    (int)REPLACE(bytes_string, '-', '0')          as bytes
  ;


--
-- determine total number of requests and bytes served by UTC hour of day
-- aggregating as a typical day across the total time of the logs
--
by_hour_count =
  -- group logs by their hour of day, counting the number of logs in that hour
  -- and the sum of the bytes of rows for that hour
  FOREACH 
    (GROUP logs BY GetHour(dtime))
  GENERATE 
    [=10=], 
    COUNT() AS num_requests, 
    SUM(.bytes) AS num_bytes
  ;

STORE by_hour_count INTO '$OUTPUT/total_requests_bytes_per_hour';



--
-- top 50 X.X.X.* blocks
--
by_ip_count =
  -- group weblog entries by the ip address from the remote address field
  -- and count the number of entries for each address blok as well as
  -- the sum of the bytes
    FOREACH
      (GROUP logs BY (chararray)REGEX_EXTRACT(remoteAddr, '(\d+\.\d+\.\d+)', 1))
  --     (GROUP logs BY block)
    GENERATE  [=10=],
      COUNT() AS num_requests,
      SUM(.bytes) AS num_bytes
  ; 



by_ip_count_sorted =  ORDER by_ip_count BY num_requests DESC;

by_ip_count_limited =
  -- order ip by the number of requests they make
  LIMIT by_ip_count_sorted 50;

STORE by_ip_count_limited into '$OUTPUT/top_50_ips';


--
-- top 50 external referrers
--
by_referrer_count =
  -- group by the referrer URL and count the number of requests
  FOREACH
    (GROUP logs BY (chararray)REGEX_EXTRACT(referrer, '(http:\/\/[a-z0-9\.-]+)', 1))
  GENERATE
    FLATTEN([=10=]),
    COUNT() AS num_requests
  ;

by_referrer_count_filtered =
  -- exclude matches for example.org
  FILTER by_referrer_count BY NOT [=10=] matches '.*example\.org';

by_referrer_count_sorted =
  -- take the top 50 results
  ORDER by_referrer_count_filtered BY  DESC;

by_referrer_count_limited =
  -- take the top 50 results
  LIMIT by_referrer_count_sorted 50;

STORE by_referrer_count_limited INTO '$OUTPUT/top_50_external_referrers';


--
-- top search terms coming from bing or google
--
google_and_bing_urls =
  -- find referrer fields that match either bing or google
  FILTER
    (FOREACH logs GENERATE referrer)
  BY
    referrer matches '.*bing.*'
  OR
    referrer matches '.*google.*'
  ;

search_terms =
  -- extract from each referrer url the search phrases
  FOREACH
    google_and_bing_urls
  GENERATE
    FLATTEN(REGEX_EXTRACT_ALL(referrer, '.*[&\?]q=([^&]+).*')) as (term:chararray)
  ;

search_terms_filtered =
  -- reject urls that contained no search terms
  FILTER search_terms BY NOT [=10=] IS NULL;

search_terms_count =
  -- for each search phrase count the number of weblogs entries that contained it
  FOREACH
    (GROUP search_terms_filtered BY [=10=])
  GENERATE
    [=10=],
    COUNT() AS num
  ;

search_terms_count_sorted =
  -- order the results
  ORDER search_terms_count BY num DESC;

search_terms_count_limited =
  -- take the top 50 results
  LIMIT search_terms_count_sorted 50;

STORE search_terms_count_limited INTO '$OUTPUT/top_50_search_terms_from_bing_google';