aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2018-11-12 23:18:56 -0800
committerBryan Newbold <bnewbold@robocracy.org>2018-11-12 23:18:56 -0800
commitb03bfc8f3fd84141738f775b273a99850d78e1ff (patch)
tree64858e474fa38aa015f06f5e15b851dcc85da421 /python/fatcat_tools
parent055c464deea8cdaccf3ed384995d4409b0f51409 (diff)
downloadfatcat-b03bfc8f3fd84141738f775b273a99850d78e1ff.tar.gz
fatcat-b03bfc8f3fd84141738f775b273a99850d78e1ff.zip
refactor python modules
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r--python/fatcat_tools/changelog_workers.py122
-rw-r--r--python/fatcat_tools/crossref_importer.py272
-rw-r--r--python/fatcat_tools/elastic_workers.py47
-rw-r--r--python/fatcat_tools/entity_helpers.py100
-rw-r--r--python/fatcat_tools/fcid.py17
-rwxr-xr-xpython/fatcat_tools/grobid_metadata_importer.py168
-rw-r--r--python/fatcat_tools/importer_common.py137
-rw-r--r--python/fatcat_tools/issn_importer.py72
-rw-r--r--python/fatcat_tools/matched_importer.py144
-rw-r--r--python/fatcat_tools/orcid_importer.py73
-rw-r--r--python/fatcat_tools/raw_api_client.py66
-rw-r--r--python/fatcat_tools/worker_common.py25
12 files changed, 1243 insertions, 0 deletions
diff --git a/python/fatcat_tools/changelog_workers.py b/python/fatcat_tools/changelog_workers.py
new file mode 100644
index 00000000..e341ea32
--- /dev/null
+++ b/python/fatcat_tools/changelog_workers.py
@@ -0,0 +1,122 @@
+
+import json
+import time
+from itertools import islice
+from fatcat.worker_common import FatcatWorker
+from pykafka.common import OffsetType
+
+
+class FatcatChangelogWorker(FatcatWorker):
+ """
+ Periodically polls the fatcat API looking for new changelogs. When they are
+ found, fetch them and push (as JSON) into a Kafka topic.
+ """
+
+ def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
+ # TODO: should be offset=0
+ super().__init__(kafka_hosts=kafka_hosts,
+ produce_topic=produce_topic,
+ api_host_url=api_host_url)
+ self.poll_interval = poll_interval
+ self.offset = offset # the fatcat changelog offset, not the kafka offset
+
+ def most_recent_message(self, topic):
+ """
+ Tries to fetch the most recent message from a given topic.
+ This only makes sense for single partition topics, though could be
+ extended with "last N" behavior.
+
+ Following "Consuming the last N messages from a topic"
+ from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns
+ """
+ consumer = topic.get_simple_consumer(
+ auto_offset_reset=OffsetType.LATEST,
+ reset_offset_on_start=True)
+ offsets = [(p, op.last_offset_consumed - 1)
+ for p, op in consumer._partitions.items()]
+ offsets = [(p, (o if o > -1 else -2)) for p, o in offsets]
+ if -2 in [o for p, o in offsets]:
+ return None
+ else:
+ consumer.reset_offsets(offsets)
+ msg = islice(consumer, 1)
+ if msg:
+ return list(msg)[0].value
+ else:
+ return None
+
+ def run(self):
+ topic = self.kafka.topics[self.produce_topic]
+ # On start, try to consume the most recent from the topic, and using
+ # that as the starting offset. Note that this is a single-partition
+ # topic
+ if self.offset is None:
+ print("Checking for most recent changelog offset...")
+ msg = self.most_recent_message(topic)
+ if msg:
+ self.offset = json.loads(msg.decode('utf-8'))['index']
+ else:
+ self.offset = 1
+
+ with topic.get_sync_producer() as producer:
+ while True:
+ latest = int(self.api.get_changelog(limit=1)[0].index)
+ if latest > self.offset:
+ print("Fetching changelogs from {} through {}".format(
+ self.offset+1, latest))
+ for i in range(self.offset+1, latest+1):
+ cle = self.api.get_changelog_entry(i)
+ obj = self.api.api_client.sanitize_for_serialization(cle)
+ producer.produce(
+ message=json.dumps(obj).encode('utf-8'),
+ partition_key=None,
+ timestamp=None,
+ #XXX: timestamp=cle.timestamp,
+ )
+ self.offset = i
+ print("Sleeping {} seconds...".format(self.poll_interval))
+ time.sleep(self.poll_interval)
+
+
+class FatcatEntityUpdatesWorker(FatcatWorker):
+ """
+ Consumes from the changelog topic and publishes expanded entities (fetched
+ from API) to update topics.
+
+ For now, only release updates are published.
+ """
+
+ def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ api_host_url=api_host_url)
+ self.release_topic = release_topic
+ self.consumer_group = "entity-updates"
+
+ def run(self):
+ changelog_topic = self.kafka.topics[self.consume_topic]
+ release_topic = self.kafka.topics[self.release_topic]
+
+ consumer = changelog_topic.get_balanced_consumer(
+ consumer_group=self.consumer_group,
+ managed=True,
+ auto_offset_reset=OffsetType.LATEST,
+ reset_offset_on_start=False,
+ )
+
+ with release_topic.get_sync_producer() as producer:
+ for msg in consumer:
+ cle = json.loads(msg.value.decode('utf-8'))
+ #print(cle)
+ release_edits = cle['editgroup']['edits']['releases']
+ for re in release_edits:
+ ident = re['ident']
+ release = self.api.get_release(ident, expand="files,container")
+ release_dict = self.api.api_client.sanitize_for_serialization(release)
+ producer.produce(
+ message=json.dumps(release_dict).encode('utf-8'),
+ partition_key=ident.encode('utf-8'),
+ timestamp=None,
+ )
+ consumer.commit_offsets()
+
diff --git a/python/fatcat_tools/crossref_importer.py b/python/fatcat_tools/crossref_importer.py
new file mode 100644
index 00000000..37005965
--- /dev/null
+++ b/python/fatcat_tools/crossref_importer.py
@@ -0,0 +1,272 @@
+
+import sys
+import json
+import sqlite3
+import datetime
+import itertools
+import fatcat_client
+from fatcat.importer_common import FatcatImporter
+
+
+class FatcatCrossrefImporter(FatcatImporter):
+
+ def __init__(self, host_url, issn_map_file, extid_map_file=None, create_containers=True):
+ super().__init__(host_url, issn_map_file)
+ self.extid_map_db = None
+ if extid_map_file:
+ db_uri = "file:{}?mode=ro".format(extid_map_file)
+ print("Using external ID map: {}".format(db_uri))
+ self.extid_map_db = sqlite3.connect(db_uri, uri=True)
+ else:
+ print("Not using external ID map")
+ self.create_containers = create_containers
+
+ def lookup_ext_ids(self, doi):
+ if self.extid_map_db is None:
+ return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None)
+ row = self.extid_map_db.execute("SELECT core, pmid, pmcid, wikidata FROM ids WHERE doi=? LIMIT 1",
+ [doi.lower()]).fetchone()
+ if row is None:
+ return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None)
+ row = [str(cell or '') or None for cell in row]
+ return dict(
+ core_id=row[0],
+ pmid=row[1],
+ pmcid=row[2],
+ wikidata_qid=row[3])
+
+ def parse_crossref_dict(self, obj):
+ """
+ obj is a python dict (parsed from json).
+ returns a ReleaseEntity
+ """
+
+ # This work is out of scope if it doesn't have authors and a title
+ if (not 'author' in obj) or (not 'title' in obj):
+ return None
+
+ # Other ways to be out of scope (provisionally)
+ if (not 'type' in obj):
+ return None
+
+ # contribs
+ def do_contribs(obj_list, ctype):
+ contribs = []
+ for i, am in enumerate(obj_list):
+ creator_id = None
+ if 'ORCID' in am.keys():
+ creator_id = self.lookup_orcid(am['ORCID'].split('/')[-1])
+ # Sorry humans :(
+ if am.get('given') and am.get('family'):
+ raw_name = "{} {}".format(am['given'], am['family'])
+ elif am.get('family'):
+ raw_name = am['family']
+ else:
+ # TODO: defaults back to a pseudo-null value
+ raw_name = am.get('given', '<blank>')
+ extra = dict()
+ if ctype == "author":
+ index = i
+ else:
+ index = None
+ if am.get('affiliation'):
+ # note: affiliation => affiliations
+ extra['affiliations'] = am.get('affiliation')
+ if am.get('sequence') and am.get('sequence') != "additional":
+ extra['sequence'] = am.get('sequence')
+ if not extra:
+ extra = None
+ contribs.append(fatcat_client.ReleaseContrib(
+ creator_id=creator_id,
+ index=index,
+ raw_name=raw_name,
+ role=ctype,
+ extra=extra))
+ return contribs
+ contribs = do_contribs(obj['author'], "author")
+ contribs.extend(do_contribs(obj.get('editor', []), "editor"))
+ contribs.extend(do_contribs(obj.get('translator', []), "translator"))
+
+ # container
+ issn = obj.get('ISSN', [None])[0]
+ issnl = self.issn2issnl(issn)
+ container_id = None
+ if issnl:
+ container_id = self.lookup_issnl(issnl)
+ publisher = obj.get('publisher')
+
+ ce = None
+ if (container_id is None and self.create_containers and issnl != None
+ and obj.get('container-title') and len(obj['container-title']) > 0):
+ ce = fatcat_client.ContainerEntity(
+ issnl=issnl,
+ publisher=publisher,
+ name=obj['container-title'][0])
+
+ # references
+ refs = []
+ for i, rm in enumerate(obj.get('reference', [])):
+ try:
+ year = int(rm.get('year'))
+ # NOTE: will need to update/config in the future!
+ # NOTE: are there crossref works with year < 100?
+ if year > 2025 or year < 100:
+ year = None
+ except:
+ year = None
+ extra = rm.copy()
+ if rm.get('DOI'):
+ extra['doi'] = rm.get('DOI').lower()
+ key = rm.get('key')
+ if key and key.startswith(obj['DOI'].upper()):
+ key = key.replace(obj['DOI'].upper() + "-", '')
+ key = key.replace(obj['DOI'].upper(), '')
+ container_name = rm.get('volume-title')
+ if not container_name:
+ container_name = rm.get('journal-title')
+ extra.pop('DOI', None)
+ extra.pop('key', None)
+ extra.pop('year', None)
+ extra.pop('volume-name', None)
+ extra.pop('journal-title', None)
+ extra.pop('title', None)
+ extra.pop('first-page', None)
+ extra.pop('doi-asserted-by', None)
+ if extra:
+ extra = dict(crossref=extra)
+ else:
+ extra = None
+ refs.append(fatcat_client.ReleaseRef(
+ index=i,
+ # doing lookups would be a second import pass
+ target_release_id=None,
+ key=key,
+ year=year,
+ container_name=container_name,
+ title=rm.get('title'),
+ locator=rm.get('first-page'),
+ # TODO: just dump JSON somewhere here?
+ extra=extra))
+
+ # abstracts
+ abstracts = []
+ if obj.get('abstract') != None:
+ abstracts.append(fatcat_client.ReleaseEntityAbstracts(
+ mimetype="application/xml+jats",
+ content=obj.get('abstract')))
+
+ # extra fields
+ extra = dict()
+ for key in ('subject', 'type', 'license', 'alternative-id',
+ 'container-title', 'original-title', 'subtitle', 'archive',
+ 'funder', 'group-title'):
+ # TODO: unpack "container-title" array
+ val = obj.get(key)
+ if val:
+ extra[key] = val
+ if 'license' in extra and extra['license']:
+ for i in range(len(extra['license'])):
+ if 'start' in extra['license'][i]:
+ extra['license'][i]['start'] = extra['license'][i]['start']['date-time']
+ if len(obj['title']) > 1:
+ extra['other-titles'] = obj['title'][1:]
+ # TODO: this should be top-level
+ extra['is_kept'] = len(obj.get('archive', [])) > 0
+
+ # ISBN
+ isbn13 = None
+ for raw in obj.get('ISBN', []):
+ # TODO: convert if not ISBN-13 format
+ if len(raw) == 17:
+ isbn13 = raw
+ break
+
+ # release status
+ if obj['type'] in ('journal-article', 'conference-proceeding', 'book',
+ 'dissertation', 'book-chapter'):
+ release_status = "published"
+ else:
+ # unknown
+ release_status = None
+
+ # external identifiers
+ extids = self.lookup_ext_ids(doi=obj['DOI'].lower())
+
+ # TODO: filter out huge releases; we'll get them later (and fix bug in
+ # fatcatd)
+ if max(len(contribs), len(refs), len(abstracts)) > 750:
+ return None
+
+ # release date parsing is amazingly complex
+ release_date = obj['issued']['date-parts'][0]
+ if not release_date or not release_date[0]:
+ # got some NoneType, even though at least year is supposed to be set
+ release_date = None
+ elif len(release_date) == 3:
+ release_date = datetime.datetime(year=release_date[0], month=release_date[1], day=release_date[2])
+ else:
+ # only the year is actually required; mangle to first day for date
+ # (TODO: something better?)
+ release_date = datetime.datetime(year=release_date[0], month=1, day=1)
+ # convert to string ISO datetime format (if not null)
+ if release_date:
+ release_date = release_date.isoformat() + "Z"
+
+ re = fatcat_client.ReleaseEntity(
+ work_id=None,
+ title=obj['title'][0],
+ contribs=contribs,
+ refs=refs,
+ container_id=container_id,
+ publisher=publisher,
+ release_type=obj['type'],
+ release_status=release_status,
+ doi=obj['DOI'].lower(),
+ isbn13=isbn13,
+ core_id=extids['core_id'],
+ pmid=extids['pmid'],
+ pmcid=extids['pmcid'],
+ wikidata_qid=extids['wikidata_qid'],
+ release_date=release_date,
+ issue=obj.get('issue'),
+ volume=obj.get('volume'),
+ pages=obj.get('page'),
+ abstracts=abstracts,
+ extra=dict(crossref=extra))
+ return (re, ce)
+
+ def create_row(self, row, editgroup=None):
+ if row is None:
+ return
+ obj = json.loads(row)
+ entities = self.parse_crossref_dict(obj)
+ if entities is not None:
+ (re, ce) = entities
+ if ce is not None:
+ container = self.api.create_container(ce, editgroup=editgroup)
+ re.container_id = container.ident
+ self._issnl_id_map[ce.issnl] = container.ident
+ self.api.create_release(re, editgroup=editgroup)
+ self.insert_count = self.insert_count + 1
+
+ def create_batch(self, batch, editgroup=None):
+ """Current work/release pairing disallows batch creation of releases.
+ Could do batch work creation and then match against releases, but meh."""
+ release_batch = []
+ for row in batch:
+ if row is None:
+ continue
+ obj = json.loads(row)
+ entities = self.parse_crossref_dict(obj)
+ if entities is not None:
+ (re, ce) = entities
+ if ce is not None:
+ ce_eg = self.api.create_editgroup(
+ fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae'))
+ container = self.api.create_container(ce, editgroup=ce_eg.id)
+ self.api.accept_editgroup(ce_eg.id)
+ re.container_id = container.ident
+ self._issnl_id_map[ce.issnl] = container.ident
+ release_batch.append(re)
+ self.api.create_release_batch(release_batch, autoaccept="true", editgroup=editgroup)
+ self.insert_count = self.insert_count + len(release_batch)
diff --git a/python/fatcat_tools/elastic_workers.py b/python/fatcat_tools/elastic_workers.py
new file mode 100644
index 00000000..3d2e9c39
--- /dev/null
+++ b/python/fatcat_tools/elastic_workers.py
@@ -0,0 +1,47 @@
+
+import json
+import time
+import requests
+from fatcat.worker_common import FatcatWorker
+from fatcat_client.models import ReleaseEntity
+from fatcat.entity_helpers import *
+from pykafka.common import OffsetType
+
+
+class FatcatElasticReleaseWorker(FatcatWorker):
+ """
+ Consumes from release-updates topic and pushes into (presumably local)
+ elasticsearch.
+
+ Uses a consumer group to manage offset.
+ """
+
+ def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
+ elastic_backend="http://localhost:9200", elastic_index="fatcat"):
+ super().__init__(kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ api_host_url=None)
+ self.consumer_group = "elastic-updates"
+ self.elastic_backend = elastic_backend
+ self.elastic_index = elastic_index
+
+ def run(self):
+ consume_topic = self.kafka.topics[self.consume_topic]
+
+ consumer = consume_topic.get_balanced_consumer(
+ consumer_group=self.consumer_group,
+ managed=True,
+ )
+
+ for msg in consumer:
+ json_str = msg.value.decode('utf-8')
+ release = entity_from_json(json_str, ReleaseEntity)
+ #print(release)
+ elastic_endpoint = "{}/{}/release/{}".format(
+ self.elastic_backend,
+ self.elastic_index,
+ release.ident)
+ print("Updating document: {}".format(elastic_endpoint))
+ resp = requests.post(elastic_endpoint, json=release.to_elastic_dict())
+ assert resp.status_code in (200, 201)
+ consumer.commit_offsets()
diff --git a/python/fatcat_tools/entity_helpers.py b/python/fatcat_tools/entity_helpers.py
new file mode 100644
index 00000000..c454536b
--- /dev/null
+++ b/python/fatcat_tools/entity_helpers.py
@@ -0,0 +1,100 @@
+
+import collections
+from fatcat_client.models import ReleaseEntity
+from fatcat_client.api_client import ApiClient
+
+def entity_to_json(entity):
+ ac = ApiClient()
+ return ac.sanitize_for_serialization(entity)
+
+def entity_from_json(json_str, entity_type):
+ """
+ Hack to take advantage of the code-generated deserialization code
+ """
+ ac = ApiClient()
+ thing = collections.namedtuple('Thing', ['data'])
+ thing.data = json_str
+ return ac.deserialize(thing, entity_type)
+
+def release_elastic_dict(release):
+ """
+ Converts from an entity model/schema to elasticsearch oriented schema.
+
+ Returns: dict
+ """
+
+ if release.state != 'active':
+ raise ValueError("Entity is not 'active'")
+
+ # First, the easy ones (direct copy)
+ t = dict(
+ ident = release.ident,
+ revision = release.revision,
+ title = release.title,
+ release_type = release.release_type,
+ release_status = release.release_status,
+ language = release.language,
+ doi = release.doi,
+ pmid = release.pmid,
+ pmcid = release.pmcid,
+ isbn13 = release.isbn13,
+ core_id = release.core_id,
+ wikidata_qid = release.wikidata_qid
+ )
+
+ if release.release_date:
+ # TODO: resolve why this can be either a string or datetime
+ if type(release.release_date) == str:
+ t['release_date'] = release.release_date
+ else:
+ t['release_date'] = release.release_date.strftime('%F')
+
+ container = release.container
+ container_is_kept = False
+ if container:
+ t['publisher'] = container.publisher
+ t['container_name'] = container.name
+ t['container_issnl'] = container.issnl
+ container_extra = container.extra
+ if container_extra:
+ t['container_is_oa'] = container_extra.get('is_oa')
+ container_is_kept = container_extra.get('is_kept', False)
+ t['container_is_longtail_oa'] = container_extra.get('is_longtail_oa')
+ else:
+ t['publisher'] = release.publisher
+
+ files = release.files or []
+ t['file_count'] = len(files)
+ in_wa = False
+ in_ia = False
+ t['file_pdf_url'] = None
+ for f in files:
+ is_pdf = 'pdf' in f.get('mimetype', '')
+ for url in f.get('urls', []):
+ if url.get('rel', '') == 'webarchive':
+ in_wa = True
+ if '//web.archive.org/' in url['url'] or '//archive.org/' in url['url']:
+ in_ia = True
+ if is_pdf:
+ t['file_pdf_url'] = url['url']
+ if not t['file_pdf_url'] and is_pdf:
+ t['file_pdf_url'] = url['url']
+ t['file_in_webarchive'] = in_wa
+ t['file_in_ia'] = in_ia
+
+ extra = release.extra or dict()
+ if extra:
+ t['in_shadow'] = extra.get('in_shadow')
+ if extra.get('grobid') and extra['grobid'].get('is_longtail_oa'):
+ t['container_is_longtail_oa'] = True
+ t['any_abstract'] = bool(release.abstracts)
+ t['is_kept'] = container_is_kept or extra.get('is_kept', False)
+
+ t['ref_count'] = len(release.refs or [])
+ t['contrib_count'] = len(release.contribs or [])
+ contrib_names = []
+ for c in (release.contribs or []):
+ if c.raw_name:
+ contrib_names.append(c.raw_name)
+ t['contrib_names'] = contrib_names
+ return t
diff --git a/python/fatcat_tools/fcid.py b/python/fatcat_tools/fcid.py
new file mode 100644
index 00000000..dd72b242
--- /dev/null
+++ b/python/fatcat_tools/fcid.py
@@ -0,0 +1,17 @@
+
+import base64
+import uuid
+
+def fcid2uuid(s):
+ s = s.split('_')[-1].upper().encode('utf-8')
+ assert len(s) == 26
+ raw = base64.b32decode(s + b"======")
+ return str(uuid.UUID(bytes=raw)).lower()
+
+def uuid2fcid(s):
+ raw = uuid.UUID(s).bytes
+ return base64.b32encode(raw)[:26].lower().decode('utf-8')
+
+def test_fcid():
+ test_uuid = '00000000-0000-0000-3333-000000000001'
+ assert test_uuid == fcid2uuid(uuid2fcid(test_uuid))
diff --git a/python/fatcat_tools/grobid_metadata_importer.py b/python/fatcat_tools/grobid_metadata_importer.py
new file mode 100755
index 00000000..95cc285e
--- /dev/null
+++ b/python/fatcat_tools/grobid_metadata_importer.py
@@ -0,0 +1,168 @@
+#!/usr/bin/env python3
+
+import sys
+import json
+import base64
+import datetime
+import fatcat_client
+from fatcat.importer_common import FatcatImporter
+
+MAX_ABSTRACT_BYTES=4096
+
+
+class FatcatGrobidMetadataImporter(FatcatImporter):
+
+ def __init__(self, host_url, default_link_rel="web"):
+ super().__init__(host_url)
+ self.default_link_rel = default_link_rel
+
+ def parse_grobid_json(self, obj):
+
+ if not obj.get('title'):
+ return None
+
+ release = dict()
+ extra = dict()
+
+ if obj.get('abstract') and len(obj.get('abstract')) < MAX_ABSTRACT_BYTES:
+ abobj = dict(
+ mimetype="text/plain",
+ language=None,
+ content=obj.get('abstract').strip())
+ abstracts = [abobj]
+ else:
+ abstracts = None
+
+ contribs = []
+ for i, a in enumerate(obj.get('authors', [])):
+ c = dict(raw_name=a['name'], role="author")
+ contribs.append(fatcat_client.ReleaseContrib(
+ index=i,
+ raw_name=a['name'],
+ role="author",
+ extra=None))
+
+ refs = []
+ for raw in obj.get('citations', []):
+ cite_extra = dict()
+ ref = dict()
+ ref['key'] = raw.get('id')
+ if raw.get('title'):
+ ref['title'] = raw['title'].strip()
+ if raw.get('date'):
+ try:
+ year = int(raw['date'].strip()[:4])
+ ref['year'] = year
+ except:
+ pass
+ for key in ('volume', 'url', 'issue', 'publisher'):
+ if raw.get(key):
+ cite_extra[key] = raw[key].strip()
+ if raw.get('authors'):
+ cite_extra['authors'] = [a['name'] for a in raw['authors']]
+ if cite_extra:
+ cite_extra = dict(grobid=cite_extra)
+ else:
+ cite_extra = None
+ ref['extra'] = cite_extra
+ refs.append(ref)
+
+ release_type = "journal-article"
+ release_date = None
+ if obj.get('date'):
+ # TODO: only returns year, ever? how to handle?
+ release_date = datetime.datetime(year=int(obj['date'][:4]), month=1, day=1)
+
+ if obj.get('doi'):
+ extra['doi'] = obj['doi']
+ if obj['journal'] and obj['journal'].get('name'):
+ extra['container_name'] = obj['journal']['name']
+
+ extra['is_longtail_oa'] = True
+
+ # TODO: ISSN/eISSN handling? or just journal name lookup?
+
+ if extra:
+ extra = dict(grobid=extra)
+ else:
+ extra = None
+
+ re = fatcat_client.ReleaseEntity(
+ title=obj['title'].strip(),
+ contribs=contribs,
+ refs=refs,
+ publisher=obj['journal'].get('publisher'),
+ volume=obj['journal'].get('volume'),
+ issue=obj['journal'].get('issue'),
+ abstracts=abstracts,
+ extra=extra)
+ return re
+
+ # TODO: make this a common function somewhere
+ def make_url(self, raw):
+ rel = self.default_link_rel
+ # TODO: this is where we could map specific domains to rel types,
+ # and also filter out bad domains, invalid URLs, etc
+ if "//archive.org/" in raw or "//arxiv.org/" in raw:
+ # TODO: special-case the arxiv.org bulk mirror?
+ rel = "repository"
+ elif "//web.archive.org/" in raw or "//archive.is/" in raw:
+ rel = "webarchive"
+ return fatcat_client.FileEntityUrls(url=raw, rel=rel)
+
+ def parse_file_metadata(self, sha1_key, cdx, mimetype, file_size):
+
+ sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower()
+
+ # lookup existing SHA1, or create new entity
+ try:
+ existing_file = self.api.lookup_file(sha1=sha1)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+ existing_file = None
+
+ if existing_file:
+ # if file is already in here, presumably not actually long-tail
+ return None
+ fe = fatcat_client.FileEntity(
+ sha1=sha1,
+ size=int(file_size),
+ mimetype=mimetype,
+ releases=[],
+ urls=[],
+ )
+
+ # parse URLs and CDX
+ original = cdx['url']
+ wayback = "https://web.archive.org/web/{}/{}".format(
+ cdx['dt'],
+ original)
+ fe.urls.append(
+ fatcat_client.FileEntityUrls(url=wayback, rel="webarchive"))
+ original_url = self.make_url(original)
+ if original_url != None:
+ fe.urls.append(original_url)
+
+ return fe
+
+ def create_row(self, row, editgroup=None):
+ if not row:
+ return
+ fields = row.split('\t')
+ sha1_key = fields[0]
+ cdx = json.loads(fields[1])
+ mimetype = fields[2]
+ file_size = int(fields[3])
+ grobid_meta = json.loads(fields[4])
+ fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size)
+ re = self.parse_grobid_json(grobid_meta)
+ if fe and re:
+ release_entity = self.api.create_release(re, editgroup=editgroup)
+ # release ident can't already be in release list because we just
+ # created it
+ fe.releases.append(release_entity.ident)
+ file_entity = self.api.create_file(fe, editgroup=editgroup)
+ self.insert_count = self.insert_count + 1
+
+ # NB: batch mode not implemented
diff --git a/python/fatcat_tools/importer_common.py b/python/fatcat_tools/importer_common.py
new file mode 100644
index 00000000..8dfee875
--- /dev/null
+++ b/python/fatcat_tools/importer_common.py
@@ -0,0 +1,137 @@
+
+import re
+import sys
+import csv
+import json
+import itertools
+import fatcat_client
+from fatcat_client.rest import ApiException
+
+# from: https://docs.python.org/3/library/itertools.html
+def grouper(iterable, n, fillvalue=None):
+ "Collect data into fixed-length chunks or blocks"
+ args = [iter(iterable)] * n
+ return itertools.zip_longest(*args, fillvalue=fillvalue)
+
+class FatcatImporter:
+
+ def __init__(self, host_url, issn_map_file=None):
+ conf = fatcat_client.Configuration()
+ conf.host = host_url
+ self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf))
+ self._issnl_id_map = dict()
+ self._orcid_id_map = dict()
+ self._doi_id_map = dict()
+ self._issn_issnl_map = None
+ self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$")
+ if issn_map_file:
+ self.read_issn_map_file(issn_map_file)
+ self.processed_lines = 0
+ self.insert_count = 0
+ self.update_count = 0
+
+ def describe_run(self):
+ print("Processed {} lines, inserted {}, updated {}.".format(
+ self.processed_lines, self.insert_count, self.update_count))
+
+ def process_source(self, source, group_size=100):
+ """Creates and auto-accepts editgroup every group_size rows"""
+ eg = self.api.create_editgroup(
+ fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae'))
+ for i, row in enumerate(source):
+ self.create_row(row, editgroup=eg.id)
+ if i > 0 and (i % group_size) == 0:
+ self.api.accept_editgroup(eg.id)
+ eg = self.api.create_editgroup(
+ fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae'))
+ self.processed_lines = self.processed_lines + 1
+ if i == 0 or (i % group_size) != 0:
+ self.api.accept_editgroup(eg.id)
+
+ def process_batch(self, source, size=50):
+ """Reads and processes in batches (not API-call-per-)"""
+ for rows in grouper(source, size):
+ self.processed_lines = self.processed_lines + len(rows)
+ eg = self.api.create_editgroup(
+ fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae'))
+ self.create_batch(rows, editgroup=eg.id)
+
+ def process_csv_source(self, source, group_size=100, delimiter=','):
+ reader = csv.DictReader(source, delimiter=delimiter)
+ self.process_source(reader, group_size)
+
+ def process_csv_batch(self, source, size=50, delimiter=','):
+ reader = csv.DictReader(source, delimiter=delimiter)
+ self.process_batch(reader, size)
+
+ def is_issnl(self, issnl):
+ return len(issnl) == 9 and issnl[4] == '-'
+
+ def lookup_issnl(self, issnl):
+ """Caches calls to the ISSN-L lookup API endpoint in a local dict"""
+ if issnl in self._issnl_id_map:
+ return self._issnl_id_map[issnl]
+ container_id = None
+ try:
+ rv = self.api.lookup_container(issnl=issnl)
+ container_id = rv.ident
+ except ApiException as ae:
+ # If anything other than a 404 (not found), something is wrong
+ assert ae.status == 404
+ self._issnl_id_map[issnl] = container_id # might be None
+ return container_id
+
+ def is_orcid(self, orcid):
+ return self._orcid_regex.match(orcid) != None
+
+ def lookup_orcid(self, orcid):
+ """Caches calls to the Orcid lookup API endpoint in a local dict"""
+ if not self.is_orcid(orcid):
+ return None
+ if orcid in self._orcid_id_map:
+ return self._orcid_id_map[orcid]
+ creator_id = None
+ try:
+ rv = self.api.lookup_creator(orcid=orcid)
+ creator_id = rv.ident
+ except ApiException as ae:
+ # If anything other than a 404 (not found), something is wrong
+ assert ae.status == 404
+ self._orcid_id_map[orcid] = creator_id # might be None
+ return creator_id
+
+ def is_doi(self, doi):
+ return doi.startswith("10.") and doi.count("/") >= 1
+
+ def lookup_doi(self, doi):
+ """Caches calls to the doi lookup API endpoint in a local dict"""
+ assert self.is_doi(doi)
+ doi = doi.lower()
+ if doi in self._doi_id_map:
+ return self._doi_id_map[doi]
+ release_id = None
+ try:
+ rv = self.api.lookup_release(doi=doi)
+ release_id = rv.ident
+ except ApiException as ae:
+ # If anything other than a 404 (not found), something is wrong
+ assert ae.status == 404
+ self._doi_id_map[doi] = release_id # might be None
+ return release_id
+
+ def read_issn_map_file(self, issn_map_file):
+ print("Loading ISSN map file...")
+ self._issn_issnl_map = dict()
+ for line in issn_map_file:
+ if line.startswith("ISSN") or len(line) == 0:
+ continue
+ (issn, issnl) = line.split()[0:2]
+ self._issn_issnl_map[issn] = issnl
+ # double mapping makes lookups easy
+ self._issn_issnl_map[issnl] = issnl
+ print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map)))
+
+ def issn2issnl(self, issn):
+ if issn is None:
+ return None
+ return self._issn_issnl_map.get(issn)
diff --git a/python/fatcat_tools/issn_importer.py b/python/fatcat_tools/issn_importer.py
new file mode 100644
index 00000000..c9ef50b5
--- /dev/null
+++ b/python/fatcat_tools/issn_importer.py
@@ -0,0 +1,72 @@
+
+import sys
+import json
+import itertools
+import fatcat_client
+from fatcat.importer_common import FatcatImporter
+
+# CSV format (generated from git.archive.org/webgroup/oa-journal-analysis):
+# ISSN-L,in_doaj,in_road,in_norwegian,in_crossref,title,publisher,url,lang,ISSN-print,ISSN-electronic,doi_count,has_doi,is_oa,is_kept,publisher_size,url_live,url_live_status,url_live_final_status,url_live_final_url,url_live_status_simple,url_live_final_status_simple,url_domain,gwb_pdf_count
+
+def or_none(s):
+ if s is None:
+ return None
+ if len(s) == 0:
+ return None
+ return s
+
+def truthy(s):
+ if s is None:
+ return None
+ s = s.lower()
+ if s in ('true', 't', 'yes', 'y', '1'):
+ return True
+ elif s in ('false', 'f', 'no', 'n', '0'):
+ return False
+ else:
+ return None
+
+class FatcatIssnImporter(FatcatImporter):
+
+ def parse_issn_row(self, row):
+ """
+ row is a python dict (parsed from CSV).
+ returns a ContainerEntity
+ """
+ title = or_none(row['title'])
+ issnl = or_none(row['ISSN-L'])
+ if title is None or issnl is None:
+ return
+ extra = dict(
+ in_doaj=truthy(row['in_doaj']),
+ in_road=truthy(row['in_road']),
+ in_norwegian=truthy(row['in_norwegian']),
+ language=or_none(row['lang']),
+ url=or_none(row['url']),
+ ISSNp=or_none(row['ISSN-print']),
+ ISSNe=or_none(row['ISSN-electronic']),
+ is_oa=truthy(row['is_oa']),
+ is_kept=truthy(row['is_kept']),
+ )
+ ce = fatcat_client.ContainerEntity(
+ issnl=issnl,
+ name=title,
+ publisher=or_none(row['publisher']),
+ abbrev=None,
+ coden=None,
+ extra=extra)
+ return ce
+
+ def create_row(self, row, editgroup=None):
+ ce = self.parse_issn_row(row)
+ if ce is not None:
+ self.api.create_container(ce, editgroup=editgroup)
+ self.insert_count = self.insert_count + 1
+
+ def create_batch(self, batch, editgroup=None):
+ """Reads and processes in batches (not API-call-per-line)"""
+ objects = [self.parse_issn_row(l)
+ for l in batch if l != None]
+ objects = [o for o in objects if o != None]
+ self.api.create_container_batch(objects, autoaccept="true", editgroup=editgroup)
+ self.insert_count = self.insert_count + len(objects)
diff --git a/python/fatcat_tools/matched_importer.py b/python/fatcat_tools/matched_importer.py
new file mode 100644
index 00000000..7f55369b
--- /dev/null
+++ b/python/fatcat_tools/matched_importer.py
@@ -0,0 +1,144 @@
+
+import sys
+import json
+import sqlite3
+import itertools
+import fatcat_client
+from fatcat.importer_common import FatcatImporter
+
+#row = row.split('\t')
+#assert len(row) == 2
+#sha1 = row[0].replace('sha1:')
+#sha1 = base64.b16encode(base64.b32decode(sha1)).lower()
+#print(sha1)
+#dois = [d.lower() for d in json.loads(row[1])]
+
+class FatcatMatchedImporter(FatcatImporter):
+ """
+ Input format is JSON with keys:
+ - dois (list)
+ - sha1 (hex)
+ - md5 (hex)
+ - sha256 (hex)
+ - size (int)
+ - cdx (list of objects)
+ - dt
+ - url
+ - mimetype
+ - urls (list of strings... or objects?)
+
+ Future handlings/extensions:
+ - core_id, wikidata_id, pmcid, pmid: not as lists
+ """
+
+ def __init__(self, host_url, skip_file_update=False, default_mime=None,
+ default_link_rel="web"):
+ super().__init__(host_url)
+ self.default_mime = default_mime
+ self.default_link_rel = default_link_rel
+ self.skip_file_update = skip_file_update
+
+ def make_url(self, raw):
+ rel = self.default_link_rel
+ # TODO: this is where we could map specific domains to rel types,
+ # and also filter out bad domains, invalid URLs, etc
+ if "//archive.org/" in raw or "//arxiv.org/" in raw:
+ # TODO: special-case the arxiv.org bulk mirror?
+ rel = "repository"
+ elif "//web.archive.org/" in raw or "//archive.is/" in raw:
+ rel = "webarchive"
+ return fatcat_client.FileEntityUrls(url=raw, rel=rel)
+
+ def parse_matched_dict(self, obj):
+ sha1 = obj['sha1']
+ dois = [d.lower() for d in obj.get('dois', [])]
+
+ # lookup sha1, or create new entity
+ fe = None
+ if not self.skip_file_update:
+ try:
+ fe = self.api.lookup_file(sha1=sha1)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+ if fe is None:
+ fe = fatcat_client.FileEntity(
+ sha1=sha1,
+ releases=[],
+ urls=[],
+ )
+
+ # lookup dois
+ re_list = set()
+ for doi in dois:
+ try:
+ re = self.api.lookup_release(doi=doi)
+ except fatcat_client.rest.ApiException as err:
+ if err.status != 404:
+ raise err
+ re = None
+ if re is None:
+ print("DOI not found: {}".format(doi))
+ else:
+ re_list.add(re.ident)
+ if len(re_list) == 0:
+ return None
+ if fe.releases == set(re_list):
+ return None
+ re_list.update(fe.releases)
+ fe.releases = list(re_list)
+
+ # parse URLs and CDX
+ existing_urls = [feu.url for feu in fe.urls]
+ for url in obj.get('url', []):
+ if url not in existing_urls:
+ url = self.make_url(url)
+ if url != None:
+ fe.urls.append(url)
+ for cdx in obj.get('cdx', []):
+ original = cdx['url']
+ wayback = "https://web.archive.org/web/{}/{}".format(
+ cdx['dt'],
+ original)
+ if wayback not in existing_urls:
+ fe.urls.append(
+ fatcat_client.FileEntityUrls(url=wayback, rel="webarchive"))
+ if original not in existing_urls:
+ url = self.make_url(original)
+ if url != None:
+ fe.urls.append(url)
+
+ if obj.get('size') != None:
+ fe.size = int(obj['size'])
+ fe.sha256 = obj.get('sha256', fe.sha256)
+ fe.md5 = obj.get('md5', fe.sha256)
+ if obj.get('mimetype') is None:
+ if fe.mimetype is None:
+ fe.mimetype = self.default_mime
+ else:
+ fe.mimetype = obj.get('mimetype')
+ return fe
+
+ def create_row(self, row, editgroup=None):
+ obj = json.loads(row)
+ fe = self.parse_matched_dict(obj)
+ if fe is not None:
+ if fe.ident is None:
+ self.api.create_file(fe, editgroup=editgroup)
+ self.insert_count = self.insert_count + 1
+ else:
+ self.api.update_file(fe.ident, fe, editgroup=editgroup)
+ self.update_count = self.update_count + 1
+
+ def create_batch(self, batch, editgroup=None):
+ """Reads and processes in batches (not API-call-per-line)"""
+ objects = [self.parse_matched_dict(json.loads(l))
+ for l in batch if l != None]
+ new_objects = [o for o in objects if o != None and o.ident == None]
+ update_objects = [o for o in objects if o != None and o.ident != None]
+ for obj in update_objects:
+ self.api.update_file(obj.ident, obj, editgroup=editgroup)
+ if len(new_objects) > 0:
+ self.api.create_file_batch(new_objects, autoaccept="true", editgroup=editgroup)
+ self.update_count = self.update_count + len(update_objects)
+ self.insert_count = self.insert_count + len(new_objects)
diff --git a/python/fatcat_tools/orcid_importer.py b/python/fatcat_tools/orcid_importer.py
new file mode 100644
index 00000000..e1f5943c
--- /dev/null
+++ b/python/fatcat_tools/orcid_importer.py
@@ -0,0 +1,73 @@
+
+import sys
+import json
+import itertools
+import fatcat_client
+from fatcat.importer_common import FatcatImporter
+
+def value_or_none(e):
+ if type(e) == dict:
+ e = e.get('value')
+ if type(e) == str and len(e) == 0:
+ e = None
+ # TODO: this is probably bogus; patched in desperation; remove?
+ if e:
+ try:
+ e.encode()
+ except UnicodeEncodeError:
+ # Invalid JSON?
+ print("BAD UNICODE")
+ return None
+ return e
+
+class FatcatOrcidImporter(FatcatImporter):
+
+ def parse_orcid_dict(self, obj):
+ """
+ obj is a python dict (parsed from json).
+ returns a CreatorEntity
+ """
+ name = obj['person']['name']
+ if name is None:
+ return None
+ extra = None
+ given = value_or_none(name.get('given-names'))
+ sur = value_or_none(name.get('family-name'))
+ display = value_or_none(name.get('credit-name'))
+ if display is None:
+ # TODO: sorry human beings
+ if given and sur:
+ display = "{} {}".format(given, sur)
+ elif sur:
+ display = sur
+ elif given:
+ display = given
+ else:
+ # must have *some* name
+ return None
+ orcid = obj['orcid-identifier']['path']
+ if not self.is_orcid(orcid):
+ sys.stderr.write("Bad ORCID: {}\n".format(orcid))
+ return None
+ ce = fatcat_client.CreatorEntity(
+ orcid=orcid,
+ given_name=given,
+ surname=sur,
+ display_name=display,
+ extra=extra)
+ return ce
+
+ def create_row(self, row, editgroup=None):
+ obj = json.loads(row)
+ ce = self.parse_orcid_dict(obj)
+ if ce is not None:
+ self.api.create_creator(ce, editgroup=editgroup)
+ self.insert_count = self.insert_count + 1
+
+ def create_batch(self, batch, editgroup=None):
+ """Reads and processes in batches (not API-call-per-line)"""
+ objects = [self.parse_orcid_dict(json.loads(l))
+ for l in batch if l != None]
+ objects = [o for o in objects if o != None]
+ self.api.create_creator_batch(objects, autoaccept="true", editgroup=editgroup)
+ self.insert_count = self.insert_count + len(objects)
diff --git a/python/fatcat_tools/raw_api_client.py b/python/fatcat_tools/raw_api_client.py
new file mode 100644
index 00000000..75151ebb
--- /dev/null
+++ b/python/fatcat_tools/raw_api_client.py
@@ -0,0 +1,66 @@
+
+import sys
+import json
+import requests
+
+
+class RawFatcatApiClient:
+
+ def __init__(self, host_url):
+ self.host_url = host_url
+ self.session = requests.Session()
+ self._issn_map = dict()
+
+ def get(self, path, data=None):
+ headers = {"content-type": "application/json"}
+ return self.session.get(self.host_url + path, json=data,
+ headers=headers)
+
+ def post(self, path, data=None):
+ headers = {"content-type": "application/json"}
+ return self.session.post(self.host_url + path, json=data,
+ headers=headers)
+
+ def new_editgroup(self):
+ rv = self.post('/v0/editgroup', data=dict(
+ editor_id=1))
+ print(rv)
+ print(rv.json())
+ assert rv.status_code == 201
+ editgroup_id = rv.json()['id']
+ return editgroup_id
+
+ def accept_editgroup(self, eg):
+ rv = self.post('/v0/editgroup/{}/accept'.format(eg))
+ assert rv.status_code == 200
+ return rv
+
+ def import_issn_file(self, json_file, create_containers=False, batchsize=100):
+ eg = self.new_editgroup()
+ i = 0
+ with open(json_file, 'r') as file:
+ for line in file:
+ if i % batchsize == 0:
+ sys.stdout.write('\n{}: '.format(i))
+ if (i+1) % 20 == 0:
+ sys.stdout.write('.')
+ i = i + 1
+ obj = json.loads(line)
+ if not ("author" in obj and "title" in obj):
+ continue
+ try:
+ self.import_crossref_dict(obj, editgroup=eg,
+ create_containers=create_containers)
+ except Exception as e:
+ print("ERROR: {}".format(e))
+ if i % batchsize == 0:
+ self.accept_editgroup(eg)
+ eg = self.new_editgroup()
+ if i % batchsize != 0:
+ self.accept_editgroup(eg)
+ print("done!")
+
+ def health(self):
+ rv = self.get("/health")
+ assert rv.status_code == 200
+ return rv.json()
diff --git a/python/fatcat_tools/worker_common.py b/python/fatcat_tools/worker_common.py
new file mode 100644
index 00000000..77ea2c15
--- /dev/null
+++ b/python/fatcat_tools/worker_common.py
@@ -0,0 +1,25 @@
+
+import re
+import sys
+import csv
+import json
+import itertools
+import fatcat_client
+from pykafka import KafkaClient
+from fatcat_client.rest import ApiException
+
+
+class FatcatWorker:
+ """
+ Common code for for Kafka producers and consumers.
+ """
+
+ def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None):
+ if api_host_url:
+ conf = fatcat_client.Configuration()
+ conf.host = api_host_url
+ self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf))
+ self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0")
+ self.produce_topic = produce_topic
+ self.consume_topic = consume_topic
+