diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2019-11-13 00:27:48 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2019-11-15 16:46:26 -0800 | 
| commit | a4db9ee2e18a18b23eb7ece484f95914421f877d (patch) | |
| tree | 6d4856c8fb7854a75dbd43c983179d4492769039 | |
| parent | 169477c39dc772c0eb1d45f8097215e73f0f6044 (diff) | |
| download | fatcat-a4db9ee2e18a18b23eb7ece484f95914421f877d.tar.gz fatcat-a4db9ee2e18a18b23eb7ece484f95914421f877d.zip | |
ingest file result importer
| -rwxr-xr-x | python/fatcat_import.py | 34 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 3 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/ingest.py | 134 | ||||
| -rw-r--r-- | python/tests/files/example_ingest.json | 1 | ||||
| -rw-r--r-- | python/tests/import_ingest.py | 58 | 
5 files changed, 228 insertions, 2 deletions
| diff --git a/python/fatcat_import.py b/python/fatcat_import.py index 2239f179..400b1915 100755 --- a/python/fatcat_import.py +++ b/python/fatcat_import.py @@ -89,6 +89,19 @@ def run_arabesque_match(args):      elif args.json_file:          JsonLinePusher(ami, args.json_file).run() +def run_ingest_file(args): +    ifri = IngestFileResultImporter(args.api, +        do_updates=args.do_updates, +        default_link_rel=args.default_link_rel, +        require_grobid=(not args.no_require_grobid), +        edit_batch_size=args.batch_size) +    if args.kafka_mode: +        KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results", +            "fatcat-ingest-file-result", kafka_namespace="sandcrawler", +            consume_batch_size=args.batch_size).run() +    else: +        JsonLinePusher(ifri, args.json_file).run() +  def run_grobid_metadata(args):      fmi = GrobidMetadataImporter(args.api,          edit_batch_size=args.batch_size, @@ -312,6 +325,27 @@ def main():          default="web",          help="default URL rel for matches (eg, 'publisher', 'web')") +    sub_ingest_file = subparsers.add_parser('ingest-file-result') +    sub_ingest_file.set_defaults( +        func=run_ingest_file, +        auth_var="FATCAT_AUTH_WORKER_SANDCRAWLER", +    ) +    sub_ingest_file.add_argument('json_file', +        help="ingest_file JSON file to import from", +        default=sys.stdin, type=argparse.FileType('r')) +    sub_ingest_file.add_argument('--kafka-mode', +        action='store_true', +        help="consume from kafka topic (not stdin)") +    sub_ingest_file.add_argument('--do-updates', +        action='store_true', +        help="update pre-existing file entities if new match (instead of skipping)") +    sub_ingest_file.add_argument('--no-require-grobid', +        action='store_true', +        help="whether postproc_status column must be '200'") +    sub_ingest_file.add_argument('--default-link-rel', +        default="web", +        help="default URL rel for matches (eg, 'publisher', 'web')") +      sub_grobid_metadata = subparsers.add_parser('grobid-metadata')      sub_grobid_metadata.set_defaults(          func=run_grobid_metadata, diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 5e2948f4..025a111c 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -26,5 +26,4 @@ from .orcid import OrcidImporter  from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE  from .wayback_static import auto_wayback_static  from .cdl_dash_dat import auto_cdl_dash_dat -#from .kafka_source import KafkaSource -#from .file_source import FileSource +from .ingest import IngestFileResultImporter diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py new file mode 100644 index 00000000..9e75c26f --- /dev/null +++ b/python/fatcat_tools/importers/ingest.py @@ -0,0 +1,134 @@ + +import sys +import json +import base64 +import itertools +import fatcat_openapi_client +from .common import EntityImporter, clean, make_rel_url, SANE_MAX_RELEASES, SANE_MAX_URLS, b32_hex + + +class IngestFileResultImporter(EntityImporter): + +    def __init__(self, api, require_grobid=True, **kwargs): + +        eg_desc = kwargs.get('editgroup_description', +            "Files crawled from web using sandcrawler ingest tool") +        eg_extra = kwargs.get('editgroup_extra', dict()) +        eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileResultImporter') +        super().__init__(api, +            editgroup_description=eg_desc, +            editgroup_extra=eg_extra, +            **kwargs) +        self.default_link_rel = kwargs.get("default_link_rel", "web") +        assert self.default_link_rel +        self.default_mimetype = kwargs.get("default_mimetype", None) +        self.do_updates = kwargs.get("do_updates", False) +        self.require_grobid = require_grobid +        if self.require_grobid: +            print("Requiring GROBID status == 200") +        else: +            print("NOT checking GROBID success") + +    def want(self, row): +        if self.require_grobid and not row.get('grobid', {}).get('status_code') == 200: +            return False +        if row.get('hit') == True and row.get('file_meta'): +            return True +        else: +            return False + +    def parse_record(self, row): + +        request = row['request'] +        fatcat = request.get('fatcat') +        file_meta = row['file_meta'] + +        # identify release by fatcat ident or extid lookup +        release_ident = None +        if fatcat and fatcat.get('release_ident'): +            release_ident = fatcat.get('release_ident') +        elif request.get('ext_ids'): +            # if no fatcat ident, try extids +            for extid_type in ('doi', 'pmid', 'pmcid', 'arxiv'): +                extid = request['ext_ids'].get(extid_type) +                if not extid: +                    continue +                try: +                    release = self.api.lookup_release(**{extid_type: extid}) +                except fatcat_openapi_client.rest.ApiException as err: +                    if err.status == 404: +                        continue +                    elif err.status == 400: +                        self.counts['warn-extid-invalid'] += 1 +                        continue +                release_ident = release.ident +                break + +        if not release: +            self.counts['skip-release-not-found'] += 1 + +        cdx = row.get('cdx') +        if not cdx: +            return None + +        url = make_rel_url(cdx['url'], self.default_link_rel) + +        if not url: +            self.counts['skip-url'] += 1 +            return None +        wayback = "https://web.archive.org/web/{}/{}".format( +            cdx['datetime'], +            cdx['url']) +        urls = [url, ("webarchive", wayback)] + +        urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls] + +        fe = fatcat_openapi_client.FileEntity( +            md5=file_meta['md5hex'], +            sha1=file_meta['sha1hex'], +            sha256=file_meta['sha256hex'], +            size=file_meta['size_bytes'], +            mimetype=file_meta['mimetype'] or self.default_mimetype, +            release_ids=[release_ident], +            urls=urls, +        ) +        if fatcat and fatcat.get('edit_extra'): +            fe.edit_extra = fatcat['edit_extra'] +        if request.get('project'): +            if not fe.edit_extra: +                fe.edit_extra = dict() +            fe.edit_extra['project'] = request['project'] +        return fe + +    def try_update(self, fe): +        # lookup sha1, or create new entity +        existing = None +        try: +            existing = self.api.lookup_file(sha1=fe.sha1) +        except fatcat_openapi_client.rest.ApiException as err: +            if err.status != 404: +                raise err + +        if not existing: +            return True + +        if (fe.release_ids[0] in existing.release_ids) and existing.urls: +            # TODO: could still, in theory update with the new URL? +            self.counts['exists'] += 1 +            return False + +        if not self.do_updates: +            self.counts['skip-update-disabled'] += 1 +            return False + +        # TODO: for now, never update +        self.counts['skip-update-disabled'] += 1 +        return False + +    def insert_batch(self, batch): +        self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch( +            editgroup=fatcat_openapi_client.Editgroup( +                description=self.editgroup_description, +                extra=self.editgroup_extra), +            entity_list=batch)) + diff --git a/python/tests/files/example_ingest.json b/python/tests/files/example_ingest.json new file mode 100644 index 00000000..005d8742 --- /dev/null +++ b/python/tests/files/example_ingest.json @@ -0,0 +1 @@ +{"file_meta": {"sha1hex": "00242a192acc258bdfdb151943419437f440c313", "md5hex": "f4de91152c7ab9fdc2a128f962faebff", "sha256hex": "ffc1005680cb620eec4c913437dfabbf311b535cfe16cbaeb2faec1f92afc362", "size_bytes": 255629, "mimetype": "application/pdf"}, "request": {"project": "unit-tests", "ext_ids": {"doi": "10.123/abc"}}, "cdx": { "datetime": "20170227164644", "url": "http://journals.plos.org/plosmedicine/article/file?id=10.1371/journal.pmed.0020124&type=printable" }, "grobid": {"status_code": 200 } } diff --git a/python/tests/import_ingest.py b/python/tests/import_ingest.py new file mode 100644 index 00000000..7c0a85cd --- /dev/null +++ b/python/tests/import_ingest.py @@ -0,0 +1,58 @@ + +import json +import pytest +from fatcat_tools.importers import IngestFileResultImporter, JsonLinePusher +from fixtures import api + + +@pytest.fixture(scope="function") +def ingest_importer(api): +    yield IngestFileResultImporter(api) + +# TODO: use API to check that entities actually created... +def test_ingest_importer_basic(ingest_importer): +    with open('tests/files/example_ingest.json', 'r') as f: +        JsonLinePusher(ingest_importer, f).run() + +@pytest.mark.skip("tests not flushed out yet") +def test_ingest_importer(ingest_importer): +    last_index = ingest_importer.api.get_changelog(limit=1)[0].index +    with open('tests/files/example_ingest.json', 'r') as f: +        ingest_importer.bezerk_mode = True +        counts = JsonLinePusher(ingest_importer, f).run() +    assert counts['insert'] == 2 +    assert counts['exists'] == 0 +    assert counts['skip'] == 11 + +    # fetch most recent editgroup +    change = ingest_importer.api.get_changelog_entry(index=last_index+1) +    eg = change.editgroup +    assert eg.description +    assert "crawled from web" in eg.description.lower() +    assert eg.extra['git_rev'] +    assert "fatcat_tools.IngestFileResultImporter" in eg.extra['agent'] + +    # re-insert; should skip +    with open('tests/files/example_ingest.json', 'r') as f: +        ingest_importer.reset() +        ingest_importer.bezerk_mode = False +        counts = JsonLinePusher(ingest_importer, f).run() +    assert counts['insert'] == 0 +    assert counts['exists'] == 2 +    assert counts['skip'] == 11 + +def test_ingest_dict_parse(ingest_importer): +    with open('tests/files/example_ingest.json', 'r') as f: +        raw = json.loads(f.readline()) +        f = ingest_importer.parse_record(raw) +        assert f.sha1 == "00242a192acc258bdfdb151943419437f440c313" +        assert f.md5 == "f4de91152c7ab9fdc2a128f962faebff" +        assert f.mimetype == "application/pdf" +        assert f.size == 255629 +        assert len(f.urls) == 2 +        for u in f.urls: +            if u.rel == "web": +                assert u.url.startswith("http://journals.plos.org") +            if u.rel == "webarchive": +                assert u.url.startswith("https://web.archive.org/") +        assert len(f.release_ids) == 1 | 
