From e9e8d455f2dbb98960c32d7a651f46f904ee3a0d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 23 Mar 2020 20:53:22 -0700 Subject: commit CORD19 munging scripts --- scripts/deliver_file2disk.py | 231 +++++++++++++++++++++++++++++++++++++++++++ scripts/parse_cord19_csv.py | 15 +++ scripts/who_enrich.py | 110 +++++++++++++++++++++ 3 files changed, 356 insertions(+) create mode 100755 scripts/deliver_file2disk.py create mode 100755 scripts/parse_cord19_csv.py create mode 100755 scripts/who_enrich.py (limited to 'scripts') diff --git a/scripts/deliver_file2disk.py b/scripts/deliver_file2disk.py new file mode 100755 index 0000000..d661acc --- /dev/null +++ b/scripts/deliver_file2disk.py @@ -0,0 +1,231 @@ +#!/usr/bin/env python3 +""" +Tool for downloading fatcat release PDFs to disk (assuming there is at least +one accessible PDF file entity for each release). + +Behavior: +- if no file, or not accessible, skip release +- filter files, then iterate through: + - if already exists locally on disk, skip + - try downloading from any archive.org or web.archive.org URLs + - verify SHA-1 + - write out to disk + +TODO: +x blob_path(sha1hex) -> returns relative/local path file would be saved to +x filter_files(files) -> list of files to try +- fetch_release(release) -> tries to download PDF bytes +- fetch_file(file) -> returns bytes of fetched file +- fetch_content(url) -> tries to download PDF bytes + +LATER: +- GRBOID XML as well, from minio? +""" + +# XXX: some broken MRO thing going on in here due to python3 object wrangling +# in `wayback` library. Means we can't run pylint. +# pylint: skip-file + +import os +import sys +import json +import magic +import hashlib +import argparse +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error +from collections import Counter + + +def gen_file_metadata(blob): + """ + Takes a file blob (bytestream) and returns hashes and other metadata. + + Returns a dict: size_bytes, md5hex, sha1hex, sha256hex, mimetype + """ + assert blob + mimetype = magic.Magic(mime=True).from_buffer(blob) + hashes = [ + hashlib.sha1(), + hashlib.sha256(), + hashlib.md5(), + ] + for h in hashes: + h.update(blob) + return dict( + size_bytes=len(blob), + sha1hex=hashes[0].hexdigest(), + sha256hex=hashes[1].hexdigest(), + md5hex=hashes[2].hexdigest(), + mimetype=mimetype, + ) + +def requests_retry_session(retries=10, backoff_factor=3, + status_forcelist=(500, 502, 504), session=None): + """ + From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests + """ + session = session or requests.Session() + retry = Retry( + total=retries, + read=retries, + connect=retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + + +class DeliverFatcatDisk: + + def __init__(self, disk_dir, **kwargs): + self.count = Counter() + self.disk_dir = disk_dir + self.disk_prefix = kwargs.get('disk_prefix', 'pdf/') + self.disk_suffix = kwargs.get('disk_suffix', '.pdf') + self.session = requests_retry_session() + + + def run(self, release_json_file): + sys.stderr.write("Ensuring all 256 base directories exist...\n") + for i in range(256): + fpath = "{}/{}{:02x}".format( + self.disk_dir, + self.disk_prefix, + i) + os.makedirs(fpath, exist_ok=True) + sys.stderr.write("Starting...\n") + for line in release_json_file: + self.count['total'] += 1 + if not line.startswith('{'): + self.count['skip-no-release'] += 1 + continue + #print(line) + release = json.loads(line) + assert 'ident' in release + self.fetch_release(release) + sys.stderr.write("{}\n".format(self.count)) + + def blob_path(self, sha1hex): + fpath = "{}/{}{}/{}{}".format( + self.disk_dir, + self.disk_prefix, + sha1hex[0:2], + sha1hex, + self.disk_suffix) + return fpath + + def does_file_already_exist(self, sha1hex): + return os.path.isfile(self.blob_path(sha1hex)) + + def filter_files(self, files): + """ + Takes a list of file entities and only returns the ones which are PDFs + we can download. + """ + good = [] + for f in files: + if f['mimetype'] and not 'pdf' in f['mimetype'].lower(): + continue + for url in f['urls']: + if 'archive.org/' in url['url']: + good.append(f) + break + return good + + def fetch_content(self, url): + """ + Returns tuple: (str:status, content) + Content contains bytes only if status is "success", otherwise None + """ + if '://web.archive.org/' in url: + # add id_ to URL to avoid wayback re-writing + l = url.split('/') + if l[2] == 'web.archive.org' and l[3] == 'web' and not '_' in l[4]: + l[4] = l[4] + 'id_' + url = '/'.join(l) + + try: + resp = self.session.get(url) + except requests.exceptions.RetryError: + return ('wayback-error', None) + if resp.status_code != 200: + return ('fetch:{}'.format(resp.status_code), None) + else: + return ('success', resp.content) + + def fetch_file(self, f): + """ + Returns tuple: (status, sha1hex, file_meta) + + file_meta is a dict on success, or None otherwise + """ + sha1hex = f['sha1'] + if self.does_file_already_exist(sha1hex): + return ('exists', sha1hex, None) + status = None + for url in f['urls']: + url = url['url'] + if not 'archive.org' in url: + continue + status, content = self.fetch_content(url) + if status == 'success': + # TODO: verify sha1hex + file_meta = gen_file_metadata(content) + if file_meta['sha1hex'] != sha1hex: + status = 'sha1-mismatch' + continue + with open(self.blob_path(sha1hex), 'wb') as outf: + outf.write(content) + return ('success', sha1hex, file_meta) + if status: + return (status, sha1hex, None) + else: + return ('no-urls', sha1hex, None) + + def fetch_release(self, release): + good_files = self.filter_files(release['files']) + status = 'no-file' + sha1hex = None + for f in good_files: + status, sha1hex, file_meta = self.fetch_file(f) + if status in ('success', 'exists'): + break + else: + continue + if sha1hex: + print("{}\t{}".format(status, sha1hex)) + else: + print(status) + self.count[status] += 1 + +def main(): + + parser = argparse.ArgumentParser() + parser.add_argument('--disk-dir', + required=True, + type=str, + help='local base directory to save into') + parser.add_argument('--disk-prefix', + type=str, + default="pdf/", + help='directory prefix for items created in bucket') + parser.add_argument('--disk-suffix', + type=str, + default=".pdf", + help='file suffix for created files') + parser.add_argument('release_json_file', + help="JSON manifest of fatcat release entities", + default=sys.stdin, + type=argparse.FileType('r')) + args = parser.parse_args() + + worker = DeliverFatcatDisk(**args.__dict__) + worker.run(args.release_json_file) + +if __name__ == '__main__': # pragma: no cover + main() diff --git a/scripts/parse_cord19_csv.py b/scripts/parse_cord19_csv.py new file mode 100755 index 0000000..536e5d3 --- /dev/null +++ b/scripts/parse_cord19_csv.py @@ -0,0 +1,15 @@ +#!/usr/bin/env python3 + +import sys +import csv +import json + +CSVFILE = sys.argv[1] + +with open(CSVFILE, newline='') as csvfile: + reader = csv.DictReader(csvfile) + for row in reader: + row = dict(row) + row['mag_id'] = row.pop('Microsoft Academic Paper ID') + row['who_covidence_id'] = row.pop('WHO #Covidence').replace('#', '') + print(json.dumps(row, sort_keys=True)) diff --git a/scripts/who_enrich.py b/scripts/who_enrich.py new file mode 100755 index 0000000..b445927 --- /dev/null +++ b/scripts/who_enrich.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python3 + +""" +This script takes a "Paper" MAG TSV file which has been joined with (at most) a +single "PaperExtendedAttributes", parses it into JSON, and does fatcat fetches +to "enrich" the output. Outputs a single JSON object per line with attributes: + + mag_id + mag_paper + release_id + fatcat_release + +Input columns: + +""" + +import sys +import json +import argparse +import datetime + +import requests +from requests.adapters import HTTPAdapter +from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error + + +def requests_retry_session(retries=10, backoff_factor=3, + status_forcelist=(500, 502, 504), session=None): + """ + From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests + """ + session = session or requests.Session() + retry = Retry( + total=retries, + read=retries, + connect=retries, + backoff_factor=backoff_factor, + status_forcelist=status_forcelist, + ) + adapter = HTTPAdapter(max_retries=retry) + session.mount('http://', adapter) + session.mount('https://', adapter) + return session + + +def do_line(row, args): + + pubmed_id = row.get('pubmed_id') or None + pmcid = row.get('pmcid') or None + doi = row.get('doi') or None + fatcat_release = None + + if not fatcat_release and pmcid: + resp = args.session.get('https://api.fatcat.wiki/v0/release/lookup', + params={ + 'pmcid': pmcid, + 'expand': 'container,files,filesets,webcaptures', + 'hide': 'abstracts,references', + }) + if resp.status_code == 200: + fatcat_release = resp.json() + if not fatcat_release and doi: + resp = args.session.get('https://api.fatcat.wiki/v0/release/lookup', + params={ + 'doi': doi, + 'expand': 'container,files,filesets,webcaptures', + 'hide': 'abstracts,references', + }) + if resp.status_code == 200: + fatcat_release = resp.json() + if not fatcat_release and pubmed_id: + resp = args.session.get('https://api.fatcat.wiki/v0/release/lookup', + params={ + 'pmid': pubmed_id, + 'expand': 'container,files,filesets,webcaptures', + 'hide': 'abstracts,references', + }) + if resp.status_code == 200: + fatcat_release = resp.json() + + obj = dict( + who_paper=row, + ) + if fatcat_release: + obj['fatcat_release'] = fatcat_release + obj['release_id'] = fatcat_release['ident'] + obj['fatcat_url'] = "https://fatcat.wiki/release/{}".format(obj['release_id']) + print(json.dumps(obj, sort_keys=True)) + +def run(args): + for l in sys.stdin: + l = json.loads(l) + do_line(l, args) + +def main(): + parser = argparse.ArgumentParser( + formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('json_file', + help="WHO/S2 parsed JSON file", + type=argparse.FileType('r')) + subparsers = parser.add_subparsers() + + args = parser.parse_args() + args.session = requests_retry_session() + + run(args) + +if __name__ == '__main__': + main() + -- cgit v1.2.3