aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-01-22 22:04:39 -0800
committerBryan Newbold <bnewbold@robocracy.org>2019-01-22 22:04:39 -0800
commite0f70bbbcbcb6232cfb508ad5c0ae637391c4871 (patch)
tree7d7c83a04a3776754476b123d70e23dfa6cf297d
parent09475b87821142c5cd36c6b90fb97deb2a058312 (diff)
downloadfatcat-e0f70bbbcbcb6232cfb508ad5c0ae637391c4871.tar.gz
fatcat-e0f70bbbcbcb6232cfb508ad5c0ae637391c4871.zip
refactor remaining importers
-rw-r--r--python/fatcat_tools/importers/__init__.py2
-rw-r--r--python/fatcat_tools/importers/common.py264
-rw-r--r--python/fatcat_tools/importers/crossref.py11
-rw-r--r--python/fatcat_tools/importers/grobid_metadata.py104
-rw-r--r--python/fatcat_tools/importers/journal_metadata.py49
-rw-r--r--python/fatcat_tools/importers/matched.py150
-rw-r--r--python/fatcat_tools/importers/orcid.py45
-rw-r--r--python/tests/import_crossref.py7
-rw-r--r--python/tests/import_grobid_metadata.py10
-rw-r--r--python/tests/import_journal_metadata.py8
-rw-r--r--python/tests/import_matched.py9
-rw-r--r--python/tests/import_orcid.py19
-rw-r--r--python/tests/transform_tests.py2
13 files changed, 324 insertions, 356 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index 7b20868c..b709f714 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -12,7 +12,7 @@ To run an import you combine two classes; one each of:
"""
-from .common import FatcatImporter, JsonLinePusher, make_kafka_consumer
+from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, KafkaJsonPusher, make_kafka_consumer
from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP
from .grobid_metadata import GrobidMetadataImporter
from .journal_metadata import JournalMetadataImporter
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 604aa78b..e7fe2305 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -49,9 +49,11 @@ class EntityImporter:
self.api = api
self.bezerk_mode = kwargs.get('bezerk_mode', False)
- self._editgroup_description = kwargs.get('editgroup_description')
- self._editgroup_extra = kwargs.get('editgroup_extra')
- self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0, 'sub.container': 0})
+ self.serial_mode = kwargs.get('serial_mode', False)
+ self.edit_batch_size = kwargs.get('edit_batch_size', 100)
+ self.editgroup_description = kwargs.get('editgroup_description')
+ self.editgroup_extra = kwargs.get('editgroup_extra')
+ self.counts = Counter({'skip': 0, 'insert': 0, 'update': 0, 'exists': 0})
self._edit_count = 0
self._editgroup_id = None
self._entity_queue = []
@@ -69,7 +71,9 @@ class EntityImporter:
self.counts['skip'] += 1
return
entity = self.parse_record(raw_record)
- assert entity
+ if not entity:
+ self.counts['skip'] += 1
+ return
if self.bezerk_mode:
self.push_entity(entity)
return
@@ -85,8 +89,8 @@ class EntityImporter:
if self._entity_queue:
self.insert_batch(self._entity_queue)
- self.counts['insert'] += len(_entity_queue)
- self._entity_queue = 0
+ self.counts['insert'] += len(self._entity_queue)
+ self._entity_queue = []
self.counts['total'] = 0
for key in ('skip', 'insert', 'update', 'exists'):
@@ -101,17 +105,28 @@ class EntityImporter:
if not self._editgroup_id:
eg = self.api.create_editgroup(
- description=self._editgroup_description,
- extra=self._editgroup_extra)
+ fatcat_client.Editgroup(
+ description=self.editgroup_description,
+ extra=self.editgroup_extra))
self._editgroup_id = eg.editgroup_id
self._edit_count += edits
return self._editgroup_id
def create_container(self, entity):
- eg = self._get_editgroup()
- self.counts['sub.container'] += 1
- return self.api.create_container(entity, editgroup_id=eg.editgroup_id)
+ eg_id = self._get_editgroup()
+ self.counts['inserted.container'] += 1
+ return self.api.create_container(entity, editgroup_id=eg_id)
+
+ def create_release(self, entity):
+ eg_id = self._get_editgroup()
+ self.counts['inserted.release'] += 1
+ return self.api.create_release(entity, editgroup_id=eg_id)
+
+ def create_file(self, entity):
+ eg_id = self._get_editgroup()
+ self.counts['inserted.file'] += 1
+ return self.api.create_file(entity, editgroup_id=eg_id)
def updated(self):
"""
@@ -135,10 +150,10 @@ class EntityImporter:
def parse(self, raw_record):
"""
- Returns an entity class type. expected to always succeed, or raise an
- exception (`want()` should be used to filter out bad records). May have
- side-effects (eg, create related entities), but shouldn't update/mutate
- the actual entity.
+ Returns an entity class type, or None if we should skip this one.
+
+ May have side-effects (eg, create related entities), but shouldn't
+ update/mutate the actual entity.
"""
raise NotImplementedError
@@ -148,9 +163,10 @@ class EntityImporter:
update it (PUT), decide we should do nothing (based on the existing
record), or create a new one.
- Implementations should update exists/updated counts appropriately.
+ Implementations must update the exists/updated/skip counts
+ appropriately in this method.
- Returns boolean: True if
+ Returns boolean: True if the entity should still be inserted, False otherwise
"""
raise NotImplementedError
@@ -230,7 +246,6 @@ class EntityImporter:
return self._issn_issnl_map.get(issn)
-
class RecordPusher:
"""
Base class for different importer sources. Pretty trivial interface, just
@@ -252,14 +267,14 @@ class RecordPusher:
raise NotImplementedError
-class JsonLinePusher:
+class JsonLinePusher(RecordPusher):
- def __init__(self, importer, in_file, **kwargs):
+ def __init__(self, importer, json_file, **kwargs):
self.importer = importer
- self.in_file = in_file
+ self.json_file = json_file
def run(self):
- for line in self.in_file:
+ for line in self.json_file:
if not line:
continue
record = json.loads(line)
@@ -267,11 +282,59 @@ class JsonLinePusher:
print(self.importer.finish())
-# from: https://docs.python.org/3/library/itertools.html
-def grouper(iterable, n, fillvalue=None):
- "Collect data into fixed-length chunks or blocks"
- args = [iter(iterable)] * n
- return itertools.zip_longest(*args, fillvalue=fillvalue)
+class CsvPusher(RecordPusher):
+
+ def __init__(self, importer, csv_file, **kwargs):
+ self.importer = importer
+ self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ','))
+
+ def run(self):
+ for line in self.reader:
+ if not line:
+ continue
+ self.importer.push_record(line)
+ print(self.importer.finish())
+
+
+class LinePusher(RecordPusher):
+
+ def __init__(self, importer, text_file, **kwargs):
+ self.importer = importer
+ self.text_file = text_file
+
+ def run(self):
+ for line in self.text_file:
+ if not line:
+ continue
+ self.importer.push_record(line)
+ print(self.importer.finish())
+
+
+class KafkaJsonPusher(RecordPusher):
+
+ def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs):
+ self.importer = importer
+ self.consumer = make_kafka_consumer(
+ kafka_hosts,
+ kafka_env,
+ topic_suffix,
+ group,
+ )
+
+ def run(self):
+ count = 0
+ for msg in self.consumer:
+ if not msg:
+ continue
+ record = json.loads(msg.value.decode('utf-8'))
+ self.importer.push_record(record)
+ count += 1
+ if count % 500 == 0:
+ print("Import counts: {}".format(self.importer.counts))
+ # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
+ # commit the current batch if it has been lingering
+ print(self.importer.finish())
+
def make_kafka_consumer(hosts, env, topic_suffix, group):
topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8')
@@ -288,150 +351,3 @@ def make_kafka_consumer(hosts, env, topic_suffix, group):
)
return consumer
-class FatcatImporter:
- """
- Base class for fatcat importers
- """
-
- def __init__(self, api, **kwargs):
-
- eg_extra = kwargs.get('editgroup_extra', dict())
- eg_extra['git_rev'] = eg_extra.get('git_rev',
- subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
- eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.FatcatImporter')
-
- self.api = api
- self._editgroup_description = kwargs.get('editgroup_description')
- self._editgroup_extra = kwargs.get('editgroup_extra')
- issn_map_file = kwargs.get('issn_map_file')
-
- self._issnl_id_map = dict()
- self._orcid_id_map = dict()
- self._doi_id_map = dict()
- if issn_map_file:
- self.read_issn_map_file(issn_map_file)
- self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$")
- self.counts = Counter({'insert': 0, 'update': 0, 'processed_lines': 0})
-
- def _editgroup(self):
- eg = fatcat_client.Editgroup(
- description=self._editgroup_description,
- extra=self._editgroup_extra,
- )
- return self.api.create_editgroup(eg)
-
- def describe_run(self):
- print("Processed {} lines, inserted {}, updated {}.".format(
- self.counts['processed_lines'], self.counts['insert'], self.counts['update']))
-
- def create_row(self, row, editgroup_id=None):
- # sub-classes expected to implement this
- raise NotImplementedError
-
- def create_batch(self, rows, editgroup_id=None):
- # sub-classes expected to implement this
- raise NotImplementedError
-
- def process_source(self, source, group_size=100):
- """Creates and auto-accepts editgroup every group_size rows"""
- eg = self._editgroup()
- i = 0
- for i, row in enumerate(source):
- self.create_row(row, editgroup_id=eg.editgroup_id)
- if i > 0 and (i % group_size) == 0:
- self.api.accept_editgroup(eg.editgroup_id)
- eg = self._editgroup()
- self.counts['processed_lines'] += 1
- if i == 0 or (i % group_size) != 0:
- self.api.accept_editgroup(eg.editgroup_id)
-
- def process_batch(self, source, size=50, decode_kafka=False):
- """Reads and processes in batches (not API-call-per-)"""
- for rows in grouper(source, size):
- if decode_kafka:
- rows = [msg.value.decode('utf-8') for msg in rows]
- self.counts['processed_lines'] += len(rows)
- #eg = self._editgroup()
- #self.create_batch(rows, editgroup_id=eg.editgroup_id)
- self.create_batch(rows)
-
- def process_csv_source(self, source, group_size=100, delimiter=','):
- reader = csv.DictReader(source, delimiter=delimiter)
- self.process_source(reader, group_size)
-
- def process_csv_batch(self, source, size=50, delimiter=','):
- reader = csv.DictReader(source, delimiter=delimiter)
- self.process_batch(reader, size)
-
- def is_issnl(self, issnl):
- return len(issnl) == 9 and issnl[4] == '-'
-
- def lookup_issnl(self, issnl):
- """Caches calls to the ISSN-L lookup API endpoint in a local dict"""
- if issnl in self._issnl_id_map:
- return self._issnl_id_map[issnl]
- container_id = None
- try:
- rv = self.api.lookup_container(issnl=issnl)
- container_id = rv.ident
- except ApiException as ae:
- # If anything other than a 404 (not found), something is wrong
- assert ae.status == 404
- self._issnl_id_map[issnl] = container_id # might be None
- return container_id
-
- def is_orcid(self, orcid):
- return self._orcid_regex.match(orcid) is not None
-
- def lookup_orcid(self, orcid):
- """Caches calls to the Orcid lookup API endpoint in a local dict"""
- if not self.is_orcid(orcid):
- return None
- if orcid in self._orcid_id_map:
- return self._orcid_id_map[orcid]
- creator_id = None
- try:
- rv = self.api.lookup_creator(orcid=orcid)
- creator_id = rv.ident
- except ApiException as ae:
- # If anything other than a 404 (not found), something is wrong
- assert ae.status == 404
- self._orcid_id_map[orcid] = creator_id # might be None
- return creator_id
-
- def is_doi(self, doi):
- return doi.startswith("10.") and doi.count("/") >= 1
-
- def lookup_doi(self, doi):
- """Caches calls to the doi lookup API endpoint in a local dict"""
- assert self.is_doi(doi)
- doi = doi.lower()
- if doi in self._doi_id_map:
- return self._doi_id_map[doi]
- release_id = None
- try:
- rv = self.api.lookup_release(doi=doi)
- release_id = rv.ident
- except ApiException as ae:
- # If anything other than a 404 (not found), something is wrong
- assert ae.status == 404
- self._doi_id_map[doi] = release_id # might be None
- return release_id
-
- def read_issn_map_file(self, issn_map_file):
- print("Loading ISSN map file...")
- self._issn_issnl_map = dict()
- for line in issn_map_file:
- if line.startswith("ISSN") or len(line) == 0:
- continue
- (issn, issnl) = line.split()[0:2]
- self._issn_issnl_map[issn] = issnl
- # double mapping makes lookups easy
- self._issn_issnl_map[issnl] = issnl
- print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map)))
-
- def issn2issnl(self, issn):
- if issn is None:
- return None
- return self._issn_issnl_map.get(issn)
-
diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py
index 6866cb6f..22abd08d 100644
--- a/python/fatcat_tools/importers/crossref.py
+++ b/python/fatcat_tools/importers/crossref.py
@@ -359,9 +359,9 @@ class CrossrefImporter(EntityImporter):
def try_update(self, re):
# lookup existing DOI (don't need to try other ext idents for crossref)
- existing_release = None
+ existing = None
try:
- existing_release = self.api.lookup_release(doi=re.doi)
+ existing = self.api.lookup_release(doi=re.doi)
except fatcat_client.rest.ApiException as err:
if err.status != 404:
raise err
@@ -370,12 +370,15 @@ class CrossrefImporter(EntityImporter):
# eventually we'll want to support "updates", but for now just skip if
# entity already exists
- if existing_release:
+ if existing:
self.counts['exists'] += 1
return False
return True
def insert_batch(self, batch):
- self.api.create_release_batch(batch, autoaccept=True)
+ self.api.create_release_batch(batch,
+ autoaccept=True,
+ description=self.editgroup_description,
+ extra=json.dumps(self.editgroup_extra))
diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py
index 5e61a154..c1835b9f 100644
--- a/python/fatcat_tools/importers/grobid_metadata.py
+++ b/python/fatcat_tools/importers/grobid_metadata.py
@@ -5,12 +5,22 @@ import json
import base64
import datetime
import fatcat_client
-from .common import FatcatImporter
+from .common import EntityImporter
MAX_ABSTRACT_BYTES=4096
-class GrobidMetadataImporter(FatcatImporter):
+class GrobidMetadataImporter(EntityImporter):
+ """
+ This is a complex case: we need to parse and create both file and release entities.
+
+ The "primary" entity here is really File, not Release. If a matching File
+ exists, we bail in want(); if not we insert the Release during parsing, and
+ insert both.
+
+ TODO: should instead check if the File has any releases; if not, insert and update.
+ TODO: relaxing 'None' constraint on parse_record() might make this refactor-able.
+ """
def __init__(self, api, **kwargs):
@@ -23,10 +33,52 @@ class GrobidMetadataImporter(FatcatImporter):
editgroup_extra=eg_extra)
self.default_link_rel = kwargs.get("default_link_rel", "web")
- def parse_grobid_json(self, obj):
+ def want(self, raw_record):
+
+ fields = raw_record.split('\t')
+ sha1_key = fields[0]
+ sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower()
+ #cdx = json.loads(fields[1])
+ #mimetype = fields[2]
+ #file_size = int(fields[3])
+ grobid_meta = json.loads(fields[4])
+
+ if not grobid_meta.get('title'):
+ return False
+
+ # lookup existing file SHA1
+ try:
+ existing_file = self.api.lookup_file(sha1=sha1)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+ existing_file = None
+
+ # if file is already in here, presumably not actually long-tail
+ # TODO: this is where we should check if the file actually has
+ # release_ids and/or URLs associated with it
+ if existing_file and not self.bezerk_mode:
+ return False
+ return True
+
+ def parse_record(self, row):
+
+ fields = row.split('\t')
+ sha1_key = fields[0]
+ cdx = json.loads(fields[1])
+ mimetype = fields[2]
+ file_size = int(fields[3])
+ grobid_meta = json.loads(fields[4])
+ fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size)
+ re = self.parse_grobid_json(grobid_meta)
+ assert (fe and re)
- if not obj.get('title'):
- return None
+ release_edit = self.create_release(re)
+ fe.release_ids.append(release_edit.ident)
+ return fe
+
+ def parse_grobid_json(self, obj):
+ assert obj.get('title')
extra = dict()
@@ -122,17 +174,6 @@ class GrobidMetadataImporter(FatcatImporter):
sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower()
- # lookup existing SHA1, or create new entity
- try:
- existing_file = self.api.lookup_file(sha1=sha1)
- except fatcat_client.rest.ApiException as err:
- if err.status != 404:
- raise err
- existing_file = None
-
- if existing_file:
- # if file is already in here, presumably not actually long-tail
- return None
fe = fatcat_client.FileEntity(
sha1=sha1,
size=int(file_size),
@@ -143,6 +184,7 @@ class GrobidMetadataImporter(FatcatImporter):
# parse URLs and CDX
original = cdx['url']
+ assert len(cdx['dt']) >= 8
wayback = "https://web.archive.org/web/{}/{}".format(
cdx['dt'],
original)
@@ -154,23 +196,13 @@ class GrobidMetadataImporter(FatcatImporter):
return fe
- def create_row(self, row, editgroup_id=None):
- if not row:
- return
- fields = row.split('\t')
- sha1_key = fields[0]
- cdx = json.loads(fields[1])
- mimetype = fields[2]
- file_size = int(fields[3])
- grobid_meta = json.loads(fields[4])
- fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size)
- re = self.parse_grobid_json(grobid_meta)
- if fe and re:
- release_entity = self.api.create_release(re, editgroup_id=editgroup_id)
- # release ident can't already be in release list because we just
- # created it
- fe.release_ids.append(release_entity.ident)
- file_entity = self.api.create_file(fe, editgroup_id=editgroup_id)
- self.counts['insert'] += 1
-
- # NB: batch mode not implemented
+ def try_update(entity):
+ # we did this in want()
+ return True
+
+ def insert_batch(self, batch):
+ self.api.create_file_batch(batch,
+ autoaccept=True,
+ description=self.editgroup_description,
+ extra=json.dumps(self.editgroup_extra))
+
diff --git a/python/fatcat_tools/importers/journal_metadata.py b/python/fatcat_tools/importers/journal_metadata.py
index cd058889..ff38cd77 100644
--- a/python/fatcat_tools/importers/journal_metadata.py
+++ b/python/fatcat_tools/importers/journal_metadata.py
@@ -3,7 +3,7 @@ import sys
import json
import itertools
import fatcat_client
-from .common import FatcatImporter
+from .common import EntityImporter
def or_none(s):
@@ -25,7 +25,7 @@ def truthy(s):
else:
return None
-class JournalMetadataImporter(FatcatImporter):
+class JournalMetadataImporter(EntityImporter):
"""
Imports journal metadata ("containers") by ISSN, currently from a custom
(data munged) .csv file format
@@ -45,7 +45,12 @@ class JournalMetadataImporter(FatcatImporter):
editgroup_description=eg_desc,
editgroup_extra=eg_extra)
- def parse_journal_metadata_row(self, row):
+ def want(self, raw_record):
+ if raw_record.get('ISSN-L'):
+ return True
+ return False
+
+ def parse_record(self, row):
"""
row is a python dict (parsed from CSV).
returns a ContainerEntity (or None if invalid or couldn't parse)
@@ -72,16 +77,28 @@ class JournalMetadataImporter(FatcatImporter):
extra=extra)
return ce
- def create_row(self, row, editgroup_id=None):
- ce = self.parse_journal_metadata_row(row)
- if ce is not None:
- self.api.create_container(ce, editgroup_id=editgroup_id)
- self.counts['insert'] += 1
-
- def create_batch(self, batch):
- """Reads and processes in batches (not API-call-per-line)"""
- objects = [self.parse_journal_metadata_row(l)
- for l in batch if (l is not None)]
- objects = [o for o in objects if (o is not None)]
- self.api.create_container_batch(objects, autoaccept=True)
- self.counts['insert'] += len(objects)
+ def try_update(self, ce):
+
+ existing = None
+ try:
+ existing = self.api.lookup_container(issnl=ce.issnl)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+ # doesn't exist, need to update
+ return True
+
+ # eventually we'll want to support "updates", but for now just skip if
+ # entity already exists
+ if existing:
+ self.counts['exists'] += 1
+ return False
+
+ return True
+
+ def insert_batch(self, batch):
+ self.api.create_container_batch(batch,
+ autoaccept=True,
+ description=self.editgroup_description,
+ extra=json.dumps(self.editgroup_extra))
+
diff --git a/python/fatcat_tools/importers/matched.py b/python/fatcat_tools/importers/matched.py
index 02c0a9c5..2be15860 100644
--- a/python/fatcat_tools/importers/matched.py
+++ b/python/fatcat_tools/importers/matched.py
@@ -4,16 +4,10 @@ import json
import sqlite3
import itertools
import fatcat_client
-from .common import FatcatImporter
+from .common import EntityImporter
-#row = row.split('\t')
-#assert len(row) == 2
-#sha1 = row[0].replace('sha1:')
-#sha1 = base64.b16encode(base64.b32decode(sha1)).lower()
-#print(sha1)
-#dois = [d.lower() for d in json.loads(row[1])]
-class MatchedImporter(FatcatImporter):
+class MatchedImporter(EntityImporter):
"""
Importer for "file to crossref DOI" matches.
@@ -59,26 +53,13 @@ class MatchedImporter(FatcatImporter):
rel = "repository"
elif "//web.archive.org/" in raw or "//archive.is/" in raw:
rel = "webarchive"
- return fatcat_client.FileEntityUrls(url=raw, rel=rel)
+ return (rel, raw)
- def parse_matched_dict(self, obj):
- sha1 = obj['sha1']
- dois = [d.lower() for d in obj.get('dois', [])]
+ def want(self, raw_record):
+ return True
- # lookup sha1, or create new entity
- fe = None
- if not self.skip_file_updates:
- try:
- fe = self.api.lookup_file(sha1=sha1)
- except fatcat_client.rest.ApiException as err:
- if err.status != 404:
- raise err
- if fe is None:
- fe = fatcat_client.FileEntity(
- sha1=sha1,
- release_ids=[],
- urls=[],
- )
+ def parse_record(self, obj):
+ dois = [d.lower() for d in obj.get('dois', [])]
# lookup dois
re_list = set()
@@ -93,67 +74,78 @@ class MatchedImporter(FatcatImporter):
print("DOI not found: {}".format(doi))
else:
re_list.add(re.ident)
- if len(re_list) == 0:
+ release_ids = list(re_list)
+ if len(release_ids) == 0:
return None
- if fe.release_ids == set(re_list):
- return None
- re_list.update(fe.release_ids)
- fe.release_ids = list(re_list)
# parse URLs and CDX
- existing_urls = [feu.url for feu in fe.urls]
+ urls = set()
for url in obj.get('url', []):
- if url not in existing_urls:
- url = self.make_url(url)
- if url != None:
- fe.urls.append(url)
+ url = self.make_url(url)
+ if url != None:
+ urls.add(url)
for cdx in obj.get('cdx', []):
original = cdx['url']
wayback = "https://web.archive.org/web/{}/{}".format(
cdx['dt'],
original)
- if wayback not in existing_urls:
- fe.urls.append(
- fatcat_client.FileEntityUrls(url=wayback, rel="webarchive"))
- if original not in existing_urls:
- url = self.make_url(original)
- if url != None:
- fe.urls.append(url)
-
- if obj.get('size') != None:
- fe.size = int(obj['size'])
- fe.sha256 = obj.get('sha256', fe.sha256)
- fe.md5 = obj.get('md5', fe.sha256)
- if obj.get('mimetype') is None:
- if fe.mimetype is None:
- fe.mimetype = self.default_mime
- else:
- fe.mimetype = obj.get('mimetype')
+ urls.add(("webarchive", wayback))
+ url = self.make_url(original)
+ if url != None:
+ urls.add(url)
+ urls = [fatcat_client.FileEntityUrls(rel, url) for (rel, url) in urls]
+ if len(urls) == 0:
+ return None
+
+ size = obj.get('size')
+ if size:
+ size = int(size)
+
+ fe = fatcat_client.FileEntity(
+ md5=obj.get('md5'),
+ sha1=obj['sha1'],
+ sha256=obj.get('sha256'),
+ size=size,
+ mimetype=obj.get('mimetype'),
+ release_ids=release_ids,
+ urls=urls,
+ )
return fe
- def create_row(self, row, editgroup_id=None):
- obj = json.loads(row)
- fe = self.parse_matched_dict(obj)
- if fe is not None:
- if fe.ident is None:
- self.api.create_file(fe, editgroup_id=editgroup_id)
- self.counts['insert'] += 1
- else:
- self.api.update_file(fe.ident, fe, editgroup_id=editgroup_id)
- self.counts['update'] += 1
-
- def create_batch(self, batch):
- """Reads and processes in batches (not API-call-per-line)"""
- objects = [self.parse_matched_dict(json.loads(l))
- for l in batch if l != None]
- new_objects = [o for o in objects if o != None and o.ident == None]
- update_objects = [o for o in objects if o != None and o.ident != None]
- if len(update_objects):
- update_eg = self._editgroup().editgroup_id
- for obj in update_objects:
- self.api.update_file(obj.ident, obj, editgroup_id=update_eg)
- self.api.accept_editgroup(update_eg)
- if len(new_objects) > 0:
- self.api.create_file_batch(new_objects, autoaccept=True)
- self.counts['update'] += len(update_objects)
- self.counts['insert'] += len(new_objects)
+ def try_update(self, fe):
+ # lookup sha1, or create new entity
+ existing = None
+ if not self.skip_file_updates:
+ try:
+ existing = self.api.lookup_file(sha1=fe.sha1)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+
+ if not existing:
+ # new match; go ahead and insert
+ return True
+
+ fe.release_ids = list(set(fe.release_ids + existing.release_ids))
+ if set(fe.release_ids) == set(existing.release_ids) and len(existing.urls) > 0:
+ # no new release matches *and* there are already existing URLs
+ self.counts['exists'] += 1
+ return False
+
+ # merge the existing into this one and update
+ existing.urls = list(set(fe.urls + existing.urls))
+ existing.release_ids = list(set(fe.release_ids + existing.release_ids))
+ existing.mimetype = existing.mimetype or fe.mimetype
+ existing.size = existing.size or fe.size
+ existing.md5 = existing.md5 or fe.md5
+ existing.sha256 = existing.sha256 or fe.sha256
+ self.api.update_file(existing.ident, existing)
+ self.counts['update'] += 1
+ return False
+
+ def insert_batch(self, batch):
+ self.api.create_file_batch(batch,
+ autoaccept=True,
+ description=self.editgroup_description,
+ extra=json.dumps(self.editgroup_extra))
+
diff --git a/python/fatcat_tools/importers/orcid.py b/python/fatcat_tools/importers/orcid.py
index d03b2855..2c39db18 100644
--- a/python/fatcat_tools/importers/orcid.py
+++ b/python/fatcat_tools/importers/orcid.py
@@ -3,7 +3,7 @@ import sys
import json
import itertools
import fatcat_client
-from .common import FatcatImporter
+from .common import EntityImporter
def value_or_none(e):
if type(e) == dict:
@@ -20,7 +20,7 @@ def value_or_none(e):
return None
return e
-class OrcidImporter(FatcatImporter):
+class OrcidImporter(EntityImporter):
def __init__(self, api, **kwargs):
@@ -32,14 +32,16 @@ class OrcidImporter(FatcatImporter):
editgroup_description=eg_desc,
editgroup_extra=eg_extra)
- def parse_orcid_dict(self, obj):
+ def want(self, raw_record):
+ return True
+
+ def parse_record(self, obj):
"""
obj is a python dict (parsed from json).
returns a CreatorEntity
"""
name = obj['person']['name']
- if name is None:
- return None
+ assert name
extra = None
given = value_or_none(name.get('given-names'))
sur = value_or_none(name.get('family-name'))
@@ -67,17 +69,24 @@ class OrcidImporter(FatcatImporter):
extra=extra)
return ce
- def create_row(self, row, editgroup_id=None):
- obj = json.loads(row)
- ce = self.parse_orcid_dict(obj)
- if ce is not None:
- self.api.create_creator(ce, editgroup_id=editgroup_id)
- self.counts['insert'] += 1
+ def try_update(self, raw_record):
+ existing = None
+ try:
+ existing = self.api.lookup_creator(orcid=raw_record.orcid)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+
+ # eventually we'll want to support "updates", but for now just skip if
+ # entity already exists
+ if existing:
+ self.counts['exists'] += 1
+ return False
+
+ return True
- def create_batch(self, batch):
- """Reads and processes in batches (not API-call-per-line)"""
- objects = [self.parse_orcid_dict(json.loads(l))
- for l in batch if l != None]
- objects = [o for o in objects if o != None]
- self.api.create_creator_batch(objects, autoaccept=True)
- self.counts['insert'] += len(objects)
+ def insert_batch(self, batch):
+ self.api.create_creator_batch(batch,
+ autoaccept=True,
+ description=self.editgroup_description,
+ extra=json.dumps(self.editgroup_extra))
diff --git a/python/tests/import_crossref.py b/python/tests/import_crossref.py
index 5e0a150f..db49bb7f 100644
--- a/python/tests/import_crossref.py
+++ b/python/tests/import_crossref.py
@@ -17,13 +17,12 @@ def crossref_importer_existing(api):
def test_crossref_importer_batch(crossref_importer):
with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f:
- pusher = JsonLinePusher(crossref_importer, f)
- pusher.run()
+ JsonLinePusher(crossref_importer, f).run()
def test_crossref_importer(crossref_importer):
with open('tests/files/crossref-works.2018-01-21.badsample.json', 'r') as f:
- pusher = JsonLinePusher(crossref_importer, f)
- pusher.run()
+ crossref_importer.bezerk_mode = True
+ JsonLinePusher(crossref_importer, f).run()
# fetch most recent editgroup
changes = crossref_importer.api.get_changelog(limit=1)
eg = changes[0].editgroup
diff --git a/python/tests/import_grobid_metadata.py b/python/tests/import_grobid_metadata.py
index 97ebcaef..f00479d8 100644
--- a/python/tests/import_grobid_metadata.py
+++ b/python/tests/import_grobid_metadata.py
@@ -3,7 +3,7 @@ import os
import json
import base64
import pytest
-from fatcat_tools.importers import GrobidMetadataImporter
+from fatcat_tools.importers import GrobidMetadataImporter, LinePusher
from fixtures import api
"""
@@ -15,10 +15,6 @@ side-effects. Should probably be disabled or re-written.
def grobid_metadata_importer(api):
yield GrobidMetadataImporter(api)
-# TODO: use API to check that entities actually created...
-#def test_grobid_metadata_importer_batch(grobid_metadata_importer):
-# with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f:
-# grobid_metadata_importer.process_batch(f)
def test_grobid_metadata_parse(grobid_metadata_importer):
with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f:
@@ -52,9 +48,11 @@ def test_file_metadata_parse(grobid_metadata_importer):
assert fe.urls[0].rel == "webarchive"
assert len(fe.release_ids) == 0
+# TODO: use API to check that entities actually created...
def test_grobid_metadata_importer(grobid_metadata_importer):
with open('tests/files/example_grobid_metadata_lines.tsv', 'r') as f:
- grobid_metadata_importer.process_source(f)
+ grobid_metadata_importer.bezerk_mode = True
+ LinePusher(grobid_metadata_importer, f).run()
# fetch most recent editgroup
changes = grobid_metadata_importer.api.get_changelog(limit=1)
diff --git a/python/tests/import_journal_metadata.py b/python/tests/import_journal_metadata.py
index 81334bc6..0263f706 100644
--- a/python/tests/import_journal_metadata.py
+++ b/python/tests/import_journal_metadata.py
@@ -1,6 +1,6 @@
import pytest
-from fatcat_tools.importers import JournalMetadataImporter
+from fatcat_tools.importers import JournalMetadataImporter, CsvPusher
from fixtures import api
@@ -11,11 +11,13 @@ def journal_metadata_importer(api):
# TODO: use API to check that entities actually created...
def test_journal_metadata_importer_batch(journal_metadata_importer):
with open('tests/files/journal_extra_metadata.snip.csv', 'r') as f:
- journal_metadata_importer.process_csv_batch(f)
+ CsvPusher(journal_metadata_importer, f).run()
def test_journal_metadata_importer(journal_metadata_importer):
with open('tests/files/journal_extra_metadata.snip.csv', 'r') as f:
- journal_metadata_importer.process_csv_source(f)
+ journal_metadata_importer.bezerk_mode = True
+ journal_metadata_importer.serial_mode = True
+ CsvPusher(journal_metadata_importer, f).run()
# fetch most recent editgroup
changes = journal_metadata_importer.api.get_changelog(limit=1)
diff --git a/python/tests/import_matched.py b/python/tests/import_matched.py
index 080674ac..a58c402f 100644
--- a/python/tests/import_matched.py
+++ b/python/tests/import_matched.py
@@ -1,7 +1,7 @@
import json
import pytest
-from fatcat_tools.importers import MatchedImporter
+from fatcat_tools.importers import MatchedImporter, JsonLinePusher
from fixtures import api
@@ -12,11 +12,12 @@ def matched_importer(api):
# TODO: use API to check that entities actually created...
def test_matched_importer_batch(matched_importer):
with open('tests/files/example_matched.json', 'r') as f:
- matched_importer.process_batch(f)
+ JsonLinePusher(matched_importer, f).run()
def test_matched_importer(matched_importer):
with open('tests/files/example_matched.json', 'r') as f:
- matched_importer.process_source(f)
+ matched_importer.bezerk_mode = True
+ JsonLinePusher(matched_importer, f).run()
# fetch most recent editgroup
changes = matched_importer.api.get_changelog(limit=1)
@@ -29,7 +30,7 @@ def test_matched_importer(matched_importer):
def test_matched_dict_parse(matched_importer):
with open('tests/files/example_matched.json', 'r') as f:
raw = json.loads(f.readline())
- f = matched_importer.parse_matched_dict(raw)
+ f = matched_importer.parse_record(raw)
assert f.sha1 == "00242a192acc258bdfdb151943419437f440c313"
assert f.md5 == "f4de91152c7ab9fdc2a128f962faebff"
assert f.mimetype == "application/pdf"
diff --git a/python/tests/import_orcid.py b/python/tests/import_orcid.py
index 717a1328..9e898521 100644
--- a/python/tests/import_orcid.py
+++ b/python/tests/import_orcid.py
@@ -1,7 +1,7 @@
import json
import pytest
-from fatcat_tools.importers import OrcidImporter
+from fatcat_tools.importers import OrcidImporter, JsonLinePusher
from fixtures import api
@@ -9,18 +9,16 @@ from fixtures import api
def orcid_importer(api):
yield OrcidImporter(api)
-# TODO: use API to check that entities actually created...
-def test_orcid_importer_batch(orcid_importer):
- with open('tests/files/0000-0001-8254-7103.json', 'r') as f:
- orcid_importer.process_batch(f)
-
def test_orcid_importer_badid(orcid_importer):
with open('tests/files/0000-0001-8254-710X.json', 'r') as f:
- orcid_importer.process_batch(f)
+ pusher = JsonLinePusher(orcid_importer, f)
+ pusher.run()
+# TODO: use API to check that entities actually created...
def test_orcid_importer(orcid_importer):
with open('tests/files/0000-0001-8254-7103.json', 'r') as f:
- orcid_importer.process_source(f)
+ orcid_importer.bezerk_mode = True
+ JsonLinePusher(orcid_importer, f).run()
# fetch most recent editgroup
changes = orcid_importer.api.get_changelog(limit=1)
@@ -32,14 +30,15 @@ def test_orcid_importer(orcid_importer):
def test_orcid_importer_x(orcid_importer):
with open('tests/files/0000-0003-3953-765X.json', 'r') as f:
- orcid_importer.process_source(f)
+ pusher = JsonLinePusher(orcid_importer, f)
+ pusher.run()
c = orcid_importer.api.lookup_creator(orcid="0000-0003-3953-765X")
assert c is not None
def test_orcid_dict_parse(orcid_importer):
with open('tests/files/0000-0001-8254-7103.json', 'r') as f:
raw = json.loads(f.readline())
- c = orcid_importer.parse_orcid_dict(raw)
+ c = orcid_importer.parse_record(raw)
assert c.given_name == "Man-Hui"
assert c.surname == "Li"
assert c.display_name == "Man-Hui Li"
diff --git a/python/tests/transform_tests.py b/python/tests/transform_tests.py
index e9d23250..6d6c6c82 100644
--- a/python/tests/transform_tests.py
+++ b/python/tests/transform_tests.py
@@ -11,7 +11,7 @@ def test_elasticsearch_convert(crossref_importer):
with open('tests/files/crossref-works.single.json', 'r') as f:
# not a single line
raw = json.loads(f.read())
- (r, c) = crossref_importer.parse_crossref_dict(raw)
+ r = crossref_importer.parse_record(raw)
r.state = 'active'
release_to_elasticsearch(r)