aboutsummaryrefslogtreecommitdiffstats
path: root/pig
diff options
context:
space:
mode:
Diffstat (limited to 'pig')
-rw-r--r--pig/join-cdx-sha1.pig43
-rw-r--r--pig/tests/files/example.sha1b324
-rw-r--r--pig/tests/test_join_cdx.py44
3 files changed, 91 insertions, 0 deletions
diff --git a/pig/join-cdx-sha1.pig b/pig/join-cdx-sha1.pig
new file mode 100644
index 0000000..460f8b0
--- /dev/null
+++ b/pig/join-cdx-sha1.pig
@@ -0,0 +1,43 @@
+
+--
+-- Author: Bryan Newbold <bnewbold@archive.org>
+-- Date: December 2020
+--
+-- This pig script is intended to run agains the full (many TByte) GWB CDX, and
+-- catch captures that match exact SHA1 (b32 encoded), regardless of mimetype.
+--
+-- The process is to filter the CDX for non-revisit HTTP 200s, sort this by
+-- SHA1 digest, then join with the (pre-sorted) SHA1 -- b32 input list, and dump
+-- output.
+
+%default INPUT_CDX ''
+%default INPUT_DIGEST ''
+%default OUTPUT ''
+
+set mapreduce.job.queuename default
+
+digests = LOAD '$INPUT_DIGEST' USING PigStorage() AS sha1b32:chararray;
+digests = ORDER digests by sha1b32 ASC PARALLEL 20;
+digests = DISTINCT digests;
+
+cdx = LOAD '$INPUT_CDX' AS cdxline:chararray;
+cdx = FILTER cdx BY not STARTSWITH (cdxline, 'filedesc');
+cdx = FILTER cdx BY not STARTSWITH (cdxline, ' ');
+
+cdx = FOREACH cdx GENERATE STRSPLIT(cdxline,'\\s+') as cols, cdxline;
+cdx = FOREACH cdx GENERATE (chararray)cols.$0 as cdx_surt, (chararray)cols.$1 as timestamp, (chararray)cols.$3 as mimetype, (chararray)cols.$4 as httpstatus, (chararray)cols.$5 as sha1b32, cdxline;
+cdx = FILTER cdx BY not cdx_surt matches '-';
+cdx = FILTER cdx BY httpstatus matches '200';
+cdx = FILTER cdx BY not mimetype matches 'warc/revisit';
+cdx = ORDER cdx by sha1b32 ASC PARALLEL 40;
+
+-- TODO: DISTINCT by (sha1b32, cdx_surt) for efficiency
+
+-- Core JOIN
+full_join = JOIN cdx BY sha1b32, digests BY sha1b32;
+
+-- TODO: at most, say 5 CDX lines per sha1b32?
+
+result = FOREACH full_join GENERATE cdxline;
+
+STORE result INTO '$OUTPUT' USING PigStorage();
diff --git a/pig/tests/files/example.sha1b32 b/pig/tests/files/example.sha1b32
new file mode 100644
index 0000000..20a1357
--- /dev/null
+++ b/pig/tests/files/example.sha1b32
@@ -0,0 +1,4 @@
+EJWYVOPONJRARK7SGG6COFRN7CSTHROY
+V32E3CCO7NMI2M4OHLKG73DXD72LR4B2
+3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ
+E3WSNQ7JAFOW7N3ZJ6GLV27T52T25JDK
diff --git a/pig/tests/test_join_cdx.py b/pig/tests/test_join_cdx.py
new file mode 100644
index 0000000..e6eca6a
--- /dev/null
+++ b/pig/tests/test_join_cdx.py
@@ -0,0 +1,44 @@
+
+import os
+import unittest
+import tempfile
+import subprocess
+from pighelper import PigTestHelper, count_lines
+
+class TestJoinCDXSha1(PigTestHelper):
+
+ def run_pig_join(self, script_path, cdx_file, digest_file, **kwargs):
+ """Convenience helper around run_pig().
+
+ INPUT parameter is set to in_file.
+ OUTPUT parameter is set to a random file.
+ Any keyword args are passed as parameters.
+ """
+
+ pargs = []
+ for key, value in kwargs.items():
+ pargs.append('-p')
+ pargs.append('{}={}'.format(key, value))
+
+ out_file = tempfile.mktemp(dir=self._tmpdir)
+ params = [
+ '-f', script_path,
+ '-p', 'INPUT_CDX={}'.format(cdx_file),
+ '-p', 'INPUT_DIGEST={}'.format(digest_file),
+ '-p', 'OUTPUT={}'.format(out_file),
+ ] + pargs
+ status = self.run_pig_raw(params)
+ assert status.returncode == 0
+ # Capture all the part-r-* files
+ print("out_file: {}".format(out_file))
+ subprocess.run("/bin/ls -la {}/part-*".format(out_file), shell=True)
+ sub = subprocess.run("/bin/cat {}/part-*".format(out_file), stdout=subprocess.PIPE, shell=True)
+ out = sub.stdout.decode('utf-8')
+ print(out)
+ return out
+
+ # TODO: helper to verify that output matches an expected file
+
+ def test_thing(self):
+ r = self.run_pig_join("join-cdx-sha1.pig", "tests/files/example.cdx", "tests/files/example.sha1b32")
+ assert count_lines(r) == 4