From 28de71e714c1f5d70adcfd3213dc2433a701a430 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sun, 22 Dec 2019 14:26:04 -0800 Subject: pig: first rev of join-cdx-sha1 script --- pig/join-cdx-sha1.pig | 43 ++++++++++++++++++++++++++++++++++++++++ pig/tests/files/example.sha1b32 | 4 ++++ pig/tests/test_join_cdx.py | 44 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 91 insertions(+) create mode 100644 pig/join-cdx-sha1.pig create mode 100644 pig/tests/files/example.sha1b32 create mode 100644 pig/tests/test_join_cdx.py diff --git a/pig/join-cdx-sha1.pig b/pig/join-cdx-sha1.pig new file mode 100644 index 0000000..460f8b0 --- /dev/null +++ b/pig/join-cdx-sha1.pig @@ -0,0 +1,43 @@ + +-- +-- Author: Bryan Newbold +-- Date: December 2020 +-- +-- This pig script is intended to run agains the full (many TByte) GWB CDX, and +-- catch captures that match exact SHA1 (b32 encoded), regardless of mimetype. +-- +-- The process is to filter the CDX for non-revisit HTTP 200s, sort this by +-- SHA1 digest, then join with the (pre-sorted) SHA1 -- b32 input list, and dump +-- output. + +%default INPUT_CDX '' +%default INPUT_DIGEST '' +%default OUTPUT '' + +set mapreduce.job.queuename default + +digests = LOAD '$INPUT_DIGEST' USING PigStorage() AS sha1b32:chararray; +digests = ORDER digests by sha1b32 ASC PARALLEL 20; +digests = DISTINCT digests; + +cdx = LOAD '$INPUT_CDX' AS cdxline:chararray; +cdx = FILTER cdx BY not STARTSWITH (cdxline, 'filedesc'); +cdx = FILTER cdx BY not STARTSWITH (cdxline, ' '); + +cdx = FOREACH cdx GENERATE STRSPLIT(cdxline,'\\s+') as cols, cdxline; +cdx = FOREACH cdx GENERATE (chararray)cols.$0 as cdx_surt, (chararray)cols.$1 as timestamp, (chararray)cols.$3 as mimetype, (chararray)cols.$4 as httpstatus, (chararray)cols.$5 as sha1b32, cdxline; +cdx = FILTER cdx BY not cdx_surt matches '-'; +cdx = FILTER cdx BY httpstatus matches '200'; +cdx = FILTER cdx BY not mimetype matches 'warc/revisit'; +cdx = ORDER cdx by sha1b32 ASC PARALLEL 40; + +-- TODO: DISTINCT by (sha1b32, cdx_surt) for efficiency + +-- Core JOIN +full_join = JOIN cdx BY sha1b32, digests BY sha1b32; + +-- TODO: at most, say 5 CDX lines per sha1b32? + +result = FOREACH full_join GENERATE cdxline; + +STORE result INTO '$OUTPUT' USING PigStorage(); diff --git a/pig/tests/files/example.sha1b32 b/pig/tests/files/example.sha1b32 new file mode 100644 index 0000000..20a1357 --- /dev/null +++ b/pig/tests/files/example.sha1b32 @@ -0,0 +1,4 @@ +EJWYVOPONJRARK7SGG6COFRN7CSTHROY +V32E3CCO7NMI2M4OHLKG73DXD72LR4B2 +3I42H3S6NNFQ2MSVX7XZKYAYSCX5QBYJ +E3WSNQ7JAFOW7N3ZJ6GLV27T52T25JDK diff --git a/pig/tests/test_join_cdx.py b/pig/tests/test_join_cdx.py new file mode 100644 index 0000000..e6eca6a --- /dev/null +++ b/pig/tests/test_join_cdx.py @@ -0,0 +1,44 @@ + +import os +import unittest +import tempfile +import subprocess +from pighelper import PigTestHelper, count_lines + +class TestJoinCDXSha1(PigTestHelper): + + def run_pig_join(self, script_path, cdx_file, digest_file, **kwargs): + """Convenience helper around run_pig(). + + INPUT parameter is set to in_file. + OUTPUT parameter is set to a random file. + Any keyword args are passed as parameters. + """ + + pargs = [] + for key, value in kwargs.items(): + pargs.append('-p') + pargs.append('{}={}'.format(key, value)) + + out_file = tempfile.mktemp(dir=self._tmpdir) + params = [ + '-f', script_path, + '-p', 'INPUT_CDX={}'.format(cdx_file), + '-p', 'INPUT_DIGEST={}'.format(digest_file), + '-p', 'OUTPUT={}'.format(out_file), + ] + pargs + status = self.run_pig_raw(params) + assert status.returncode == 0 + # Capture all the part-r-* files + print("out_file: {}".format(out_file)) + subprocess.run("/bin/ls -la {}/part-*".format(out_file), shell=True) + sub = subprocess.run("/bin/cat {}/part-*".format(out_file), stdout=subprocess.PIPE, shell=True) + out = sub.stdout.decode('utf-8') + print(out) + return out + + # TODO: helper to verify that output matches an expected file + + def test_thing(self): + r = self.run_pig_join("join-cdx-sha1.pig", "tests/files/example.cdx", "tests/files/example.sha1b32") + assert count_lines(r) == 4 -- cgit v1.2.3