From 4a861749ee23a14974ca9222baa2f9b7d47c38d9 Mon Sep 17 00:00:00 2001 From: Bryan Newbold Date: Sat, 6 Apr 2019 21:15:48 -0700 Subject: first draft harvesters using confluent-kafka --- python/fatcat_tools/harvest/doi_registrars.py | 71 +++++++++++++++++---------- python/fatcat_tools/harvest/harvest_common.py | 40 ++++++++++----- python/fatcat_tools/harvest/oaipmh.py | 41 +++++++++++----- 3 files changed, 104 insertions(+), 48 deletions(-) (limited to 'python/fatcat_tools') diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index 802e0e22..1483266c 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -7,7 +7,7 @@ import time import itertools import datetime import requests -from pykafka import KafkaClient +from confluent_kafka import Producer from fatcat_tools.workers import most_recent_message from .harvest_common import HarvestState, requests_retry_session @@ -53,11 +53,16 @@ class HarvestCrossrefWorker: self.produce_topic = produce_topic self.state_topic = state_topic self.contact_email = contact_email - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") self.is_update_filter = is_update_filter + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'delivery.report.only.error': True, + 'default.topic.config': + {'request.required.acks': 'all'}, + } self.state = HarvestState(start_date, end_date) - self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) + self.state.initialize_from_kafka(self.state_topic, self.kafka_config) self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 @@ -81,9 +86,16 @@ class HarvestCrossrefWorker: def extract_key(self, obj): return obj['DOI'].encode('utf-8') + def kafka_produce_delivery_callback(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) + def fetch_date(self, date): - produce_topic = self.kafka.topics[self.produce_topic] + producer = Producer(self.kafka_config) date_str = date.isoformat() params = self.params(date_str) @@ -93,27 +105,32 @@ class HarvestCrossrefWorker: self.contact_email), }) count = 0 - with produce_topic.get_producer() as producer: - while True: - http_resp = http_session.get(self.api_host_url, params=params) - if http_resp.status_code == 503: - # crude backoff; now redundant with session exponential - # backoff, but allows for longer backoff/downtime on remote end - print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) - time.sleep(30.0) - continue - http_resp.raise_for_status() - resp = http_resp.json() - items = self.extract_items(resp) - count += len(items) - print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, - self.extract_total(resp), http_resp.elapsed)) - #print(json.dumps(resp)) - for work in items: - producer.produce(json.dumps(work).encode('utf-8'), partition_key=self.extract_key(work)) - if len(items) < self.api_batch_size: - break - params = self.update_params(params, resp) + while True: + http_resp = http_session.get(self.api_host_url, params=params) + if http_resp.status_code == 503: + # crude backoff; now redundant with session exponential + # backoff, but allows for longer backoff/downtime on remote end + print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code)) + time.sleep(30.0) + continue + http_resp.raise_for_status() + resp = http_resp.json() + items = self.extract_items(resp) + count += len(items) + print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count, + self.extract_total(resp), http_resp.elapsed)) + #print(json.dumps(resp)) + for work in items: + producer.produce( + self.produce_topic, + json.dumps(work).encode('utf-8'), + key=self.extract_key(work), + on_delivery=self.kafka_produce_delivery_callback) + producer.poll(0) + if len(items) < self.api_batch_size: + break + params = self.update_params(params, resp) + producer.flush() def extract_items(self, resp): return resp['message']['items'] @@ -128,7 +145,9 @@ class HarvestCrossrefWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current)) self.fetch_date(current) - self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) + self.state.complete(current, + kafka_topic=self.state_topic, + kafka_config=self.kafka_config) continue if continuous: diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py index da06275f..90f499da 100644 --- a/python/fatcat_tools/harvest/harvest_common.py +++ b/python/fatcat_tools/harvest/harvest_common.py @@ -8,6 +8,8 @@ from requests.adapters import HTTPAdapter # unclear why pylint chokes on this import. Recent 'requests' and 'urllib3' are # in Pipenv.lock, and there are no errors in QA from requests.packages.urllib3.util.retry import Retry # pylint: disable=import-error +from confluent_kafka import Producer, Consumer, TopicPartition, KafkaException, \ + OFFSET_BEGINNING # Used for parsing ISO date format (YYYY-MM-DD) @@ -104,14 +106,14 @@ class HarvestState: date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date() self.complete(date) - def complete(self, date, kafka_topic=None): + def complete(self, date, kafka_topic=None, kafka_config=None): """ Records that a date has been processed successfully. Updates internal state and returns a JSON representation to be serialized. Will publish to a kafka topic if passed as an argument. - kafka_topic should have type pykafka.Topic (not str) + kafka_topic should be a string. A producer will be created and destroyed. """ try: self.to_process.remove(date) @@ -123,25 +125,41 @@ class HarvestState: 'completed-date': str(date), }).encode('utf-8') if kafka_topic: - with kafka_topic.get_sync_producer() as producer: - producer.produce(state_json) + assert(kafka_config) + def fail_fast(err, msg): + if err: + raise KafkaException(err) + print("Commiting status to Kafka: {}".format(kafka_topic)) + producer = Producer(kafka_config) + producer.produce(kafka_topic, state_json, on_delivery=fail_fast) + producer.flush() return state_json - def initialize_from_kafka(self, kafka_topic): + def initialize_from_kafka(self, kafka_topic, kafka_config): """ - kafka_topic should have type pykafka.Topic (not str) + kafka_topic should have type str """ if not kafka_topic: return - print("Fetching state from kafka topic: {}".format(kafka_topic.name)) - consumer = kafka_topic.get_simple_consumer(consumer_timeout_ms=1000) + print("Fetching state from kafka topic: {}".format(kafka_topic)) + conf = kafka_config.copy() + conf.update({ + 'auto.offset.reset': 'earliest', + 'session.timeout.ms': 10000, + 'group.id': kafka_topic + "-init", + }) + consumer = Consumer(conf) + consumer.assign([TopicPartition(kafka_topic, 0, OFFSET_BEGINNING)]) c = 0 while True: - msg = consumer.consume(block=True) + msg = consumer.poll(timeout=1.0) if not msg: break - #sys.stdout.write('.') - self.update(msg.value.decode('utf-8')) + if msg.error(): + raise KafkaException(msg.error()) + sys.stdout.write('.') # XXX: + self.update(msg.value().decode('utf-8')) c += 1 + consumer.close() print("... got {} state update messages, done".format(c)) diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py index 0b482924..ab424482 100644 --- a/python/fatcat_tools/harvest/oaipmh.py +++ b/python/fatcat_tools/harvest/oaipmh.py @@ -7,8 +7,8 @@ import time import itertools import datetime import requests -from pykafka import KafkaClient import sickle +from confluent_kafka import Producer from fatcat_tools.workers import most_recent_message from .harvest_common import HarvestState @@ -37,7 +37,12 @@ class HarvestOaiPmhWorker: self.produce_topic = produce_topic self.state_topic = state_topic - self.kafka = KafkaClient(hosts=kafka_hosts, broker_version="1.0.0") + self.kafka_config = { + 'bootstrap.servers': kafka_hosts, + 'delivery.report.only.error': True, + 'default.topic.config': + {'request.required.acks': 'all'}, + } self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks @@ -45,14 +50,21 @@ class HarvestOaiPmhWorker: self.metadata_prefix = None # needs override self.name = "unnamed" self.state = HarvestState(start_date, end_date) - self.state.initialize_from_kafka(self.kafka.topics[self.state_topic]) + self.state.initialize_from_kafka(self.state_topic, self.kafka_config) + def kafka_produce_delivery_callback(err, msg): + if err is not None: + print("Kafka producer delivery error: {}".format(err)) + print("Bailing out...") + # TODO: should it be sys.exit(-1)? + raise KafkaException(err) def fetch_date(self, date): + producer = Producer(self.kafka_config) + api = sickle.Sickle(self.endpoint_url) date_str = date.isoformat() - produce_topic = self.kafka.topics[self.produce_topic] # this dict kwargs hack is to work around 'from' as a reserved python keyword # recommended by sickle docs try: @@ -66,12 +78,17 @@ class HarvestOaiPmhWorker: return count = 0 - with produce_topic.get_producer() as producer: - for item in records: - count += 1 - if count % 50 == 0: - print("... up to {}".format(count)) - producer.produce(item.raw.encode('utf-8'), partition_key=item.header.identifier.encode('utf-8')) + for item in records: + count += 1 + if count % 50 == 0: + print("... up to {}".format(count)) + producer.produce( + self.produce_topic, + item.raw.encode('utf-8'), + key=item.header.identifier.encode('utf-8'), + on_delivery=self.kafka_produce_delivery_callback) + producer.poll(0) + producer.flush() def run(self, continuous=False): @@ -80,7 +97,9 @@ class HarvestOaiPmhWorker: if current: print("Fetching DOIs updated on {} (UTC)".format(current)) self.fetch_date(current) - self.state.complete(current, kafka_topic=self.kafka.topics[self.state_topic]) + self.state.complete(current, + kafka_topic=self.state_topic, + kafka_config=self.kafka_config) continue if continuous: -- cgit v1.2.3