From b03bfc8f3fd84141738f775b273a99850d78e1ff Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Mon, 12 Nov 2018 23:18:56 -0800 Subject: refactor python modules --- python/fatcat_tools/changelog_workers.py | 122 +++++++++++ python/fatcat_tools/crossref_importer.py | 272 ++++++++++++++++++++++++ python/fatcat_tools/elastic_workers.py | 47 ++++ python/fatcat_tools/entity_helpers.py | 100 +++++++++ python/fatcat_tools/fcid.py | 17 ++ python/fatcat_tools/grobid_metadata_importer.py | 168 +++++++++++++++ python/fatcat_tools/importer_common.py | 137 ++++++++++++ python/fatcat_tools/issn_importer.py | 72 +++++++ python/fatcat_tools/matched_importer.py | 144 +++++++++++++ python/fatcat_tools/orcid_importer.py | 73 +++++++ python/fatcat_tools/raw_api_client.py | 66 ++++++ python/fatcat_tools/worker_common.py | 25 +++ 12 files changed, 1243 insertions(+) create mode 100644 python/fatcat_tools/changelog_workers.py create mode 100644 python/fatcat_tools/crossref_importer.py create mode 100644 python/fatcat_tools/elastic_workers.py create mode 100644 python/fatcat_tools/entity_helpers.py create mode 100644 python/fatcat_tools/fcid.py create mode 100755 python/fatcat_tools/grobid_metadata_importer.py create mode 100644 python/fatcat_tools/importer_common.py create mode 100644 python/fatcat_tools/issn_importer.py create mode 100644 python/fatcat_tools/matched_importer.py create mode 100644 python/fatcat_tools/orcid_importer.py create mode 100644 python/fatcat_tools/raw_api_client.py create mode 100644 python/fatcat_tools/worker_common.py (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/changelog_workers.py b/python/fatcat_tools/changelog_workers.py new file mode 100644 index 00000000..e341ea32 --- /dev/null +++ b/python/fatcat_tools/changelog_workers.py @@ -0,0 +1,122 @@ + +import json +import time +from itertools import islice +from fatcat.worker_common import FatcatWorker +from pykafka.common import OffsetType + + +class FatcatChangelogWorker(FatcatWorker): + """ + Periodically polls the fatcat API looking for new changelogs. When they are + found, fetch them and push (as JSON) into a Kafka topic. + """ + + def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): + # TODO: should be offset=0 + super().__init__(kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + api_host_url=api_host_url) + self.poll_interval = poll_interval + self.offset = offset # the fatcat changelog offset, not the kafka offset + + def most_recent_message(self, topic): + """ + Tries to fetch the most recent message from a given topic. + This only makes sense for single partition topics, though could be + extended with "last N" behavior. + + Following "Consuming the last N messages from a topic" + from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + """ + consumer = topic.get_simple_consumer( + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=True) + offsets = [(p, op.last_offset_consumed - 1) + for p, op in consumer._partitions.items()] + offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] + if -2 in [o for p, o in offsets]: + return None + else: + consumer.reset_offsets(offsets) + msg = islice(consumer, 1) + if msg: + return list(msg)[0].value + else: + return None + + def run(self): + topic = self.kafka.topics[self.produce_topic] + # On start, try to consume the most recent from the topic, and using + # that as the starting offset. Note that this is a single-partition + # topic + if self.offset is None: + print("Checking for most recent changelog offset...") + msg = self.most_recent_message(topic) + if msg: + self.offset = json.loads(msg.decode('utf-8'))['index'] + else: + self.offset = 1 + + with topic.get_sync_producer() as producer: + while True: + latest = int(self.api.get_changelog(limit=1)[0].index) + if latest > self.offset: + print("Fetching changelogs from {} through {}".format( + self.offset+1, latest)) + for i in range(self.offset+1, latest+1): + cle = self.api.get_changelog_entry(i) + obj = self.api.api_client.sanitize_for_serialization(cle) + producer.produce( + message=json.dumps(obj).encode('utf-8'), + partition_key=None, + timestamp=None, + #XXX: timestamp=cle.timestamp, + ) + self.offset = i + print("Sleeping {} seconds...".format(self.poll_interval)) + time.sleep(self.poll_interval) + + +class FatcatEntityUpdatesWorker(FatcatWorker): + """ + Consumes from the changelog topic and publishes expanded entities (fetched + from API) to update topics. + + For now, only release updates are published. + """ + + def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=api_host_url) + self.release_topic = release_topic + self.consumer_group = "entity-updates" + + def run(self): + changelog_topic = self.kafka.topics[self.consume_topic] + release_topic = self.kafka.topics[self.release_topic] + + consumer = changelog_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=False, + ) + + with release_topic.get_sync_producer() as producer: + for msg in consumer: + cle = json.loads(msg.value.decode('utf-8')) + #print(cle) + release_edits = cle['editgroup']['edits']['releases'] + for re in release_edits: + ident = re['ident'] + release = self.api.get_release(ident, expand="files,container") + release_dict = self.api.api_client.sanitize_for_serialization(release) + producer.produce( + message=json.dumps(release_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + consumer.commit_offsets() + diff --git a/python/fatcat_tools/crossref_importer.py b/python/fatcat_tools/crossref_importer.py new file mode 100644 index 00000000..37005965 --- /dev/null +++ b/python/fatcat_tools/crossref_importer.py @@ -0,0 +1,272 @@ + +import sys +import json +import sqlite3 +import datetime +import itertools +import fatcat_client +from fatcat.importer_common import FatcatImporter + + +class FatcatCrossrefImporter(FatcatImporter): + + def __init__(self, host_url, issn_map_file, extid_map_file=None, create_containers=True): + super().__init__(host_url, issn_map_file) + self.extid_map_db = None + if extid_map_file: + db_uri = "file:{}?mode=ro".format(extid_map_file) + print("Using external ID map: {}".format(db_uri)) + self.extid_map_db = sqlite3.connect(db_uri, uri=True) + else: + print("Not using external ID map") + self.create_containers = create_containers + + def lookup_ext_ids(self, doi): + if self.extid_map_db is None: + return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None) + row = self.extid_map_db.execute("SELECT core, pmid, pmcid, wikidata FROM ids WHERE doi=? LIMIT 1", + [doi.lower()]).fetchone() + if row is None: + return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None) + row = [str(cell or '') or None for cell in row] + return dict( + core_id=row[0], + pmid=row[1], + pmcid=row[2], + wikidata_qid=row[3]) + + def parse_crossref_dict(self, obj): + """ + obj is a python dict (parsed from json). + returns a ReleaseEntity + """ + + # This work is out of scope if it doesn't have authors and a title + if (not 'author' in obj) or (not 'title' in obj): + return None + + # Other ways to be out of scope (provisionally) + if (not 'type' in obj): + return None + + # contribs + def do_contribs(obj_list, ctype): + contribs = [] + for i, am in enumerate(obj_list): + creator_id = None + if 'ORCID' in am.keys(): + creator_id = self.lookup_orcid(am['ORCID'].split('/')[-1]) + # Sorry humans :( + if am.get('given') and am.get('family'): + raw_name = "{} {}".format(am['given'], am['family']) + elif am.get('family'): + raw_name = am['family'] + else: + # TODO: defaults back to a pseudo-null value + raw_name = am.get('given', '') + extra = dict() + if ctype == "author": + index = i + else: + index = None + if am.get('affiliation'): + # note: affiliation => affiliations + extra['affiliations'] = am.get('affiliation') + if am.get('sequence') and am.get('sequence') != "additional": + extra['sequence'] = am.get('sequence') + if not extra: + extra = None + contribs.append(fatcat_client.ReleaseContrib( + creator_id=creator_id, + index=index, + raw_name=raw_name, + role=ctype, + extra=extra)) + return contribs + contribs = do_contribs(obj['author'], "author") + contribs.extend(do_contribs(obj.get('editor', []), "editor")) + contribs.extend(do_contribs(obj.get('translator', []), "translator")) + + # container + issn = obj.get('ISSN', [None])[0] + issnl = self.issn2issnl(issn) + container_id = None + if issnl: + container_id = self.lookup_issnl(issnl) + publisher = obj.get('publisher') + + ce = None + if (container_id is None and self.create_containers and issnl != None + and obj.get('container-title') and len(obj['container-title']) > 0): + ce = fatcat_client.ContainerEntity( + issnl=issnl, + publisher=publisher, + name=obj['container-title'][0]) + + # references + refs = [] + for i, rm in enumerate(obj.get('reference', [])): + try: + year = int(rm.get('year')) + # NOTE: will need to update/config in the future! + # NOTE: are there crossref works with year < 100? + if year > 2025 or year < 100: + year = None + except: + year = None + extra = rm.copy() + if rm.get('DOI'): + extra['doi'] = rm.get('DOI').lower() + key = rm.get('key') + if key and key.startswith(obj['DOI'].upper()): + key = key.replace(obj['DOI'].upper() + "-", '') + key = key.replace(obj['DOI'].upper(), '') + container_name = rm.get('volume-title') + if not container_name: + container_name = rm.get('journal-title') + extra.pop('DOI', None) + extra.pop('key', None) + extra.pop('year', None) + extra.pop('volume-name', None) + extra.pop('journal-title', None) + extra.pop('title', None) + extra.pop('first-page', None) + extra.pop('doi-asserted-by', None) + if extra: + extra = dict(crossref=extra) + else: + extra = None + refs.append(fatcat_client.ReleaseRef( + index=i, + # doing lookups would be a second import pass + target_release_id=None, + key=key, + year=year, + container_name=container_name, + title=rm.get('title'), + locator=rm.get('first-page'), + # TODO: just dump JSON somewhere here? + extra=extra)) + + # abstracts + abstracts = [] + if obj.get('abstract') != None: + abstracts.append(fatcat_client.ReleaseEntityAbstracts( + mimetype="application/xml+jats", + content=obj.get('abstract'))) + + # extra fields + extra = dict() + for key in ('subject', 'type', 'license', 'alternative-id', + 'container-title', 'original-title', 'subtitle', 'archive', + 'funder', 'group-title'): + # TODO: unpack "container-title" array + val = obj.get(key) + if val: + extra[key] = val + if 'license' in extra and extra['license']: + for i in range(len(extra['license'])): + if 'start' in extra['license'][i]: + extra['license'][i]['start'] = extra['license'][i]['start']['date-time'] + if len(obj['title']) > 1: + extra['other-titles'] = obj['title'][1:] + # TODO: this should be top-level + extra['is_kept'] = len(obj.get('archive', [])) > 0 + + # ISBN + isbn13 = None + for raw in obj.get('ISBN', []): + # TODO: convert if not ISBN-13 format + if len(raw) == 17: + isbn13 = raw + break + + # release status + if obj['type'] in ('journal-article', 'conference-proceeding', 'book', + 'dissertation', 'book-chapter'): + release_status = "published" + else: + # unknown + release_status = None + + # external identifiers + extids = self.lookup_ext_ids(doi=obj['DOI'].lower()) + + # TODO: filter out huge releases; we'll get them later (and fix bug in + # fatcatd) + if max(len(contribs), len(refs), len(abstracts)) > 750: + return None + + # release date parsing is amazingly complex + release_date = obj['issued']['date-parts'][0] + if not release_date or not release_date[0]: + # got some NoneType, even though at least year is supposed to be set + release_date = None + elif len(release_date) == 3: + release_date = datetime.datetime(year=release_date[0], month=release_date[1], day=release_date[2]) + else: + # only the year is actually required; mangle to first day for date + # (TODO: something better?) + release_date = datetime.datetime(year=release_date[0], month=1, day=1) + # convert to string ISO datetime format (if not null) + if release_date: + release_date = release_date.isoformat() + "Z" + + re = fatcat_client.ReleaseEntity( + work_id=None, + title=obj['title'][0], + contribs=contribs, + refs=refs, + container_id=container_id, + publisher=publisher, + release_type=obj['type'], + release_status=release_status, + doi=obj['DOI'].lower(), + isbn13=isbn13, + core_id=extids['core_id'], + pmid=extids['pmid'], + pmcid=extids['pmcid'], + wikidata_qid=extids['wikidata_qid'], + release_date=release_date, + issue=obj.get('issue'), + volume=obj.get('volume'), + pages=obj.get('page'), + abstracts=abstracts, + extra=dict(crossref=extra)) + return (re, ce) + + def create_row(self, row, editgroup=None): + if row is None: + return + obj = json.loads(row) + entities = self.parse_crossref_dict(obj) + if entities is not None: + (re, ce) = entities + if ce is not None: + container = self.api.create_container(ce, editgroup=editgroup) + re.container_id = container.ident + self._issnl_id_map[ce.issnl] = container.ident + self.api.create_release(re, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + def create_batch(self, batch, editgroup=None): + """Current work/release pairing disallows batch creation of releases. + Could do batch work creation and then match against releases, but meh.""" + release_batch = [] + for row in batch: + if row is None: + continue + obj = json.loads(row) + entities = self.parse_crossref_dict(obj) + if entities is not None: + (re, ce) = entities + if ce is not None: + ce_eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + container = self.api.create_container(ce, editgroup=ce_eg.id) + self.api.accept_editgroup(ce_eg.id) + re.container_id = container.ident + self._issnl_id_map[ce.issnl] = container.ident + release_batch.append(re) + self.api.create_release_batch(release_batch, autoaccept="true", editgroup=editgroup) + self.insert_count = self.insert_count + len(release_batch) diff --git a/python/fatcat_tools/elastic_workers.py b/python/fatcat_tools/elastic_workers.py new file mode 100644 index 00000000..3d2e9c39 --- /dev/null +++ b/python/fatcat_tools/elastic_workers.py @@ -0,0 +1,47 @@ + +import json +import time +import requests +from fatcat.worker_common import FatcatWorker +from fatcat_client.models import ReleaseEntity +from fatcat.entity_helpers import * +from pykafka.common import OffsetType + + +class FatcatElasticReleaseWorker(FatcatWorker): + """ + Consumes from release-updates topic and pushes into (presumably local) + elasticsearch. + + Uses a consumer group to manage offset. + """ + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elastic_backend="http://localhost:9200", elastic_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=None) + self.consumer_group = "elastic-updates" + self.elastic_backend = elastic_backend + self.elastic_index = elastic_index + + def run(self): + consume_topic = self.kafka.topics[self.consume_topic] + + consumer = consume_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + ) + + for msg in consumer: + json_str = msg.value.decode('utf-8') + release = entity_from_json(json_str, ReleaseEntity) + #print(release) + elastic_endpoint = "{}/{}/release/{}".format( + self.elastic_backend, + self.elastic_index, + release.ident) + print("Updating document: {}".format(elastic_endpoint)) + resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) + assert resp.status_code in (200, 201) + consumer.commit_offsets() diff --git a/python/fatcat_tools/entity_helpers.py b/python/fatcat_tools/entity_helpers.py new file mode 100644 index 00000000..c454536b --- /dev/null +++ b/python/fatcat_tools/entity_helpers.py @@ -0,0 +1,100 @@ + +import collections +from fatcat_client.models import ReleaseEntity +from fatcat_client.api_client import ApiClient + +def entity_to_json(entity): + ac = ApiClient() + return ac.sanitize_for_serialization(entity) + +def entity_from_json(json_str, entity_type): + """ + Hack to take advantage of the code-generated deserialization code + """ + ac = ApiClient() + thing = collections.namedtuple('Thing', ['data']) + thing.data = json_str + return ac.deserialize(thing, entity_type) + +def release_elastic_dict(release): + """ + Converts from an entity model/schema to elasticsearch oriented schema. + + Returns: dict + """ + + if release.state != 'active': + raise ValueError("Entity is not 'active'") + + # First, the easy ones (direct copy) + t = dict( + ident = release.ident, + revision = release.revision, + title = release.title, + release_type = release.release_type, + release_status = release.release_status, + language = release.language, + doi = release.doi, + pmid = release.pmid, + pmcid = release.pmcid, + isbn13 = release.isbn13, + core_id = release.core_id, + wikidata_qid = release.wikidata_qid + ) + + if release.release_date: + # TODO: resolve why this can be either a string or datetime + if type(release.release_date) == str: + t['release_date'] = release.release_date + else: + t['release_date'] = release.release_date.strftime('%F') + + container = release.container + container_is_kept = False + if container: + t['publisher'] = container.publisher + t['container_name'] = container.name + t['container_issnl'] = container.issnl + container_extra = container.extra + if container_extra: + t['container_is_oa'] = container_extra.get('is_oa') + container_is_kept = container_extra.get('is_kept', False) + t['container_is_longtail_oa'] = container_extra.get('is_longtail_oa') + else: + t['publisher'] = release.publisher + + files = release.files or [] + t['file_count'] = len(files) + in_wa = False + in_ia = False + t['file_pdf_url'] = None + for f in files: + is_pdf = 'pdf' in f.get('mimetype', '') + for url in f.get('urls', []): + if url.get('rel', '') == 'webarchive': + in_wa = True + if '//web.archive.org/' in url['url'] or '//archive.org/' in url['url']: + in_ia = True + if is_pdf: + t['file_pdf_url'] = url['url'] + if not t['file_pdf_url'] and is_pdf: + t['file_pdf_url'] = url['url'] + t['file_in_webarchive'] = in_wa + t['file_in_ia'] = in_ia + + extra = release.extra or dict() + if extra: + t['in_shadow'] = extra.get('in_shadow') + if extra.get('grobid') and extra['grobid'].get('is_longtail_oa'): + t['container_is_longtail_oa'] = True + t['any_abstract'] = bool(release.abstracts) + t['is_kept'] = container_is_kept or extra.get('is_kept', False) + + t['ref_count'] = len(release.refs or []) + t['contrib_count'] = len(release.contribs or []) + contrib_names = [] + for c in (release.contribs or []): + if c.raw_name: + contrib_names.append(c.raw_name) + t['contrib_names'] = contrib_names + return t diff --git a/python/fatcat_tools/fcid.py b/python/fatcat_tools/fcid.py new file mode 100644 index 00000000..dd72b242 --- /dev/null +++ b/python/fatcat_tools/fcid.py @@ -0,0 +1,17 @@ + +import base64 +import uuid + +def fcid2uuid(s): + s = s.split('_')[-1].upper().encode('utf-8') + assert len(s) == 26 + raw = base64.b32decode(s + b"======") + return str(uuid.UUID(bytes=raw)).lower() + +def uuid2fcid(s): + raw = uuid.UUID(s).bytes + return base64.b32encode(raw)[:26].lower().decode('utf-8') + +def test_fcid(): + test_uuid = '00000000-0000-0000-3333-000000000001' + assert test_uuid == fcid2uuid(uuid2fcid(test_uuid)) diff --git a/python/fatcat_tools/grobid_metadata_importer.py b/python/fatcat_tools/grobid_metadata_importer.py new file mode 100755 index 00000000..95cc285e --- /dev/null +++ b/python/fatcat_tools/grobid_metadata_importer.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 + +import sys +import json +import base64 +import datetime +import fatcat_client +from fatcat.importer_common import FatcatImporter + +MAX_ABSTRACT_BYTES=4096 + + +class FatcatGrobidMetadataImporter(FatcatImporter): + + def __init__(self, host_url, default_link_rel="web"): + super().__init__(host_url) + self.default_link_rel = default_link_rel + + def parse_grobid_json(self, obj): + + if not obj.get('title'): + return None + + release = dict() + extra = dict() + + if obj.get('abstract') and len(obj.get('abstract')) < MAX_ABSTRACT_BYTES: + abobj = dict( + mimetype="text/plain", + language=None, + content=obj.get('abstract').strip()) + abstracts = [abobj] + else: + abstracts = None + + contribs = [] + for i, a in enumerate(obj.get('authors', [])): + c = dict(raw_name=a['name'], role="author") + contribs.append(fatcat_client.ReleaseContrib( + index=i, + raw_name=a['name'], + role="author", + extra=None)) + + refs = [] + for raw in obj.get('citations', []): + cite_extra = dict() + ref = dict() + ref['key'] = raw.get('id') + if raw.get('title'): + ref['title'] = raw['title'].strip() + if raw.get('date'): + try: + year = int(raw['date'].strip()[:4]) + ref['year'] = year + except: + pass + for key in ('volume', 'url', 'issue', 'publisher'): + if raw.get(key): + cite_extra[key] = raw[key].strip() + if raw.get('authors'): + cite_extra['authors'] = [a['name'] for a in raw['authors']] + if cite_extra: + cite_extra = dict(grobid=cite_extra) + else: + cite_extra = None + ref['extra'] = cite_extra + refs.append(ref) + + release_type = "journal-article" + release_date = None + if obj.get('date'): + # TODO: only returns year, ever? how to handle? + release_date = datetime.datetime(year=int(obj['date'][:4]), month=1, day=1) + + if obj.get('doi'): + extra['doi'] = obj['doi'] + if obj['journal'] and obj['journal'].get('name'): + extra['container_name'] = obj['journal']['name'] + + extra['is_longtail_oa'] = True + + # TODO: ISSN/eISSN handling? or just journal name lookup? + + if extra: + extra = dict(grobid=extra) + else: + extra = None + + re = fatcat_client.ReleaseEntity( + title=obj['title'].strip(), + contribs=contribs, + refs=refs, + publisher=obj['journal'].get('publisher'), + volume=obj['journal'].get('volume'), + issue=obj['journal'].get('issue'), + abstracts=abstracts, + extra=extra) + return re + + # TODO: make this a common function somewhere + def make_url(self, raw): + rel = self.default_link_rel + # TODO: this is where we could map specific domains to rel types, + # and also filter out bad domains, invalid URLs, etc + if "//archive.org/" in raw or "//arxiv.org/" in raw: + # TODO: special-case the arxiv.org bulk mirror? + rel = "repository" + elif "//web.archive.org/" in raw or "//archive.is/" in raw: + rel = "webarchive" + return fatcat_client.FileEntityUrls(url=raw, rel=rel) + + def parse_file_metadata(self, sha1_key, cdx, mimetype, file_size): + + sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() + + # lookup existing SHA1, or create new entity + try: + existing_file = self.api.lookup_file(sha1=sha1) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + existing_file = None + + if existing_file: + # if file is already in here, presumably not actually long-tail + return None + fe = fatcat_client.FileEntity( + sha1=sha1, + size=int(file_size), + mimetype=mimetype, + releases=[], + urls=[], + ) + + # parse URLs and CDX + original = cdx['url'] + wayback = "https://web.archive.org/web/{}/{}".format( + cdx['dt'], + original) + fe.urls.append( + fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) + original_url = self.make_url(original) + if original_url != None: + fe.urls.append(original_url) + + return fe + + def create_row(self, row, editgroup=None): + if not row: + return + fields = row.split('\t') + sha1_key = fields[0] + cdx = json.loads(fields[1]) + mimetype = fields[2] + file_size = int(fields[3]) + grobid_meta = json.loads(fields[4]) + fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) + re = self.parse_grobid_json(grobid_meta) + if fe and re: + release_entity = self.api.create_release(re, editgroup=editgroup) + # release ident can't already be in release list because we just + # created it + fe.releases.append(release_entity.ident) + file_entity = self.api.create_file(fe, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + # NB: batch mode not implemented diff --git a/python/fatcat_tools/importer_common.py b/python/fatcat_tools/importer_common.py new file mode 100644 index 00000000..8dfee875 --- /dev/null +++ b/python/fatcat_tools/importer_common.py @@ -0,0 +1,137 @@ + +import re +import sys +import csv +import json +import itertools +import fatcat_client +from fatcat_client.rest import ApiException + +# from: https://docs.python.org/3/library/itertools.html +def grouper(iterable, n, fillvalue=None): + "Collect data into fixed-length chunks or blocks" + args = [iter(iterable)] * n + return itertools.zip_longest(*args, fillvalue=fillvalue) + +class FatcatImporter: + + def __init__(self, host_url, issn_map_file=None): + conf = fatcat_client.Configuration() + conf.host = host_url + self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) + self._issnl_id_map = dict() + self._orcid_id_map = dict() + self._doi_id_map = dict() + self._issn_issnl_map = None + self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") + if issn_map_file: + self.read_issn_map_file(issn_map_file) + self.processed_lines = 0 + self.insert_count = 0 + self.update_count = 0 + + def describe_run(self): + print("Processed {} lines, inserted {}, updated {}.".format( + self.processed_lines, self.insert_count, self.update_count)) + + def process_source(self, source, group_size=100): + """Creates and auto-accepts editgroup every group_size rows""" + eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + for i, row in enumerate(source): + self.create_row(row, editgroup=eg.id) + if i > 0 and (i % group_size) == 0: + self.api.accept_editgroup(eg.id) + eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + self.processed_lines = self.processed_lines + 1 + if i == 0 or (i % group_size) != 0: + self.api.accept_editgroup(eg.id) + + def process_batch(self, source, size=50): + """Reads and processes in batches (not API-call-per-)""" + for rows in grouper(source, size): + self.processed_lines = self.processed_lines + len(rows) + eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + self.create_batch(rows, editgroup=eg.id) + + def process_csv_source(self, source, group_size=100, delimiter=','): + reader = csv.DictReader(source, delimiter=delimiter) + self.process_source(reader, group_size) + + def process_csv_batch(self, source, size=50, delimiter=','): + reader = csv.DictReader(source, delimiter=delimiter) + self.process_batch(reader, size) + + def is_issnl(self, issnl): + return len(issnl) == 9 and issnl[4] == '-' + + def lookup_issnl(self, issnl): + """Caches calls to the ISSN-L lookup API endpoint in a local dict""" + if issnl in self._issnl_id_map: + return self._issnl_id_map[issnl] + container_id = None + try: + rv = self.api.lookup_container(issnl=issnl) + container_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._issnl_id_map[issnl] = container_id # might be None + return container_id + + def is_orcid(self, orcid): + return self._orcid_regex.match(orcid) != None + + def lookup_orcid(self, orcid): + """Caches calls to the Orcid lookup API endpoint in a local dict""" + if not self.is_orcid(orcid): + return None + if orcid in self._orcid_id_map: + return self._orcid_id_map[orcid] + creator_id = None + try: + rv = self.api.lookup_creator(orcid=orcid) + creator_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._orcid_id_map[orcid] = creator_id # might be None + return creator_id + + def is_doi(self, doi): + return doi.startswith("10.") and doi.count("/") >= 1 + + def lookup_doi(self, doi): + """Caches calls to the doi lookup API endpoint in a local dict""" + assert self.is_doi(doi) + doi = doi.lower() + if doi in self._doi_id_map: + return self._doi_id_map[doi] + release_id = None + try: + rv = self.api.lookup_release(doi=doi) + release_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._doi_id_map[doi] = release_id # might be None + return release_id + + def read_issn_map_file(self, issn_map_file): + print("Loading ISSN map file...") + self._issn_issnl_map = dict() + for line in issn_map_file: + if line.startswith("ISSN") or len(line) == 0: + continue + (issn, issnl) = line.split()[0:2] + self._issn_issnl_map[issn] = issnl + # double mapping makes lookups easy + self._issn_issnl_map[issnl] = issnl + print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map))) + + def issn2issnl(self, issn): + if issn is None: + return None + return self._issn_issnl_map.get(issn) diff --git a/python/fatcat_tools/issn_importer.py b/python/fatcat_tools/issn_importer.py new file mode 100644 index 00000000..c9ef50b5 --- /dev/null +++ b/python/fatcat_tools/issn_importer.py @@ -0,0 +1,72 @@ + +import sys +import json +import itertools +import fatcat_client +from fatcat.importer_common import FatcatImporter + +# CSV format (generated from git.archive.org/webgroup/oa-journal-analysis): +# ISSN-L,in_doaj,in_road,in_norwegian,in_crossref,title,publisher,url,lang,ISSN-print,ISSN-electronic,doi_count,has_doi,is_oa,is_kept,publisher_size,url_live,url_live_status,url_live_final_status,url_live_final_url,url_live_status_simple,url_live_final_status_simple,url_domain,gwb_pdf_count + +def or_none(s): + if s is None: + return None + if len(s) == 0: + return None + return s + +def truthy(s): + if s is None: + return None + s = s.lower() + if s in ('true', 't', 'yes', 'y', '1'): + return True + elif s in ('false', 'f', 'no', 'n', '0'): + return False + else: + return None + +class FatcatIssnImporter(FatcatImporter): + + def parse_issn_row(self, row): + """ + row is a python dict (parsed from CSV). + returns a ContainerEntity + """ + title = or_none(row['title']) + issnl = or_none(row['ISSN-L']) + if title is None or issnl is None: + return + extra = dict( + in_doaj=truthy(row['in_doaj']), + in_road=truthy(row['in_road']), + in_norwegian=truthy(row['in_norwegian']), + language=or_none(row['lang']), + url=or_none(row['url']), + ISSNp=or_none(row['ISSN-print']), + ISSNe=or_none(row['ISSN-electronic']), + is_oa=truthy(row['is_oa']), + is_kept=truthy(row['is_kept']), + ) + ce = fatcat_client.ContainerEntity( + issnl=issnl, + name=title, + publisher=or_none(row['publisher']), + abbrev=None, + coden=None, + extra=extra) + return ce + + def create_row(self, row, editgroup=None): + ce = self.parse_issn_row(row) + if ce is not None: + self.api.create_container(ce, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + def create_batch(self, batch, editgroup=None): + """Reads and processes in batches (not API-call-per-line)""" + objects = [self.parse_issn_row(l) + for l in batch if l != None] + objects = [o for o in objects if o != None] + self.api.create_container_batch(objects, autoaccept="true", editgroup=editgroup) + self.insert_count = self.insert_count + len(objects) diff --git a/python/fatcat_tools/matched_importer.py b/python/fatcat_tools/matched_importer.py new file mode 100644 index 00000000..7f55369b --- /dev/null +++ b/python/fatcat_tools/matched_importer.py @@ -0,0 +1,144 @@ + +import sys +import json +import sqlite3 +import itertools +import fatcat_client +from fatcat.importer_common import FatcatImporter + +#row = row.split('\t') +#assert len(row) == 2 +#sha1 = row[0].replace('sha1:') +#sha1 = base64.b16encode(base64.b32decode(sha1)).lower() +#print(sha1) +#dois = [d.lower() for d in json.loads(row[1])] + +class FatcatMatchedImporter(FatcatImporter): + """ + Input format is JSON with keys: + - dois (list) + - sha1 (hex) + - md5 (hex) + - sha256 (hex) + - size (int) + - cdx (list of objects) + - dt + - url + - mimetype + - urls (list of strings... or objects?) + + Future handlings/extensions: + - core_id, wikidata_id, pmcid, pmid: not as lists + """ + + def __init__(self, host_url, skip_file_update=False, default_mime=None, + default_link_rel="web"): + super().__init__(host_url) + self.default_mime = default_mime + self.default_link_rel = default_link_rel + self.skip_file_update = skip_file_update + + def make_url(self, raw): + rel = self.default_link_rel + # TODO: this is where we could map specific domains to rel types, + # and also filter out bad domains, invalid URLs, etc + if "//archive.org/" in raw or "//arxiv.org/" in raw: + # TODO: special-case the arxiv.org bulk mirror? + rel = "repository" + elif "//web.archive.org/" in raw or "//archive.is/" in raw: + rel = "webarchive" + return fatcat_client.FileEntityUrls(url=raw, rel=rel) + + def parse_matched_dict(self, obj): + sha1 = obj['sha1'] + dois = [d.lower() for d in obj.get('dois', [])] + + # lookup sha1, or create new entity + fe = None + if not self.skip_file_update: + try: + fe = self.api.lookup_file(sha1=sha1) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + if fe is None: + fe = fatcat_client.FileEntity( + sha1=sha1, + releases=[], + urls=[], + ) + + # lookup dois + re_list = set() + for doi in dois: + try: + re = self.api.lookup_release(doi=doi) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + re = None + if re is None: + print("DOI not found: {}".format(doi)) + else: + re_list.add(re.ident) + if len(re_list) == 0: + return None + if fe.releases == set(re_list): + return None + re_list.update(fe.releases) + fe.releases = list(re_list) + + # parse URLs and CDX + existing_urls = [feu.url for feu in fe.urls] + for url in obj.get('url', []): + if url not in existing_urls: + url = self.make_url(url) + if url != None: + fe.urls.append(url) + for cdx in obj.get('cdx', []): + original = cdx['url'] + wayback = "https://web.archive.org/web/{}/{}".format( + cdx['dt'], + original) + if wayback not in existing_urls: + fe.urls.append( + fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) + if original not in existing_urls: + url = self.make_url(original) + if url != None: + fe.urls.append(url) + + if obj.get('size') != None: + fe.size = int(obj['size']) + fe.sha256 = obj.get('sha256', fe.sha256) + fe.md5 = obj.get('md5', fe.sha256) + if obj.get('mimetype') is None: + if fe.mimetype is None: + fe.mimetype = self.default_mime + else: + fe.mimetype = obj.get('mimetype') + return fe + + def create_row(self, row, editgroup=None): + obj = json.loads(row) + fe = self.parse_matched_dict(obj) + if fe is not None: + if fe.ident is None: + self.api.create_file(fe, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + else: + self.api.update_file(fe.ident, fe, editgroup=editgroup) + self.update_count = self.update_count + 1 + + def create_batch(self, batch, editgroup=None): + """Reads and processes in batches (not API-call-per-line)""" + objects = [self.parse_matched_dict(json.loads(l)) + for l in batch if l != None] + new_objects = [o for o in objects if o != None and o.ident == None] + update_objects = [o for o in objects if o != None and o.ident != None] + for obj in update_objects: + self.api.update_file(obj.ident, obj, editgroup=editgroup) + if len(new_objects) > 0: + self.api.create_file_batch(new_objects, autoaccept="true", editgroup=editgroup) + self.update_count = self.update_count + len(update_objects) + self.insert_count = self.insert_count + len(new_objects) diff --git a/python/fatcat_tools/orcid_importer.py b/python/fatcat_tools/orcid_importer.py new file mode 100644 index 00000000..e1f5943c --- /dev/null +++ b/python/fatcat_tools/orcid_importer.py @@ -0,0 +1,73 @@ + +import sys +import json +import itertools +import fatcat_client +from fatcat.importer_common import FatcatImporter + +def value_or_none(e): + if type(e) == dict: + e = e.get('value') + if type(e) == str and len(e) == 0: + e = None + # TODO: this is probably bogus; patched in desperation; remove? + if e: + try: + e.encode() + except UnicodeEncodeError: + # Invalid JSON? + print("BAD UNICODE") + return None + return e + +class FatcatOrcidImporter(FatcatImporter): + + def parse_orcid_dict(self, obj): + """ + obj is a python dict (parsed from json). + returns a CreatorEntity + """ + name = obj['person']['name'] + if name is None: + return None + extra = None + given = value_or_none(name.get('given-names')) + sur = value_or_none(name.get('family-name')) + display = value_or_none(name.get('credit-name')) + if display is None: + # TODO: sorry human beings + if given and sur: + display = "{} {}".format(given, sur) + elif sur: + display = sur + elif given: + display = given + else: + # must have *some* name + return None + orcid = obj['orcid-identifier']['path'] + if not self.is_orcid(orcid): + sys.stderr.write("Bad ORCID: {}\n".format(orcid)) + return None + ce = fatcat_client.CreatorEntity( + orcid=orcid, + given_name=given, + surname=sur, + display_name=display, + extra=extra) + return ce + + def create_row(self, row, editgroup=None): + obj = json.loads(row) + ce = self.parse_orcid_dict(obj) + if ce is not None: + self.api.create_creator(ce, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + def create_batch(self, batch, editgroup=None): + """Reads and processes in batches (not API-call-per-line)""" + objects = [self.parse_orcid_dict(json.loads(l)) + for l in batch if l != None] + objects = [o for o in objects if o != None] + self.api.create_creator_batch(objects, autoaccept="true", editgroup=editgroup) + self.insert_count = self.insert_count + len(objects) diff --git a/python/fatcat_tools/raw_api_client.py b/python/fatcat_tools/raw_api_client.py new file mode 100644 index 00000000..75151ebb --- /dev/null +++ b/python/fatcat_tools/raw_api_client.py @@ -0,0 +1,66 @@ + +import sys +import json +import requests + + +class RawFatcatApiClient: + + def __init__(self, host_url): + self.host_url = host_url + self.session = requests.Session() + self._issn_map = dict() + + def get(self, path, data=None): + headers = {"content-type": "application/json"} + return self.session.get(self.host_url + path, json=data, + headers=headers) + + def post(self, path, data=None): + headers = {"content-type": "application/json"} + return self.session.post(self.host_url + path, json=data, + headers=headers) + + def new_editgroup(self): + rv = self.post('/v0/editgroup', data=dict( + editor_id=1)) + print(rv) + print(rv.json()) + assert rv.status_code == 201 + editgroup_id = rv.json()['id'] + return editgroup_id + + def accept_editgroup(self, eg): + rv = self.post('/v0/editgroup/{}/accept'.format(eg)) + assert rv.status_code == 200 + return rv + + def import_issn_file(self, json_file, create_containers=False, batchsize=100): + eg = self.new_editgroup() + i = 0 + with open(json_file, 'r') as file: + for line in file: + if i % batchsize == 0: + sys.stdout.write('\n{}: '.format(i)) + if (i+1) % 20 == 0: + sys.stdout.write('.') + i = i + 1 + obj = json.loads(line) + if not ("author" in obj and "title" in obj): + continue + try: + self.import_crossref_dict(obj, editgroup=eg, + create_containers=create_containers) + except Exception as e: + print("ERROR: {}".format(e)) + if i % batchsize == 0: + self.accept_editgroup(eg) + eg = self.new_editgroup() + if i % batchsize != 0: + self.accept_editgroup(eg) + print("done!") + + def health(self): + rv = self.get("/health") + assert rv.status_code == 200 + return rv.json() diff --git a/python/fatcat_tools/worker_common.py b/python/fatcat_tools/worker_common.py new file mode 100644 index 00000000..77ea2c15 --- /dev/null +++ b/python/fatcat_tools/worker_common.py @@ -0,0 +1,25 @@ + +import re +import sys +import csv +import json +import itertools +import fatcat_client +from pykafka import KafkaClient +from fatcat_client.rest import ApiException + + +class FatcatWorker: + """ + Common code for for Kafka producers and consumers. + """ + + def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None): + if api_host_url: + conf = fatcat_client.Configuration() + conf.host = api_host_url + self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) + self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.produce_topic = produce_topic + self.consume_topic = consume_topic + -- cgit v1.2.3