From a4db9ee2e18a18b23eb7ece484f95914421f877d Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Wed, 13 Nov 2019 00:27:48 -0800 Subject: ingest file result importer --- python/fatcat_import.py | 34 ++++++++ python/fatcat_tools/importers/__init__.py | 3 +- python/fatcat_tools/importers/ingest.py | 134 ++++++++++++++++++++++++++++++ python/tests/files/example_ingest.json | 1 + python/tests/import_ingest.py | 58 +++++++++++++ 5 files changed, 228 insertions(+), 2 deletions(-) create mode 100644 python/fatcat_tools/importers/ingest.py create mode 100644 python/tests/files/example_ingest.json create mode 100644 python/tests/import_ingest.py diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 2239f179..400b1915 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -89,6 +89,19 @@ def run_arabesque_match(args): elif args.json_file: JsonLinePusher(ami, args.json_file).run() +def run_ingest_file(args): + ifri = IngestFileResultImporter(args.api, + do_updates=args.do_updates, + default_link_rel=args.default_link_rel, + require_grobid=(not args.no_require_grobid), + edit_batch_size=args.batch_size) + if args.kafka_mode: + KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", + "fatcat-ingest-file-result", kafka_namespace="sandcrawler", + consume_batch_size=args.batch_size).run() + else: + JsonLinePusher(ifri, args.json_file).run() + def run_grobid_metadata(args): fmi = GrobidMetadataImporter(args.api, edit_batch_size=args.batch_size, @@ -312,6 +325,27 @@ def main(): default="web", help="default URL rel for matches (eg, 'publisher', 'web')") + sub_ingest_file = subparsers.add_parser('ingest-file-result') + sub_ingest_file.set_defaults( + func=run_ingest_file, + auth_var="FATCAT_AUTH_WORKER_SANDCRAWLER", + ) + sub_ingest_file.add_argument('json_file', + help="ingest_file JSON file to import from", + default=sys.stdin, type=argparse.FileType('r')) + sub_ingest_file.add_argument('--kafka-mode', + action='store_true', + help="consume from kafka topic (not stdin)") + sub_ingest_file.add_argument('--do-updates', + action='store_true', + help="update pre-existing file entities if new match (instead of skipping)") + sub_ingest_file.add_argument('--no-require-grobid', + action='store_true', + help="whether postproc_status column must be '200'") + sub_ingest_file.add_argument('--default-link-rel', + default="web", + help="default URL rel for matches (eg, 'publisher', 'web')") + sub_grobid_metadata = subparsers.add_parser('grobid-metadata') sub_grobid_metadata.set_defaults( func=run_grobid_metadata, diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 5e2948f4..025a111c 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -26,5 +26,4 @@ from .orcid import OrcidImporter from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE from .wayback_static import auto_wayback_static from .cdl_dash_dat import auto_cdl_dash_dat -#from .kafka_source import KafkaSource -#from .file_source import FileSource +from .ingest import IngestFileResultImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py new file mode 100644 index 00000000..9e75c26f --- /dev/null +++ b/python/fatcat_tools/importers/ingest.py @@ -0,0 +1,134 @@ + +import sys +import json +import base64 +import itertools +import fatcat_openapi_client +from .common import EntityImporter, clean, make_rel_url, SANE_MAX_RELEASES, SANE_MAX_URLS, b32_hex + + +class IngestFileResultImporter(EntityImporter): + + def __init__(self, api, require_grobid=True, **kwargs): + + eg_desc = kwargs.get('editgroup_description', + "Files crawled from web using sandcrawler ingest tool") + eg_extra = kwargs.get('editgroup_extra', dict()) + eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileResultImporter') + super().__init__(api, + editgroup_description=eg_desc, + editgroup_extra=eg_extra, + **kwargs) + self.default_link_rel = kwargs.get("default_link_rel", "web") + assert self.default_link_rel + self.default_mimetype = kwargs.get("default_mimetype", None) + self.do_updates = kwargs.get("do_updates", False) + self.require_grobid = require_grobid + if self.require_grobid: + print("Requiring GROBID status == 200") + else: + print("NOT checking GROBID success") + + def want(self, row): + if self.require_grobid and not row.get('grobid', {}).get('status_code') == 200: + return False + if row.get('hit') == True and row.get('file_meta'): + return True + else: + return False + + def parse_record(self, row): + + request = row['request'] + fatcat = request.get('fatcat') + file_meta = row['file_meta'] + + # identify release by fatcat ident or extid lookup + release_ident = None + if fatcat and fatcat.get('release_ident'): + release_ident = fatcat.get('release_ident') + elif request.get('ext_ids'): + # if no fatcat ident, try extids + for extid_type in ('doi', 'pmid', 'pmcid', 'arxiv'): + extid = request['ext_ids'].get(extid_type) + if not extid: + continue + try: + release = self.api.lookup_release(**{extid_type: extid}) + except fatcat_openapi_client.rest.ApiException as err: + if err.status == 404: + continue + elif err.status == 400: + self.counts['warn-extid-invalid'] += 1 + continue + release_ident = release.ident + break + + if not release: + self.counts['skip-release-not-found'] += 1 + + cdx = row.get('cdx') + if not cdx: + return None + + url = make_rel_url(cdx['url'], self.default_link_rel) + + if not url: + self.counts['skip-url'] += 1 + return None + wayback = "https://web.archive.org/web/{}/{}".format( + cdx['datetime'], + cdx['url']) + urls = [url, ("webarchive", wayback)] + + urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls] + + fe = fatcat_openapi_client.FileEntity( + md5=file_meta['md5hex'], + sha1=file_meta['sha1hex'], + sha256=file_meta['sha256hex'], + size=file_meta['size_bytes'], + mimetype=file_meta['mimetype'] or self.default_mimetype, + release_ids=[release_ident], + urls=urls, + ) + if fatcat and fatcat.get('edit_extra'): + fe.edit_extra = fatcat['edit_extra'] + if request.get('project'): + if not fe.edit_extra: + fe.edit_extra = dict() + fe.edit_extra['project'] = request['project'] + return fe + + def try_update(self, fe): + # lookup sha1, or create new entity + existing = None + try: + existing = self.api.lookup_file(sha1=fe.sha1) + except fatcat_openapi_client.rest.ApiException as err: + if err.status != 404: + raise err + + if not existing: + return True + + if (fe.release_ids[0] in existing.release_ids) and existing.urls: + # TODO: could still, in theory update with the new URL? + self.counts['exists'] += 1 + return False + + if not self.do_updates: + self.counts['skip-update-disabled'] += 1 + return False + + # TODO: for now, never update + self.counts['skip-update-disabled'] += 1 + return False + + def insert_batch(self, batch): + self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( + editgroup=fatcat_openapi_client.Editgroup( + description=self.editgroup_description, + extra=self.editgroup_extra), + entity_list=batch)) + diff --git a/python/tests/files/example_ingest.json b/python/tests/files/example_ingest.json new file mode 100644 index 00000000..005d8742 --- /dev/null +++ b/python/tests/files/example_ingest.json @@ -0,0 +1 @@ +{"file_meta": {"sha1hex": "00242a192acc258bdfdb151943419437f440c313", "md5hex": "f4de91152c7ab9fdc2a128f962faebff", "sha256hex": "ffc1005680cb620eec4c913437dfabbf311b535cfe16cbaeb2faec1f92afc362", "size_bytes": 255629, "mimetype": "application/pdf"}, "request": {"project": "unit-tests", "ext_ids": {"doi": "10.123/abc"}}, "cdx": { "datetime": "20170227164644", "url": "http://journals.plos.org/plosmedicine/article/file?id=10.1371/journal.pmed.0020124&type=printable" }, "grobid": {"status_code": 200 } } diff --git a/python/tests/import_ingest.py b/python/tests/import_ingest.py new file mode 100644 index 00000000..7c0a85cd --- /dev/null +++ b/python/tests/import_ingest.py @@ -0,0 +1,58 @@ + +import json +import pytest +from fatcat_tools.importers import IngestFileResultImporter, JsonLinePusher +from fixtures import api + + +@pytest.fixture(scope="function") +def ingest_importer(api): + yield IngestFileResultImporter(api) + +# TODO: use API to check that entities actually created... +def test_ingest_importer_basic(ingest_importer): + with open('tests/files/example_ingest.json', 'r') as f: + JsonLinePusher(ingest_importer, f).run() + +@pytest.mark.skip("tests not flushed out yet") +def test_ingest_importer(ingest_importer): + last_index = ingest_importer.api.get_changelog(limit=1)[0].index + with open('tests/files/example_ingest.json', 'r') as f: + ingest_importer.bezerk_mode = True + counts = JsonLinePusher(ingest_importer, f).run() + assert counts['insert'] == 2 + assert counts['exists'] == 0 + assert counts['skip'] == 11 + + # fetch most recent editgroup + change = ingest_importer.api.get_changelog_entry(index=last_index+1) + eg = change.editgroup + assert eg.description + assert "crawled from web" in eg.description.lower() + assert eg.extra['git_rev'] + assert "fatcat_tools.IngestFileResultImporter" in eg.extra['agent'] + + # re-insert; should skip + with open('tests/files/example_ingest.json', 'r') as f: + ingest_importer.reset() + ingest_importer.bezerk_mode = False + counts = JsonLinePusher(ingest_importer, f).run() + assert counts['insert'] == 0 + assert counts['exists'] == 2 + assert counts['skip'] == 11 + +def test_ingest_dict_parse(ingest_importer): + with open('tests/files/example_ingest.json', 'r') as f: + raw = json.loads(f.readline()) + f = ingest_importer.parse_record(raw) + assert f.sha1 == "00242a192acc258bdfdb151943419437f440c313" + assert f.md5 == "f4de91152c7ab9fdc2a128f962faebff" + assert f.mimetype == "application/pdf" + assert f.size == 255629 + assert len(f.urls) == 2 + for u in f.urls: + if u.rel == "web": + assert u.url.startswith("http://journals.plos.org") + if u.rel == "webarchive": + assert u.url.startswith("https://web.archive.org/") + assert len(f.release_ids) == 1 -- cgit v1.2.3