aboutsummaryrefslogtreecommitdiffstats
path: root/pig/filter-cdx-join-urls.pig
blob: 3d06804be261a9812d9bab9da5c8a5b9808a4c85 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43

--
-- Author: Bryan Newbold <bnewbold@archive.org>
-- Date: May 2018

%default INPUT_CDX ''
%default INPUT_URLS ''
%default OUTPUT ''

REGISTER /home/webcrawl/pig-scripts/jars/ia-web-commons-jar-with-dependencies-CDH3.jar;
REGISTER /home/webcrawl/pig-scripts/jars/pigtools.jar;
DEFINE SURTURL pigtools.SurtUrlKey();

set mapreduce.job.queuename default

urls = LOAD '$INPUT_URLS' USING PigStorage() AS url:chararray;
surts = FOREACH urls GENERATE SURTURL(url) AS url_surt;
surts = ORDER surts by url_surt ASC PARALLEL 10;
surts = DISTINCT surts;

cdx = LOAD '$INPUT_CDX' AS cdxline:chararray;
cdx = FILTER cdx BY not STARTSWITH (cdxline, 'filedesc');
cdx = FILTER cdx BY not STARTSWITH (cdxline, ' ');

cdx = FOREACH cdx GENERATE STRSPLIT(cdxline,'\\s+') as cols, cdxline;
cdx = FOREACH cdx GENERATE (chararray)cols.$0 as cdx_surt, (chararray)cols.$1 as timestamp, (chararray)cols.$3 as mimetype, (chararray)cols.$4 as httpstatus, (chararray)cols.$5 as sha1sum, cdxline;
cdx = FILTER cdx BY not cdx_surt matches '-';
cdx = FILTER cdx BY httpstatus matches '200';
cdx = FILTER cdx BY mimetype matches '.*pdf.*';

-- Core JOIN
full_join = JOIN cdx BY cdx_surt, surts BY url_surt;

-- DISTINCT by sha1 column
full_uniq = FOREACH (GROUP full_join BY sha1sum) {
    r = TOP(1, 0, $1);
    GENERATE FLATTEN(r);
};

result = FOREACH full_uniq GENERATE cdxline;
result = DISTINCT result;

STORE result INTO '$OUTPUT' USING PigStorage();