summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2019-01-22 19:19:31 -0800
committerBryan Newbold <bnewbold@robocracy.org>2019-01-22 19:19:31 -0800
commit83d5c49f5093b1820b625e1b3a1e21fc7242f79e (patch)
tree6f45ed910c0d4f589739da3584c72ca722d5b9a9 /python/fatcat_tools
parenta2086616c23320153eacec7e4f0d3c6e1c6d7790 (diff)
downloadfatcat-83d5c49f5093b1820b625e1b3a1e21fc7242f79e.tar.gz
fatcat-83d5c49f5093b1820b625e1b3a1e21fc7242f79e.zip
refactored crossref importer to new style
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r--python/fatcat_tools/importers/__init__.py6
-rw-r--r--python/fatcat_tools/importers/common.py124
-rw-r--r--python/fatcat_tools/importers/crossref.py125
3 files changed, 166 insertions, 89 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py
index f2caca5c..7b20868c 100644
--- a/python/fatcat_tools/importers/__init__.py
+++ b/python/fatcat_tools/importers/__init__.py
@@ -12,11 +12,11 @@ To run an import you combine two classes; one each of:
"""
-from .common import FatcatImporter, make_kafka_consumer
+from .common import FatcatImporter, JsonLinePusher, make_kafka_consumer
from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP
from .grobid_metadata import GrobidMetadataImporter
from .journal_metadata import JournalMetadataImporter
from .matched import MatchedImporter
from .orcid import OrcidImporter
-from .kafka_source import KafkaSource
-from .file_source import FileSource
+#from .kafka_source import KafkaSource
+#from .file_source import FileSource
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 25ee4727..604aa78b 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -31,10 +31,13 @@ class EntityImporter:
This class exposes helpers for implementations:
self.api
- self.create_related_*(entity) for all entity types
+ self.create_<entity>(entity) -> EntityEdit
+ for related entity types
self.push_entity(entity)
- self.counts['exits'] += 1 (if didn't update or insert because of existing)
- self.counts['update'] += 1 (if updated an entity)
+ self.counts['exits'] += 1
+ if didn't update or insert because of existing)
+ self.counts['update'] += 1
+ if updated an entity
"""
def __init__(self, api, **kwargs):
@@ -53,14 +56,20 @@ class EntityImporter:
self._editgroup_id = None
self._entity_queue = []
+ self._issnl_id_map = dict()
+ self._orcid_id_map = dict()
+ self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$")
+ self._doi_id_map = dict()
+
def push_record(self, raw_record):
"""
Returns nothing.
"""
- if (not raw_record) or (not self.want(raw_record):
+ if (not raw_record) or (not self.want(raw_record)):
self.counts['skip'] += 1
return
entity = self.parse_record(raw_record)
+ assert entity
if self.bezerk_mode:
self.push_entity(entity)
return
@@ -68,7 +77,7 @@ class EntityImporter:
self.push_entity(entity)
return
- def finish(self, raw_record):
+ def finish(self):
if self._edit_count > 0:
self.api.accept_editgroup(self._editgroup_id)
self._editgroup_id = None
@@ -79,8 +88,9 @@ class EntityImporter:
self.counts['insert'] += len(_entity_queue)
self._entity_queue = 0
- self.counts['total'] = counts['skip'] + counts['insert'] + \
- counts['update'] + counts['exists']
+ self.counts['total'] = 0
+ for key in ('skip', 'insert', 'update', 'exists'):
+ self.counts['total'] += self.counts[key]
return self.counts
def _get_editgroup(self, edits=1):
@@ -100,8 +110,8 @@ class EntityImporter:
def create_container(self, entity):
eg = self._get_editgroup()
- self.api.create_container(entity, editgroup_id=eg.editgroup_id)
self.counts['sub.container'] += 1
+ return self.api.create_container(entity, editgroup_id=eg.editgroup_id)
def updated(self):
"""
@@ -147,6 +157,79 @@ class EntityImporter:
def insert_batch(self, raw_record):
raise NotImplementedError
+ def is_orcid(self, orcid):
+ return self._orcid_regex.match(orcid) is not None
+
+ def lookup_orcid(self, orcid):
+ """Caches calls to the Orcid lookup API endpoint in a local dict"""
+ if not self.is_orcid(orcid):
+ return None
+ if orcid in self._orcid_id_map:
+ return self._orcid_id_map[orcid]
+ creator_id = None
+ try:
+ rv = self.api.lookup_creator(orcid=orcid)
+ creator_id = rv.ident
+ except ApiException as ae:
+ # If anything other than a 404 (not found), something is wrong
+ assert ae.status == 404
+ self._orcid_id_map[orcid] = creator_id # might be None
+ return creator_id
+
+ def is_doi(self, doi):
+ return doi.startswith("10.") and doi.count("/") >= 1
+
+ def lookup_doi(self, doi):
+ """Caches calls to the doi lookup API endpoint in a local dict"""
+ assert self.is_doi(doi)
+ doi = doi.lower()
+ if doi in self._doi_id_map:
+ return self._doi_id_map[doi]
+ release_id = None
+ try:
+ rv = self.api.lookup_release(doi=doi)
+ release_id = rv.ident
+ except ApiException as ae:
+ # If anything other than a 404 (not found), something is wrong
+ assert ae.status == 404
+ self._doi_id_map[doi] = release_id # might be None
+ return release_id
+
+ def is_issnl(self, issnl):
+ return len(issnl) == 9 and issnl[4] == '-'
+
+ def lookup_issnl(self, issnl):
+ """Caches calls to the ISSN-L lookup API endpoint in a local dict"""
+ if issnl in self._issnl_id_map:
+ return self._issnl_id_map[issnl]
+ container_id = None
+ try:
+ rv = self.api.lookup_container(issnl=issnl)
+ container_id = rv.ident
+ except ApiException as ae:
+ # If anything other than a 404 (not found), something is wrong
+ assert ae.status == 404
+ self._issnl_id_map[issnl] = container_id # might be None
+ return container_id
+
+ def read_issn_map_file(self, issn_map_file):
+ print("Loading ISSN map file...")
+ self._issn_issnl_map = dict()
+ for line in issn_map_file:
+ if line.startswith("ISSN") or len(line) == 0:
+ continue
+ (issn, issnl) = line.split()[0:2]
+ self._issn_issnl_map[issn] = issnl
+ # double mapping makes lookups easy
+ self._issn_issnl_map[issnl] = issnl
+ print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map)))
+
+ def issn2issnl(self, issn):
+ if issn is None:
+ return None
+ return self._issn_issnl_map.get(issn)
+
+
class RecordPusher:
"""
@@ -155,15 +238,7 @@ class RecordPusher:
"""
def __init__(self, importer, **kwargs):
-
- eg_extra = kwargs.get('editgroup_extra', dict())
- eg_extra['git_rev'] = eg_extra.get('git_rev',
- subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8')
- eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter')
-
- self.api = api
- self.bezerk_mode = kwargs.get('bezerk_mode', False)
- self._editgroup_description = kwargs.get('editgroup_description')
+ self.importer = importer
def run(self):
"""
@@ -177,6 +252,21 @@ class RecordPusher:
raise NotImplementedError
+class JsonLinePusher:
+
+ def __init__(self, importer, in_file, **kwargs):
+ self.importer = importer
+ self.in_file = in_file
+
+ def run(self):
+ for line in self.in_file:
+ if not line:
+ continue
+ record = json.loads(line)
+ self.importer.push_record(record)
+ print(self.importer.finish())
+
+
# from: https://docs.python.org/3/library/itertools.html
def grouper(iterable, n, fillvalue=None):
"Collect data into fixed-length chunks or blocks"
diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py
index 8953dd82..6866cb6f 100644
--- a/python/fatcat_tools/importers/crossref.py
+++ b/python/fatcat_tools/importers/crossref.py
@@ -6,7 +6,7 @@ import datetime
import itertools
import subprocess
import fatcat_client
-from .common import FatcatImporter
+from .common import EntityImporter
# The docs/guide should be the cannonical home for these mappings; update there
@@ -57,7 +57,7 @@ LICENSE_SLUG_MAP = {
# http://www.springer.com/tdm doesn't seem like a license
}
-class CrossrefImporter(FatcatImporter):
+class CrossrefImporter(EntityImporter):
"""
Importer for Crossref metadata.
@@ -76,9 +76,9 @@ class CrossrefImporter(FatcatImporter):
issn_map_file=issn_map_file,
editgroup_description=eg_desc,
editgroup_extra=eg_extra)
+
+ self.create_containers = kwargs.get('create_containers')
extid_map_file = kwargs.get('extid_map_file')
- create_containers = kwargs.get('create_containers')
- check_existing = kwargs.get('check_existing')
self.extid_map_db = None
if extid_map_file:
db_uri = "file:{}?mode=ro".format(extid_map_file)
@@ -86,8 +86,8 @@ class CrossrefImporter(FatcatImporter):
self.extid_map_db = sqlite3.connect(db_uri, uri=True)
else:
print("Not using external ID map")
- self.create_containers = create_containers
- self.check_existing = check_existing
+
+ self.read_issn_map_file(issn_map_file)
def lookup_ext_ids(self, doi):
if self.extid_map_db is None:
@@ -110,38 +110,38 @@ class CrossrefImporter(FatcatImporter):
def map_release_type(self, crossref_type):
return CROSSREF_TYPE_MAP.get(crossref_type)
- def parse_crossref_dict(self, obj):
- """
- obj is a python dict (parsed from json).
- returns a ReleaseEntity
- """
+ def map_container_type(self, crossref_type):
+ return CONTAINER_TYPE_MAP.get(release_type)
- # Do require the 'title' keys to exsit, as release entities do
- if (not 'title' in obj) or (not obj['title']):
- return None
+ def want(self, obj):
# Ways to be out of scope (provisionally)
# journal-issue and journal-volume map to None, but allowed for now
if obj.get('type') in (None, 'journal', 'proceedings',
'standard-series', 'report-series', 'book-series', 'book-set',
'book-track', 'proceedings-series'):
- return None
+ return False
- release_type = self.map_release_type(obj['type'])
+ # Do require the 'title' keys to exsit, as release entities do
+ if (not 'title' in obj) or (not obj['title']):
+ return False
- # lookup existing DOI
- existing_release = None
- if self.check_existing:
- try:
- existing_release = self.api.lookup_release(doi=obj['DOI'].lower())
- except fatcat_client.rest.ApiException as err:
- if err.status != 404:
- raise err
+ # Can't handle such large lists yet
+ authors = len(obj.get('author', []))
+ abstracts = len(obj.get('abstract', []))
+ refs = len(obj.get('reference', []))
+ if max(authors, abstracts, refs) > 750:
+ return False
- # eventually we'll want to support "updates", but for now just skip if
- # entity already exists
- if existing_release:
- return None
+ return True
+
+ def parse_record(self, obj):
+ """
+ obj is a python dict (parsed from json).
+ returns a ReleaseEntity
+ """
+
+ release_type = self.map_release_type(obj['type'])
# contribs
def do_contribs(obj_list, ctype):
@@ -195,14 +195,15 @@ class CrossrefImporter(FatcatImporter):
container_id = self.lookup_issnl(issnl)
publisher = obj.get('publisher')
- ce = None
if (container_id is None and self.create_containers and (issnl is not None)
and obj.get('container-title') and len(obj['container-title']) > 0):
ce = fatcat_client.ContainerEntity(
issnl=issnl,
publisher=publisher,
- container_type=CONTAINER_TYPE_MAP.get(release_type),
+ container_type=self.map_container_type(release_type),
name=obj['container-title'][0])
+ ce_edit = self.create_container(ce)
+ container_id = ce_edit.ident
# license slug
license_slug = None
@@ -309,8 +310,7 @@ class CrossrefImporter(FatcatImporter):
# TODO: filter out huge releases; we'll get them later (and fix bug in
# fatcatd)
- if max(len(contribs), len(refs), len(abstracts)) > 750:
- return None
+ assert max(len(contribs), len(refs), len(abstracts)) <= 750
# release date parsing is amazingly complex
raw_date = obj['issued']['date-parts'][0]
@@ -354,41 +354,28 @@ class CrossrefImporter(FatcatImporter):
contribs=contribs,
refs=refs,
)
- return (re, ce)
+ return re
+
+ def try_update(self, re):
+
+ # lookup existing DOI (don't need to try other ext idents for crossref)
+ existing_release = None
+ try:
+ existing_release = self.api.lookup_release(doi=re.doi)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+ # doesn't exist, need to update
+ return True
+
+ # eventually we'll want to support "updates", but for now just skip if
+ # entity already exists
+ if existing_release:
+ self.counts['exists'] += 1
+ return False
+
+ return True
+
+ def insert_batch(self, batch):
+ self.api.create_release_batch(batch, autoaccept=True)
- def create_row(self, row, editgroup_id=None):
- if row is None:
- return
- obj = json.loads(row)
- entities = self.parse_crossref_dict(obj)
- # XXX:
- print(entities)
- if entities is not None:
- (re, ce) = entities
- if ce is not None:
- container = self.api.create_container(ce, editgroup_id=editgroup_id)
- re.container_id = container.ident
- self._issnl_id_map[ce.issnl] = container.ident
- self.api.create_release(re, editgroup_id=editgroup_id)
- self.counts['insert'] += 1
-
- def create_batch(self, batch):
- """Current work/release pairing disallows batch creation of releases.
- Could do batch work creation and then match against releases, but meh."""
- release_batch = []
- for row in batch:
- if row is None:
- continue
- obj = json.loads(row)
- entities = self.parse_crossref_dict(obj)
- if entities is not None:
- (re, ce) = entities
- if ce is not None:
- ce_eg = self.api.create_editgroup(fatcat_client.Editgroup())
- container = self.api.create_container(ce, editgroup_id=ce_eg.editgroup_id)
- self.api.accept_editgroup(ce_eg.editgroup_id)
- re.container_id = container.ident
- self._issnl_id_map[ce.issnl] = container.ident
- release_batch.append(re)
- self.api.create_release_batch(release_batch, autoaccept=True)
- self.counts['insert'] += len(release_batch)