diff options
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 256 |
1 files changed, 138 insertions, 118 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index a61e364c..1e4cb41d 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,4 +1,3 @@ - import json import time @@ -16,11 +15,9 @@ class ChangelogWorker(FatcatWorker): """ def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): - super().__init__(kafka_hosts=kafka_hosts, - produce_topic=produce_topic, - api=api) + super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api) self.poll_interval = poll_interval - self.offset = offset # the fatcat changelog offset, not the kafka offset + self.offset = offset # the fatcat changelog offset, not the kafka offset def run(self): @@ -31,7 +28,7 @@ class ChangelogWorker(FatcatWorker): print("Checking for most recent changelog offset...") msg = most_recent_message(self.produce_topic, self.kafka_config) if msg: - self.offset = json.loads(msg.decode('utf-8'))['index'] + self.offset = json.loads(msg.decode("utf-8"))["index"] else: self.offset = 0 print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) @@ -44,28 +41,29 @@ class ChangelogWorker(FatcatWorker): raise KafkaException(err) producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) while True: latest = int(self.api.get_changelog(limit=1)[0].index) if latest > self.offset: - print("Fetching changelogs from {} through {}".format( - self.offset+1, latest)) - for i in range(self.offset+1, latest+1): + print("Fetching changelogs from {} through {}".format(self.offset + 1, latest)) + for i in range(self.offset + 1, latest + 1): cle = self.api.get_changelog_entry(i) obj = self.api.api_client.sanitize_for_serialization(cle) producer.produce( self.produce_topic, - json.dumps(obj).encode('utf-8'), + json.dumps(obj).encode("utf-8"), key=str(i), on_delivery=fail_fast, - #NOTE timestamp could be timestamp=cle.timestamp (?) + # NOTE timestamp could be timestamp=cle.timestamp (?) ) self.offset = i producer.flush() @@ -79,12 +77,19 @@ class EntityUpdatesWorker(FatcatWorker): from API) to update topics. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, - file_topic, container_topic, ingest_file_request_topic, - work_ident_topic, poll_interval=5.0): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic, - api=api) + def __init__( + self, + api, + kafka_hosts, + consume_topic, + release_topic, + file_topic, + container_topic, + ingest_file_request_topic, + work_ident_topic, + poll_interval=5.0, + ): + super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic @@ -150,7 +155,7 @@ class EntityUpdatesWorker(FatcatWorker): # Transactions of the Japan Society of Mechanical Engineers "10.1299/kikai", # protocols.io - "10.17504/" + "10.17504/", ] def want_live_ingest(self, release, ingest_request): @@ -163,40 +168,40 @@ class EntityUpdatesWorker(FatcatWorker): ingest crawling (via wayback SPN). """ - link_source = ingest_request.get('ingest_request') - ingest_type = ingest_request.get('ingest_type') - doi = ingest_request.get('ext_ids', {}).get('doi') + link_source = ingest_request.get("ingest_request") + ingest_type = ingest_request.get("ingest_type") + doi = ingest_request.get("ext_ids", {}).get("doi") es = release_to_elasticsearch(release) is_document = release.release_type in ( - 'article', - 'article-journal', - 'article-newspaper', - 'book', - 'chapter', - 'editorial', - 'interview', - 'legal_case', - 'legislation', - 'letter', - 'manuscript', - 'paper-conference', - 'patent', - 'peer_review', - 'post', - 'report', - 'retraction', - 'review', - 'review-book', - 'thesis', + "article", + "article-journal", + "article-newspaper", + "book", + "chapter", + "editorial", + "interview", + "legal_case", + "legislation", + "letter", + "manuscript", + "paper-conference", + "patent", + "peer_review", + "post", + "report", + "retraction", + "review", + "review-book", + "thesis", ) is_not_pdf = release.release_type in ( - 'component', - 'dataset', - 'figure', - 'graphic', - 'software', - 'stub', + "component", + "dataset", + "figure", + "graphic", + "software", + "stub", ) # accept list sets a default "crawl it" despite OA metadata for @@ -207,19 +212,23 @@ class EntityUpdatesWorker(FatcatWorker): if doi.startswith(prefix): in_acceptlist = True - if self.ingest_oa_only and link_source not in ('arxiv', 'pmc'): + if self.ingest_oa_only and link_source not in ("arxiv", "pmc"): # most datacite documents are in IRs and should be crawled is_datacite_doc = False - if release.extra and ('datacite' in release.extra) and is_document: + if release.extra and ("datacite" in release.extra) and is_document: is_datacite_doc = True - if not (es['is_oa'] or in_acceptlist or is_datacite_doc): + if not (es["is_oa"] or in_acceptlist or is_datacite_doc): return False # big publishers *generally* have accurate OA metadata, use # preservation networks, and block our crawlers. So unless OA, or # explicitly on accept list, or not preserved, skip crawling - if es.get('publisher_type') == 'big5' and es.get('is_preserved') and not (es['is_oa'] or in_acceptlist): + if ( + es.get("publisher_type") == "big5" + and es.get("is_preserved") + and not (es["is_oa"] or in_acceptlist) + ): return False # if ingest_type is pdf but release_type is almost certainly not a PDF, @@ -233,23 +242,24 @@ class EntityUpdatesWorker(FatcatWorker): return False # figshare - if doi and (doi.startswith('10.6084/') or doi.startswith('10.25384/')): + if doi and (doi.startswith("10.6084/") or doi.startswith("10.25384/")): # don't crawl "most recent version" (aka "group") DOIs if not release.version: return False # zenodo - if doi and doi.startswith('10.5281/'): + if doi and doi.startswith("10.5281/"): # if this is a "grouping" DOI of multiple "version" DOIs, do not crawl (will crawl the versioned DOIs) - if release.extra and release.extra.get('relations'): - for rel in release.extra['relations']: - if (rel.get('relationType') == 'HasVersion' and rel.get('relatedIdentifier', '').startswith('10.5281/')): + if release.extra and release.extra.get("relations"): + for rel in release.extra["relations"]: + if rel.get("relationType") == "HasVersion" and rel.get( + "relatedIdentifier", "" + ).startswith("10.5281/"): return False return True def run(self): - def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err)) @@ -278,36 +288,40 @@ class EntityUpdatesWorker(FatcatWorker): for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions)) + print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions)) consumer_conf = self.kafka_config.copy() - consumer_conf.update({ - 'group.id': self.consumer_group, - 'on_commit': fail_fast, - # messages don't have offset marked as stored until pushed to - # elastic, but we do auto-commit stored offsets to broker - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - # user code timeout; if no poll after this long, assume user code - # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 180000, - 'default.topic.config': { - 'auto.offset.reset': 'latest', - }, - }) + consumer_conf.update( + { + "group.id": self.consumer_group, + "on_commit": fail_fast, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + "enable.auto.commit": True, + "enable.auto.offset.store": False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + "max.poll.interval.ms": 180000, + "default.topic.config": { + "auto.offset.reset": "latest", + }, + } + ) consumer = Consumer(consumer_conf) producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) - consumer.subscribe([self.consume_topic], + consumer.subscribe( + [self.consume_topic], on_assign=on_rebalance, on_revoke=on_rebalance, ) @@ -316,14 +330,16 @@ class EntityUpdatesWorker(FatcatWorker): while True: msg = consumer.poll(self.poll_interval) if not msg: - print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval)) + print( + "nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval) + ) continue if msg.error(): raise KafkaException(msg.error()) - cle = json.loads(msg.value().decode('utf-8')) - #print(cle) - print("processing changelog index {}".format(cle['index'])) + cle = json.loads(msg.value().decode("utf-8")) + # print(cle) + print("processing changelog index {}".format(cle["index"])) release_ids = [] new_release_ids = [] file_ids = [] @@ -331,27 +347,27 @@ class EntityUpdatesWorker(FatcatWorker): webcapture_ids = [] container_ids = [] work_ids = [] - release_edits = cle['editgroup']['edits']['releases'] + release_edits = cle["editgroup"]["edits"]["releases"] for re in release_edits: - release_ids.append(re['ident']) + release_ids.append(re["ident"]) # filter to direct release edits which are not updates - if not re.get('prev_revision') and not re.get('redirect_ident'): - new_release_ids.append(re['ident']) - file_edits = cle['editgroup']['edits']['files'] + if not re.get("prev_revision") and not re.get("redirect_ident"): + new_release_ids.append(re["ident"]) + file_edits = cle["editgroup"]["edits"]["files"] for e in file_edits: - file_ids.append(e['ident']) - fileset_edits = cle['editgroup']['edits']['filesets'] + file_ids.append(e["ident"]) + fileset_edits = cle["editgroup"]["edits"]["filesets"] for e in fileset_edits: - fileset_ids.append(e['ident']) - webcapture_edits = cle['editgroup']['edits']['webcaptures'] + fileset_ids.append(e["ident"]) + webcapture_edits = cle["editgroup"]["edits"]["webcaptures"] for e in webcapture_edits: - webcapture_ids.append(e['ident']) - container_edits = cle['editgroup']['edits']['containers'] + webcapture_ids.append(e["ident"]) + container_edits = cle["editgroup"]["edits"]["containers"] for e in container_edits: - container_ids.append(e['ident']) - work_edits = cle['editgroup']['edits']['works'] + container_ids.append(e["ident"]) + work_edits = cle["editgroup"]["edits"]["works"] for e in work_edits: - work_ids.append(e['ident']) + work_ids.append(e["ident"]) # TODO: do these fetches in parallel using a thread pool? for ident in set(file_ids): @@ -363,8 +379,8 @@ class EntityUpdatesWorker(FatcatWorker): file_dict = self.api.api_client.sanitize_for_serialization(file_entity) producer.produce( self.file_topic, - json.dumps(file_dict).encode('utf-8'), - key=ident.encode('utf-8'), + json.dumps(file_dict).encode("utf-8"), + key=ident.encode("utf-8"), on_delivery=fail_fast, ) @@ -385,30 +401,34 @@ class EntityUpdatesWorker(FatcatWorker): container_dict = self.api.api_client.sanitize_for_serialization(container) producer.produce( self.container_topic, - json.dumps(container_dict).encode('utf-8'), - key=ident.encode('utf-8'), + json.dumps(container_dict).encode("utf-8"), + key=ident.encode("utf-8"), on_delivery=fail_fast, ) for ident in set(release_ids): - release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") + release = self.api.get_release( + ident, expand="files,filesets,webcaptures,container" + ) if release.work_id: work_ids.append(release.work_id) release_dict = self.api.api_client.sanitize_for_serialization(release) producer.produce( self.release_topic, - json.dumps(release_dict).encode('utf-8'), - key=ident.encode('utf-8'), + json.dumps(release_dict).encode("utf-8"), + key=ident.encode("utf-8"), on_delivery=fail_fast, ) # for ingest requests, filter to "new" active releases with no matched files if release.ident in new_release_ids: - ir = release_ingest_request(release, ingest_request_source='fatcat-changelog') + ir = release_ingest_request( + release, ingest_request_source="fatcat-changelog" + ) if ir and not release.files and self.want_live_ingest(release, ir): producer.produce( self.ingest_file_request_topic, - json.dumps(ir).encode('utf-8'), - #key=None, + json.dumps(ir).encode("utf-8"), + # key=None, on_delivery=fail_fast, ) @@ -420,13 +440,13 @@ class EntityUpdatesWorker(FatcatWorker): key=key, type="fatcat_work", work_ident=ident, - updated=cle['timestamp'], - fatcat_changelog_index=cle['index'], + updated=cle["timestamp"], + fatcat_changelog_index=cle["index"], ) producer.produce( self.work_ident_topic, - json.dumps(work_ident_dict).encode('utf-8'), - key=key.encode('utf-8'), + json.dumps(work_ident_dict).encode("utf-8"), + key=key.encode("utf-8"), on_delivery=fail_fast, ) |