summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/workers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r--python/fatcat_tools/workers/changelog.py256
-rw-r--r--python/fatcat_tools/workers/elasticsearch.py171
-rw-r--r--python/fatcat_tools/workers/worker_common.py32
3 files changed, 263 insertions, 196 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py
index a61e364c..1e4cb41d 100644
--- a/python/fatcat_tools/workers/changelog.py
+++ b/python/fatcat_tools/workers/changelog.py
@@ -1,4 +1,3 @@
-
import json
import time
@@ -16,11 +15,9 @@ class ChangelogWorker(FatcatWorker):
"""
def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None):
- super().__init__(kafka_hosts=kafka_hosts,
- produce_topic=produce_topic,
- api=api)
+ super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api)
self.poll_interval = poll_interval
- self.offset = offset # the fatcat changelog offset, not the kafka offset
+ self.offset = offset # the fatcat changelog offset, not the kafka offset
def run(self):
@@ -31,7 +28,7 @@ class ChangelogWorker(FatcatWorker):
print("Checking for most recent changelog offset...")
msg = most_recent_message(self.produce_topic, self.kafka_config)
if msg:
- self.offset = json.loads(msg.decode('utf-8'))['index']
+ self.offset = json.loads(msg.decode("utf-8"))["index"]
else:
self.offset = 0
print("Most recent changelog index in Kafka seems to be {}".format(self.offset))
@@ -44,28 +41,29 @@ class ChangelogWorker(FatcatWorker):
raise KafkaException(err)
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
while True:
latest = int(self.api.get_changelog(limit=1)[0].index)
if latest > self.offset:
- print("Fetching changelogs from {} through {}".format(
- self.offset+1, latest))
- for i in range(self.offset+1, latest+1):
+ print("Fetching changelogs from {} through {}".format(self.offset + 1, latest))
+ for i in range(self.offset + 1, latest + 1):
cle = self.api.get_changelog_entry(i)
obj = self.api.api_client.sanitize_for_serialization(cle)
producer.produce(
self.produce_topic,
- json.dumps(obj).encode('utf-8'),
+ json.dumps(obj).encode("utf-8"),
key=str(i),
on_delivery=fail_fast,
- #NOTE timestamp could be timestamp=cle.timestamp (?)
+ # NOTE timestamp could be timestamp=cle.timestamp (?)
)
self.offset = i
producer.flush()
@@ -79,12 +77,19 @@ class EntityUpdatesWorker(FatcatWorker):
from API) to update topics.
"""
- def __init__(self, api, kafka_hosts, consume_topic, release_topic,
- file_topic, container_topic, ingest_file_request_topic,
- work_ident_topic, poll_interval=5.0):
- super().__init__(kafka_hosts=kafka_hosts,
- consume_topic=consume_topic,
- api=api)
+ def __init__(
+ self,
+ api,
+ kafka_hosts,
+ consume_topic,
+ release_topic,
+ file_topic,
+ container_topic,
+ ingest_file_request_topic,
+ work_ident_topic,
+ poll_interval=5.0,
+ ):
+ super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api)
self.release_topic = release_topic
self.file_topic = file_topic
self.container_topic = container_topic
@@ -150,7 +155,7 @@ class EntityUpdatesWorker(FatcatWorker):
# Transactions of the Japan Society of Mechanical Engineers
"10.1299/kikai",
# protocols.io
- "10.17504/"
+ "10.17504/",
]
def want_live_ingest(self, release, ingest_request):
@@ -163,40 +168,40 @@ class EntityUpdatesWorker(FatcatWorker):
ingest crawling (via wayback SPN).
"""
- link_source = ingest_request.get('ingest_request')
- ingest_type = ingest_request.get('ingest_type')
- doi = ingest_request.get('ext_ids', {}).get('doi')
+ link_source = ingest_request.get("ingest_request")
+ ingest_type = ingest_request.get("ingest_type")
+ doi = ingest_request.get("ext_ids", {}).get("doi")
es = release_to_elasticsearch(release)
is_document = release.release_type in (
- 'article',
- 'article-journal',
- 'article-newspaper',
- 'book',
- 'chapter',
- 'editorial',
- 'interview',
- 'legal_case',
- 'legislation',
- 'letter',
- 'manuscript',
- 'paper-conference',
- 'patent',
- 'peer_review',
- 'post',
- 'report',
- 'retraction',
- 'review',
- 'review-book',
- 'thesis',
+ "article",
+ "article-journal",
+ "article-newspaper",
+ "book",
+ "chapter",
+ "editorial",
+ "interview",
+ "legal_case",
+ "legislation",
+ "letter",
+ "manuscript",
+ "paper-conference",
+ "patent",
+ "peer_review",
+ "post",
+ "report",
+ "retraction",
+ "review",
+ "review-book",
+ "thesis",
)
is_not_pdf = release.release_type in (
- 'component',
- 'dataset',
- 'figure',
- 'graphic',
- 'software',
- 'stub',
+ "component",
+ "dataset",
+ "figure",
+ "graphic",
+ "software",
+ "stub",
)
# accept list sets a default "crawl it" despite OA metadata for
@@ -207,19 +212,23 @@ class EntityUpdatesWorker(FatcatWorker):
if doi.startswith(prefix):
in_acceptlist = True
- if self.ingest_oa_only and link_source not in ('arxiv', 'pmc'):
+ if self.ingest_oa_only and link_source not in ("arxiv", "pmc"):
# most datacite documents are in IRs and should be crawled
is_datacite_doc = False
- if release.extra and ('datacite' in release.extra) and is_document:
+ if release.extra and ("datacite" in release.extra) and is_document:
is_datacite_doc = True
- if not (es['is_oa'] or in_acceptlist or is_datacite_doc):
+ if not (es["is_oa"] or in_acceptlist or is_datacite_doc):
return False
# big publishers *generally* have accurate OA metadata, use
# preservation networks, and block our crawlers. So unless OA, or
# explicitly on accept list, or not preserved, skip crawling
- if es.get('publisher_type') == 'big5' and es.get('is_preserved') and not (es['is_oa'] or in_acceptlist):
+ if (
+ es.get("publisher_type") == "big5"
+ and es.get("is_preserved")
+ and not (es["is_oa"] or in_acceptlist)
+ ):
return False
# if ingest_type is pdf but release_type is almost certainly not a PDF,
@@ -233,23 +242,24 @@ class EntityUpdatesWorker(FatcatWorker):
return False
# figshare
- if doi and (doi.startswith('10.6084/') or doi.startswith('10.25384/')):
+ if doi and (doi.startswith("10.6084/") or doi.startswith("10.25384/")):
# don't crawl "most recent version" (aka "group") DOIs
if not release.version:
return False
# zenodo
- if doi and doi.startswith('10.5281/'):
+ if doi and doi.startswith("10.5281/"):
# if this is a "grouping" DOI of multiple "version" DOIs, do not crawl (will crawl the versioned DOIs)
- if release.extra and release.extra.get('relations'):
- for rel in release.extra['relations']:
- if (rel.get('relationType') == 'HasVersion' and rel.get('relatedIdentifier', '').startswith('10.5281/')):
+ if release.extra and release.extra.get("relations"):
+ for rel in release.extra["relations"]:
+ if rel.get("relationType") == "HasVersion" and rel.get(
+ "relatedIdentifier", ""
+ ).startswith("10.5281/"):
return False
return True
def run(self):
-
def fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err))
@@ -278,36 +288,40 @@ class EntityUpdatesWorker(FatcatWorker):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions))
+ print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions))
consumer_conf = self.kafka_config.copy()
- consumer_conf.update({
- 'group.id': self.consumer_group,
- 'on_commit': fail_fast,
- # messages don't have offset marked as stored until pushed to
- # elastic, but we do auto-commit stored offsets to broker
- 'enable.auto.commit': True,
- 'enable.auto.offset.store': False,
- # user code timeout; if no poll after this long, assume user code
- # hung and rebalance (default: 5min)
- 'max.poll.interval.ms': 180000,
- 'default.topic.config': {
- 'auto.offset.reset': 'latest',
- },
- })
+ consumer_conf.update(
+ {
+ "group.id": self.consumer_group,
+ "on_commit": fail_fast,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ "enable.auto.commit": True,
+ "enable.auto.offset.store": False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ "max.poll.interval.ms": 180000,
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
consumer = Consumer(consumer_conf)
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
- consumer.subscribe([self.consume_topic],
+ consumer.subscribe(
+ [self.consume_topic],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)
@@ -316,14 +330,16 @@ class EntityUpdatesWorker(FatcatWorker):
while True:
msg = consumer.poll(self.poll_interval)
if not msg:
- print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval))
+ print(
+ "nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval)
+ )
continue
if msg.error():
raise KafkaException(msg.error())
- cle = json.loads(msg.value().decode('utf-8'))
- #print(cle)
- print("processing changelog index {}".format(cle['index']))
+ cle = json.loads(msg.value().decode("utf-8"))
+ # print(cle)
+ print("processing changelog index {}".format(cle["index"]))
release_ids = []
new_release_ids = []
file_ids = []
@@ -331,27 +347,27 @@ class EntityUpdatesWorker(FatcatWorker):
webcapture_ids = []
container_ids = []
work_ids = []
- release_edits = cle['editgroup']['edits']['releases']
+ release_edits = cle["editgroup"]["edits"]["releases"]
for re in release_edits:
- release_ids.append(re['ident'])
+ release_ids.append(re["ident"])
# filter to direct release edits which are not updates
- if not re.get('prev_revision') and not re.get('redirect_ident'):
- new_release_ids.append(re['ident'])
- file_edits = cle['editgroup']['edits']['files']
+ if not re.get("prev_revision") and not re.get("redirect_ident"):
+ new_release_ids.append(re["ident"])
+ file_edits = cle["editgroup"]["edits"]["files"]
for e in file_edits:
- file_ids.append(e['ident'])
- fileset_edits = cle['editgroup']['edits']['filesets']
+ file_ids.append(e["ident"])
+ fileset_edits = cle["editgroup"]["edits"]["filesets"]
for e in fileset_edits:
- fileset_ids.append(e['ident'])
- webcapture_edits = cle['editgroup']['edits']['webcaptures']
+ fileset_ids.append(e["ident"])
+ webcapture_edits = cle["editgroup"]["edits"]["webcaptures"]
for e in webcapture_edits:
- webcapture_ids.append(e['ident'])
- container_edits = cle['editgroup']['edits']['containers']
+ webcapture_ids.append(e["ident"])
+ container_edits = cle["editgroup"]["edits"]["containers"]
for e in container_edits:
- container_ids.append(e['ident'])
- work_edits = cle['editgroup']['edits']['works']
+ container_ids.append(e["ident"])
+ work_edits = cle["editgroup"]["edits"]["works"]
for e in work_edits:
- work_ids.append(e['ident'])
+ work_ids.append(e["ident"])
# TODO: do these fetches in parallel using a thread pool?
for ident in set(file_ids):
@@ -363,8 +379,8 @@ class EntityUpdatesWorker(FatcatWorker):
file_dict = self.api.api_client.sanitize_for_serialization(file_entity)
producer.produce(
self.file_topic,
- json.dumps(file_dict).encode('utf-8'),
- key=ident.encode('utf-8'),
+ json.dumps(file_dict).encode("utf-8"),
+ key=ident.encode("utf-8"),
on_delivery=fail_fast,
)
@@ -385,30 +401,34 @@ class EntityUpdatesWorker(FatcatWorker):
container_dict = self.api.api_client.sanitize_for_serialization(container)
producer.produce(
self.container_topic,
- json.dumps(container_dict).encode('utf-8'),
- key=ident.encode('utf-8'),
+ json.dumps(container_dict).encode("utf-8"),
+ key=ident.encode("utf-8"),
on_delivery=fail_fast,
)
for ident in set(release_ids):
- release = self.api.get_release(ident, expand="files,filesets,webcaptures,container")
+ release = self.api.get_release(
+ ident, expand="files,filesets,webcaptures,container"
+ )
if release.work_id:
work_ids.append(release.work_id)
release_dict = self.api.api_client.sanitize_for_serialization(release)
producer.produce(
self.release_topic,
- json.dumps(release_dict).encode('utf-8'),
- key=ident.encode('utf-8'),
+ json.dumps(release_dict).encode("utf-8"),
+ key=ident.encode("utf-8"),
on_delivery=fail_fast,
)
# for ingest requests, filter to "new" active releases with no matched files
if release.ident in new_release_ids:
- ir = release_ingest_request(release, ingest_request_source='fatcat-changelog')
+ ir = release_ingest_request(
+ release, ingest_request_source="fatcat-changelog"
+ )
if ir and not release.files and self.want_live_ingest(release, ir):
producer.produce(
self.ingest_file_request_topic,
- json.dumps(ir).encode('utf-8'),
- #key=None,
+ json.dumps(ir).encode("utf-8"),
+ # key=None,
on_delivery=fail_fast,
)
@@ -420,13 +440,13 @@ class EntityUpdatesWorker(FatcatWorker):
key=key,
type="fatcat_work",
work_ident=ident,
- updated=cle['timestamp'],
- fatcat_changelog_index=cle['index'],
+ updated=cle["timestamp"],
+ fatcat_changelog_index=cle["index"],
)
producer.produce(
self.work_ident_topic,
- json.dumps(work_ident_dict).encode('utf-8'),
- key=key.encode('utf-8'),
+ json.dumps(work_ident_dict).encode("utf-8"),
+ key=key.encode("utf-8"),
on_delivery=fail_fast,
)
diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py
index f411073d..0d75f964 100644
--- a/python/fatcat_tools/workers/elasticsearch.py
+++ b/python/fatcat_tools/workers/elasticsearch.py
@@ -1,4 +1,3 @@
-
import json
import sys
@@ -26,12 +25,20 @@ class ElasticsearchReleaseWorker(FatcatWorker):
Uses a consumer group to manage offset.
"""
- def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
- elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
- elasticsearch_release_index="fatcat_releases",
- batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False):
- super().__init__(kafka_hosts=kafka_hosts,
- consume_topic=consume_topic)
+ def __init__(
+ self,
+ kafka_hosts,
+ consume_topic,
+ poll_interval=10.0,
+ offset=None,
+ elasticsearch_backend="http://localhost:9200",
+ elasticsearch_index="fatcat",
+ elasticsearch_release_index="fatcat_releases",
+ batch_size=200,
+ api_host="https://api.fatcat.wiki/v0",
+ query_stats=False,
+ ):
+ super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
self.poll_interval = poll_interval
@@ -63,45 +70,53 @@ class ElasticsearchReleaseWorker(FatcatWorker):
print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(p.error)
- #print("Kafka consumer commit successful")
+ # print("Kafka consumer commit successful")
pass
def on_rebalance(consumer, partitions):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- print("Kafka partitions rebalanced: {} / {}".format(
- consumer, partitions), file=sys.stderr)
+ print(
+ "Kafka partitions rebalanced: {} / {}".format(consumer, partitions),
+ file=sys.stderr,
+ )
consumer_conf = self.kafka_config.copy()
- consumer_conf.update({
- 'group.id': self.consumer_group,
- 'on_commit': fail_fast,
- # messages don't have offset marked as stored until pushed to
- # elastic, but we do auto-commit stored offsets to broker
- 'enable.auto.commit': True,
- 'enable.auto.offset.store': False,
- # user code timeout; if no poll after this long, assume user code
- # hung and rebalance (default: 5min)
- 'max.poll.interval.ms': 60000,
- 'default.topic.config': {
- 'auto.offset.reset': 'latest',
- },
- })
+ consumer_conf.update(
+ {
+ "group.id": self.consumer_group,
+ "on_commit": fail_fast,
+ # messages don't have offset marked as stored until pushed to
+ # elastic, but we do auto-commit stored offsets to broker
+ "enable.auto.commit": True,
+ "enable.auto.offset.store": False,
+ # user code timeout; if no poll after this long, assume user code
+ # hung and rebalance (default: 5min)
+ "max.poll.interval.ms": 60000,
+ "default.topic.config": {
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
consumer = Consumer(consumer_conf)
- consumer.subscribe([self.consume_topic],
+ consumer.subscribe(
+ [self.consume_topic],
on_assign=on_rebalance,
on_revoke=on_rebalance,
)
while True:
- batch = consumer.consume(
- num_messages=self.batch_size,
- timeout=self.poll_interval)
+ batch = consumer.consume(num_messages=self.batch_size, timeout=self.poll_interval)
if not batch:
if not consumer.assignment():
print("... no Kafka consumer partitions assigned yet", file=sys.stderr)
- print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval), file=sys.stderr)
+ print(
+ "... nothing new from kafka, try again (interval: {}".format(
+ self.poll_interval
+ ),
+ file=sys.stderr,
+ )
continue
print("... got {} kafka messages".format(len(batch)), file=sys.stderr)
# first check errors on entire batch...
@@ -111,19 +126,24 @@ class ElasticsearchReleaseWorker(FatcatWorker):
# ... then process
bulk_actions = []
for msg in batch:
- json_str = msg.value().decode('utf-8')
+ json_str = msg.value().decode("utf-8")
entity = entity_from_json(json_str, self.entity_type, api_client=ac)
assert isinstance(entity, self.entity_type)
if self.entity_type == ChangelogEntry:
key = entity.index
# might need to fetch from API
- if not (entity.editgroup and entity.editgroup.editor): # pylint: disable=no-member # (TODO)
+ if not (
+ entity.editgroup and entity.editgroup.editor
+ ): # pylint: disable=no-member # (TODO)
entity = api.get_changelog_entry(entity.index)
else:
key = entity.ident # pylint: disable=no-member # (TODO)
- if self.entity_type != ChangelogEntry and entity.state == 'wip':
- print(f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}", file=sys.stderr)
+ if self.entity_type != ChangelogEntry and entity.state == "wip":
+ print(
+ f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}",
+ file=sys.stderr,
+ )
continue
if self.entity_type == ContainerEntity and self.query_stats:
@@ -138,9 +158,15 @@ class ElasticsearchReleaseWorker(FatcatWorker):
doc_dict = self.transform_func(entity)
# TODO: handle deletions from index
- bulk_actions.append(json.dumps({
- "index": { "_id": key, },
- }))
+ bulk_actions.append(
+ json.dumps(
+ {
+ "index": {
+ "_id": key,
+ },
+ }
+ )
+ )
bulk_actions.append(json.dumps(doc_dict))
# if only WIP entities, then skip
@@ -149,15 +175,22 @@ class ElasticsearchReleaseWorker(FatcatWorker):
consumer.store_offsets(message=msg)
continue
- print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__), file=sys.stderr)
+ print(
+ "Upserting, eg, {} (of {} {} in elasticsearch)".format(
+ key, len(batch), self.entity_type.__name__
+ ),
+ file=sys.stderr,
+ )
elasticsearch_endpoint = "{}/{}/_bulk".format(
- self.elasticsearch_backend,
- self.elasticsearch_index)
- resp = requests.post(elasticsearch_endpoint,
+ self.elasticsearch_backend, self.elasticsearch_index
+ )
+ resp = requests.post(
+ elasticsearch_endpoint,
headers={"Content-Type": "application/x-ndjson"},
- data="\n".join(bulk_actions) + "\n")
+ data="\n".join(bulk_actions) + "\n",
+ )
resp.raise_for_status()
- if resp.json()['errors']:
+ if resp.json()["errors"]:
desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint)
print(desc, file=sys.stderr)
print(resp.content, file=sys.stderr)
@@ -169,20 +202,29 @@ class ElasticsearchReleaseWorker(FatcatWorker):
class ElasticsearchContainerWorker(ElasticsearchReleaseWorker):
-
- def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
- query_stats=False, elasticsearch_release_index="fatcat_release",
- elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat",
- batch_size=200):
- super().__init__(kafka_hosts=kafka_hosts,
- consume_topic=consume_topic,
- poll_interval=poll_interval,
- offset=offset,
- elasticsearch_backend=elasticsearch_backend,
- elasticsearch_index=elasticsearch_index,
- elasticsearch_release_index=elasticsearch_release_index,
- query_stats=query_stats,
- batch_size=batch_size)
+ def __init__(
+ self,
+ kafka_hosts,
+ consume_topic,
+ poll_interval=10.0,
+ offset=None,
+ query_stats=False,
+ elasticsearch_release_index="fatcat_release",
+ elasticsearch_backend="http://localhost:9200",
+ elasticsearch_index="fatcat",
+ batch_size=200,
+ ):
+ super().__init__(
+ kafka_hosts=kafka_hosts,
+ consume_topic=consume_topic,
+ poll_interval=poll_interval,
+ offset=offset,
+ elasticsearch_backend=elasticsearch_backend,
+ elasticsearch_index=elasticsearch_index,
+ elasticsearch_release_index=elasticsearch_release_index,
+ query_stats=query_stats,
+ batch_size=batch_size,
+ )
# previous group got corrupted (by pykafka library?)
self.consumer_group = "elasticsearch-updates3"
self.entity_type = ContainerEntity
@@ -196,11 +238,18 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker):
Note: Very early versions of changelog entries did not contain details
about the editor or extra fields.
"""
- def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None,
- elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog",
- batch_size=200):
- super().__init__(kafka_hosts=kafka_hosts,
- consume_topic=consume_topic)
+
+ def __init__(
+ self,
+ kafka_hosts,
+ consume_topic,
+ poll_interval=10.0,
+ offset=None,
+ elasticsearch_backend="http://localhost:9200",
+ elasticsearch_index="fatcat_changelog",
+ batch_size=200,
+ ):
+ super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic)
self.consumer_group = "elasticsearch-updates3"
self.batch_size = batch_size
self.poll_interval = poll_interval
diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py
index 8c2936be..baec44f4 100644
--- a/python/fatcat_tools/workers/worker_common.py
+++ b/python/fatcat_tools/workers/worker_common.py
@@ -1,4 +1,3 @@
-
from confluent_kafka import Consumer, KafkaException, TopicPartition
@@ -13,22 +12,21 @@ def most_recent_message(topic, kafka_config):
print("Fetching most Kafka message from {}".format(topic))
conf = kafka_config.copy()
- conf.update({
- 'group.id': 'worker-init-last-msg', # should never commit
- 'delivery.report.only.error': True,
- 'enable.auto.commit': False,
- 'default.topic.config': {
- 'request.required.acks': -1,
- 'auto.offset.reset': 'latest',
- },
- })
+ conf.update(
+ {
+ "group.id": "worker-init-last-msg", # should never commit
+ "delivery.report.only.error": True,
+ "enable.auto.commit": False,
+ "default.topic.config": {
+ "request.required.acks": -1,
+ "auto.offset.reset": "latest",
+ },
+ }
+ )
consumer = Consumer(conf)
- hwm = consumer.get_watermark_offsets(
- TopicPartition(topic, 0),
- timeout=5.0,
- cached=False)
+ hwm = consumer.get_watermark_offsets(TopicPartition(topic, 0), timeout=5.0, cached=False)
if not hwm:
raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic))
print("High watermarks: {}".format(hwm))
@@ -37,7 +35,7 @@ def most_recent_message(topic, kafka_config):
print("topic is new; not 'most recent message'")
return None
- consumer.assign([TopicPartition(topic, 0, hwm[1]-1)])
+ consumer.assign([TopicPartition(topic, 0, hwm[1] - 1)])
msg = consumer.poll(2.0)
consumer.close()
if not msg:
@@ -56,8 +54,8 @@ class FatcatWorker:
if api:
self.api = api
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker-side max is ~50 MBytes
}
self.produce_topic = produce_topic
self.consume_topic = consume_topic