diff options
Diffstat (limited to 'python/fatcat_tools/workers')
-rw-r--r-- | python/fatcat_tools/workers/changelog.py | 256 | ||||
-rw-r--r-- | python/fatcat_tools/workers/elasticsearch.py | 171 | ||||
-rw-r--r-- | python/fatcat_tools/workers/worker_common.py | 32 |
3 files changed, 263 insertions, 196 deletions
diff --git a/python/fatcat_tools/workers/changelog.py b/python/fatcat_tools/workers/changelog.py index a61e364c..1e4cb41d 100644 --- a/python/fatcat_tools/workers/changelog.py +++ b/python/fatcat_tools/workers/changelog.py @@ -1,4 +1,3 @@ - import json import time @@ -16,11 +15,9 @@ class ChangelogWorker(FatcatWorker): """ def __init__(self, api, kafka_hosts, produce_topic, poll_interval=10.0, offset=None): - super().__init__(kafka_hosts=kafka_hosts, - produce_topic=produce_topic, - api=api) + super().__init__(kafka_hosts=kafka_hosts, produce_topic=produce_topic, api=api) self.poll_interval = poll_interval - self.offset = offset # the fatcat changelog offset, not the kafka offset + self.offset = offset # the fatcat changelog offset, not the kafka offset def run(self): @@ -31,7 +28,7 @@ class ChangelogWorker(FatcatWorker): print("Checking for most recent changelog offset...") msg = most_recent_message(self.produce_topic, self.kafka_config) if msg: - self.offset = json.loads(msg.decode('utf-8'))['index'] + self.offset = json.loads(msg.decode("utf-8"))["index"] else: self.offset = 0 print("Most recent changelog index in Kafka seems to be {}".format(self.offset)) @@ -44,28 +41,29 @@ class ChangelogWorker(FatcatWorker): raise KafkaException(err) producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) while True: latest = int(self.api.get_changelog(limit=1)[0].index) if latest > self.offset: - print("Fetching changelogs from {} through {}".format( - self.offset+1, latest)) - for i in range(self.offset+1, latest+1): + print("Fetching changelogs from {} through {}".format(self.offset + 1, latest)) + for i in range(self.offset + 1, latest + 1): cle = self.api.get_changelog_entry(i) obj = self.api.api_client.sanitize_for_serialization(cle) producer.produce( self.produce_topic, - json.dumps(obj).encode('utf-8'), + json.dumps(obj).encode("utf-8"), key=str(i), on_delivery=fail_fast, - #NOTE timestamp could be timestamp=cle.timestamp (?) + # NOTE timestamp could be timestamp=cle.timestamp (?) ) self.offset = i producer.flush() @@ -79,12 +77,19 @@ class EntityUpdatesWorker(FatcatWorker): from API) to update topics. """ - def __init__(self, api, kafka_hosts, consume_topic, release_topic, - file_topic, container_topic, ingest_file_request_topic, - work_ident_topic, poll_interval=5.0): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic, - api=api) + def __init__( + self, + api, + kafka_hosts, + consume_topic, + release_topic, + file_topic, + container_topic, + ingest_file_request_topic, + work_ident_topic, + poll_interval=5.0, + ): + super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic, api=api) self.release_topic = release_topic self.file_topic = file_topic self.container_topic = container_topic @@ -150,7 +155,7 @@ class EntityUpdatesWorker(FatcatWorker): # Transactions of the Japan Society of Mechanical Engineers "10.1299/kikai", # protocols.io - "10.17504/" + "10.17504/", ] def want_live_ingest(self, release, ingest_request): @@ -163,40 +168,40 @@ class EntityUpdatesWorker(FatcatWorker): ingest crawling (via wayback SPN). """ - link_source = ingest_request.get('ingest_request') - ingest_type = ingest_request.get('ingest_type') - doi = ingest_request.get('ext_ids', {}).get('doi') + link_source = ingest_request.get("ingest_request") + ingest_type = ingest_request.get("ingest_type") + doi = ingest_request.get("ext_ids", {}).get("doi") es = release_to_elasticsearch(release) is_document = release.release_type in ( - 'article', - 'article-journal', - 'article-newspaper', - 'book', - 'chapter', - 'editorial', - 'interview', - 'legal_case', - 'legislation', - 'letter', - 'manuscript', - 'paper-conference', - 'patent', - 'peer_review', - 'post', - 'report', - 'retraction', - 'review', - 'review-book', - 'thesis', + "article", + "article-journal", + "article-newspaper", + "book", + "chapter", + "editorial", + "interview", + "legal_case", + "legislation", + "letter", + "manuscript", + "paper-conference", + "patent", + "peer_review", + "post", + "report", + "retraction", + "review", + "review-book", + "thesis", ) is_not_pdf = release.release_type in ( - 'component', - 'dataset', - 'figure', - 'graphic', - 'software', - 'stub', + "component", + "dataset", + "figure", + "graphic", + "software", + "stub", ) # accept list sets a default "crawl it" despite OA metadata for @@ -207,19 +212,23 @@ class EntityUpdatesWorker(FatcatWorker): if doi.startswith(prefix): in_acceptlist = True - if self.ingest_oa_only and link_source not in ('arxiv', 'pmc'): + if self.ingest_oa_only and link_source not in ("arxiv", "pmc"): # most datacite documents are in IRs and should be crawled is_datacite_doc = False - if release.extra and ('datacite' in release.extra) and is_document: + if release.extra and ("datacite" in release.extra) and is_document: is_datacite_doc = True - if not (es['is_oa'] or in_acceptlist or is_datacite_doc): + if not (es["is_oa"] or in_acceptlist or is_datacite_doc): return False # big publishers *generally* have accurate OA metadata, use # preservation networks, and block our crawlers. So unless OA, or # explicitly on accept list, or not preserved, skip crawling - if es.get('publisher_type') == 'big5' and es.get('is_preserved') and not (es['is_oa'] or in_acceptlist): + if ( + es.get("publisher_type") == "big5" + and es.get("is_preserved") + and not (es["is_oa"] or in_acceptlist) + ): return False # if ingest_type is pdf but release_type is almost certainly not a PDF, @@ -233,23 +242,24 @@ class EntityUpdatesWorker(FatcatWorker): return False # figshare - if doi and (doi.startswith('10.6084/') or doi.startswith('10.25384/')): + if doi and (doi.startswith("10.6084/") or doi.startswith("10.25384/")): # don't crawl "most recent version" (aka "group") DOIs if not release.version: return False # zenodo - if doi and doi.startswith('10.5281/'): + if doi and doi.startswith("10.5281/"): # if this is a "grouping" DOI of multiple "version" DOIs, do not crawl (will crawl the versioned DOIs) - if release.extra and release.extra.get('relations'): - for rel in release.extra['relations']: - if (rel.get('relationType') == 'HasVersion' and rel.get('relatedIdentifier', '').startswith('10.5281/')): + if release.extra and release.extra.get("relations"): + for rel in release.extra["relations"]: + if rel.get("relationType") == "HasVersion" and rel.get( + "relatedIdentifier", "" + ).startswith("10.5281/"): return False return True def run(self): - def fail_fast(err, msg): if err is not None: print("Kafka producer delivery error: {}".format(err)) @@ -278,36 +288,40 @@ class EntityUpdatesWorker(FatcatWorker): for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions)) + print("Kafka partitions rebalanced: {} / {}".format(consumer, partitions)) consumer_conf = self.kafka_config.copy() - consumer_conf.update({ - 'group.id': self.consumer_group, - 'on_commit': fail_fast, - # messages don't have offset marked as stored until pushed to - # elastic, but we do auto-commit stored offsets to broker - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - # user code timeout; if no poll after this long, assume user code - # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 180000, - 'default.topic.config': { - 'auto.offset.reset': 'latest', - }, - }) + consumer_conf.update( + { + "group.id": self.consumer_group, + "on_commit": fail_fast, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + "enable.auto.commit": True, + "enable.auto.offset.store": False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + "max.poll.interval.ms": 180000, + "default.topic.config": { + "auto.offset.reset": "latest", + }, + } + ) consumer = Consumer(consumer_conf) producer_conf = self.kafka_config.copy() - producer_conf.update({ - 'delivery.report.only.error': True, - 'default.topic.config': { - 'request.required.acks': -1, # all brokers must confirm - }, - }) + producer_conf.update( + { + "delivery.report.only.error": True, + "default.topic.config": { + "request.required.acks": -1, # all brokers must confirm + }, + } + ) producer = Producer(producer_conf) - consumer.subscribe([self.consume_topic], + consumer.subscribe( + [self.consume_topic], on_assign=on_rebalance, on_revoke=on_rebalance, ) @@ -316,14 +330,16 @@ class EntityUpdatesWorker(FatcatWorker): while True: msg = consumer.poll(self.poll_interval) if not msg: - print("nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval)) + print( + "nothing new from kafka (poll_interval: {} sec)".format(self.poll_interval) + ) continue if msg.error(): raise KafkaException(msg.error()) - cle = json.loads(msg.value().decode('utf-8')) - #print(cle) - print("processing changelog index {}".format(cle['index'])) + cle = json.loads(msg.value().decode("utf-8")) + # print(cle) + print("processing changelog index {}".format(cle["index"])) release_ids = [] new_release_ids = [] file_ids = [] @@ -331,27 +347,27 @@ class EntityUpdatesWorker(FatcatWorker): webcapture_ids = [] container_ids = [] work_ids = [] - release_edits = cle['editgroup']['edits']['releases'] + release_edits = cle["editgroup"]["edits"]["releases"] for re in release_edits: - release_ids.append(re['ident']) + release_ids.append(re["ident"]) # filter to direct release edits which are not updates - if not re.get('prev_revision') and not re.get('redirect_ident'): - new_release_ids.append(re['ident']) - file_edits = cle['editgroup']['edits']['files'] + if not re.get("prev_revision") and not re.get("redirect_ident"): + new_release_ids.append(re["ident"]) + file_edits = cle["editgroup"]["edits"]["files"] for e in file_edits: - file_ids.append(e['ident']) - fileset_edits = cle['editgroup']['edits']['filesets'] + file_ids.append(e["ident"]) + fileset_edits = cle["editgroup"]["edits"]["filesets"] for e in fileset_edits: - fileset_ids.append(e['ident']) - webcapture_edits = cle['editgroup']['edits']['webcaptures'] + fileset_ids.append(e["ident"]) + webcapture_edits = cle["editgroup"]["edits"]["webcaptures"] for e in webcapture_edits: - webcapture_ids.append(e['ident']) - container_edits = cle['editgroup']['edits']['containers'] + webcapture_ids.append(e["ident"]) + container_edits = cle["editgroup"]["edits"]["containers"] for e in container_edits: - container_ids.append(e['ident']) - work_edits = cle['editgroup']['edits']['works'] + container_ids.append(e["ident"]) + work_edits = cle["editgroup"]["edits"]["works"] for e in work_edits: - work_ids.append(e['ident']) + work_ids.append(e["ident"]) # TODO: do these fetches in parallel using a thread pool? for ident in set(file_ids): @@ -363,8 +379,8 @@ class EntityUpdatesWorker(FatcatWorker): file_dict = self.api.api_client.sanitize_for_serialization(file_entity) producer.produce( self.file_topic, - json.dumps(file_dict).encode('utf-8'), - key=ident.encode('utf-8'), + json.dumps(file_dict).encode("utf-8"), + key=ident.encode("utf-8"), on_delivery=fail_fast, ) @@ -385,30 +401,34 @@ class EntityUpdatesWorker(FatcatWorker): container_dict = self.api.api_client.sanitize_for_serialization(container) producer.produce( self.container_topic, - json.dumps(container_dict).encode('utf-8'), - key=ident.encode('utf-8'), + json.dumps(container_dict).encode("utf-8"), + key=ident.encode("utf-8"), on_delivery=fail_fast, ) for ident in set(release_ids): - release = self.api.get_release(ident, expand="files,filesets,webcaptures,container") + release = self.api.get_release( + ident, expand="files,filesets,webcaptures,container" + ) if release.work_id: work_ids.append(release.work_id) release_dict = self.api.api_client.sanitize_for_serialization(release) producer.produce( self.release_topic, - json.dumps(release_dict).encode('utf-8'), - key=ident.encode('utf-8'), + json.dumps(release_dict).encode("utf-8"), + key=ident.encode("utf-8"), on_delivery=fail_fast, ) # for ingest requests, filter to "new" active releases with no matched files if release.ident in new_release_ids: - ir = release_ingest_request(release, ingest_request_source='fatcat-changelog') + ir = release_ingest_request( + release, ingest_request_source="fatcat-changelog" + ) if ir and not release.files and self.want_live_ingest(release, ir): producer.produce( self.ingest_file_request_topic, - json.dumps(ir).encode('utf-8'), - #key=None, + json.dumps(ir).encode("utf-8"), + # key=None, on_delivery=fail_fast, ) @@ -420,13 +440,13 @@ class EntityUpdatesWorker(FatcatWorker): key=key, type="fatcat_work", work_ident=ident, - updated=cle['timestamp'], - fatcat_changelog_index=cle['index'], + updated=cle["timestamp"], + fatcat_changelog_index=cle["index"], ) producer.produce( self.work_ident_topic, - json.dumps(work_ident_dict).encode('utf-8'), - key=key.encode('utf-8'), + json.dumps(work_ident_dict).encode("utf-8"), + key=key.encode("utf-8"), on_delivery=fail_fast, ) diff --git a/python/fatcat_tools/workers/elasticsearch.py b/python/fatcat_tools/workers/elasticsearch.py index f411073d..0d75f964 100644 --- a/python/fatcat_tools/workers/elasticsearch.py +++ b/python/fatcat_tools/workers/elasticsearch.py @@ -1,4 +1,3 @@ - import json import sys @@ -26,12 +25,20 @@ class ElasticsearchReleaseWorker(FatcatWorker): Uses a consumer group to manage offset. """ - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - elasticsearch_release_index="fatcat_releases", - batch_size=200, api_host="https://api.fatcat.wiki/v0", query_stats=False): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic) + def __init__( + self, + kafka_hosts, + consume_topic, + poll_interval=10.0, + offset=None, + elasticsearch_backend="http://localhost:9200", + elasticsearch_index="fatcat", + elasticsearch_release_index="fatcat_releases", + batch_size=200, + api_host="https://api.fatcat.wiki/v0", + query_stats=False, + ): + super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" self.batch_size = batch_size self.poll_interval = poll_interval @@ -63,45 +70,53 @@ class ElasticsearchReleaseWorker(FatcatWorker): print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(p.error) - #print("Kafka consumer commit successful") + # print("Kafka consumer commit successful") pass def on_rebalance(consumer, partitions): for p in partitions: if p.error: raise KafkaException(p.error) - print("Kafka partitions rebalanced: {} / {}".format( - consumer, partitions), file=sys.stderr) + print( + "Kafka partitions rebalanced: {} / {}".format(consumer, partitions), + file=sys.stderr, + ) consumer_conf = self.kafka_config.copy() - consumer_conf.update({ - 'group.id': self.consumer_group, - 'on_commit': fail_fast, - # messages don't have offset marked as stored until pushed to - # elastic, but we do auto-commit stored offsets to broker - 'enable.auto.commit': True, - 'enable.auto.offset.store': False, - # user code timeout; if no poll after this long, assume user code - # hung and rebalance (default: 5min) - 'max.poll.interval.ms': 60000, - 'default.topic.config': { - 'auto.offset.reset': 'latest', - }, - }) + consumer_conf.update( + { + "group.id": self.consumer_group, + "on_commit": fail_fast, + # messages don't have offset marked as stored until pushed to + # elastic, but we do auto-commit stored offsets to broker + "enable.auto.commit": True, + "enable.auto.offset.store": False, + # user code timeout; if no poll after this long, assume user code + # hung and rebalance (default: 5min) + "max.poll.interval.ms": 60000, + "default.topic.config": { + "auto.offset.reset": "latest", + }, + } + ) consumer = Consumer(consumer_conf) - consumer.subscribe([self.consume_topic], + consumer.subscribe( + [self.consume_topic], on_assign=on_rebalance, on_revoke=on_rebalance, ) while True: - batch = consumer.consume( - num_messages=self.batch_size, - timeout=self.poll_interval) + batch = consumer.consume(num_messages=self.batch_size, timeout=self.poll_interval) if not batch: if not consumer.assignment(): print("... no Kafka consumer partitions assigned yet", file=sys.stderr) - print("... nothing new from kafka, try again (interval: {}".format(self.poll_interval), file=sys.stderr) + print( + "... nothing new from kafka, try again (interval: {}".format( + self.poll_interval + ), + file=sys.stderr, + ) continue print("... got {} kafka messages".format(len(batch)), file=sys.stderr) # first check errors on entire batch... @@ -111,19 +126,24 @@ class ElasticsearchReleaseWorker(FatcatWorker): # ... then process bulk_actions = [] for msg in batch: - json_str = msg.value().decode('utf-8') + json_str = msg.value().decode("utf-8") entity = entity_from_json(json_str, self.entity_type, api_client=ac) assert isinstance(entity, self.entity_type) if self.entity_type == ChangelogEntry: key = entity.index # might need to fetch from API - if not (entity.editgroup and entity.editgroup.editor): # pylint: disable=no-member # (TODO) + if not ( + entity.editgroup and entity.editgroup.editor + ): # pylint: disable=no-member # (TODO) entity = api.get_changelog_entry(entity.index) else: key = entity.ident # pylint: disable=no-member # (TODO) - if self.entity_type != ChangelogEntry and entity.state == 'wip': - print(f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}", file=sys.stderr) + if self.entity_type != ChangelogEntry and entity.state == "wip": + print( + f"WARNING: skipping state=wip entity: {self.entity_type.__name__} {entity.ident}", + file=sys.stderr, + ) continue if self.entity_type == ContainerEntity and self.query_stats: @@ -138,9 +158,15 @@ class ElasticsearchReleaseWorker(FatcatWorker): doc_dict = self.transform_func(entity) # TODO: handle deletions from index - bulk_actions.append(json.dumps({ - "index": { "_id": key, }, - })) + bulk_actions.append( + json.dumps( + { + "index": { + "_id": key, + }, + } + ) + ) bulk_actions.append(json.dumps(doc_dict)) # if only WIP entities, then skip @@ -149,15 +175,22 @@ class ElasticsearchReleaseWorker(FatcatWorker): consumer.store_offsets(message=msg) continue - print("Upserting, eg, {} (of {} {} in elasticsearch)".format(key, len(batch), self.entity_type.__name__), file=sys.stderr) + print( + "Upserting, eg, {} (of {} {} in elasticsearch)".format( + key, len(batch), self.entity_type.__name__ + ), + file=sys.stderr, + ) elasticsearch_endpoint = "{}/{}/_bulk".format( - self.elasticsearch_backend, - self.elasticsearch_index) - resp = requests.post(elasticsearch_endpoint, + self.elasticsearch_backend, self.elasticsearch_index + ) + resp = requests.post( + elasticsearch_endpoint, headers={"Content-Type": "application/x-ndjson"}, - data="\n".join(bulk_actions) + "\n") + data="\n".join(bulk_actions) + "\n", + ) resp.raise_for_status() - if resp.json()['errors']: + if resp.json()["errors"]: desc = "Elasticsearch errors from post to {}:".format(elasticsearch_endpoint) print(desc, file=sys.stderr) print(resp.content, file=sys.stderr) @@ -169,20 +202,29 @@ class ElasticsearchReleaseWorker(FatcatWorker): class ElasticsearchContainerWorker(ElasticsearchReleaseWorker): - - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - query_stats=False, elasticsearch_release_index="fatcat_release", - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat", - batch_size=200): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic, - poll_interval=poll_interval, - offset=offset, - elasticsearch_backend=elasticsearch_backend, - elasticsearch_index=elasticsearch_index, - elasticsearch_release_index=elasticsearch_release_index, - query_stats=query_stats, - batch_size=batch_size) + def __init__( + self, + kafka_hosts, + consume_topic, + poll_interval=10.0, + offset=None, + query_stats=False, + elasticsearch_release_index="fatcat_release", + elasticsearch_backend="http://localhost:9200", + elasticsearch_index="fatcat", + batch_size=200, + ): + super().__init__( + kafka_hosts=kafka_hosts, + consume_topic=consume_topic, + poll_interval=poll_interval, + offset=offset, + elasticsearch_backend=elasticsearch_backend, + elasticsearch_index=elasticsearch_index, + elasticsearch_release_index=elasticsearch_release_index, + query_stats=query_stats, + batch_size=batch_size, + ) # previous group got corrupted (by pykafka library?) self.consumer_group = "elasticsearch-updates3" self.entity_type = ContainerEntity @@ -196,11 +238,18 @@ class ElasticsearchChangelogWorker(ElasticsearchReleaseWorker): Note: Very early versions of changelog entries did not contain details about the editor or extra fields. """ - def __init__(self, kafka_hosts, consume_topic, poll_interval=10.0, offset=None, - elasticsearch_backend="http://localhost:9200", elasticsearch_index="fatcat_changelog", - batch_size=200): - super().__init__(kafka_hosts=kafka_hosts, - consume_topic=consume_topic) + + def __init__( + self, + kafka_hosts, + consume_topic, + poll_interval=10.0, + offset=None, + elasticsearch_backend="http://localhost:9200", + elasticsearch_index="fatcat_changelog", + batch_size=200, + ): + super().__init__(kafka_hosts=kafka_hosts, consume_topic=consume_topic) self.consumer_group = "elasticsearch-updates3" self.batch_size = batch_size self.poll_interval = poll_interval diff --git a/python/fatcat_tools/workers/worker_common.py b/python/fatcat_tools/workers/worker_common.py index 8c2936be..baec44f4 100644 --- a/python/fatcat_tools/workers/worker_common.py +++ b/python/fatcat_tools/workers/worker_common.py @@ -1,4 +1,3 @@ - from confluent_kafka import Consumer, KafkaException, TopicPartition @@ -13,22 +12,21 @@ def most_recent_message(topic, kafka_config): print("Fetching most Kafka message from {}".format(topic)) conf = kafka_config.copy() - conf.update({ - 'group.id': 'worker-init-last-msg', # should never commit - 'delivery.report.only.error': True, - 'enable.auto.commit': False, - 'default.topic.config': { - 'request.required.acks': -1, - 'auto.offset.reset': 'latest', - }, - }) + conf.update( + { + "group.id": "worker-init-last-msg", # should never commit + "delivery.report.only.error": True, + "enable.auto.commit": False, + "default.topic.config": { + "request.required.acks": -1, + "auto.offset.reset": "latest", + }, + } + ) consumer = Consumer(conf) - hwm = consumer.get_watermark_offsets( - TopicPartition(topic, 0), - timeout=5.0, - cached=False) + hwm = consumer.get_watermark_offsets(TopicPartition(topic, 0), timeout=5.0, cached=False) if not hwm: raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(topic)) print("High watermarks: {}".format(hwm)) @@ -37,7 +35,7 @@ def most_recent_message(topic, kafka_config): print("topic is new; not 'most recent message'") return None - consumer.assign([TopicPartition(topic, 0, hwm[1]-1)]) + consumer.assign([TopicPartition(topic, 0, hwm[1] - 1)]) msg = consumer.poll(2.0) consumer.close() if not msg: @@ -56,8 +54,8 @@ class FatcatWorker: if api: self.api = api self.kafka_config = { - 'bootstrap.servers': kafka_hosts, - 'message.max.bytes': 20000000, # ~20 MBytes; broker-side max is ~50 MBytes + "bootstrap.servers": kafka_hosts, + "message.max.bytes": 20000000, # ~20 MBytes; broker-side max is ~50 MBytes } self.produce_topic = produce_topic self.consume_topic = consume_topic |