diff options
Diffstat (limited to 'python/fatcat_tools')
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 25 | 
1 files changed, 20 insertions, 5 deletions
| diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 2971660f..4a3cd648 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -330,6 +330,15 @@ class EntityImporter:          raise NotImplementedError      def finish(self): +        """ +        Gets called as cleanup at the end of imports, but can also be called at +        any time to "snip off" current editgroup progress. In other words, safe +        to call this but then continue pushing records. + +        For example, in a persistent worker could call this if there have been +        no new entities fed in for more than some time period, to ensure that +        entities actually get created within a reasonable time frame. +        """          if self._edit_count > 0:              if self.submit_mode:                  self.api.submit_editgroup(self._editgroup_id) @@ -730,22 +739,27 @@ class KafkaJsonPusher(RecordPusher):      def run(self):          count = 0 +        last_push = datetime.datetime.now()          while True: -            # TODO: this is batch-oriented, because underlying importer is +            # Note: this is batch-oriented, because underlying importer is              # often batch-oriented, but this doesn't confirm that entire batch              # has been pushed to fatcat before commiting offset. Eg, consider              # case where there there is one update and thousands of creates;              # update would be lingering in importer, and if importer crashed -            # never created. Not great. +            # never created. +            # This is partially mitigated for the worker case by flushing any +            # outstanding editgroups every 5 minutes, but there is still that +            # window when editgroups might be hanging (unsubmitted).              batch = self.consumer.consume(                  num_messages=self.consume_batch_size,                  timeout=self.poll_interval)              print("... got {} kafka messages ({}sec poll interval)".format(                  len(batch), self.poll_interval))              if not batch: -                # TODO: could have some larger timeout here and -                # self.importer.finish() if it's been more than, eg, a couple -                # minutes +                if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): +                    # it has been some time, so flush any current editgroup +                    self.importer.finish() +                    last_push = datetime.datetime.now()                  continue              # first check errors on entire batch...              for msg in batch: @@ -758,6 +772,7 @@ class KafkaJsonPusher(RecordPusher):                  count += 1                  if count % 500 == 0:                      print("Import counts: {}".format(self.importer.counts)) +            last_push = datetime.datetime.now()              for msg in batch:                  # locally store offsets of processed messages; will be                  # auto-commited by librdkafka from this "stored" value | 
