diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 198 |
1 files changed, 106 insertions, 92 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index e33a2012..2639c85a 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -1,4 +1,3 @@ - import csv import datetime import json @@ -34,7 +33,6 @@ SANE_MAX_URLS: int = 100 DOMAIN_REL_MAP: Dict[str, str] = { "archive.org": "archive", # LOCKSS, Portico, DuraSpace, etc would also be "archive" - "arxiv.org": "repository", "babel.hathitrust.org": "repository", "cds.cern.ch": "repository", @@ -53,7 +51,6 @@ DOMAIN_REL_MAP: Dict[str, str] = { "zenodo.org": "repository", "www.biorxiv.org": "repository", "www.medrxiv.org": "repository", - "citeseerx.ist.psu.edu": "aggregator", "publisher-connector.core.ac.uk": "aggregator", "core.ac.uk": "aggregator", @@ -62,7 +59,6 @@ DOMAIN_REL_MAP: Dict[str, str] = { "pdfs.semanticscholar.org": "aggregator", "semanticscholar.org": "aggregator", "www.semanticscholar.org": "aggregator", - "academic.oup.com": "publisher", "cdn.elifesciences.org": "publisher", "cell.com": "publisher", @@ -86,15 +82,14 @@ DOMAIN_REL_MAP: Dict[str, str] = { "ehp.niehs.nih.gov": "publisher", "journals.tsu.ru": "publisher", "www.cogentoa.com": "publisher", - "www.researchgate.net": "academicsocial", "academia.edu": "academicsocial", - "wayback.archive-it.org": "webarchive", "web.archive.org": "webarchive", "archive.is": "webarchive", } + def make_rel_url(raw_url: str, default_link_rel: str = "web"): # this is where we map specific domains to rel types, and also filter out # bad domains, invalid URLs, etc @@ -105,12 +100,17 @@ def make_rel_url(raw_url: str, default_link_rel: str = "web"): break return (rel, raw_url) + def test_make_rel_url(): assert make_rel_url("http://example.com/thing.pdf")[0] == "web" assert make_rel_url("http://example.com/thing.pdf", default_link_rel="jeans")[0] == "jeans" - assert make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0] == "webarchive" + assert ( + make_rel_url("https://web.archive.org/web/*/http://example.com/thing.pdf")[0] + == "webarchive" + ) assert make_rel_url("http://cell.com/thing.pdf")[0] == "publisher" + class EntityImporter: """ Base class for fatcat entity importers. @@ -147,23 +147,26 @@ class EntityImporter: def __init__(self, api, **kwargs): - eg_extra = kwargs.get('editgroup_extra', dict()) - eg_extra['git_rev'] = eg_extra.get('git_rev', - subprocess.check_output(["git", "describe", "--always"]).strip()).decode('utf-8') - eg_extra['agent'] = eg_extra.get('agent', 'fatcat_tools.EntityImporter') + eg_extra = kwargs.get("editgroup_extra", dict()) + eg_extra["git_rev"] = eg_extra.get( + "git_rev", subprocess.check_output(["git", "describe", "--always"]).strip() + ).decode("utf-8") + eg_extra["agent"] = eg_extra.get("agent", "fatcat_tools.EntityImporter") self.api = api - self.do_updates = bool(kwargs.get('do_updates', True)) - self.do_fuzzy_match: bool = kwargs.get('do_fuzzy_match', True) - self.bezerk_mode: bool = kwargs.get('bezerk_mode', False) - self.submit_mode: bool = kwargs.get('submit_mode', False) - self.edit_batch_size: int = kwargs.get('edit_batch_size', 100) - self.editgroup_description: Optional[str] = kwargs.get('editgroup_description') + self.do_updates = bool(kwargs.get("do_updates", True)) + self.do_fuzzy_match: bool = kwargs.get("do_fuzzy_match", True) + self.bezerk_mode: bool = kwargs.get("bezerk_mode", False) + self.submit_mode: bool = kwargs.get("submit_mode", False) + self.edit_batch_size: int = kwargs.get("edit_batch_size", 100) + self.editgroup_description: Optional[str] = kwargs.get("editgroup_description") self.editgroup_extra: Optional[Any] = eg_extra - self.es_client = kwargs.get('es_client') + self.es_client = kwargs.get("es_client") if not self.es_client: - self.es_client = elasticsearch.Elasticsearch("https://search.fatcat.wiki", timeout=120) + self.es_client = elasticsearch.Elasticsearch( + "https://search.fatcat.wiki", timeout=120 + ) self._issnl_id_map: Dict[str, Any] = dict() self._orcid_id_map: Dict[str, Any] = dict() @@ -174,7 +177,7 @@ class EntityImporter: self.reset() def reset(self) -> None: - self.counts = Counter({'total': 0, 'skip': 0, 'insert': 0, 'update': 0, 'exists': 0}) + self.counts = Counter({"total": 0, "skip": 0, "insert": 0, "update": 0, "exists": 0}) self._edit_count: int = 0 self._editgroup_id: Optional[str] = None self._entity_queue: List[Any] = [] @@ -184,13 +187,13 @@ class EntityImporter: """ Returns nothing. """ - self.counts['total'] += 1 + self.counts["total"] += 1 if (not raw_record) or (not self.want(raw_record)): - self.counts['skip'] += 1 + self.counts["skip"] += 1 return entity = self.parse_record(raw_record) if not entity: - self.counts['skip'] += 1 + self.counts["skip"] += 1 return if self.bezerk_mode: self.push_entity(entity) @@ -230,7 +233,7 @@ class EntityImporter: if self._entity_queue: self.insert_batch(self._entity_queue) - self.counts['insert'] += len(self._entity_queue) + self.counts["insert"] += len(self._entity_queue) self._entity_queue = [] return self.counts @@ -248,8 +251,9 @@ class EntityImporter: if not self._editgroup_id: eg = self.api.create_editgroup( fatcat_openapi_client.Editgroup( - description=self.editgroup_description, - extra=self.editgroup_extra)) + description=self.editgroup_description, extra=self.editgroup_extra + ) + ) self._editgroup_id = eg.editgroup_id self._edit_count += edits @@ -257,30 +261,30 @@ class EntityImporter: def create_container(self, entity): eg_id = self.get_editgroup_id() - self.counts['inserted.container'] += 1 + self.counts["inserted.container"] += 1 return self.api.create_container(eg_id, entity) def create_release(self, entity): eg_id = self.get_editgroup_id() - self.counts['inserted.release'] += 1 + self.counts["inserted.release"] += 1 return self.api.create_release(eg_id, entity) def create_file(self, entity): eg_id = self.get_editgroup_id() - self.counts['inserted.file'] += 1 + self.counts["inserted.file"] += 1 return self.api.create_file(eg_id, entity) def updated(self): """ Implementations should call this from try_update() if the update was successful """ - self.counts['update'] += 1 + self.counts["update"] += 1 def push_entity(self, entity): self._entity_queue.append(entity) if len(self._entity_queue) >= self.edit_batch_size: self.insert_batch(self._entity_queue) - self.counts['insert'] += len(self._entity_queue) + self.counts["insert"] += len(self._entity_queue) self._entity_queue = [] def want(self, raw_record: Any) -> bool: @@ -324,7 +328,7 @@ class EntityImporter: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae - self._orcid_id_map[orcid] = creator_id # might be None + self._orcid_id_map[orcid] = creator_id # might be None return creator_id def is_doi(self, doi: str) -> bool: @@ -347,7 +351,7 @@ class EntityImporter: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae - self._doi_id_map[doi] = release_id # might be None + self._doi_id_map[doi] = release_id # might be None return release_id def lookup_pmid(self, pmid: str): @@ -364,11 +368,11 @@ class EntityImporter: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae - self._pmid_id_map[pmid] = release_id # might be None + self._pmid_id_map[pmid] = release_id # might be None return release_id def is_issnl(self, issnl: str) -> bool: - return len(issnl) == 9 and issnl[4] == '-' + return len(issnl) == 9 and issnl[4] == "-" def lookup_issnl(self, issnl: str): """Caches calls to the ISSN-L lookup API endpoint in a local dict""" @@ -382,7 +386,7 @@ class EntityImporter: # If anything other than a 404 (not found), something is wrong if ae.status != 404: raise ae - self._issnl_id_map[issnl] = container_id # might be None + self._issnl_id_map[issnl] = container_id # might be None return container_id def read_issn_map_file(self, issn_map_file): @@ -417,26 +421,26 @@ class EntityImporter: # update old/deprecated 'rel' on URLs for i in range(len(existing.urls)): u = existing.urls[i] - if u.rel == 'repository' and '://archive.org/download/' in u.url: - existing.urls[i].rel = 'archive' - if u.rel == 'social': - u.rel = 'academicsocial' + if u.rel == "repository" and "://archive.org/download/" in u.url: + existing.urls[i].rel = "archive" + if u.rel == "social": + u.rel = "academicsocial" # remove URLs which are near-duplicates redundant_urls = [] all_urls = [u.url for u in existing.urls] - all_wayback_urls = [u.url for u in existing.urls if '://web.archive.org/web/' in u.url] + all_wayback_urls = [u.url for u in existing.urls if "://web.archive.org/web/" in u.url] for url in all_urls: # https/http redundancy - if url.startswith('http://') and url.replace('http://', 'https://', 1) in all_urls: + if url.startswith("http://") and url.replace("http://", "https://", 1) in all_urls: redundant_urls.append(url) continue # default HTTP port included and not included - if ':80/' in url and url.replace(':80', '', 1) in all_urls: + if ":80/" in url and url.replace(":80", "", 1) in all_urls: redundant_urls.append(url) continue # partial and complete wayback timestamps - if '://web.archive.org/web/2017/' in url: + if "://web.archive.org/web/2017/" in url: original_url = "/".join(url.split("/")[5:]) assert len(original_url) > 5 for wb_url in all_wayback_urls: @@ -452,7 +456,9 @@ class EntityImporter: def generic_fileset_cleanups(existing): return existing - def match_existing_release_fuzzy(self, release: ReleaseEntity) -> Optional[Tuple[str, str, ReleaseEntity]]: + def match_existing_release_fuzzy( + self, release: ReleaseEntity + ) -> Optional[Tuple[str, str, ReleaseEntity]]: """ This helper function uses fuzzycat (and elasticsearch) to look for existing release entities with similar metadata. @@ -488,7 +494,15 @@ class EntityImporter: return None release_dict = entity_to_dict(release, api_client=self.api.api_client) - verified = [(fuzzycat.verify.verify(release_dict, entity_to_dict(c, api_client=self.api.api_client)), c) for c in candidates] + verified = [ + ( + fuzzycat.verify.verify( + release_dict, entity_to_dict(c, api_client=self.api.api_client) + ), + c, + ) + for c in candidates + ] # chose the "closest" match closest = sorted(verified, key=lambda v: STATUS_SORT[v[0].status])[0] @@ -522,7 +536,6 @@ class RecordPusher: class JsonLinePusher(RecordPusher): - def __init__(self, importer, json_file, **kwargs): self.importer = importer self.json_file = json_file @@ -539,10 +552,9 @@ class JsonLinePusher(RecordPusher): class CsvPusher(RecordPusher): - def __init__(self, importer, csv_file, **kwargs): self.importer = importer - self.reader = csv.DictReader(csv_file, delimiter=kwargs.get('delimiter', ',')) + self.reader = csv.DictReader(csv_file, delimiter=kwargs.get("delimiter", ",")) def run(self): for line in self.reader: @@ -555,7 +567,6 @@ class CsvPusher(RecordPusher): class LinePusher(RecordPusher): - def __init__(self, importer, text_file, **kwargs): self.importer = importer self.text_file = text_file @@ -571,17 +582,15 @@ class LinePusher(RecordPusher): class SqlitePusher(RecordPusher): - def __init__(self, importer, db_file, table_name, where_clause="", **kwargs): self.importer = importer - self.db = sqlite3.connect(db_file, isolation_level='EXCLUSIVE') + self.db = sqlite3.connect(db_file, isolation_level="EXCLUSIVE") self.db.row_factory = sqlite3.Row self.table_name = table_name self.where_clause = where_clause def run(self): - cur = self.db.execute("SELECT * FROM {} {};".format( - self.table_name, self.where_clause)) + cur = self.db.execute("SELECT * FROM {} {};".format(self.table_name, self.where_clause)) for row in cur: self.importer.push_record(row) counts = self.importer.finish() @@ -590,7 +599,6 @@ class SqlitePusher(RecordPusher): class Bs4XmlLinesPusher(RecordPusher): - def __init__(self, importer, xml_file, prefix_filter=None, **kwargs): self.importer = importer self.xml_file = xml_file @@ -611,7 +619,6 @@ class Bs4XmlLinesPusher(RecordPusher): class Bs4XmlFilePusher(RecordPusher): - def __init__(self, importer, xml_file, record_tag, **kwargs): self.importer = importer self.xml_file = xml_file @@ -684,7 +691,6 @@ class Bs4XmlLargeFilePusher(RecordPusher): class Bs4XmlFileListPusher(RecordPusher): - def __init__(self, importer, list_file, record_tag, **kwargs): self.importer = importer self.list_file = list_file @@ -695,7 +701,7 @@ class Bs4XmlFileListPusher(RecordPusher): xml_path = xml_path.strip() if not xml_path or xml_path.startswith("#"): continue - with open(xml_path, 'r') as xml_file: + with open(xml_path, "r") as xml_file: soup = BeautifulSoup(xml_file, "xml") for record in soup.find_all(self.record_tag): self.importer.push_record(record) @@ -705,10 +711,12 @@ class Bs4XmlFileListPusher(RecordPusher): print(counts) return counts + class KafkaBs4XmlPusher(RecordPusher): """ Fetch XML for an article from Kafka, parse via Bs4. """ + def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): self.importer = importer self.consumer = make_kafka_consumer( @@ -716,10 +724,10 @@ class KafkaBs4XmlPusher(RecordPusher): kafka_env, topic_suffix, group, - kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') + kafka_namespace=kwargs.get("kafka_namespace", "fatcat"), ) - self.poll_interval = kwargs.get('poll_interval', 5.0) - self.consume_batch_size = kwargs.get('consume_batch_size', 25) + self.poll_interval = kwargs.get("poll_interval", 5.0) + self.consume_batch_size = kwargs.get("consume_batch_size", 25) def run(self): count = 0 @@ -735,16 +743,19 @@ class KafkaBs4XmlPusher(RecordPusher): # outstanding editgroups every 5 minutes, but there is still that # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( - num_messages=self.consume_batch_size, - timeout=self.poll_interval) - print("... got {} kafka messages ({}sec poll interval) {}".format( - len(batch), self.poll_interval, self.importer.counts)) + num_messages=self.consume_batch_size, timeout=self.poll_interval + ) + print( + "... got {} kafka messages ({}sec poll interval) {}".format( + len(batch), self.poll_interval, self.importer.counts + ) + ) if not batch: if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): # it has been some time, so flush any current editgroup self.importer.finish() last_push = datetime.datetime.now() - #print("Flushed any partial import batch: {}".format(self.importer.counts)) + # print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: @@ -752,7 +763,7 @@ class KafkaBs4XmlPusher(RecordPusher): raise KafkaException(msg.error()) # ... then process for msg in batch: - soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") + soup = BeautifulSoup(msg.value().decode("utf-8"), "xml") self.importer.push_record(soup) soup.decompose() count += 1 @@ -771,8 +782,8 @@ class KafkaBs4XmlPusher(RecordPusher): self.consumer.close() return counts -class KafkaJsonPusher(RecordPusher): +class KafkaJsonPusher(RecordPusher): def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): self.importer = importer self.consumer = make_kafka_consumer( @@ -780,11 +791,11 @@ class KafkaJsonPusher(RecordPusher): kafka_env, topic_suffix, group, - kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') + kafka_namespace=kwargs.get("kafka_namespace", "fatcat"), ) - self.poll_interval = kwargs.get('poll_interval', 5.0) - self.consume_batch_size = kwargs.get('consume_batch_size', 100) - self.force_flush = kwargs.get('force_flush', False) + self.poll_interval = kwargs.get("poll_interval", 5.0) + self.consume_batch_size = kwargs.get("consume_batch_size", 100) + self.force_flush = kwargs.get("force_flush", False) def run(self): count = 0 @@ -801,10 +812,13 @@ class KafkaJsonPusher(RecordPusher): # outstanding editgroups every 5 minutes, but there is still that # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( - num_messages=self.consume_batch_size, - timeout=self.poll_interval) - print("... got {} kafka messages ({}sec poll interval) {}".format( - len(batch), self.poll_interval, self.importer.counts)) + num_messages=self.consume_batch_size, timeout=self.poll_interval + ) + print( + "... got {} kafka messages ({}sec poll interval) {}".format( + len(batch), self.poll_interval, self.importer.counts + ) + ) if self.force_flush: # this flushing happens even if there have been 'push' events # more recently. it is intended for, eg, importers off the @@ -821,7 +835,7 @@ class KafkaJsonPusher(RecordPusher): self.importer.finish() last_push = datetime.datetime.now() last_force_flush = datetime.datetime.now() - #print("Flushed any partial import batch: {}".format(self.importer.counts)) + # print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: @@ -829,7 +843,7 @@ class KafkaJsonPusher(RecordPusher): raise KafkaException(msg.error()) # ... then process for msg in batch: - record = json.loads(msg.value().decode('utf-8')) + record = json.loads(msg.value().decode("utf-8")) self.importer.push_record(record) count += 1 if count % 500 == 0: @@ -864,25 +878,25 @@ def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat print("Bailing out...") # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) - #print("Kafka consumer commit successful") + # print("Kafka consumer commit successful") pass # previously, using pykafka - #auto_commit_enable=True, - #auto_commit_interval_ms=30000, # 30 seconds + # auto_commit_enable=True, + # auto_commit_interval_ms=30000, # 30 seconds conf = { - 'bootstrap.servers': hosts, - 'group.id': group, - 'on_commit': fail_fast, + "bootstrap.servers": hosts, + "group.id": group, + "on_commit": fail_fast, # messages don't have offset marked as stored until pushed to # elastic, but we do auto-commit stored offsets to broker - 'enable.auto.offset.store': False, - 'enable.auto.commit': True, + "enable.auto.offset.store": False, + "enable.auto.commit": True, # user code timeout; if no poll after this long, assume user code # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 120000, - 'default.topic.config': { - 'auto.offset.reset': 'latest', + "max.poll.interval.ms": 120000, + "default.topic.config": { + "auto.offset.reset": "latest", }, } @@ -890,13 +904,13 @@ def make_kafka_consumer(hosts, env, topic_suffix, group, kafka_namespace="fatcat for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions)) + print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions)) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 # encoded) - consumer.subscribe([topic_name], + consumer.subscribe( + [topic_name], on_assign=on_rebalance, on_revoke=on_rebalance, ) |