diff options
Diffstat (limited to 'python/fatcat_tools/importers')
| -rw-r--r-- | python/fatcat_tools/importers/__init__.py | 2 | ||||
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 65 | 
2 files changed, 66 insertions, 1 deletions
diff --git a/python/fatcat_tools/importers/__init__.py b/python/fatcat_tools/importers/__init__.py index d936605f..03c7cbcc 100644 --- a/python/fatcat_tools/importers/__init__.py +++ b/python/fatcat_tools/importers/__init__.py @@ -12,7 +12,7 @@ To run an import you combine two classes; one each of:  """ -from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC +from .common import EntityImporter, JsonLinePusher, LinePusher, CsvPusher, SqlitePusher, Bs4XmlFilePusher, Bs4XmlLargeFilePusher, Bs4XmlLinesPusher, Bs4XmlFileListPusher, KafkaJsonPusher, KafkaBs4XmlPusher, make_kafka_consumer, clean, is_cjk, LANG_MAP_MARC  from .crossref import CrossrefImporter, CROSSREF_TYPE_MAP, lookup_license_slug  from .datacite import DataciteImporter  from .jalc import JalcImporter diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 1ffbd6e7..1cb5529e 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -721,6 +721,71 @@ class Bs4XmlFileListPusher(RecordPusher):          print(counts)          return counts +class KafkaBs4XmlPusher(RecordPusher): +    """ +    Fetch XML for an article from Kafka, parse via Bs4. +    """ +    def __init__(self, importer, kafka_hosts, kafka_env, topic_suffix, group, **kwargs): +        self.importer = importer +        self.consumer = make_kafka_consumer( +            kafka_hosts, +            kafka_env, +            topic_suffix, +            group, +            kafka_namespace=kwargs.get('kafka_namespace', 'fatcat') +        ) +        self.poll_interval = kwargs.get('poll_interval', 5.0) +        self.consume_batch_size = kwargs.get('consume_batch_size', 100) + +    def run(self): +        count = 0 +        last_push = datetime.datetime.now() +        while True: +            # Note: this is batch-oriented, because underlying importer is +            # often batch-oriented, but this doesn't confirm that entire batch +            # has been pushed to fatcat before commiting offset. Eg, consider +            # case where there there is one update and thousands of creates; +            # update would be lingering in importer, and if importer crashed +            # never created. +            # This is partially mitigated for the worker case by flushing any +            # outstanding editgroups every 5 minutes, but there is still that +            # window when editgroups might be hanging (unsubmitted). +            batch = self.consumer.consume( +                num_messages=self.consume_batch_size, +                timeout=self.poll_interval) +            print("... got {} kafka messages ({}sec poll interval)".format( +                len(batch), self.poll_interval)) +            if not batch: +                if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): +                    # it has been some time, so flush any current editgroup +                    self.importer.finish() +                    last_push = datetime.datetime.now() +                    #print("Flushed any partial import batch: {}".format(self.importer.counts)) +                continue +            # first check errors on entire batch... +            for msg in batch: +                if msg.error(): +                    raise KafkaException(msg.error()) +            # ... then process +            for msg in batch: +                soup = BeautifulSoup(msg.value().decode('utf-8'), "xml") +                self.importer.push_record(soup) +                soup.decompose() +                count += 1 +                if count % 500 == 0: +                    print("Import counts: {}".format(self.importer.counts)) +            last_push = datetime.datetime.now() +            for msg in batch: +                # locally store offsets of processed messages; will be +                # auto-commited by librdkafka from this "stored" value +                self.consumer.store_offsets(message=msg) + +        # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or +        # commit the current batch if it has been lingering +        counts = self.importer.finish() +        print(counts) +        self.consumer.close() +        return counts  class KafkaJsonPusher(RecordPusher):  | 
