aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@archive.org>2018-12-03 18:15:29 -0800
committerBryan Newbold <bnewbold@archive.org>2018-12-03 18:15:29 -0800
commitd3901d21037ee740c0273cadf67d9783b420b029 (patch)
treea7017246b84f90964c1c9e38fe996f9c251ce618 /python
parent255f76dcbe15eaa9f032f26c19a6f28b4690d204 (diff)
downloadsandcrawler-d3901d21037ee740c0273cadf67d9783b420b029.tar.gz
sandcrawler-d3901d21037ee740c0273cadf67d9783b420b029.zip
tweak kafka config significantly
Diffstat (limited to 'python')
-rwxr-xr-xpython/kafka_grobid.py19
-rwxr-xr-xpython/kafka_grobid_hbase.py2
2 files changed, 18 insertions, 3 deletions
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
index f3aaedf..fc4e461 100755
--- a/python/kafka_grobid.py
+++ b/python/kafka_grobid.py
@@ -69,6 +69,7 @@ class KafkaGrobidWorker:
self.warc_uri_prefix = kwargs.get('warc_uri_prefix')
self.mime_filter = ['application/pdf']
self.rstore = None
+ self.produce_max_request_size = 20000000 # Kafka producer batch size tuning; also limit on size of single extracted document
def grobid_process_fulltext(self, content):
r = requests.post(self.grobid_uri + "/api/processFulltextDocument",
@@ -243,14 +244,26 @@ class KafkaGrobidWorker:
consume_topic = kafka.topics[self.consume_topic]
sequential_failures = 0
- with produce_topic.get_producer(compression=pykafka.common.CompressionType.GZIP, sync=False) as producer:
+ # Configure producer to basically *immediately* publish messages,
+ # one-at-a-time, but asynchronously (don't block). Fetch and GROBID
+ # process takes a while, and we want to send as soon as processing is
+ # done.
+ with produce_topic.get_producer(sync=False,
+ compression=pykafka.common.CompressionType.GZIP,
+ retry_backoff_ms=250,
+ max_queued_messages=20,
+ min_queued_messages=1,
+ linger_ms=0,
+ max_request_size=self.produce_max_request_size) as producer:
print("Producing to: {}".format(self.produce_topic))
consumer = consume_topic.get_balanced_consumer(
consumer_group=self.consumer_group,
managed=True,
- #fetch_message_max_bytes=100000, # only ~100kbytes at a time
auto_commit_enable=True,
- auto_commit_interval_ms=60000, # 60 seconds
+ auto_commit_interval_ms=30000, # 30 seconds
+ # LATEST because best to miss processing than waste time re-process
+ auto_offset_reset=pykafka.common.OffsetType.LATEST,
+ queued_max_messages=20,
compacted_topic=True)
print("Consuming from: {} as {}".format(self.consume_topic, self.consumer_group))
sys.stdout.flush()
diff --git a/python/kafka_grobid_hbase.py b/python/kafka_grobid_hbase.py
index 7c5c4a2..466ccb6 100755
--- a/python/kafka_grobid_hbase.py
+++ b/python/kafka_grobid_hbase.py
@@ -145,6 +145,8 @@ class KafkaGrobidHbaseWorker:
consumer_group=self.consumer_group,
managed=True,
auto_commit_enable=True,
+ # LATEST because best to miss processing than waste time re-process
+ auto_offset_reset=pykafka.common.OffsetType.LATEST,
compacted_topic=True)
print("Kafka consuming {} in group {}".format(
self.consume_topic,