summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers/changelog.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers/changelog.py')
-rw-r--r--python/fatcat_tools/workers/changelog.py256
1 files changed, 138 insertions, 118 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index a61e364c..1e4cb41d 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -1,4 +1,3 @@
-
import json
import time
@@ -16,11 +15,9 @@ class ChangelogWorker(FatcatWorker):
"""
def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
- super().__init__(kafka_hosts=kafka_hosts,
- produce_topic=produce_topic,
- api=api)
+ super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api)
self.poll_interval = poll_interval
- self.offset = offset # the fatcat changelog offset, not the kafka offset
+ self.offset = offset # the fatcat changelog offset, not the kafka offset
def run(self):
@@ -31,7 +28,7 @@ class ChangelogWorker(FatcatWorker):
print("Checking for most recent changelog offset...")
msg = most_recent_message(self.produce_topic, self.kafka_config)
if msg:
- self.offset = json.loads(msg.decode('utf-8'))['index']
+ self.offset = json.loads(msg.decode("utf-8"))["index"]
else:
self.offset = 0
print("Most recent changelog index in Kafka seems to be {}".format(self.offset))
@@ -44,28 +41,29 @@ class ChangelogWorker(FatcatWorker):
raise KafkaException(err)
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
while True:
latest = int(self.api.get_changelog(limit=1)[0].index)
if latest > self.offset:
- print("Fetching changelogs from {} through {}".format(
- self.offset+1, latest))
- for i in range(self.offset+1, latest+1):
+ print("Fetching changelogs from {} through {}".format(self.offset + 1, latest))
+ for i in range(self.offset + 1, latest + 1):
cle = self.api.get_changelog_entry(i)
obj = self.api.api_client.sanitize_for_serialization(cle)
producer.produce(
self.produce_topic,
- json.dumps(obj).encode('utf-8'),
+ json.dumps(obj).encode("utf-8"),
key=str(i),
on_delivery=fail_fast,
- #NOTE timestamp could be timestamp=cle.timestamp (?)
+ # NOTE timestamp could be timestamp=cle.timestamp (?)
)
self.offset = i
producer.flush()
@@ -79,12 +77,19 @@ class EntityUpdatesWorker(FatcatWorker):
from API) to update topics.
"""
- def __init__(self, api, kafka_hosts, consume_topic, release_topic,
- file_topic, container_topic, ingest_file_request_topic,
- work_ident_topic, poll_interval=5.0):
- super().__init__(kafka_hosts=kafka_hosts,
- consume_topic=consume_topic,
- api=api)
+ def __init__(
+ self,
+ api,
+ kafka_hosts,
+ consume_topic,
+ release_topic,
+ file_topic,
+ container_topic,
+ ingest_file_request_topic,
+ work_ident_topic,
+ poll_interval=5.0,
+ ):
+ super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api)
self.release_topic = release_topic
self.file_topic = file_topic
self.container_topic = container_topic
@@ -150,7 +155,7 @@ class EntityUpdatesWorker(FatcatWorker):
# Transactions of the Japan Society of Mechanical Engineers
"10.1299/kikai",
# protocols.io
- "10.17504/"
+ "10.17504/",
]
def want_live_ingest(self, release, ingest_request):
@@ -163,40 +168,40 @@ class EntityUpdatesWorker(FatcatWorker):
ingest crawling (via wayback SPN).
"""
- link_source = ingest_request.get('ingest_request')
- ingest_type = ingest_request.get('ingest_type')
- doi = ingest_request.get('ext_ids', {}).get('doi')
+ link_source = ingest_request.get("ingest_request")
+ ingest_type = ingest_request.get("ingest_type")
+ doi = ingest_request.get("ext_ids", {}).get("doi")
es = release_to_elasticsearch(release)
is_document = release.release_type in (
- 'article',
- 'article-journal',
- 'article-newspaper',
- 'book',
- 'chapter',
- 'editorial',
- 'interview',
- 'legal_case',
- 'legislation',
- 'letter',
- 'manuscript',
- 'paper-conference',
- 'patent',
- 'peer_review',
- 'post',
- 'report',
- 'retraction',
- 'review',
- 'review-book',
- 'thesis',
+ "article",
+ "article-journal",
+ "article-newspaper",
+ "book",
+ "chapter",
+ "editorial",
+ "interview",
+ "legal_case",
+ "legislation",
+ "letter",
+ "manuscript",
+ "paper-conference",
+ "patent",
+ "peer_review",
+ "post",
+ "report",
+ "retraction",
+ "review",
+ "review-book",
+ "thesis",
)
is_not_pdf = release.release_type in (
- 'component',
- 'dataset',
- 'figure',
- 'graphic',
- 'software',
- 'stub',
+ "component",
+ "dataset",
+ "figure",
+ "graphic",
+ "software",
+ "stub",
)
# accept list sets a default "crawl it" despite OA metadata for
@@ -207,19 +212,23 @@ class EntityUpdatesWorker(FatcatWorker):
if doi.startswith(prefix):
in_acceptlist = True
- if self.ingest_oa_only and link_source not in ('arxiv', 'pmc'):
+ if self.ingest_oa_only and link_source not in ("arxiv", "pmc"):
# most datacite documents are in IRs and should be crawled
is_datacite_doc = False
- if release.extra and ('datacite' in release.extra) and is_document:
+ if release.extra and ("datacite" in release.extra) and is_document:
is_datacite_doc = True
- if not (es['is_oa'] or in_acceptlist or is_datacite_doc):
+ if not (es["is_oa"] or in_acceptlist or is_datacite_doc):
return False
# big publishers *generally* have accurate OA metadata, use
# preservation networks, and block our crawlers. So unless OA, or
# explicitly on accept list, or not preserved, skip crawling
- if es.get('publisher_type') == 'big5' and es.get('is_preserved') and not (es['is_oa'] or in_acceptlist):
+ if (
+ es.get("publisher_type") == "big5"
+ and es.get("is_preserved")
+ and not (es["is_oa"] or in_acceptlist)
+ ):
return False
# if ingest_type is pdf but release_type is almost certainly not a PDF,
@@ -233,23 +242,24 @@ class EntityUpdatesWorker(FatcatWorker):
return False
# figshare
- if doi and (doi.startswith('10.6084/') or doi.startswith('10.25384/')):
+ if doi and (doi.startswith("10.6084/") or doi.startswith("10.25384/")):
# don't crawl "most recent version" (aka "group") DOIs
if not release.version:
return False
# zenodo
- if doi and doi.startswith('10.5281/'):
+ if doi and doi.startswith("10.5281/"):
# if this is a "grouping" DOI of multiple "version" DOIs, do not crawl (will crawl the versioned DOIs)
- if release.extra and release.extra.get('relations'):
- for rel in release.extra['relations']:
- if (rel.get('relationType') == 'HasVersion' and rel.get('relatedIdentifier', '').startswith('10.5281/')):
+ if release.extra and release.extra.get("relations"):
+ for rel in release.extra["relations"]:
+ if rel.get("relationType") == "HasVersion" and rel.get(
+ "relatedIdentifier", ""
+ ).startswith("10.5281/"):
return False
return True
def run(self):
-
def fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err))
@@ -278,36 +288,40 @@ class EntityUpdatesWorker(FatcatWorker):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions))
+ print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions))
consumer_conf = self.kafka_config.copy()
- consumer_conf.update({
- 'group.id': self.consumer_group,
- 'on_commit': fail_fast,
- # messages don't have offset marked as stored until pushed to
- # elastic, but we do auto-commit stored offsets to broker
- 'enable.auto.commit': True,
- 'enable.auto.offset.store': False,
- # user code timeout; if no poll after this long, assume user code
- # hung and rebalance (default: 5min)
- 'max.poll.interval.ms': 180000,
- 'default.topic.config': {
- 'auto.offset.reset': 'latest',
- },
- })
+ consumer_conf.update(
+ {
+ "group.id": self.consumer_group,
+ "on_commit": fail_fast,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ "enable.auto.commit": True,
+ "enable.auto.offset.store": False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ "max.poll.interval.ms": 180000,
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
consumer = Consumer(consumer_conf)
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
- consumer.subscribe([self.consume_topic],
+ consumer.subscribe(
+ [self.consume_topic],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)
@@ -316,14 +330,16 @@ class EntityUpdatesWorker(FatcatWorker):
while True:
msg = consumer.poll(self.poll_interval)
if not msg:
- print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval))
+ print(
+ "nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval)
+ )
continue
if msg.error():
raise KafkaException(msg.error())
- cle = json.loads(msg.value().decode('utf-8'))
- #print(cle)
- print("processing changelog index {}".format(cle['index']))
+ cle = json.loads(msg.value().decode("utf-8"))
+ # print(cle)
+ print("processing changelog index {}".format(cle["index"]))
release_ids = []
new_release_ids = []
file_ids = []
@@ -331,27 +347,27 @@ class EntityUpdatesWorker(FatcatWorker):
webcapture_ids = []
container_ids = []
work_ids = []
- release_edits = cle['editgroup']['edits']['releases']
+ release_edits = cle["editgroup"]["edits"]["releases"]
for re in release_edits:
- release_ids.append(re['ident'])
+ release_ids.append(re["ident"])
# filter to direct release edits which are not updates
- if not re.get('prev_revision') and not re.get('redirect_ident'):
- new_release_ids.append(re['ident'])
- file_edits = cle['editgroup']['edits']['files']
+ if not re.get("prev_revision") and not re.get("redirect_ident"):
+ new_release_ids.append(re["ident"])
+ file_edits = cle["editgroup"]["edits"]["files"]
for e in file_edits:
- file_ids.append(e['ident'])
- fileset_edits = cle['editgroup']['edits']['filesets']
+ file_ids.append(e["ident"])
+ fileset_edits = cle["editgroup"]["edits"]["filesets"]
for e in fileset_edits:
- fileset_ids.append(e['ident'])
- webcapture_edits = cle['editgroup']['edits']['webcaptures']
+ fileset_ids.append(e["ident"])
+ webcapture_edits = cle["editgroup"]["edits"]["webcaptures"]
for e in webcapture_edits:
- webcapture_ids.append(e['ident'])
- container_edits = cle['editgroup']['edits']['containers']
+ webcapture_ids.append(e["ident"])
+ container_edits = cle["editgroup"]["edits"]["containers"]
for e in container_edits:
- container_ids.append(e['ident'])
- work_edits = cle['editgroup']['edits']['works']
+ container_ids.append(e["ident"])
+ work_edits = cle["editgroup"]["edits"]["works"]
for e in work_edits:
- work_ids.append(e['ident'])
+ work_ids.append(e["ident"])
# TODO: do these fetches in parallel using a thread pool?
for ident in set(file_ids):
@@ -363,8 +379,8 @@ class EntityUpdatesWorker(FatcatWorker):
file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
producer.produce(
self.file_topic,
- json.dumps(file_dict).encode('utf-8'),
- key=ident.encode('utf-8'),
+ json.dumps(file_dict).encode("utf-8"),
+ key=ident.encode("utf-8"),
on_delivery=fail_fast,
)
@@ -385,30 +401,34 @@ class EntityUpdatesWorker(FatcatWorker):
container_dict = self.api.api_client.sanitize_for_serialization(container)
producer.produce(
self.container_topic,
- json.dumps(container_dict).encode('utf-8'),
- key=ident.encode('utf-8'),
+ json.dumps(container_dict).encode("utf-8"),
+ key=ident.encode("utf-8"),
on_delivery=fail_fast,
)
for ident in set(release_ids):
- release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
+ release = self.api.get_release(
+ ident, expand="files,filesets,webcaptures,container"
+ )
if release.work_id:
work_ids.append(release.work_id)
release_dict = self.api.api_client.sanitize_for_serialization(release)
producer.produce(
self.release_topic,
- json.dumps(release_dict).encode('utf-8'),
- key=ident.encode('utf-8'),
+ json.dumps(release_dict).encode("utf-8"),
+ key=ident.encode("utf-8"),
on_delivery=fail_fast,
)
# for ingest requests, filter to "new" active releases with no matched files
if release.ident in new_release_ids:
- ir = release_ingest_request(release, ingest_request_source='fatcat-changelog')
+ ir = release_ingest_request(
+ release, ingest_request_source="fatcat-changelog"
+ )
if ir and not release.files and self.want_live_ingest(release, ir):
producer.produce(
self.ingest_file_request_topic,
- json.dumps(ir).encode('utf-8'),
- #key=None,
+ json.dumps(ir).encode("utf-8"),
+ # key=None,
on_delivery=fail_fast,
)
@@ -420,13 +440,13 @@ class EntityUpdatesWorker(FatcatWorker):
key=key,
type="fatcat_work",
work_ident=ident,
- updated=cle['timestamp'],
- fatcat_changelog_index=cle['index'],
+ updated=cle["timestamp"],
+ fatcat_changelog_index=cle["index"],
)
producer.produce(
self.work_ident_topic,
- json.dumps(work_ident_dict).encode('utf-8'),
- key=key.encode('utf-8'),
+ json.dumps(work_ident_dict).encode("utf-8"),
+ key=key.encode("utf-8"),
on_delivery=fail_fast,
)