aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools')
-rw-r--r--python/fatcat_tools/importers/common.py90
1 files changed, 71 insertions, 19 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 093569e1..11ea6880 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -8,9 +8,9 @@ import sqlite3
import subprocess
import unicodedata
from collections import Counter
+from confluent_kafka import Consumer, KafkaException
import xml.etree.ElementTree as ET
-import pykafka
from bs4 import BeautifulSoup
import fatcat_openapi_client
@@ -706,32 +706,84 @@ class KafkaJsonPusher(RecordPusher):
def run(self):
count = 0
- for msg in self.consumer:
- if not msg:
+ while True:
+ batch = self.consumer.consume(
+ num_messages=self.edit_batch_size,
+ timeout=3.0)
+ print("... got {} kafka messages".format(len(batch)))
+ if not batch:
+ # TODO: could have some larger timeout here and
+ # self.importer.finish() if it's been more than, eg, a couple
+ # minutes
continue
- record = json.loads(msg.value.decode('utf-8'))
- self.importer.push_record(record)
- count += 1
- if count % 500 == 0:
- print("Import counts: {}".format(self.importer.counts))
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+ # ... then process
+ for msg in batch:
+ record = json.loads(msg.value().decode('utf-8'))
+ self.importer.push_record(record)
+ count += 1
+ if count % 500 == 0:
+ print("Import counts: {}".format(self.importer.counts))
+ # locally store the last processed message; will be auto-commited
+ # from this "stored" value
+ assert msg
+ self.consumer.store_offsets(msg)
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
counts = self.importer.finish()
print(counts)
+ self.consumer.close()
return counts
def make_kafka_consumer(hosts, env, topic_suffix, group):
- topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8')
- client = pykafka.KafkaClient(hosts=hosts, broker_version="1.0.0")
- consume_topic = client.topics[topic_name]
- print("Consuming from kafka topic {}, group {}".format(topic_name, group))
-
- consumer = consume_topic.get_balanced_consumer(
- consumer_group=group.encode('utf-8'),
- managed=True,
- auto_commit_enable=True,
- auto_commit_interval_ms=30000, # 30 seconds
- compacted_topic=True,
+ topic_name = "fatcat-{}.{}".format(env, topic_suffix)
+
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ print(p)
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ #print("Kafka consumer commit successful")
+ pass
+
+ # previously, using pykafka
+ #auto_commit_enable=True,
+ #auto_commit_interval_ms=30000, # 30 seconds
+ conf = {
+ 'bootstrap.servers': hosts,
+ 'group.id': group.encode('utf-8'),
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ 'enable.auto.offset.store': False,
+ 'default.topic.config': {
+ 'request.required.acks': -1,
+ 'auto.offset.reset': 'latest',
+ },
+ }
+
+ def on_rebalance(consumer, partitions):
+ print("Kafka partitions rebalanced: {} / {}".format(
+ consumer, partitions))
+
+ consumer = Consumer(conf)
+ # NOTE: it's actually important that topic_name *not* be bytes (UTF-8
+ # encoded)
+ consumer.subscribe([topic_name],
+ on_assign=on_rebalance,
+ on_revoke=on_rebalance,
)
+ print("Consuming from kafka topic {}, group {}".format(topic_name, group))
return consumer