aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/fatcat_import.py5
-rw-r--r--python/fatcat_tools/importers/common.py91
2 files changed, 75 insertions, 21 deletions
diff --git a/python/fatcat_import.py b/python/fatcat_import.py
index aea8c757..c612d605 100755
--- a/python/fatcat_import.py
+++ b/python/fatcat_import.py
@@ -12,7 +12,8 @@ def run_crossref(args):
edit_batch_size=args.batch_size,
bezerk_mode=args.bezerk_mode)
if args.kafka_mode:
- KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref", "fatcat-import").run()
+ KafkaJsonPusher(fci, args.kafka_hosts, args.kafka_env, "api-crossref",
+ "fatcat-import", edit_batch_size=args.batch_size).run()
else:
JsonLinePusher(fci, args.json_file).run()
@@ -93,7 +94,7 @@ def main():
default="localhost:9092",
help="list of Kafka brokers (host/port) to use")
parser.add_argument('--kafka-env',
- default="qa",
+ default="dev",
help="Kafka topic namespace to use (eg, prod, qa)")
parser.add_argument('--batch-size',
help="size of batch to send",
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index b89c3828..4648f6f3 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -7,7 +7,7 @@ import ftfy
import itertools
import subprocess
from collections import Counter
-import pykafka
+from confluent_kafka import Consumer, KafkaException
import fatcat_client
from fatcat_client.rest import ApiException
@@ -429,35 +429,88 @@ class KafkaJsonPusher(RecordPusher):
topic_suffix,
group,
)
+ self.edit_batch_size = kwargs.get('edit_batch_size', 100)
def run(self):
count = 0
- for msg in self.consumer:
- if not msg:
+ while True:
+ batch = self.consumer.consume(
+ num_messages=self.edit_batch_size,
+ timeout=3.0)
+ print("... got {} kafka messages".format(len(batch)))
+ if not batch:
+ # TODO: could have some larger timeout here and
+ # self.importer.finish() if it's been more than, eg, a couple
+ # minutes
continue
- record = json.loads(msg.value.decode('utf-8'))
- self.importer.push_record(record)
- count += 1
- if count % 500 == 0:
- print("Import counts: {}".format(self.importer.counts))
+ # first check errors on entire batch...
+ for msg in batch:
+ if msg.error():
+ raise KafkaException(msg.error())
+ # ... then process
+ for msg in batch:
+ record = json.loads(msg.value().decode('utf-8'))
+ self.importer.push_record(record)
+ count += 1
+ if count % 500 == 0:
+ print("Import counts: {}".format(self.importer.counts))
+ # locally store the last processed message; will be auto-commited
+ # from this "stored" value
+ assert msg
+ self.consumer.store_offsets(msg)
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
counts = self.importer.finish()
print(counts)
+ self.consumer.close()
return counts
def make_kafka_consumer(hosts, env, topic_suffix, group):
- topic_name = "fatcat-{}.{}".format(env, topic_suffix).encode('utf-8')
- client = pykafka.KafkaClient(hosts=hosts, broker_version="1.0.0")
- consume_topic = client.topics[topic_name]
- print("Consuming from kafka topic {}, group {}".format(topic_name, group))
-
- consumer = consume_topic.get_balanced_consumer(
- consumer_group=group.encode('utf-8'),
- managed=True,
- auto_commit_enable=True,
- auto_commit_interval_ms=30000, # 30 seconds
- compacted_topic=True,
+ topic_name = "fatcat-{}.{}".format(env, topic_suffix)
+
+ def fail_fast(err, partitions):
+ if err is not None:
+ print("Kafka consumer commit error: {}".format(err))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ for p in partitions:
+ # check for partition-specific commit errors
+ print(p)
+ if p.error:
+ print("Kafka consumer commit error: {}".format(p.error))
+ print("Bailing out...")
+ # TODO: should it be sys.exit(-1)?
+ raise KafkaException(err)
+ #print("Kafka consumer commit successful")
+ pass
+
+ # previously, using pykafka
+ #auto_commit_enable=True,
+ #auto_commit_interval_ms=30000, # 30 seconds
+ conf = {
+ 'bootstrap.servers': hosts,
+ 'group.id': group.encode('utf-8'),
+ 'on_commit': fail_fast,
+ 'delivery.report.only.error': True,
+ 'enable.auto.offset.store': False,
+ 'default.topic.config': {
+ 'request.required.acks': -1,
+ 'auto.offset.reset': 'latest',
+ },
+ }
+
+ def on_rebalance(consumer, partitions):
+ print("Kafka partitions rebalanced: {} / {}".format(
+ consumer, partitions))
+
+ consumer = Consumer(conf)
+ # NOTE: it's actually important that topic_name *not* be bytes (UTF-8
+ # encoded)
+ consumer.subscribe([topic_name],
+ on_assign=on_rebalance,
+ on_revoke=on_rebalance,
)
+ print("Consuming from kafka topic {}, group {}".format(topic_name, group))
return consumer