diff options
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
-rw-r--r-- | python/fatcat_tools/importers/common.py | 65 |
2 files changed, 66 insertions, 1 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index 10557ef8..c26446fd 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of: """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug from .datacite import DataciteImporter from .jalc import JalcImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index a84ce90f..c000ad62 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -730,6 +730,71 @@ class Bs4XmlFileListPusher(RecordPusher): print(counts) return counts +class KafkaBs4XmlPusher(RecordPusher): + """ + Fetch XML for an article from Kafka, parse via Bs4. + """ + def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): + self.importer = importer + self.consumer = make_kafka_consumer( + kafka_hosts, + kafka_env, + topic_suffix, + group, + kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') + ) + self.poll_interval = kwargs.get('poll_interval', 5.0) + self.consume_batch_size = kwargs.get('consume_batch_size', 25) + + def run(self): + count = 0 + last_push = datetime.datetime.now() + while True: + # Note: this is batch-oriented, because underlying importer is + # often batch-oriented, but this doesn't confirm that entire batch + # has been pushed to fatcat before commiting offset. Eg, consider + # case where there there is one update and thousands of creates; + # update would be lingering in importer, and if importer crashed + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). + batch = self.consumer.consume( + num_messages=self.consume_batch_size, + timeout=self.poll_interval) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval)) + if not batch: + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) + continue + # first check errors on entire batch... + for msg in batch: + if msg.error(): + raise KafkaException(msg.error()) + # ... then process + for msg in batch: + soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") + self.importer.push_record(soup) + soup.decompose() + count += 1 + if count % 500 == 0: + print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() + for msg in batch: + # locally store offsets of processed messages; will be + # auto-commited by librdkafka from this "stored" value + self.consumer.store_offsets(message=msg) + + # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or + # commit the current batch if it has been lingering + counts = self.importer.finish() + print(counts) + self.consumer.close() + return counts class KafkaJsonPusher(RecordPusher): |