From 279b22e30d9b590838268f5f5acdaa1110ee593a Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Tue, 13 Nov 2018 11:32:41 -0800 Subject: shuffle around fatcat_tools layout --- python/fatcat_tools/changelog_workers.py | 122 ---------- python/fatcat_tools/crossref_importer.py | 272 ----------------------- python/fatcat_tools/elastic_workers.py | 47 ---- python/fatcat_tools/grobid_metadata_importer.py | 168 -------------- python/fatcat_tools/importer_common.py | 137 ------------ python/fatcat_tools/importers/common.py | 137 ++++++++++++ python/fatcat_tools/importers/crossref.py | 272 +++++++++++++++++++++++ python/fatcat_tools/importers/grobid_metadata.py | 168 ++++++++++++++ python/fatcat_tools/importers/issn.py | 72 ++++++ python/fatcat_tools/importers/matched.py | 144 ++++++++++++ python/fatcat_tools/importers/orcid.py | 73 ++++++ python/fatcat_tools/issn_importer.py | 72 ------ python/fatcat_tools/matched_importer.py | 144 ------------ python/fatcat_tools/orcid_importer.py | 73 ------ python/fatcat_tools/raw_api_client.py | 66 ------ python/fatcat_tools/worker_common.py | 25 --- python/fatcat_tools/workers/changelog.py | 122 ++++++++++ python/fatcat_tools/workers/elastic.py | 47 ++++ python/fatcat_tools/workers/worker_common.py | 25 +++ 19 files changed, 1060 insertions(+), 1126 deletions(-) delete mode 100644 python/fatcat_tools/changelog_workers.py delete mode 100644 python/fatcat_tools/crossref_importer.py delete mode 100644 python/fatcat_tools/elastic_workers.py delete mode 100755 python/fatcat_tools/grobid_metadata_importer.py delete mode 100644 python/fatcat_tools/importer_common.py create mode 100644 python/fatcat_tools/importers/common.py create mode 100644 python/fatcat_tools/importers/crossref.py create mode 100644 python/fatcat_tools/importers/grobid_metadata.py create mode 100644 python/fatcat_tools/importers/issn.py create mode 100644 python/fatcat_tools/importers/matched.py create mode 100644 python/fatcat_tools/importers/orcid.py delete mode 100644 python/fatcat_tools/issn_importer.py delete mode 100644 python/fatcat_tools/matched_importer.py delete mode 100644 python/fatcat_tools/orcid_importer.py delete mode 100644 python/fatcat_tools/raw_api_client.py delete mode 100644 python/fatcat_tools/worker_common.py create mode 100644 python/fatcat_tools/workers/changelog.py create mode 100644 python/fatcat_tools/workers/elastic.py create mode 100644 python/fatcat_tools/workers/worker_common.py (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/changelog_workers.py b/python/fatcat_tools/changelog_workers.py deleted file mode 100644 index 223d4478..00000000 --- a/python/fatcat_tools/changelog_workers.py +++ /dev/null @@ -1,122 +0,0 @@ - -import json -import time -from itertools import islice -from fatcat_tools.worker_common import FatcatWorker -from pykafka.common import OffsetType - - -class FatcatChangelogWorker(FatcatWorker): - """ - Periodically polls the fatcat API looking for new changelogs. When they are - found, fetch them and push (as JSON) into a Kafka topic. - """ - - def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): - # TODO: should be offset=0 - super().__init__(kafka_hosts=kafka_hosts, - produce_topic=produce_topic, - api_host_url=api_host_url) - self.poll_interval = poll_interval - self.offset = offset # the fatcat changelog offset, not the kafka offset - - def most_recent_message(self, topic): - """ - Tries to fetch the most recent message from a given topic. - This only makes sense for single partition topics, though could be - extended with "last N" behavior. - - Following "Consuming the last N messages from a topic" - from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns - """ - consumer = topic.get_simple_consumer( - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=True) - offsets = [(p, op.last_offset_consumed - 1) - for p, op in consumer._partitions.items()] - offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] - if -2 in [o for p, o in offsets]: - return None - else: - consumer.reset_offsets(offsets) - msg = islice(consumer, 1) - if msg: - return list(msg)[0].value - else: - return None - - def run(self): - topic = self.kafka.topics[self.produce_topic] - # On start, try to consume the most recent from the topic, and using - # that as the starting offset. Note that this is a single-partition - # topic - if self.offset is None: - print("Checking for most recent changelog offset...") - msg = self.most_recent_message(topic) - if msg: - self.offset = json.loads(msg.decode('utf-8'))['index'] - else: - self.offset = 1 - - with topic.get_sync_producer() as producer: - while True: - latest = int(self.api.get_changelog(limit=1)[0].index) - if latest > self.offset: - print("Fetching changelogs from {} through {}".format( - self.offset+1, latest)) - for i in range(self.offset+1, latest+1): - cle = self.api.get_changelog_entry(i) - obj = self.api.api_client.sanitize_for_serialization(cle) - producer.produce( - message=json.dumps(obj).encode('utf-8'), - partition_key=None, - timestamp=None, - #XXX: timestamp=cle.timestamp, - ) - self.offset = i - print("Sleeping {} seconds...".format(self.poll_interval)) - time.sleep(self.poll_interval) - - -class FatcatEntityUpdatesWorker(FatcatWorker): - """ - Consumes from the changelog topic and publishes expanded entities (fetched - from API) to update topics. - - For now, only release updates are published. - """ - - def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic, - api_host_url=api_host_url) - self.release_topic = release_topic - self.consumer_group = "entity-updates" - - def run(self): - changelog_topic = self.kafka.topics[self.consume_topic] - release_topic = self.kafka.topics[self.release_topic] - - consumer = changelog_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - auto_offset_reset=OffsetType.LATEST, - reset_offset_on_start=False, - ) - - with release_topic.get_sync_producer() as producer: - for msg in consumer: - cle = json.loads(msg.value.decode('utf-8')) - #print(cle) - release_edits = cle['editgroup']['edits']['releases'] - for re in release_edits: - ident = re['ident'] - release = self.api.get_release(ident, expand="files,container") - release_dict = self.api.api_client.sanitize_for_serialization(release) - producer.produce( - message=json.dumps(release_dict).encode('utf-8'), - partition_key=ident.encode('utf-8'), - timestamp=None, - ) - consumer.commit_offsets() - diff --git a/python/fatcat_tools/crossref_importer.py b/python/fatcat_tools/crossref_importer.py deleted file mode 100644 index 6a5ad824..00000000 --- a/python/fatcat_tools/crossref_importer.py +++ /dev/null @@ -1,272 +0,0 @@ - -import sys -import json -import sqlite3 -import datetime -import itertools -import fatcat_client -from fatcat_tools.importer_common import FatcatImporter - - -class FatcatCrossrefImporter(FatcatImporter): - - def __init__(self, host_url, issn_map_file, extid_map_file=None, create_containers=True): - super().__init__(host_url, issn_map_file) - self.extid_map_db = None - if extid_map_file: - db_uri = "file:{}?mode=ro".format(extid_map_file) - print("Using external ID map: {}".format(db_uri)) - self.extid_map_db = sqlite3.connect(db_uri, uri=True) - else: - print("Not using external ID map") - self.create_containers = create_containers - - def lookup_ext_ids(self, doi): - if self.extid_map_db is None: - return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None) - row = self.extid_map_db.execute("SELECT core, pmid, pmcid, wikidata FROM ids WHERE doi=? LIMIT 1", - [doi.lower()]).fetchone() - if row is None: - return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None) - row = [str(cell or '') or None for cell in row] - return dict( - core_id=row[0], - pmid=row[1], - pmcid=row[2], - wikidata_qid=row[3]) - - def parse_crossref_dict(self, obj): - """ - obj is a python dict (parsed from json). - returns a ReleaseEntity - """ - - # This work is out of scope if it doesn't have authors and a title - if (not 'author' in obj) or (not 'title' in obj): - return None - - # Other ways to be out of scope (provisionally) - if (not 'type' in obj): - return None - - # contribs - def do_contribs(obj_list, ctype): - contribs = [] - for i, am in enumerate(obj_list): - creator_id = None - if 'ORCID' in am.keys(): - creator_id = self.lookup_orcid(am['ORCID'].split('/')[-1]) - # Sorry humans :( - if am.get('given') and am.get('family'): - raw_name = "{} {}".format(am['given'], am['family']) - elif am.get('family'): - raw_name = am['family'] - else: - # TODO: defaults back to a pseudo-null value - raw_name = am.get('given', '') - extra = dict() - if ctype == "author": - index = i - else: - index = None - if am.get('affiliation'): - # note: affiliation => affiliations - extra['affiliations'] = am.get('affiliation') - if am.get('sequence') and am.get('sequence') != "additional": - extra['sequence'] = am.get('sequence') - if not extra: - extra = None - contribs.append(fatcat_client.ReleaseContrib( - creator_id=creator_id, - index=index, - raw_name=raw_name, - role=ctype, - extra=extra)) - return contribs - contribs = do_contribs(obj['author'], "author") - contribs.extend(do_contribs(obj.get('editor', []), "editor")) - contribs.extend(do_contribs(obj.get('translator', []), "translator")) - - # container - issn = obj.get('ISSN', [None])[0] - issnl = self.issn2issnl(issn) - container_id = None - if issnl: - container_id = self.lookup_issnl(issnl) - publisher = obj.get('publisher') - - ce = None - if (container_id is None and self.create_containers and issnl != None - and obj.get('container-title') and len(obj['container-title']) > 0): - ce = fatcat_client.ContainerEntity( - issnl=issnl, - publisher=publisher, - name=obj['container-title'][0]) - - # references - refs = [] - for i, rm in enumerate(obj.get('reference', [])): - try: - year = int(rm.get('year')) - # NOTE: will need to update/config in the future! - # NOTE: are there crossref works with year < 100? - if year > 2025 or year < 100: - year = None - except: - year = None - extra = rm.copy() - if rm.get('DOI'): - extra['doi'] = rm.get('DOI').lower() - key = rm.get('key') - if key and key.startswith(obj['DOI'].upper()): - key = key.replace(obj['DOI'].upper() + "-", '') - key = key.replace(obj['DOI'].upper(), '') - container_name = rm.get('volume-title') - if not container_name: - container_name = rm.get('journal-title') - extra.pop('DOI', None) - extra.pop('key', None) - extra.pop('year', None) - extra.pop('volume-name', None) - extra.pop('journal-title', None) - extra.pop('title', None) - extra.pop('first-page', None) - extra.pop('doi-asserted-by', None) - if extra: - extra = dict(crossref=extra) - else: - extra = None - refs.append(fatcat_client.ReleaseRef( - index=i, - # doing lookups would be a second import pass - target_release_id=None, - key=key, - year=year, - container_name=container_name, - title=rm.get('title'), - locator=rm.get('first-page'), - # TODO: just dump JSON somewhere here? - extra=extra)) - - # abstracts - abstracts = [] - if obj.get('abstract') != None: - abstracts.append(fatcat_client.ReleaseEntityAbstracts( - mimetype="application/xml+jats", - content=obj.get('abstract'))) - - # extra fields - extra = dict() - for key in ('subject', 'type', 'license', 'alternative-id', - 'container-title', 'original-title', 'subtitle', 'archive', - 'funder', 'group-title'): - # TODO: unpack "container-title" array - val = obj.get(key) - if val: - extra[key] = val - if 'license' in extra and extra['license']: - for i in range(len(extra['license'])): - if 'start' in extra['license'][i]: - extra['license'][i]['start'] = extra['license'][i]['start']['date-time'] - if len(obj['title']) > 1: - extra['other-titles'] = obj['title'][1:] - # TODO: this should be top-level - extra['is_kept'] = len(obj.get('archive', [])) > 0 - - # ISBN - isbn13 = None - for raw in obj.get('ISBN', []): - # TODO: convert if not ISBN-13 format - if len(raw) == 17: - isbn13 = raw - break - - # release status - if obj['type'] in ('journal-article', 'conference-proceeding', 'book', - 'dissertation', 'book-chapter'): - release_status = "published" - else: - # unknown - release_status = None - - # external identifiers - extids = self.lookup_ext_ids(doi=obj['DOI'].lower()) - - # TODO: filter out huge releases; we'll get them later (and fix bug in - # fatcatd) - if max(len(contribs), len(refs), len(abstracts)) > 750: - return None - - # release date parsing is amazingly complex - release_date = obj['issued']['date-parts'][0] - if not release_date or not release_date[0]: - # got some NoneType, even though at least year is supposed to be set - release_date = None - elif len(release_date) == 3: - release_date = datetime.datetime(year=release_date[0], month=release_date[1], day=release_date[2]) - else: - # only the year is actually required; mangle to first day for date - # (TODO: something better?) - release_date = datetime.datetime(year=release_date[0], month=1, day=1) - # convert to string ISO datetime format (if not null) - if release_date: - release_date = release_date.isoformat() + "Z" - - re = fatcat_client.ReleaseEntity( - work_id=None, - title=obj['title'][0], - contribs=contribs, - refs=refs, - container_id=container_id, - publisher=publisher, - release_type=obj['type'], - release_status=release_status, - doi=obj['DOI'].lower(), - isbn13=isbn13, - core_id=extids['core_id'], - pmid=extids['pmid'], - pmcid=extids['pmcid'], - wikidata_qid=extids['wikidata_qid'], - release_date=release_date, - issue=obj.get('issue'), - volume=obj.get('volume'), - pages=obj.get('page'), - abstracts=abstracts, - extra=dict(crossref=extra)) - return (re, ce) - - def create_row(self, row, editgroup=None): - if row is None: - return - obj = json.loads(row) - entities = self.parse_crossref_dict(obj) - if entities is not None: - (re, ce) = entities - if ce is not None: - container = self.api.create_container(ce, editgroup=editgroup) - re.container_id = container.ident - self._issnl_id_map[ce.issnl] = container.ident - self.api.create_release(re, editgroup=editgroup) - self.insert_count = self.insert_count + 1 - - def create_batch(self, batch, editgroup=None): - """Current work/release pairing disallows batch creation of releases. - Could do batch work creation and then match against releases, but meh.""" - release_batch = [] - for row in batch: - if row is None: - continue - obj = json.loads(row) - entities = self.parse_crossref_dict(obj) - if entities is not None: - (re, ce) = entities - if ce is not None: - ce_eg = self.api.create_editgroup( - fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) - container = self.api.create_container(ce, editgroup=ce_eg.id) - self.api.accept_editgroup(ce_eg.id) - re.container_id = container.ident - self._issnl_id_map[ce.issnl] = container.ident - release_batch.append(re) - self.api.create_release_batch(release_batch, autoaccept="true", editgroup=editgroup) - self.insert_count = self.insert_count + len(release_batch) diff --git a/python/fatcat_tools/elastic_workers.py b/python/fatcat_tools/elastic_workers.py deleted file mode 100644 index eac8d6b0..00000000 --- a/python/fatcat_tools/elastic_workers.py +++ /dev/null @@ -1,47 +0,0 @@ - -import json -import time -import requests -from fatcat_tools.worker_common import FatcatWorker -from fatcat_client.models import ReleaseEntity -from fatcat_tools.transforms import * -from pykafka.common import OffsetType - - -class FatcatElasticReleaseWorker(FatcatWorker): - """ - Consumes from release-updates topic and pushes into (presumably local) - elasticsearch. - - Uses a consumer group to manage offset. - """ - - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elastic_backend="http://localhost:9200", elastic_index="fatcat"): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic, - api_host_url=None) - self.consumer_group = "elastic-updates" - self.elastic_backend = elastic_backend - self.elastic_index = elastic_index - - def run(self): - consume_topic = self.kafka.topics[self.consume_topic] - - consumer = consume_topic.get_balanced_consumer( - consumer_group=self.consumer_group, - managed=True, - ) - - for msg in consumer: - json_str = msg.value.decode('utf-8') - release = entity_from_json(json_str, ReleaseEntity) - #print(release) - elastic_endpoint = "{}/{}/release/{}".format( - self.elastic_backend, - self.elastic_index, - release.ident) - print("Updating document: {}".format(elastic_endpoint)) - resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) - assert resp.status_code in (200, 201) - consumer.commit_offsets() diff --git a/python/fatcat_tools/grobid_metadata_importer.py b/python/fatcat_tools/grobid_metadata_importer.py deleted file mode 100755 index effa0d94..00000000 --- a/python/fatcat_tools/grobid_metadata_importer.py +++ /dev/null @@ -1,168 +0,0 @@ -#!/usr/bin/env python3 - -import sys -import json -import base64 -import datetime -import fatcat_client -from fatcat_tools.importer_common import FatcatImporter - -MAX_ABSTRACT_BYTES=4096 - - -class FatcatGrobidMetadataImporter(FatcatImporter): - - def __init__(self, host_url, default_link_rel="web"): - super().__init__(host_url) - self.default_link_rel = default_link_rel - - def parse_grobid_json(self, obj): - - if not obj.get('title'): - return None - - release = dict() - extra = dict() - - if obj.get('abstract') and len(obj.get('abstract')) < MAX_ABSTRACT_BYTES: - abobj = dict( - mimetype="text/plain", - language=None, - content=obj.get('abstract').strip()) - abstracts = [abobj] - else: - abstracts = None - - contribs = [] - for i, a in enumerate(obj.get('authors', [])): - c = dict(raw_name=a['name'], role="author") - contribs.append(fatcat_client.ReleaseContrib( - index=i, - raw_name=a['name'], - role="author", - extra=None)) - - refs = [] - for raw in obj.get('citations', []): - cite_extra = dict() - ref = dict() - ref['key'] = raw.get('id') - if raw.get('title'): - ref['title'] = raw['title'].strip() - if raw.get('date'): - try: - year = int(raw['date'].strip()[:4]) - ref['year'] = year - except: - pass - for key in ('volume', 'url', 'issue', 'publisher'): - if raw.get(key): - cite_extra[key] = raw[key].strip() - if raw.get('authors'): - cite_extra['authors'] = [a['name'] for a in raw['authors']] - if cite_extra: - cite_extra = dict(grobid=cite_extra) - else: - cite_extra = None - ref['extra'] = cite_extra - refs.append(ref) - - release_type = "journal-article" - release_date = None - if obj.get('date'): - # TODO: only returns year, ever? how to handle? - release_date = datetime.datetime(year=int(obj['date'][:4]), month=1, day=1) - - if obj.get('doi'): - extra['doi'] = obj['doi'] - if obj['journal'] and obj['journal'].get('name'): - extra['container_name'] = obj['journal']['name'] - - extra['is_longtail_oa'] = True - - # TODO: ISSN/eISSN handling? or just journal name lookup? - - if extra: - extra = dict(grobid=extra) - else: - extra = None - - re = fatcat_client.ReleaseEntity( - title=obj['title'].strip(), - contribs=contribs, - refs=refs, - publisher=obj['journal'].get('publisher'), - volume=obj['journal'].get('volume'), - issue=obj['journal'].get('issue'), - abstracts=abstracts, - extra=extra) - return re - - # TODO: make this a common function somewhere - def make_url(self, raw): - rel = self.default_link_rel - # TODO: this is where we could map specific domains to rel types, - # and also filter out bad domains, invalid URLs, etc - if "//archive.org/" in raw or "//arxiv.org/" in raw: - # TODO: special-case the arxiv.org bulk mirror? - rel = "repository" - elif "//web.archive.org/" in raw or "//archive.is/" in raw: - rel = "webarchive" - return fatcat_client.FileEntityUrls(url=raw, rel=rel) - - def parse_file_metadata(self, sha1_key, cdx, mimetype, file_size): - - sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() - - # lookup existing SHA1, or create new entity - try: - existing_file = self.api.lookup_file(sha1=sha1) - except fatcat_client.rest.ApiException as err: - if err.status != 404: - raise err - existing_file = None - - if existing_file: - # if file is already in here, presumably not actually long-tail - return None - fe = fatcat_client.FileEntity( - sha1=sha1, - size=int(file_size), - mimetype=mimetype, - releases=[], - urls=[], - ) - - # parse URLs and CDX - original = cdx['url'] - wayback = "https://web.archive.org/web/{}/{}".format( - cdx['dt'], - original) - fe.urls.append( - fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) - original_url = self.make_url(original) - if original_url != None: - fe.urls.append(original_url) - - return fe - - def create_row(self, row, editgroup=None): - if not row: - return - fields = row.split('\t') - sha1_key = fields[0] - cdx = json.loads(fields[1]) - mimetype = fields[2] - file_size = int(fields[3]) - grobid_meta = json.loads(fields[4]) - fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) - re = self.parse_grobid_json(grobid_meta) - if fe and re: - release_entity = self.api.create_release(re, editgroup=editgroup) - # release ident can't already be in release list because we just - # created it - fe.releases.append(release_entity.ident) - file_entity = self.api.create_file(fe, editgroup=editgroup) - self.insert_count = self.insert_count + 1 - - # NB: batch mode not implemented diff --git a/python/fatcat_tools/importer_common.py b/python/fatcat_tools/importer_common.py deleted file mode 100644 index 8dfee875..00000000 --- a/python/fatcat_tools/importer_common.py +++ /dev/null @@ -1,137 +0,0 @@ - -import re -import sys -import csv -import json -import itertools -import fatcat_client -from fatcat_client.rest import ApiException - -# from: https://docs.python.org/3/library/itertools.html -def grouper(iterable, n, fillvalue=None): - "Collect data into fixed-length chunks or blocks" - args = [iter(iterable)] * n - return itertools.zip_longest(*args, fillvalue=fillvalue) - -class FatcatImporter: - - def __init__(self, host_url, issn_map_file=None): - conf = fatcat_client.Configuration() - conf.host = host_url - self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) - self._issnl_id_map = dict() - self._orcid_id_map = dict() - self._doi_id_map = dict() - self._issn_issnl_map = None - self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") - if issn_map_file: - self.read_issn_map_file(issn_map_file) - self.processed_lines = 0 - self.insert_count = 0 - self.update_count = 0 - - def describe_run(self): - print("Processed {} lines, inserted {}, updated {}.".format( - self.processed_lines, self.insert_count, self.update_count)) - - def process_source(self, source, group_size=100): - """Creates and auto-accepts editgroup every group_size rows""" - eg = self.api.create_editgroup( - fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) - for i, row in enumerate(source): - self.create_row(row, editgroup=eg.id) - if i > 0 and (i % group_size) == 0: - self.api.accept_editgroup(eg.id) - eg = self.api.create_editgroup( - fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) - self.processed_lines = self.processed_lines + 1 - if i == 0 or (i % group_size) != 0: - self.api.accept_editgroup(eg.id) - - def process_batch(self, source, size=50): - """Reads and processes in batches (not API-call-per-)""" - for rows in grouper(source, size): - self.processed_lines = self.processed_lines + len(rows) - eg = self.api.create_editgroup( - fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) - self.create_batch(rows, editgroup=eg.id) - - def process_csv_source(self, source, group_size=100, delimiter=','): - reader = csv.DictReader(source, delimiter=delimiter) - self.process_source(reader, group_size) - - def process_csv_batch(self, source, size=50, delimiter=','): - reader = csv.DictReader(source, delimiter=delimiter) - self.process_batch(reader, size) - - def is_issnl(self, issnl): - return len(issnl) == 9 and issnl[4] == '-' - - def lookup_issnl(self, issnl): - """Caches calls to the ISSN-L lookup API endpoint in a local dict""" - if issnl in self._issnl_id_map: - return self._issnl_id_map[issnl] - container_id = None - try: - rv = self.api.lookup_container(issnl=issnl) - container_id = rv.ident - except ApiException as ae: - # If anything other than a 404 (not found), something is wrong - assert ae.status == 404 - self._issnl_id_map[issnl] = container_id # might be None - return container_id - - def is_orcid(self, orcid): - return self._orcid_regex.match(orcid) != None - - def lookup_orcid(self, orcid): - """Caches calls to the Orcid lookup API endpoint in a local dict""" - if not self.is_orcid(orcid): - return None - if orcid in self._orcid_id_map: - return self._orcid_id_map[orcid] - creator_id = None - try: - rv = self.api.lookup_creator(orcid=orcid) - creator_id = rv.ident - except ApiException as ae: - # If anything other than a 404 (not found), something is wrong - assert ae.status == 404 - self._orcid_id_map[orcid] = creator_id # might be None - return creator_id - - def is_doi(self, doi): - return doi.startswith("10.") and doi.count("/") >= 1 - - def lookup_doi(self, doi): - """Caches calls to the doi lookup API endpoint in a local dict""" - assert self.is_doi(doi) - doi = doi.lower() - if doi in self._doi_id_map: - return self._doi_id_map[doi] - release_id = None - try: - rv = self.api.lookup_release(doi=doi) - release_id = rv.ident - except ApiException as ae: - # If anything other than a 404 (not found), something is wrong - assert ae.status == 404 - self._doi_id_map[doi] = release_id # might be None - return release_id - - def read_issn_map_file(self, issn_map_file): - print("Loading ISSN map file...") - self._issn_issnl_map = dict() - for line in issn_map_file: - if line.startswith("ISSN") or len(line) == 0: - continue - (issn, issnl) = line.split()[0:2] - self._issn_issnl_map[issn] = issnl - # double mapping makes lookups easy - self._issn_issnl_map[issnl] = issnl - print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map))) - - def issn2issnl(self, issn): - if issn is None: - return None - return self._issn_issnl_map.get(issn) diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py new file mode 100644 index 00000000..8dfee875 --- /dev/null +++ b/python/fatcat_tools/importers/common.py @@ -0,0 +1,137 @@ + +import re +import sys +import csv +import json +import itertools +import fatcat_client +from fatcat_client.rest import ApiException + +# from: https://docs.python.org/3/library/itertools.html +def grouper(iterable, n, fillvalue=None): + "Collect data into fixed-length chunks or blocks" + args = [iter(iterable)] * n + return itertools.zip_longest(*args, fillvalue=fillvalue) + +class FatcatImporter: + + def __init__(self, host_url, issn_map_file=None): + conf = fatcat_client.Configuration() + conf.host = host_url + self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) + self._issnl_id_map = dict() + self._orcid_id_map = dict() + self._doi_id_map = dict() + self._issn_issnl_map = None + self._orcid_regex = re.compile("^\\d{4}-\\d{4}-\\d{4}-\\d{3}[\\dX]$") + if issn_map_file: + self.read_issn_map_file(issn_map_file) + self.processed_lines = 0 + self.insert_count = 0 + self.update_count = 0 + + def describe_run(self): + print("Processed {} lines, inserted {}, updated {}.".format( + self.processed_lines, self.insert_count, self.update_count)) + + def process_source(self, source, group_size=100): + """Creates and auto-accepts editgroup every group_size rows""" + eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + for i, row in enumerate(source): + self.create_row(row, editgroup=eg.id) + if i > 0 and (i % group_size) == 0: + self.api.accept_editgroup(eg.id) + eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + self.processed_lines = self.processed_lines + 1 + if i == 0 or (i % group_size) != 0: + self.api.accept_editgroup(eg.id) + + def process_batch(self, source, size=50): + """Reads and processes in batches (not API-call-per-)""" + for rows in grouper(source, size): + self.processed_lines = self.processed_lines + len(rows) + eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + self.create_batch(rows, editgroup=eg.id) + + def process_csv_source(self, source, group_size=100, delimiter=','): + reader = csv.DictReader(source, delimiter=delimiter) + self.process_source(reader, group_size) + + def process_csv_batch(self, source, size=50, delimiter=','): + reader = csv.DictReader(source, delimiter=delimiter) + self.process_batch(reader, size) + + def is_issnl(self, issnl): + return len(issnl) == 9 and issnl[4] == '-' + + def lookup_issnl(self, issnl): + """Caches calls to the ISSN-L lookup API endpoint in a local dict""" + if issnl in self._issnl_id_map: + return self._issnl_id_map[issnl] + container_id = None + try: + rv = self.api.lookup_container(issnl=issnl) + container_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._issnl_id_map[issnl] = container_id # might be None + return container_id + + def is_orcid(self, orcid): + return self._orcid_regex.match(orcid) != None + + def lookup_orcid(self, orcid): + """Caches calls to the Orcid lookup API endpoint in a local dict""" + if not self.is_orcid(orcid): + return None + if orcid in self._orcid_id_map: + return self._orcid_id_map[orcid] + creator_id = None + try: + rv = self.api.lookup_creator(orcid=orcid) + creator_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._orcid_id_map[orcid] = creator_id # might be None + return creator_id + + def is_doi(self, doi): + return doi.startswith("10.") and doi.count("/") >= 1 + + def lookup_doi(self, doi): + """Caches calls to the doi lookup API endpoint in a local dict""" + assert self.is_doi(doi) + doi = doi.lower() + if doi in self._doi_id_map: + return self._doi_id_map[doi] + release_id = None + try: + rv = self.api.lookup_release(doi=doi) + release_id = rv.ident + except ApiException as ae: + # If anything other than a 404 (not found), something is wrong + assert ae.status == 404 + self._doi_id_map[doi] = release_id # might be None + return release_id + + def read_issn_map_file(self, issn_map_file): + print("Loading ISSN map file...") + self._issn_issnl_map = dict() + for line in issn_map_file: + if line.startswith("ISSN") or len(line) == 0: + continue + (issn, issnl) = line.split()[0:2] + self._issn_issnl_map[issn] = issnl + # double mapping makes lookups easy + self._issn_issnl_map[issnl] = issnl + print("Got {} ISSN-L mappings.".format(len(self._issn_issnl_map))) + + def issn2issnl(self, issn): + if issn is None: + return None + return self._issn_issnl_map.get(issn) diff --git a/python/fatcat_tools/importers/crossref.py b/python/fatcat_tools/importers/crossref.py new file mode 100644 index 00000000..dddb58d1 --- /dev/null +++ b/python/fatcat_tools/importers/crossref.py @@ -0,0 +1,272 @@ + +import sys +import json +import sqlite3 +import datetime +import itertools +import fatcat_client +from fatcat_tools.importers.common import FatcatImporter + + +class FatcatCrossrefImporter(FatcatImporter): + + def __init__(self, host_url, issn_map_file, extid_map_file=None, create_containers=True): + super().__init__(host_url, issn_map_file) + self.extid_map_db = None + if extid_map_file: + db_uri = "file:{}?mode=ro".format(extid_map_file) + print("Using external ID map: {}".format(db_uri)) + self.extid_map_db = sqlite3.connect(db_uri, uri=True) + else: + print("Not using external ID map") + self.create_containers = create_containers + + def lookup_ext_ids(self, doi): + if self.extid_map_db is None: + return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None) + row = self.extid_map_db.execute("SELECT core, pmid, pmcid, wikidata FROM ids WHERE doi=? LIMIT 1", + [doi.lower()]).fetchone() + if row is None: + return dict(core_id=None, pmid=None, pmcid=None, wikidata_qid=None) + row = [str(cell or '') or None for cell in row] + return dict( + core_id=row[0], + pmid=row[1], + pmcid=row[2], + wikidata_qid=row[3]) + + def parse_crossref_dict(self, obj): + """ + obj is a python dict (parsed from json). + returns a ReleaseEntity + """ + + # This work is out of scope if it doesn't have authors and a title + if (not 'author' in obj) or (not 'title' in obj): + return None + + # Other ways to be out of scope (provisionally) + if (not 'type' in obj): + return None + + # contribs + def do_contribs(obj_list, ctype): + contribs = [] + for i, am in enumerate(obj_list): + creator_id = None + if 'ORCID' in am.keys(): + creator_id = self.lookup_orcid(am['ORCID'].split('/')[-1]) + # Sorry humans :( + if am.get('given') and am.get('family'): + raw_name = "{} {}".format(am['given'], am['family']) + elif am.get('family'): + raw_name = am['family'] + else: + # TODO: defaults back to a pseudo-null value + raw_name = am.get('given', '') + extra = dict() + if ctype == "author": + index = i + else: + index = None + if am.get('affiliation'): + # note: affiliation => affiliations + extra['affiliations'] = am.get('affiliation') + if am.get('sequence') and am.get('sequence') != "additional": + extra['sequence'] = am.get('sequence') + if not extra: + extra = None + contribs.append(fatcat_client.ReleaseContrib( + creator_id=creator_id, + index=index, + raw_name=raw_name, + role=ctype, + extra=extra)) + return contribs + contribs = do_contribs(obj['author'], "author") + contribs.extend(do_contribs(obj.get('editor', []), "editor")) + contribs.extend(do_contribs(obj.get('translator', []), "translator")) + + # container + issn = obj.get('ISSN', [None])[0] + issnl = self.issn2issnl(issn) + container_id = None + if issnl: + container_id = self.lookup_issnl(issnl) + publisher = obj.get('publisher') + + ce = None + if (container_id is None and self.create_containers and issnl != None + and obj.get('container-title') and len(obj['container-title']) > 0): + ce = fatcat_client.ContainerEntity( + issnl=issnl, + publisher=publisher, + name=obj['container-title'][0]) + + # references + refs = [] + for i, rm in enumerate(obj.get('reference', [])): + try: + year = int(rm.get('year')) + # NOTE: will need to update/config in the future! + # NOTE: are there crossref works with year < 100? + if year > 2025 or year < 100: + year = None + except: + year = None + extra = rm.copy() + if rm.get('DOI'): + extra['doi'] = rm.get('DOI').lower() + key = rm.get('key') + if key and key.startswith(obj['DOI'].upper()): + key = key.replace(obj['DOI'].upper() + "-", '') + key = key.replace(obj['DOI'].upper(), '') + container_name = rm.get('volume-title') + if not container_name: + container_name = rm.get('journal-title') + extra.pop('DOI', None) + extra.pop('key', None) + extra.pop('year', None) + extra.pop('volume-name', None) + extra.pop('journal-title', None) + extra.pop('title', None) + extra.pop('first-page', None) + extra.pop('doi-asserted-by', None) + if extra: + extra = dict(crossref=extra) + else: + extra = None + refs.append(fatcat_client.ReleaseRef( + index=i, + # doing lookups would be a second import pass + target_release_id=None, + key=key, + year=year, + container_name=container_name, + title=rm.get('title'), + locator=rm.get('first-page'), + # TODO: just dump JSON somewhere here? + extra=extra)) + + # abstracts + abstracts = [] + if obj.get('abstract') != None: + abstracts.append(fatcat_client.ReleaseEntityAbstracts( + mimetype="application/xml+jats", + content=obj.get('abstract'))) + + # extra fields + extra = dict() + for key in ('subject', 'type', 'license', 'alternative-id', + 'container-title', 'original-title', 'subtitle', 'archive', + 'funder', 'group-title'): + # TODO: unpack "container-title" array + val = obj.get(key) + if val: + extra[key] = val + if 'license' in extra and extra['license']: + for i in range(len(extra['license'])): + if 'start' in extra['license'][i]: + extra['license'][i]['start'] = extra['license'][i]['start']['date-time'] + if len(obj['title']) > 1: + extra['other-titles'] = obj['title'][1:] + # TODO: this should be top-level + extra['is_kept'] = len(obj.get('archive', [])) > 0 + + # ISBN + isbn13 = None + for raw in obj.get('ISBN', []): + # TODO: convert if not ISBN-13 format + if len(raw) == 17: + isbn13 = raw + break + + # release status + if obj['type'] in ('journal-article', 'conference-proceeding', 'book', + 'dissertation', 'book-chapter'): + release_status = "published" + else: + # unknown + release_status = None + + # external identifiers + extids = self.lookup_ext_ids(doi=obj['DOI'].lower()) + + # TODO: filter out huge releases; we'll get them later (and fix bug in + # fatcatd) + if max(len(contribs), len(refs), len(abstracts)) > 750: + return None + + # release date parsing is amazingly complex + release_date = obj['issued']['date-parts'][0] + if not release_date or not release_date[0]: + # got some NoneType, even though at least year is supposed to be set + release_date = None + elif len(release_date) == 3: + release_date = datetime.datetime(year=release_date[0], month=release_date[1], day=release_date[2]) + else: + # only the year is actually required; mangle to first day for date + # (TODO: something better?) + release_date = datetime.datetime(year=release_date[0], month=1, day=1) + # convert to string ISO datetime format (if not null) + if release_date: + release_date = release_date.isoformat() + "Z" + + re = fatcat_client.ReleaseEntity( + work_id=None, + title=obj['title'][0], + contribs=contribs, + refs=refs, + container_id=container_id, + publisher=publisher, + release_type=obj['type'], + release_status=release_status, + doi=obj['DOI'].lower(), + isbn13=isbn13, + core_id=extids['core_id'], + pmid=extids['pmid'], + pmcid=extids['pmcid'], + wikidata_qid=extids['wikidata_qid'], + release_date=release_date, + issue=obj.get('issue'), + volume=obj.get('volume'), + pages=obj.get('page'), + abstracts=abstracts, + extra=dict(crossref=extra)) + return (re, ce) + + def create_row(self, row, editgroup=None): + if row is None: + return + obj = json.loads(row) + entities = self.parse_crossref_dict(obj) + if entities is not None: + (re, ce) = entities + if ce is not None: + container = self.api.create_container(ce, editgroup=editgroup) + re.container_id = container.ident + self._issnl_id_map[ce.issnl] = container.ident + self.api.create_release(re, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + def create_batch(self, batch, editgroup=None): + """Current work/release pairing disallows batch creation of releases. + Could do batch work creation and then match against releases, but meh.""" + release_batch = [] + for row in batch: + if row is None: + continue + obj = json.loads(row) + entities = self.parse_crossref_dict(obj) + if entities is not None: + (re, ce) = entities + if ce is not None: + ce_eg = self.api.create_editgroup( + fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) + container = self.api.create_container(ce, editgroup=ce_eg.id) + self.api.accept_editgroup(ce_eg.id) + re.container_id = container.ident + self._issnl_id_map[ce.issnl] = container.ident + release_batch.append(re) + self.api.create_release_batch(release_batch, autoaccept="true", editgroup=editgroup) + self.insert_count = self.insert_count + len(release_batch) diff --git a/python/fatcat_tools/importers/grobid_metadata.py b/python/fatcat_tools/importers/grobid_metadata.py new file mode 100644 index 00000000..56b2ee02 --- /dev/null +++ b/python/fatcat_tools/importers/grobid_metadata.py @@ -0,0 +1,168 @@ +#!/usr/bin/env python3 + +import sys +import json +import base64 +import datetime +import fatcat_client +from fatcat_tools.importers.common import FatcatImporter + +MAX_ABSTRACT_BYTES=4096 + + +class FatcatGrobidMetadataImporter(FatcatImporter): + + def __init__(self, host_url, default_link_rel="web"): + super().__init__(host_url) + self.default_link_rel = default_link_rel + + def parse_grobid_json(self, obj): + + if not obj.get('title'): + return None + + release = dict() + extra = dict() + + if obj.get('abstract') and len(obj.get('abstract')) < MAX_ABSTRACT_BYTES: + abobj = dict( + mimetype="text/plain", + language=None, + content=obj.get('abstract').strip()) + abstracts = [abobj] + else: + abstracts = None + + contribs = [] + for i, a in enumerate(obj.get('authors', [])): + c = dict(raw_name=a['name'], role="author") + contribs.append(fatcat_client.ReleaseContrib( + index=i, + raw_name=a['name'], + role="author", + extra=None)) + + refs = [] + for raw in obj.get('citations', []): + cite_extra = dict() + ref = dict() + ref['key'] = raw.get('id') + if raw.get('title'): + ref['title'] = raw['title'].strip() + if raw.get('date'): + try: + year = int(raw['date'].strip()[:4]) + ref['year'] = year + except: + pass + for key in ('volume', 'url', 'issue', 'publisher'): + if raw.get(key): + cite_extra[key] = raw[key].strip() + if raw.get('authors'): + cite_extra['authors'] = [a['name'] for a in raw['authors']] + if cite_extra: + cite_extra = dict(grobid=cite_extra) + else: + cite_extra = None + ref['extra'] = cite_extra + refs.append(ref) + + release_type = "journal-article" + release_date = None + if obj.get('date'): + # TODO: only returns year, ever? how to handle? + release_date = datetime.datetime(year=int(obj['date'][:4]), month=1, day=1) + + if obj.get('doi'): + extra['doi'] = obj['doi'] + if obj['journal'] and obj['journal'].get('name'): + extra['container_name'] = obj['journal']['name'] + + extra['is_longtail_oa'] = True + + # TODO: ISSN/eISSN handling? or just journal name lookup? + + if extra: + extra = dict(grobid=extra) + else: + extra = None + + re = fatcat_client.ReleaseEntity( + title=obj['title'].strip(), + contribs=contribs, + refs=refs, + publisher=obj['journal'].get('publisher'), + volume=obj['journal'].get('volume'), + issue=obj['journal'].get('issue'), + abstracts=abstracts, + extra=extra) + return re + + # TODO: make this a common function somewhere + def make_url(self, raw): + rel = self.default_link_rel + # TODO: this is where we could map specific domains to rel types, + # and also filter out bad domains, invalid URLs, etc + if "//archive.org/" in raw or "//arxiv.org/" in raw: + # TODO: special-case the arxiv.org bulk mirror? + rel = "repository" + elif "//web.archive.org/" in raw or "//archive.is/" in raw: + rel = "webarchive" + return fatcat_client.FileEntityUrls(url=raw, rel=rel) + + def parse_file_metadata(self, sha1_key, cdx, mimetype, file_size): + + sha1 = base64.b16encode(base64.b32decode(sha1_key.replace('sha1:', ''))).decode('ascii').lower() + + # lookup existing SHA1, or create new entity + try: + existing_file = self.api.lookup_file(sha1=sha1) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + existing_file = None + + if existing_file: + # if file is already in here, presumably not actually long-tail + return None + fe = fatcat_client.FileEntity( + sha1=sha1, + size=int(file_size), + mimetype=mimetype, + releases=[], + urls=[], + ) + + # parse URLs and CDX + original = cdx['url'] + wayback = "https://web.archive.org/web/{}/{}".format( + cdx['dt'], + original) + fe.urls.append( + fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) + original_url = self.make_url(original) + if original_url != None: + fe.urls.append(original_url) + + return fe + + def create_row(self, row, editgroup=None): + if not row: + return + fields = row.split('\t') + sha1_key = fields[0] + cdx = json.loads(fields[1]) + mimetype = fields[2] + file_size = int(fields[3]) + grobid_meta = json.loads(fields[4]) + fe = self.parse_file_metadata(sha1_key, cdx, mimetype, file_size) + re = self.parse_grobid_json(grobid_meta) + if fe and re: + release_entity = self.api.create_release(re, editgroup=editgroup) + # release ident can't already be in release list because we just + # created it + fe.releases.append(release_entity.ident) + file_entity = self.api.create_file(fe, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + # NB: batch mode not implemented diff --git a/python/fatcat_tools/importers/issn.py b/python/fatcat_tools/importers/issn.py new file mode 100644 index 00000000..d7fb9082 --- /dev/null +++ b/python/fatcat_tools/importers/issn.py @@ -0,0 +1,72 @@ + +import sys +import json +import itertools +import fatcat_client +from fatcat_tools.importers.common import FatcatImporter + +# CSV format (generated from git.archive.org/webgroup/oa-journal-analysis): +# ISSN-L,in_doaj,in_road,in_norwegian,in_crossref,title,publisher,url,lang,ISSN-print,ISSN-electronic,doi_count,has_doi,is_oa,is_kept,publisher_size,url_live,url_live_status,url_live_final_status,url_live_final_url,url_live_status_simple,url_live_final_status_simple,url_domain,gwb_pdf_count + +def or_none(s): + if s is None: + return None + if len(s) == 0: + return None + return s + +def truthy(s): + if s is None: + return None + s = s.lower() + if s in ('true', 't', 'yes', 'y', '1'): + return True + elif s in ('false', 'f', 'no', 'n', '0'): + return False + else: + return None + +class FatcatIssnImporter(FatcatImporter): + + def parse_issn_row(self, row): + """ + row is a python dict (parsed from CSV). + returns a ContainerEntity + """ + title = or_none(row['title']) + issnl = or_none(row['ISSN-L']) + if title is None or issnl is None: + return + extra = dict( + in_doaj=truthy(row['in_doaj']), + in_road=truthy(row['in_road']), + in_norwegian=truthy(row['in_norwegian']), + language=or_none(row['lang']), + url=or_none(row['url']), + ISSNp=or_none(row['ISSN-print']), + ISSNe=or_none(row['ISSN-electronic']), + is_oa=truthy(row['is_oa']), + is_kept=truthy(row['is_kept']), + ) + ce = fatcat_client.ContainerEntity( + issnl=issnl, + name=title, + publisher=or_none(row['publisher']), + abbrev=None, + coden=None, + extra=extra) + return ce + + def create_row(self, row, editgroup=None): + ce = self.parse_issn_row(row) + if ce is not None: + self.api.create_container(ce, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + def create_batch(self, batch, editgroup=None): + """Reads and processes in batches (not API-call-per-line)""" + objects = [self.parse_issn_row(l) + for l in batch if l != None] + objects = [o for o in objects if o != None] + self.api.create_container_batch(objects, autoaccept="true", editgroup=editgroup) + self.insert_count = self.insert_count + len(objects) diff --git a/python/fatcat_tools/importers/matched.py b/python/fatcat_tools/importers/matched.py new file mode 100644 index 00000000..6270fe88 --- /dev/null +++ b/python/fatcat_tools/importers/matched.py @@ -0,0 +1,144 @@ + +import sys +import json +import sqlite3 +import itertools +import fatcat_client +from fatcat_tools.importers.common import FatcatImporter + +#row = row.split('\t') +#assert len(row) == 2 +#sha1 = row[0].replace('sha1:') +#sha1 = base64.b16encode(base64.b32decode(sha1)).lower() +#print(sha1) +#dois = [d.lower() for d in json.loads(row[1])] + +class FatcatMatchedImporter(FatcatImporter): + """ + Input format is JSON with keys: + - dois (list) + - sha1 (hex) + - md5 (hex) + - sha256 (hex) + - size (int) + - cdx (list of objects) + - dt + - url + - mimetype + - urls (list of strings... or objects?) + + Future handlings/extensions: + - core_id, wikidata_id, pmcid, pmid: not as lists + """ + + def __init__(self, host_url, skip_file_update=False, default_mime=None, + default_link_rel="web"): + super().__init__(host_url) + self.default_mime = default_mime + self.default_link_rel = default_link_rel + self.skip_file_update = skip_file_update + + def make_url(self, raw): + rel = self.default_link_rel + # TODO: this is where we could map specific domains to rel types, + # and also filter out bad domains, invalid URLs, etc + if "//archive.org/" in raw or "//arxiv.org/" in raw: + # TODO: special-case the arxiv.org bulk mirror? + rel = "repository" + elif "//web.archive.org/" in raw or "//archive.is/" in raw: + rel = "webarchive" + return fatcat_client.FileEntityUrls(url=raw, rel=rel) + + def parse_matched_dict(self, obj): + sha1 = obj['sha1'] + dois = [d.lower() for d in obj.get('dois', [])] + + # lookup sha1, or create new entity + fe = None + if not self.skip_file_update: + try: + fe = self.api.lookup_file(sha1=sha1) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + if fe is None: + fe = fatcat_client.FileEntity( + sha1=sha1, + releases=[], + urls=[], + ) + + # lookup dois + re_list = set() + for doi in dois: + try: + re = self.api.lookup_release(doi=doi) + except fatcat_client.rest.ApiException as err: + if err.status != 404: + raise err + re = None + if re is None: + print("DOI not found: {}".format(doi)) + else: + re_list.add(re.ident) + if len(re_list) == 0: + return None + if fe.releases == set(re_list): + return None + re_list.update(fe.releases) + fe.releases = list(re_list) + + # parse URLs and CDX + existing_urls = [feu.url for feu in fe.urls] + for url in obj.get('url', []): + if url not in existing_urls: + url = self.make_url(url) + if url != None: + fe.urls.append(url) + for cdx in obj.get('cdx', []): + original = cdx['url'] + wayback = "https://web.archive.org/web/{}/{}".format( + cdx['dt'], + original) + if wayback not in existing_urls: + fe.urls.append( + fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) + if original not in existing_urls: + url = self.make_url(original) + if url != None: + fe.urls.append(url) + + if obj.get('size') != None: + fe.size = int(obj['size']) + fe.sha256 = obj.get('sha256', fe.sha256) + fe.md5 = obj.get('md5', fe.sha256) + if obj.get('mimetype') is None: + if fe.mimetype is None: + fe.mimetype = self.default_mime + else: + fe.mimetype = obj.get('mimetype') + return fe + + def create_row(self, row, editgroup=None): + obj = json.loads(row) + fe = self.parse_matched_dict(obj) + if fe is not None: + if fe.ident is None: + self.api.create_file(fe, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + else: + self.api.update_file(fe.ident, fe, editgroup=editgroup) + self.update_count = self.update_count + 1 + + def create_batch(self, batch, editgroup=None): + """Reads and processes in batches (not API-call-per-line)""" + objects = [self.parse_matched_dict(json.loads(l)) + for l in batch if l != None] + new_objects = [o for o in objects if o != None and o.ident == None] + update_objects = [o for o in objects if o != None and o.ident != None] + for obj in update_objects: + self.api.update_file(obj.ident, obj, editgroup=editgroup) + if len(new_objects) > 0: + self.api.create_file_batch(new_objects, autoaccept="true", editgroup=editgroup) + self.update_count = self.update_count + len(update_objects) + self.insert_count = self.insert_count + len(new_objects) diff --git a/python/fatcat_tools/importers/orcid.py b/python/fatcat_tools/importers/orcid.py new file mode 100644 index 00000000..350c4c57 --- /dev/null +++ b/python/fatcat_tools/importers/orcid.py @@ -0,0 +1,73 @@ + +import sys +import json +import itertools +import fatcat_client +from fatcat_tools.importers.common import FatcatImporter + +def value_or_none(e): + if type(e) == dict: + e = e.get('value') + if type(e) == str and len(e) == 0: + e = None + # TODO: this is probably bogus; patched in desperation; remove? + if e: + try: + e.encode() + except UnicodeEncodeError: + # Invalid JSON? + print("BAD UNICODE") + return None + return e + +class FatcatOrcidImporter(FatcatImporter): + + def parse_orcid_dict(self, obj): + """ + obj is a python dict (parsed from json). + returns a CreatorEntity + """ + name = obj['person']['name'] + if name is None: + return None + extra = None + given = value_or_none(name.get('given-names')) + sur = value_or_none(name.get('family-name')) + display = value_or_none(name.get('credit-name')) + if display is None: + # TODO: sorry human beings + if given and sur: + display = "{} {}".format(given, sur) + elif sur: + display = sur + elif given: + display = given + else: + # must have *some* name + return None + orcid = obj['orcid-identifier']['path'] + if not self.is_orcid(orcid): + sys.stderr.write("Bad ORCID: {}\n".format(orcid)) + return None + ce = fatcat_client.CreatorEntity( + orcid=orcid, + given_name=given, + surname=sur, + display_name=display, + extra=extra) + return ce + + def create_row(self, row, editgroup=None): + obj = json.loads(row) + ce = self.parse_orcid_dict(obj) + if ce is not None: + self.api.create_creator(ce, editgroup=editgroup) + self.insert_count = self.insert_count + 1 + + def create_batch(self, batch, editgroup=None): + """Reads and processes in batches (not API-call-per-line)""" + objects = [self.parse_orcid_dict(json.loads(l)) + for l in batch if l != None] + objects = [o for o in objects if o != None] + self.api.create_creator_batch(objects, autoaccept="true", editgroup=editgroup) + self.insert_count = self.insert_count + len(objects) diff --git a/python/fatcat_tools/issn_importer.py b/python/fatcat_tools/issn_importer.py deleted file mode 100644 index e3ed7382..00000000 --- a/python/fatcat_tools/issn_importer.py +++ /dev/null @@ -1,72 +0,0 @@ - -import sys -import json -import itertools -import fatcat_client -from fatcat_tools.importer_common import FatcatImporter - -# CSV format (generated from git.archive.org/webgroup/oa-journal-analysis): -# ISSN-L,in_doaj,in_road,in_norwegian,in_crossref,title,publisher,url,lang,ISSN-print,ISSN-electronic,doi_count,has_doi,is_oa,is_kept,publisher_size,url_live,url_live_status,url_live_final_status,url_live_final_url,url_live_status_simple,url_live_final_status_simple,url_domain,gwb_pdf_count - -def or_none(s): - if s is None: - return None - if len(s) == 0: - return None - return s - -def truthy(s): - if s is None: - return None - s = s.lower() - if s in ('true', 't', 'yes', 'y', '1'): - return True - elif s in ('false', 'f', 'no', 'n', '0'): - return False - else: - return None - -class FatcatIssnImporter(FatcatImporter): - - def parse_issn_row(self, row): - """ - row is a python dict (parsed from CSV). - returns a ContainerEntity - """ - title = or_none(row['title']) - issnl = or_none(row['ISSN-L']) - if title is None or issnl is None: - return - extra = dict( - in_doaj=truthy(row['in_doaj']), - in_road=truthy(row['in_road']), - in_norwegian=truthy(row['in_norwegian']), - language=or_none(row['lang']), - url=or_none(row['url']), - ISSNp=or_none(row['ISSN-print']), - ISSNe=or_none(row['ISSN-electronic']), - is_oa=truthy(row['is_oa']), - is_kept=truthy(row['is_kept']), - ) - ce = fatcat_client.ContainerEntity( - issnl=issnl, - name=title, - publisher=or_none(row['publisher']), - abbrev=None, - coden=None, - extra=extra) - return ce - - def create_row(self, row, editgroup=None): - ce = self.parse_issn_row(row) - if ce is not None: - self.api.create_container(ce, editgroup=editgroup) - self.insert_count = self.insert_count + 1 - - def create_batch(self, batch, editgroup=None): - """Reads and processes in batches (not API-call-per-line)""" - objects = [self.parse_issn_row(l) - for l in batch if l != None] - objects = [o for o in objects if o != None] - self.api.create_container_batch(objects, autoaccept="true", editgroup=editgroup) - self.insert_count = self.insert_count + len(objects) diff --git a/python/fatcat_tools/matched_importer.py b/python/fatcat_tools/matched_importer.py deleted file mode 100644 index 627ab6f1..00000000 --- a/python/fatcat_tools/matched_importer.py +++ /dev/null @@ -1,144 +0,0 @@ - -import sys -import json -import sqlite3 -import itertools -import fatcat_client -from fatcat_tools.importer_common import FatcatImporter - -#row = row.split('\t') -#assert len(row) == 2 -#sha1 = row[0].replace('sha1:') -#sha1 = base64.b16encode(base64.b32decode(sha1)).lower() -#print(sha1) -#dois = [d.lower() for d in json.loads(row[1])] - -class FatcatMatchedImporter(FatcatImporter): - """ - Input format is JSON with keys: - - dois (list) - - sha1 (hex) - - md5 (hex) - - sha256 (hex) - - size (int) - - cdx (list of objects) - - dt - - url - - mimetype - - urls (list of strings... or objects?) - - Future handlings/extensions: - - core_id, wikidata_id, pmcid, pmid: not as lists - """ - - def __init__(self, host_url, skip_file_update=False, default_mime=None, - default_link_rel="web"): - super().__init__(host_url) - self.default_mime = default_mime - self.default_link_rel = default_link_rel - self.skip_file_update = skip_file_update - - def make_url(self, raw): - rel = self.default_link_rel - # TODO: this is where we could map specific domains to rel types, - # and also filter out bad domains, invalid URLs, etc - if "//archive.org/" in raw or "//arxiv.org/" in raw: - # TODO: special-case the arxiv.org bulk mirror? - rel = "repository" - elif "//web.archive.org/" in raw or "//archive.is/" in raw: - rel = "webarchive" - return fatcat_client.FileEntityUrls(url=raw, rel=rel) - - def parse_matched_dict(self, obj): - sha1 = obj['sha1'] - dois = [d.lower() for d in obj.get('dois', [])] - - # lookup sha1, or create new entity - fe = None - if not self.skip_file_update: - try: - fe = self.api.lookup_file(sha1=sha1) - except fatcat_client.rest.ApiException as err: - if err.status != 404: - raise err - if fe is None: - fe = fatcat_client.FileEntity( - sha1=sha1, - releases=[], - urls=[], - ) - - # lookup dois - re_list = set() - for doi in dois: - try: - re = self.api.lookup_release(doi=doi) - except fatcat_client.rest.ApiException as err: - if err.status != 404: - raise err - re = None - if re is None: - print("DOI not found: {}".format(doi)) - else: - re_list.add(re.ident) - if len(re_list) == 0: - return None - if fe.releases == set(re_list): - return None - re_list.update(fe.releases) - fe.releases = list(re_list) - - # parse URLs and CDX - existing_urls = [feu.url for feu in fe.urls] - for url in obj.get('url', []): - if url not in existing_urls: - url = self.make_url(url) - if url != None: - fe.urls.append(url) - for cdx in obj.get('cdx', []): - original = cdx['url'] - wayback = "https://web.archive.org/web/{}/{}".format( - cdx['dt'], - original) - if wayback not in existing_urls: - fe.urls.append( - fatcat_client.FileEntityUrls(url=wayback, rel="webarchive")) - if original not in existing_urls: - url = self.make_url(original) - if url != None: - fe.urls.append(url) - - if obj.get('size') != None: - fe.size = int(obj['size']) - fe.sha256 = obj.get('sha256', fe.sha256) - fe.md5 = obj.get('md5', fe.sha256) - if obj.get('mimetype') is None: - if fe.mimetype is None: - fe.mimetype = self.default_mime - else: - fe.mimetype = obj.get('mimetype') - return fe - - def create_row(self, row, editgroup=None): - obj = json.loads(row) - fe = self.parse_matched_dict(obj) - if fe is not None: - if fe.ident is None: - self.api.create_file(fe, editgroup=editgroup) - self.insert_count = self.insert_count + 1 - else: - self.api.update_file(fe.ident, fe, editgroup=editgroup) - self.update_count = self.update_count + 1 - - def create_batch(self, batch, editgroup=None): - """Reads and processes in batches (not API-call-per-line)""" - objects = [self.parse_matched_dict(json.loads(l)) - for l in batch if l != None] - new_objects = [o for o in objects if o != None and o.ident == None] - update_objects = [o for o in objects if o != None and o.ident != None] - for obj in update_objects: - self.api.update_file(obj.ident, obj, editgroup=editgroup) - if len(new_objects) > 0: - self.api.create_file_batch(new_objects, autoaccept="true", editgroup=editgroup) - self.update_count = self.update_count + len(update_objects) - self.insert_count = self.insert_count + len(new_objects) diff --git a/python/fatcat_tools/orcid_importer.py b/python/fatcat_tools/orcid_importer.py deleted file mode 100644 index f2366c66..00000000 --- a/python/fatcat_tools/orcid_importer.py +++ /dev/null @@ -1,73 +0,0 @@ - -import sys -import json -import itertools -import fatcat_client -from fatcat_tools.importer_common import FatcatImporter - -def value_or_none(e): - if type(e) == dict: - e = e.get('value') - if type(e) == str and len(e) == 0: - e = None - # TODO: this is probably bogus; patched in desperation; remove? - if e: - try: - e.encode() - except UnicodeEncodeError: - # Invalid JSON? - print("BAD UNICODE") - return None - return e - -class FatcatOrcidImporter(FatcatImporter): - - def parse_orcid_dict(self, obj): - """ - obj is a python dict (parsed from json). - returns a CreatorEntity - """ - name = obj['person']['name'] - if name is None: - return None - extra = None - given = value_or_none(name.get('given-names')) - sur = value_or_none(name.get('family-name')) - display = value_or_none(name.get('credit-name')) - if display is None: - # TODO: sorry human beings - if given and sur: - display = "{} {}".format(given, sur) - elif sur: - display = sur - elif given: - display = given - else: - # must have *some* name - return None - orcid = obj['orcid-identifier']['path'] - if not self.is_orcid(orcid): - sys.stderr.write("Bad ORCID: {}\n".format(orcid)) - return None - ce = fatcat_client.CreatorEntity( - orcid=orcid, - given_name=given, - surname=sur, - display_name=display, - extra=extra) - return ce - - def create_row(self, row, editgroup=None): - obj = json.loads(row) - ce = self.parse_orcid_dict(obj) - if ce is not None: - self.api.create_creator(ce, editgroup=editgroup) - self.insert_count = self.insert_count + 1 - - def create_batch(self, batch, editgroup=None): - """Reads and processes in batches (not API-call-per-line)""" - objects = [self.parse_orcid_dict(json.loads(l)) - for l in batch if l != None] - objects = [o for o in objects if o != None] - self.api.create_creator_batch(objects, autoaccept="true", editgroup=editgroup) - self.insert_count = self.insert_count + len(objects) diff --git a/python/fatcat_tools/raw_api_client.py b/python/fatcat_tools/raw_api_client.py deleted file mode 100644 index 75151ebb..00000000 --- a/python/fatcat_tools/raw_api_client.py +++ /dev/null @@ -1,66 +0,0 @@ - -import sys -import json -import requests - - -class RawFatcatApiClient: - - def __init__(self, host_url): - self.host_url = host_url - self.session = requests.Session() - self._issn_map = dict() - - def get(self, path, data=None): - headers = {"content-type": "application/json"} - return self.session.get(self.host_url + path, json=data, - headers=headers) - - def post(self, path, data=None): - headers = {"content-type": "application/json"} - return self.session.post(self.host_url + path, json=data, - headers=headers) - - def new_editgroup(self): - rv = self.post('/v0/editgroup', data=dict( - editor_id=1)) - print(rv) - print(rv.json()) - assert rv.status_code == 201 - editgroup_id = rv.json()['id'] - return editgroup_id - - def accept_editgroup(self, eg): - rv = self.post('/v0/editgroup/{}/accept'.format(eg)) - assert rv.status_code == 200 - return rv - - def import_issn_file(self, json_file, create_containers=False, batchsize=100): - eg = self.new_editgroup() - i = 0 - with open(json_file, 'r') as file: - for line in file: - if i % batchsize == 0: - sys.stdout.write('\n{}: '.format(i)) - if (i+1) % 20 == 0: - sys.stdout.write('.') - i = i + 1 - obj = json.loads(line) - if not ("author" in obj and "title" in obj): - continue - try: - self.import_crossref_dict(obj, editgroup=eg, - create_containers=create_containers) - except Exception as e: - print("ERROR: {}".format(e)) - if i % batchsize == 0: - self.accept_editgroup(eg) - eg = self.new_editgroup() - if i % batchsize != 0: - self.accept_editgroup(eg) - print("done!") - - def health(self): - rv = self.get("/health") - assert rv.status_code == 200 - return rv.json() diff --git a/python/fatcat_tools/worker_common.py b/python/fatcat_tools/worker_common.py deleted file mode 100644 index 77ea2c15..00000000 --- a/python/fatcat_tools/worker_common.py +++ /dev/null @@ -1,25 +0,0 @@ - -import re -import sys -import csv -import json -import itertools -import fatcat_client -from pykafka import KafkaClient -from fatcat_client.rest import ApiException - - -class FatcatWorker: - """ - Common code for for Kafka producers and consumers. - """ - - def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None): - if api_host_url: - conf = fatcat_client.Configuration() - conf.host = api_host_url - self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") - self.produce_topic = produce_topic - self.consume_topic = consume_topic - diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py new file mode 100644 index 00000000..92bb8bdd --- /dev/null +++ b/python/fatcat_tools/workers/changelog.py @@ -0,0 +1,122 @@ + +import json +import time +from itertools import islice +from fatcat_tools.workers.worker_common import FatcatWorker +from pykafka.common import OffsetType + + +class FatcatChangelogWorker(FatcatWorker): + """ + Periodically polls the fatcat API looking for new changelogs. When they are + found, fetch them and push (as JSON) into a Kafka topic. + """ + + def __init__(self, api_host_url, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): + # TODO: should be offset=0 + super().__init__(kafka_hosts=kafka_hosts, + produce_topic=produce_topic, + api_host_url=api_host_url) + self.poll_interval = poll_interval + self.offset = offset # the fatcat changelog offset, not the kafka offset + + def most_recent_message(self, topic): + """ + Tries to fetch the most recent message from a given topic. + This only makes sense for single partition topics, though could be + extended with "last N" behavior. + + Following "Consuming the last N messages from a topic" + from https://pykafka.readthedocs.io/en/latest/usage.html#consumer-patterns + """ + consumer = topic.get_simple_consumer( + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=True) + offsets = [(p, op.last_offset_consumed - 1) + for p, op in consumer._partitions.items()] + offsets = [(p, (o if o > -1 else -2)) for p, o in offsets] + if -2 in [o for p, o in offsets]: + return None + else: + consumer.reset_offsets(offsets) + msg = islice(consumer, 1) + if msg: + return list(msg)[0].value + else: + return None + + def run(self): + topic = self.kafka.topics[self.produce_topic] + # On start, try to consume the most recent from the topic, and using + # that as the starting offset. Note that this is a single-partition + # topic + if self.offset is None: + print("Checking for most recent changelog offset...") + msg = self.most_recent_message(topic) + if msg: + self.offset = json.loads(msg.decode('utf-8'))['index'] + else: + self.offset = 1 + + with topic.get_sync_producer() as producer: + while True: + latest = int(self.api.get_changelog(limit=1)[0].index) + if latest > self.offset: + print("Fetching changelogs from {} through {}".format( + self.offset+1, latest)) + for i in range(self.offset+1, latest+1): + cle = self.api.get_changelog_entry(i) + obj = self.api.api_client.sanitize_for_serialization(cle) + producer.produce( + message=json.dumps(obj).encode('utf-8'), + partition_key=None, + timestamp=None, + #XXX: timestamp=cle.timestamp, + ) + self.offset = i + print("Sleeping {} seconds...".format(self.poll_interval)) + time.sleep(self.poll_interval) + + +class FatcatEntityUpdatesWorker(FatcatWorker): + """ + Consumes from the changelog topic and publishes expanded entities (fetched + from API) to update topics. + + For now, only release updates are published. + """ + + def __init__(self, api_host_url, kafka_hosts, consume_topic, release_topic): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=api_host_url) + self.release_topic = release_topic + self.consumer_group = "entity-updates" + + def run(self): + changelog_topic = self.kafka.topics[self.consume_topic] + release_topic = self.kafka.topics[self.release_topic] + + consumer = changelog_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + auto_offset_reset=OffsetType.LATEST, + reset_offset_on_start=False, + ) + + with release_topic.get_sync_producer() as producer: + for msg in consumer: + cle = json.loads(msg.value.decode('utf-8')) + #print(cle) + release_edits = cle['editgroup']['edits']['releases'] + for re in release_edits: + ident = re['ident'] + release = self.api.get_release(ident, expand="files,container") + release_dict = self.api.api_client.sanitize_for_serialization(release) + producer.produce( + message=json.dumps(release_dict).encode('utf-8'), + partition_key=ident.encode('utf-8'), + timestamp=None, + ) + consumer.commit_offsets() + diff --git a/python/fatcat_tools/workers/elastic.py b/python/fatcat_tools/workers/elastic.py new file mode 100644 index 00000000..46632792 --- /dev/null +++ b/python/fatcat_tools/workers/elastic.py @@ -0,0 +1,47 @@ + +import json +import time +import requests +from fatcat_tools.workers.worker_common import FatcatWorker +from fatcat_client.models import ReleaseEntity +from fatcat_tools.transforms import * +from pykafka.common import OffsetType + + +class FatcatElasticReleaseWorker(FatcatWorker): + """ + Consumes from release-updates topic and pushes into (presumably local) + elasticsearch. + + Uses a consumer group to manage offset. + """ + + def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, + elastic_backend="http://localhost:9200", elastic_index="fatcat"): + super().__init__(kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + api_host_url=None) + self.consumer_group = "elastic-updates" + self.elastic_backend = elastic_backend + self.elastic_index = elastic_index + + def run(self): + consume_topic = self.kafka.topics[self.consume_topic] + + consumer = consume_topic.get_balanced_consumer( + consumer_group=self.consumer_group, + managed=True, + ) + + for msg in consumer: + json_str = msg.value.decode('utf-8') + release = entity_from_json(json_str, ReleaseEntity) + #print(release) + elastic_endpoint = "{}/{}/release/{}".format( + self.elastic_backend, + self.elastic_index, + release.ident) + print("Updating document: {}".format(elastic_endpoint)) + resp = requests.post(elastic_endpoint, json=release.to_elastic_dict()) + assert resp.status_code in (200, 201) + consumer.commit_offsets() diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py new file mode 100644 index 00000000..77ea2c15 --- /dev/null +++ b/python/fatcat_tools/workers/worker_common.py @@ -0,0 +1,25 @@ + +import re +import sys +import csv +import json +import itertools +import fatcat_client +from pykafka import KafkaClient +from fatcat_client.rest import ApiException + + +class FatcatWorker: + """ + Common code for for Kafka producers and consumers. + """ + + def __init__(self, kafka_hosts, produce_topic=None, consume_topic=None, api_host_url=None): + if api_host_url: + conf = fatcat_client.Configuration() + conf.host = api_host_url + self.api = fatcat_client.DefaultApi(fatcat_client.ApiClient(conf)) + self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.produce_topic = produce_topic + self.consume_topic = consume_topic + -- cgit v1.2.3