diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 45 | 
1 files changed, 37 insertions, 8 deletions
| diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 073725ad..d51a5ff4 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -6,6 +6,7 @@ import json  import ftfy  import base64  import sqlite3 +import datetime  import subprocess  import unicodedata  from collections import Counter @@ -112,7 +113,7 @@ def clean(thing, force_xml=False):      This function is appropriate to be called on any random, non-markup string,      such as author names, titles, etc. -    It will try to clean up commong unicode mangles, HTML characters, etc. +    It will try to clean up common unicode mangles, HTML characters, etc.      This will detect XML/HTML and "do the right thing" (aka, not remove      entities like '&' if there are tags in the string), unless you pass the @@ -271,6 +272,11 @@ class EntityImporter:              if didn't update or insert because of existing)          self.counts['update'] += 1              if updated an entity + +    Parameters: + +        submit_mode: instead of accepting editgroups, only submits them. +            implementors must write insert_batch appropriately      """      def __init__(self, api, **kwargs): @@ -282,6 +288,7 @@ class EntityImporter:          self.api = api          self.bezerk_mode = kwargs.get('bezerk_mode', False) +        self.submit_mode = kwargs.get('submit_mode', False)          self.edit_batch_size = kwargs.get('edit_batch_size', 100)          self.editgroup_description = kwargs.get('editgroup_description')          self.editgroup_extra = eg_extra @@ -330,8 +337,20 @@ class EntityImporter:          raise NotImplementedError      def finish(self): +        """ +        Gets called as cleanup at the end of imports, but can also be called at +        any time to "snip off" current editgroup progress. In other words, safe +        to call this but then continue pushing records. + +        For example, in a persistent worker could call this if there have been +        no new entities fed in for more than some time period, to ensure that +        entities actually get created within a reasonable time frame. +        """          if self._edit_count > 0: -            self.api.accept_editgroup(self._editgroup_id) +            if self.submit_mode: +                self.api.submit_editgroup(self._editgroup_id) +            else: +                self.api.accept_editgroup(self._editgroup_id)              self._editgroup_id = None              self._edit_count = 0              self._edits_inflight = [] @@ -345,7 +364,10 @@ class EntityImporter:      def get_editgroup_id(self, edits=1):          if self._edit_count >= self.edit_batch_size: -            self.api.accept_editgroup(self._editgroup_id) +            if self.submit_mode: +                self.api.submit_editgroup(self._editgroup_id) +            else: +                self.api.accept_editgroup(self._editgroup_id)              self._editgroup_id = None              self._edit_count = 0              self._edits_inflight = [] @@ -715,22 +737,28 @@ class KafkaJsonPusher(RecordPusher):      def run(self):          count = 0 +        last_push = datetime.datetime.now()          while True: -            # TODO: this is batch-oriented, because underlying importer is +            # Note: this is batch-oriented, because underlying importer is              # often batch-oriented, but this doesn't confirm that entire batch              # has been pushed to fatcat before commiting offset. Eg, consider              # case where there there is one update and thousands of creates;              # update would be lingering in importer, and if importer crashed -            # never created. Not great. +            # never created. +            # This is partially mitigated for the worker case by flushing any +            # outstanding editgroups every 5 minutes, but there is still that +            # window when editgroups might be hanging (unsubmitted).              batch = self.consumer.consume(                  num_messages=self.consume_batch_size,                  timeout=self.poll_interval)              print("... got {} kafka messages ({}sec poll interval)".format(                  len(batch), self.poll_interval))              if not batch: -                # TODO: could have some larger timeout here and -                # self.importer.finish() if it's been more than, eg, a couple -                # minutes +                if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): +                    # it has been some time, so flush any current editgroup +                    self.importer.finish() +                    last_push = datetime.datetime.now() +                    #print("Flushed any partial import batch: {}".format(self.importer.counts))                  continue              # first check errors on entire batch...              for msg in batch: @@ -743,6 +771,7 @@ class KafkaJsonPusher(RecordPusher):                  count += 1                  if count % 500 == 0:                      print("Import counts: {}".format(self.importer.counts)) +            last_push = datetime.datetime.now()              for msg in batch:                  # locally store offsets of processed messages; will be                  # auto-commited by librdkafka from this "stored" value | 
