1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
"""
A helper class for locally testing Pig scripts.
Include `PigTestHelper` and extend in your test classes, call `self.run_pig()`
with your script and example input file, then look at the output (at returned
path) to check for validity.
TODO: switch to pytest-style fixture generation
author: Bryan Newbold <bnewbold@archive.org>
"""
import os
import shutil
import tempfile
import unittest
import subprocess
def count_lines(s):
return len([l for l in s.strip().split('\n') if len(l) > 0])
class PigTestHelper(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls._pigpath= "./deps/pig/bin/pig"
cls._classpath = "./deps/hadoop/share/hadoop/common/lib"
cls._base = [cls._pigpath,
'-x', 'local',
'-P', './tests/pig.properties']
# Check that pig is functioning
if subprocess.call(cls._base + ['-version']) != 0:
raise unittest.SkipTest("Failed to find and run Pig")
def setUp(self):
self._tmpdir = tempfile.mkdtemp()
def tearDown(self):
pass
# XXX: shutil.rmtree(self._tmpdir)
def run_pig_raw(self, params):
"""Low-level variant with params appended directly. Returns
CompletedProcess, raises an error if return value isn't succes"""
print("Running: {}".format(' '.join(self._base + params)))
retval = subprocess.run(self._base + params,
timeout=20.0,
check=True)
return retval
def run_pig(self, script_path, in_file, **kwargs):
"""Convenience helper around run_pig_raw().
INPUT parameter is set to in_file.
OUTPUT parameter is set to a random file.
Any keyword args are passed as parameters.
"""
pargs = []
for key, value in kwargs.items():
pargs.append('-p')
pargs.append('{}={}'.format(key, value))
out_file = tempfile.mktemp(dir=self._tmpdir)
params = [
'-f', script_path,
'-p', 'INPUT={}'.format(in_file),
'-p', 'OUTPUT={}'.format(out_file),
] + pargs
status = self.run_pig_raw(params)
assert status.returncode == 0
# Capture all the part-r-* files
print("out_file: {}".format(out_file))
subprocess.run("/bin/ls -la {}/part-*".format(out_file), shell=True)
sub = subprocess.run("/bin/cat {}/part-*".format(out_file), stdout=subprocess.PIPE, shell=True)
out = sub.stdout.decode('utf-8')
print(out)
return out
# TODO: helper to verify that output matches an expected file
|