From 519b90d7f539b667e919c220a53626e7a4ac48bf Mon Sep 17 00:00:00 2001 From: Martin Czygan Date: Fri, 14 Feb 2020 14:32:57 +0100 Subject: pubmed ftp harvest and KafkaBs4XmlPusher * add PubmedFTPWorker * utils are currently stored alongside pubmed (e.g. ftpretr, xmlstream) but may live elsewhere, as they are more generic * add KafkaBs4XmlPusher --- python/fatcat_tools/importers/common.py | 65 +++++++++++++++++++++++++++++++++ 1 file changed, 65 insertions(+) (limited to 'python/fatcat_tools/importers/common.py') diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 1ffbd6e7..1cb5529e 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -721,6 +721,71 @@ class Bs4XmlFileListPusher(RecordPusher): print(counts) return counts +class KafkaBs4XmlPusher(RecordPusher): + """ + Fetch XML for an article from Kafka, parse via Bs4. + """ + def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): + self.importer = importer + self.consumer = make_kafka_consumer( + kafka_hosts, + kafka_env, + topic_suffix, + group, + kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') + ) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.consume_batch_size = kwargs.get('consume_batch_size', 100) + + def run(self): + count = 0 + last_push = datetime.datetime.now() + while True: + # Note: this is batch-oriented, because underlying importer is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in importer, and if importer crashed + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). + batch = self.consumer.consume( + num_messages=self.consume_batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval)) + if not batch: + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) + continue + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + for msg in batch: + soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") + self.importer.push_record(soup) + soup.decompose() + count += 1 + if count % 500 == 0: + print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or + # commit the current batch if it has been lingering + counts = self.importer.finish() + print(counts) + self.consumer.close() + return counts class KafkaJsonPusher(RecordPusher): -- cgit v1.2.3