summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/fatcat_import.py34
-rw-r--r--python/fatcat_tools/importers/__init__.py3
-rw-r--r--python/fatcat_tools/importers/ingest.py134
-rw-r--r--python/tests/files/example_ingest.json1
-rw-r--r--python/tests/import_ingest.py58
5 files changed, 228 insertions, 2 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index 2239f179..400b1915 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -89,6 +89,19 @@ def run_arabesque_match(args):
elif args.json_file:
JsonLinePusher(ami, args.json_file).run()
+def run_ingest_file(args):
+ ifri = IngestFileResultImporter(args.api,
+ do_updates=args.do_updates,
+ default_link_rel=args.default_link_rel,
+ require_grobid=(not args.no_require_grobid),
+ edit_batch_size=args.batch_size)
+ if args.kafka_mode:
+ KafkaJsonPusher(ifri, args.kafka_hosts, args.kafka_env, "ingest-file-results",
+ "fatcat-ingest-file-result", kafka_namespace="sandcrawler",
+ consume_batch_size=args.batch_size).run()
+ else:
+ JsonLinePusher(ifri, args.json_file).run()
+
def run_grobid_metadata(args):
fmi = GrobidMetadataImporter(args.api,
edit_batch_size=args.batch_size,
@@ -312,6 +325,27 @@ def main():
default="web",
help="default URL rel for matches (eg, 'publisher', 'web')")
+ sub_ingest_file = subparsers.add_parser('ingest-file-result')
+ sub_ingest_file.set_defaults(
+ func=run_ingest_file,
+ auth_var="FATCAT_AUTH_WORKER_SANDCRAWLER",
+ )
+ sub_ingest_file.add_argument('json_file',
+ help="ingest_file JSON file to import from",
+ default=sys.stdin, type=argparse.FileType('r'))
+ sub_ingest_file.add_argument('--kafka-mode',
+ action='store_true',
+ help="consume from kafka topic (not stdin)")
+ sub_ingest_file.add_argument('--do-updates',
+ action='store_true',
+ help="update pre-existing file entities if new match (instead of skipping)")
+ sub_ingest_file.add_argument('--no-require-grobid',
+ action='store_true',
+ help="whether postproc_status column must be '200'")
+ sub_ingest_file.add_argument('--default-link-rel',
+ default="web",
+ help="default URL rel for matches (eg, 'publisher', 'web')")
+
sub_grobid_metadata = subparsers.add_parser('grobid-metadata')
sub_grobid_metadata.set_defaults(
func=run_grobid_metadata,
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 5e2948f4..025a111c 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -26,5 +26,4 @@ from .orcid import OrcidImporter
from .arabesque import ArabesqueMatchImporter, ARABESQUE_MATCH_WHERE_CLAUSE
from .wayback_static import auto_wayback_static
from .cdl_dash_dat import auto_cdl_dash_dat
-#from .kafka_source import KafkaSource
-#from .file_source import FileSource
+from .ingest import IngestFileResultImporter
diff --git a/python/fatcat_tools/importers/ingest.py b/python/fatcat_tools/importers/ingest.py
new file mode 100644
index 00000000..9e75c26f
--- /dev/null
+++ b/python/fatcat_tools/importers/ingest.py
@@ -0,0 +1,134 @@
+
+import sys
+import json
+import base64
+import itertools
+import fatcat_openapi_client
+from .common import EntityImporter, clean, make_rel_url, SANE_MAX_RELEASES, SANE_MAX_URLS, b32_hex
+
+
+class IngestFileResultImporter(EntityImporter):
+
+ def __init__(self, api, require_grobid=True, **kwargs):
+
+ eg_desc = kwargs.get('editgroup_description',
+ "Files crawled from web using sandcrawler ingest tool")
+ eg_extra = kwargs.get('editgroup_extra', dict())
+ eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.IngestFileResultImporter')
+ super().__init__(api,
+ editgroup_description=eg_desc,
+ editgroup_extra=eg_extra,
+ **kwargs)
+ self.default_link_rel = kwargs.get("default_link_rel", "web")
+ assert self.default_link_rel
+ self.default_mimetype = kwargs.get("default_mimetype", None)
+ self.do_updates = kwargs.get("do_updates", False)
+ self.require_grobid = require_grobid
+ if self.require_grobid:
+ print("Requiring GROBID status == 200")
+ else:
+ print("NOT checking GROBID success")
+
+ def want(self, row):
+ if self.require_grobid and not row.get('grobid', {}).get('status_code') == 200:
+ return False
+ if row.get('hit') == True and row.get('file_meta'):
+ return True
+ else:
+ return False
+
+ def parse_record(self, row):
+
+ request = row['request']
+ fatcat = request.get('fatcat')
+ file_meta = row['file_meta']
+
+ # identify release by fatcat ident or extid lookup
+ release_ident = None
+ if fatcat and fatcat.get('release_ident'):
+ release_ident = fatcat.get('release_ident')
+ elif request.get('ext_ids'):
+ # if no fatcat ident, try extids
+ for extid_type in ('doi', 'pmid', 'pmcid', 'arxiv'):
+ extid = request['ext_ids'].get(extid_type)
+ if not extid:
+ continue
+ try:
+ release = self.api.lookup_release(**{extid_type: extid})
+ except fatcat_openapi_client.rest.ApiException as err:
+ if err.status == 404:
+ continue
+ elif err.status == 400:
+ self.counts['warn-extid-invalid'] += 1
+ continue
+ release_ident = release.ident
+ break
+
+ if not release:
+ self.counts['skip-release-not-found'] += 1
+
+ cdx = row.get('cdx')
+ if not cdx:
+ return None
+
+ url = make_rel_url(cdx['url'], self.default_link_rel)
+
+ if not url:
+ self.counts['skip-url'] += 1
+ return None
+ wayback = "https://web.archive.org/web/{}/{}".format(
+ cdx['datetime'],
+ cdx['url'])
+ urls = [url, ("webarchive", wayback)]
+
+ urls = [fatcat_openapi_client.FileUrl(rel=rel, url=url) for (rel, url) in urls]
+
+ fe = fatcat_openapi_client.FileEntity(
+ md5=file_meta['md5hex'],
+ sha1=file_meta['sha1hex'],
+ sha256=file_meta['sha256hex'],
+ size=file_meta['size_bytes'],
+ mimetype=file_meta['mimetype'] or self.default_mimetype,
+ release_ids=[release_ident],
+ urls=urls,
+ )
+ if fatcat and fatcat.get('edit_extra'):
+ fe.edit_extra = fatcat['edit_extra']
+ if request.get('project'):
+ if not fe.edit_extra:
+ fe.edit_extra = dict()
+ fe.edit_extra['project'] = request['project']
+ return fe
+
+ def try_update(self, fe):
+ # lookup sha1, or create new entity
+ existing = None
+ try:
+ existing = self.api.lookup_file(sha1=fe.sha1)
+ except fatcat_openapi_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+
+ if not existing:
+ return True
+
+ if (fe.release_ids[0] in existing.release_ids) and existing.urls:
+ # TODO: could still, in theory update with the new URL?
+ self.counts['exists'] += 1
+ return False
+
+ if not self.do_updates:
+ self.counts['skip-update-disabled'] += 1
+ return False
+
+ # TODO: for now, never update
+ self.counts['skip-update-disabled'] += 1
+ return False
+
+ def insert_batch(self, batch):
+ self.api.create_file_auto_batch(fatcat_openapi_client.FileAutoBatch(
+ editgroup=fatcat_openapi_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra),
+ entity_list=batch))
+
diff --git a/python/tests/files/example_ingest.json b/python/tests/files/example_ingest.json
new file mode 100644
index 00000000..005d8742
--- /dev/null
+++ b/python/tests/files/example_ingest.json
@@ -0,0 +1 @@
+{"file_meta": {"sha1hex": "00242a192acc258bdfdb151943419437f440c313", "md5hex": "f4de91152c7ab9fdc2a128f962faebff", "sha256hex": "ffc1005680cb620eec4c913437dfabbf311b535cfe16cbaeb2faec1f92afc362", "size_bytes": 255629, "mimetype": "application/pdf"}, "request": {"project": "unit-tests", "ext_ids": {"doi": "10.123/abc"}}, "cdx": { "datetime": "20170227164644", "url": "http://journals.plos.org/plosmedicine/article/file?id=10.1371/journal.pmed.0020124&type=printable" }, "grobid": {"status_code": 200 } }
diff --git a/python/tests/import_ingest.py b/python/tests/import_ingest.py
new file mode 100644
index 00000000..7c0a85cd
--- /dev/null
+++ b/python/tests/import_ingest.py
@@ -0,0 +1,58 @@
+
+import json
+import pytest
+from fatcat_tools.importers import IngestFileResultImporter, JsonLinePusher
+from fixtures import api
+
+
+@pytest.fixture(scope="function")
+def ingest_importer(api):
+ yield IngestFileResultImporter(api)
+
+# TODO: use API to check that entities actually created...
+def test_ingest_importer_basic(ingest_importer):
+ with open('tests/files/example_ingest.json', 'r') as f:
+ JsonLinePusher(ingest_importer, f).run()
+
+@pytest.mark.skip("tests not flushed out yet")
+def test_ingest_importer(ingest_importer):
+ last_index = ingest_importer.api.get_changelog(limit=1)[0].index
+ with open('tests/files/example_ingest.json', 'r') as f:
+ ingest_importer.bezerk_mode = True
+ counts = JsonLinePusher(ingest_importer, f).run()
+ assert counts['insert'] == 2
+ assert counts['exists'] == 0
+ assert counts['skip'] == 11
+
+ # fetch most recent editgroup
+ change = ingest_importer.api.get_changelog_entry(index=last_index+1)
+ eg = change.editgroup
+ assert eg.description
+ assert "crawled from web" in eg.description.lower()
+ assert eg.extra['git_rev']
+ assert "fatcat_tools.IngestFileResultImporter" in eg.extra['agent']
+
+ # re-insert; should skip
+ with open('tests/files/example_ingest.json', 'r') as f:
+ ingest_importer.reset()
+ ingest_importer.bezerk_mode = False
+ counts = JsonLinePusher(ingest_importer, f).run()
+ assert counts['insert'] == 0
+ assert counts['exists'] == 2
+ assert counts['skip'] == 11
+
+def test_ingest_dict_parse(ingest_importer):
+ with open('tests/files/example_ingest.json', 'r') as f:
+ raw = json.loads(f.readline())
+ f = ingest_importer.parse_record(raw)
+ assert f.sha1 == "00242a192acc258bdfdb151943419437f440c313"
+ assert f.md5 == "f4de91152c7ab9fdc2a128f962faebff"
+ assert f.mimetype == "application/pdf"
+ assert f.size == 255629
+ assert len(f.urls) == 2
+ for u in f.urls:
+ if u.rel == "web":
+ assert u.url.startswith("http://journals.plos.org")
+ if u.rel == "webarchive":
+ assert u.url.startswith("https://web.archive.org/")
+ assert len(f.release_ids) == 1