From 7c81b7bea3d670876faff1eb290c40656697dddb Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Thu, 29 Mar 2018 20:16:05 -0700 Subject: move to top level --- cdx-record-pipeline/README.md | 33 +++++++++++ cdx-record-pipeline/cdx-record-pipeline.py | 67 ++++++++++++++++++++++ cdx-record-pipeline/requirements.txt | 3 + mapreduce/cdx-record-pipeline/README.md | 33 ----------- .../cdx-record-pipeline/cdx-record-pipeline.py | 67 ---------------------- mapreduce/cdx-record-pipeline/requirements.txt | 3 - 6 files changed, 103 insertions(+), 103 deletions(-) create mode 100644 cdx-record-pipeline/README.md create mode 100755 cdx-record-pipeline/cdx-record-pipeline.py create mode 100644 cdx-record-pipeline/requirements.txt delete mode 100644 mapreduce/cdx-record-pipeline/README.md delete mode 100755 mapreduce/cdx-record-pipeline/cdx-record-pipeline.py delete mode 100644 mapreduce/cdx-record-pipeline/requirements.txt diff --git a/cdx-record-pipeline/README.md b/cdx-record-pipeline/README.md new file mode 100644 index 0000000..797b8eb --- /dev/null +++ b/cdx-record-pipeline/README.md @@ -0,0 +1,33 @@ +CDX Record Pipeline (GrobId Edition) +===================================== + +Hadoop based pipeline to process PDFs from a specified IA CDX dataset + +## Local mode example ## + +``` +cat -n /home/bnewbold/100k_random_gwb_pdf.cdx | ./cdx-record-pipeline.py + +``` + +## Cluster mode example ## + +``` +input=100k_random_gwb_pdf.cdx +output=100k_random_gwb_pdf.out +lines_per_map=1000 + +hadoop jar /home/webcrawl/hadoop-2/hadoop-mapreduce/hadoop-streaming.jar + -archives "hdfs://ia802400.us.archive.org:6000/lib/cdx-record-pipeline-venv.zip#cdx-record-pipeline-venv" + -D mapred.reduce.tasks=0 + -D mapred.job.name=Cdx-Record-Pipeline + -D mapreduce.job.queuename=extraction + -D mapred.line.input.format.linespermap=${lines_per_map} + -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat + -input ${input} + -output ${output} + -mapper cdx-record-pipeline.py + -file cdx-record-pipeline.py + +``` + diff --git a/cdx-record-pipeline/cdx-record-pipeline.py b/cdx-record-pipeline/cdx-record-pipeline.py new file mode 100755 index 0000000..9e521bf --- /dev/null +++ b/cdx-record-pipeline/cdx-record-pipeline.py @@ -0,0 +1,67 @@ +#!./cdx-record-pipeline-venv/bin/python +''' +GrobId PDF Pipeline Test +Read in CDX lines and query GROBID server for each PDF resource +TODO: Testing / HBase integration -- Bryan will update as needed +''' +import os +import re +import sys +import base64 +import hashlib +import urllib +import urlparse +import re +import string +from wayback.resource import Resource +from wayback.resource import ArcResource +from wayback.resourcestore import ResourceStore +from gwb.loader import CDXLoaderFactory +from StringIO import StringIO +import requests +import sys + +def process_pdf_using_grobid(content_buffer, debug_line): + """Query GrobId server & process response + """ + GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070" + content = content_buffer.read() + r = requests.post(GROBID_SERVER + "/api/processFulltextDocument", + files={'input': content}) + if r.status_code is not 200: + print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'), debug_line)) + else: + print("SUCCESS: " + debug_line) + +class Cdx_Record_Pipeline(object): + + def read_cdx_and_parse(self, parser_func, accepted_mimes = []): + """Read in CDX lines and process PDF records fetched over HTTP + """ + rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) + for line in sys.stdin: + line = line.rstrip() + cdx_line = line.split() + #ignoring NLine offset + if len(cdx_line) != 12: + continue + cdx_line = cdx_line[1:] + (src_url, timestamp, mime, record_location, record_offset, record_length) = (cdx_line[2], cdx_line[1], cdx_line[3], cdx_line[-1], cdx_line[-2], cdx_line[-3]) + if '-' == record_length or not record_location.endswith('arc.gz') or mime not in accepted_mimes: + continue + orig_url = cdx_line[2] + debug_line = ' '.join(cdx_line) + try: + record_location = 'http://archive.org/download/' + record_location + record_offset = int(record_offset) + record_length = int(record_length) + resource_data = rstore.load_resource(record_location, record_offset, record_length) + parser_func(resource_data.open_raw_content(), debug_line) + except: + continue + +# main() +#_______________________________________________________________________________ +if __name__ == '__main__': + cdx_record_pipeline = Cdx_Record_Pipeline() + cdx_record_pipeline.read_cdx_and_parse(process_pdf_using_grobid, ['application/pdf', 'application/x-pdf']) diff --git a/cdx-record-pipeline/requirements.txt b/cdx-record-pipeline/requirements.txt new file mode 100644 index 0000000..17b803f --- /dev/null +++ b/cdx-record-pipeline/requirements.txt @@ -0,0 +1,3 @@ +--extra-index-url https://devpi.archive.org/wb/prod/ +wayback==0.2.1.2 +GlobalWayback==0.3 diff --git a/mapreduce/cdx-record-pipeline/README.md b/mapreduce/cdx-record-pipeline/README.md deleted file mode 100644 index 797b8eb..0000000 --- a/mapreduce/cdx-record-pipeline/README.md +++ /dev/null @@ -1,33 +0,0 @@ -CDX Record Pipeline (GrobId Edition) -===================================== - -Hadoop based pipeline to process PDFs from a specified IA CDX dataset - -## Local mode example ## - -``` -cat -n /home/bnewbold/100k_random_gwb_pdf.cdx | ./cdx-record-pipeline.py - -``` - -## Cluster mode example ## - -``` -input=100k_random_gwb_pdf.cdx -output=100k_random_gwb_pdf.out -lines_per_map=1000 - -hadoop jar /home/webcrawl/hadoop-2/hadoop-mapreduce/hadoop-streaming.jar - -archives "hdfs://ia802400.us.archive.org:6000/lib/cdx-record-pipeline-venv.zip#cdx-record-pipeline-venv" - -D mapred.reduce.tasks=0 - -D mapred.job.name=Cdx-Record-Pipeline - -D mapreduce.job.queuename=extraction - -D mapred.line.input.format.linespermap=${lines_per_map} - -inputformat org.apache.hadoop.mapred.lib.NLineInputFormat - -input ${input} - -output ${output} - -mapper cdx-record-pipeline.py - -file cdx-record-pipeline.py - -``` - diff --git a/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py b/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py deleted file mode 100755 index 9e521bf..0000000 --- a/mapreduce/cdx-record-pipeline/cdx-record-pipeline.py +++ /dev/null @@ -1,67 +0,0 @@ -#!./cdx-record-pipeline-venv/bin/python -''' -GrobId PDF Pipeline Test -Read in CDX lines and query GROBID server for each PDF resource -TODO: Testing / HBase integration -- Bryan will update as needed -''' -import os -import re -import sys -import base64 -import hashlib -import urllib -import urlparse -import re -import string -from wayback.resource import Resource -from wayback.resource import ArcResource -from wayback.resourcestore import ResourceStore -from gwb.loader import CDXLoaderFactory -from StringIO import StringIO -import requests -import sys - -def process_pdf_using_grobid(content_buffer, debug_line): - """Query GrobId server & process response - """ - GROBID_SERVER="http://wbgrp-svc096.us.archive.org:8070" - content = content_buffer.read() - r = requests.post(GROBID_SERVER + "/api/processFulltextDocument", - files={'input': content}) - if r.status_code is not 200: - print("FAIL (Grobid: {}): {}".format(r.content.decode('utf8'), debug_line)) - else: - print("SUCCESS: " + debug_line) - -class Cdx_Record_Pipeline(object): - - def read_cdx_and_parse(self, parser_func, accepted_mimes = []): - """Read in CDX lines and process PDF records fetched over HTTP - """ - rstore = ResourceStore(loaderfactory=CDXLoaderFactory()) - for line in sys.stdin: - line = line.rstrip() - cdx_line = line.split() - #ignoring NLine offset - if len(cdx_line) != 12: - continue - cdx_line = cdx_line[1:] - (src_url, timestamp, mime, record_location, record_offset, record_length) = (cdx_line[2], cdx_line[1], cdx_line[3], cdx_line[-1], cdx_line[-2], cdx_line[-3]) - if '-' == record_length or not record_location.endswith('arc.gz') or mime not in accepted_mimes: - continue - orig_url = cdx_line[2] - debug_line = ' '.join(cdx_line) - try: - record_location = 'http://archive.org/download/' + record_location - record_offset = int(record_offset) - record_length = int(record_length) - resource_data = rstore.load_resource(record_location, record_offset, record_length) - parser_func(resource_data.open_raw_content(), debug_line) - except: - continue - -# main() -#_______________________________________________________________________________ -if __name__ == '__main__': - cdx_record_pipeline = Cdx_Record_Pipeline() - cdx_record_pipeline.read_cdx_and_parse(process_pdf_using_grobid, ['application/pdf', 'application/x-pdf']) diff --git a/mapreduce/cdx-record-pipeline/requirements.txt b/mapreduce/cdx-record-pipeline/requirements.txt deleted file mode 100644 index 17b803f..0000000 --- a/mapreduce/cdx-record-pipeline/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ ---extra-index-url https://devpi.archive.org/wb/prod/ -wayback==0.2.1.2 -GlobalWayback==0.3 -- cgit v1.2.3