aboutsummaryrefslogtreecommitdiffstats
path: root/mapreduce
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-03-29 12:01:03 -0700
committerBryan Newbold <bnewbold@archive.org>2018-03-29 12:01:03 -0700
commit4a7c27bcbc98f2b7420a78cd0b83ba23764ff747 (patch)
tree07b316a825acda769536f17501fc896cf96902d5 /mapreduce
parent63186a218e2e10848d4b014eacbc4ad3a51a20ca (diff)
downloadsandcrawler-4a7c27bcbc98f2b7420a78cd0b83ba23764ff747.tar.gz
sandcrawler-4a7c27bcbc98f2b7420a78cd0b83ba23764ff747.zip
import vinay's cdx-record-pipeline
Diffstat (limited to 'mapreduce')
-rw-r--r--mapreduce/cdx-record-pipeline/README.md33
-rwxr-xr-xmapreduce/cdx-record-pipeline/cdx-record-pipeline.py67
-rw-r--r--mapreduce/cdx-record-pipeline/requirements.txt3
3 files changed, 103 insertions, 0 deletions
diff --git a/mapreduce/cdx-record-pipeline/README.md b/mapreduce/cdx-record-pipeline/README.md
new file mode 100644
index 0000000..797b8eb
--- /dev/null
+++ b/mapreduce/cdx-record-pipeline/README.md
@@ -0,0 +1,33 @@
+CDX Record Pipeline (GrobId Edition)
+=====================================
+
+Hadoop based pipeline to process PDFs from a specified IA CDX dataset
+
+## Local mode example ##
+
+```
+cat -n /home/bnewbold/100k_random_gwb_pdf.cdx | ./cdx-record-pipeline.py
+
+```
+
+## Cluster mode example ##
+
+```
+input=100k_random_gwb_pdf.cdx
+output=100k_random_gwb_pdf.out
+lines_per_map=1000
+
+hadoop jar /home/webcrawl/hadoop-2/hadoop-mapreduce/hadoop-streaming.jar
+ -archives "hdfs://ia802400.us.archive.org:6000/lib/cdx-record-pipeline-venv.zip#cdx-record-pipeline-venv"
+ -D mapred.reduce.tasks=0
+ -D mapred.job.name=Cdx-Record-Pipeline
+ -D mapreduce.job.queuename=extraction
+ -D mapred.line.input.format.linespermap=${lines_per_map}
+ -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat
+ -input ${input}
+ -output ${output}
+ -mapper cdx-record-pipeline.py
+ -file cdx-record-pipeline.py
+
+```
+
diff --git a/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py b/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py
new file mode 100755
index 0000000..9e521bf
--- /dev/null
+++ b/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py
@@ -0,0 +1,67 @@
+#!./cdx-record-pipeline-venv/bin/python
+'''
+GrobId PDF Pipeline Test
+Read in CDX lines and query GROBID server for each PDF resource
+TODO: Testing / HBase integration -- Bryan will update as needed
+'''
+import os
+import re
+import sys
+import base64
+import hashlib
+import urllib
+import urlparse
+import re
+import string
+from wayback.resource import Resource
+from wayback.resource import ArcResource
+from wayback.resourcestore import ResourceStore
+from gwb.loader import CDXLoaderFactory
+from StringIO import StringIO
+import requests
+import sys
+
+def process_pdf_using_grobid(content_buffer, debug_line):
+ """Query GrobId server & process response
+ """
+ GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070"
+ content = content_buffer.read()
+ r = requests.post(GROBID_SERVER + "/api/processFulltextDocument",
+ files={'input': content})
+ if r.status_code is not 200:
+ print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'), debug_line))
+ else:
+ print("SUCCESS: " + debug_line)
+
+class Cdx_Record_Pipeline(object):
+
+ def read_cdx_and_parse(self, parser_func, accepted_mimes = []):
+ """Read in CDX lines and process PDF records fetched over HTTP
+ """
+ rstore = ResourceStore(loaderfactory=CDXLoaderFactory())
+ for line in sys.stdin:
+ line = line.rstrip()
+ cdx_line = line.split()
+ #ignoring NLine offset
+ if len(cdx_line) != 12:
+ continue
+ cdx_line = cdx_line[1:]
+ (src_url, timestamp, mime, record_location, record_offset, record_length) = (cdx_line[2], cdx_line[1], cdx_line[3], cdx_line[-1], cdx_line[-2], cdx_line[-3])
+ if '-' == record_length or not record_location.endswith('arc.gz') or mime not in accepted_mimes:
+ continue
+ orig_url = cdx_line[2]
+ debug_line = ' '.join(cdx_line)
+ try:
+ record_location = 'http://archive.org/download/' + record_location
+ record_offset = int(record_offset)
+ record_length = int(record_length)
+ resource_data = rstore.load_resource(record_location, record_offset, record_length)
+ parser_func(resource_data.open_raw_content(), debug_line)
+ except:
+ continue
+
+# main()
+#_______________________________________________________________________________
+if __name__ == '__main__':
+ cdx_record_pipeline = Cdx_Record_Pipeline()
+ cdx_record_pipeline.read_cdx_and_parse(process_pdf_using_grobid, ['application/pdf', 'application/x-pdf'])
diff --git a/mapreduce/cdx-record-pipeline/requirements.txt b/mapreduce/cdx-record-pipeline/requirements.txt
new file mode 100644
index 0000000..17b803f
--- /dev/null
+++ b/mapreduce/cdx-record-pipeline/requirements.txt
@@ -0,0 +1,3 @@
+--extra-index-url https://devpi.archive.org/wb/prod/
+wayback==0.2.1.2
+GlobalWayback==0.3