diff options
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 2971660f..4a3cd648 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -330,6 +330,15 @@ class EntityImporter: raise NotImplementedError def finish(self): + """ + Gets called as cleanup at the end of imports, but can also be called at + any time to "snip off" current editgroup progress. In other words, safe + to call this but then continue pushing records. + + For example, in a persistent worker could call this if there have been + no new entities fed in for more than some time period, to ensure that + entities actually get created within a reasonable time frame. + """ if self._edit_count > 0: if self.submit_mode: self.api.submit_editgroup(self._editgroup_id) @@ -730,22 +739,27 @@ class KafkaJsonPusher(RecordPusher): def run(self): count = 0 + last_push = datetime.datetime.now() while True: - # TODO: this is batch-oriented, because underlying importer is + # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before commiting offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed - # never created. Not great. + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: - # TODO: could have some larger timeout here and - # self.importer.finish() if it's been more than, eg, a couple - # minutes + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() continue # first check errors on entire batch... for msg in batch: @@ -758,6 +772,7 @@ class KafkaJsonPusher(RecordPusher): count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value |