aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/importers
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/importers')
-rw-r--r--python/fatcat_tools/importers/common.py25
1 files changed, 20 insertions, 5 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index 2971660f..4a3cd648 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -330,6 +330,15 @@ class EntityImporter:
raise NotImplementedError
def finish(self):
+ """
+ Gets called as cleanup at the end of imports, but can also be called at
+ any time to "snip off" current editgroup progress. In other words, safe
+ to call this but then continue pushing records.
+
+ For example, in a persistent worker could call this if there have been
+ no new entities fed in for more than some time period, to ensure that
+ entities actually get created within a reasonable time frame.
+ """
if self._edit_count > 0:
if self.submit_mode:
self.api.submit_editgroup(self._editgroup_id)
@@ -730,22 +739,27 @@ class KafkaJsonPusher(RecordPusher):
def run(self):
count = 0
+ last_push = datetime.datetime.now()
while True:
- # TODO: this is batch-oriented, because underlying importer is
+ # Note: this is batch-oriented, because underlying importer is
# often batch-oriented, but this doesn't confirm that entire batch
# has been pushed to fatcat before commiting offset. Eg, consider
# case where there there is one update and thousands of creates;
# update would be lingering in importer, and if importer crashed
- # never created. Not great.
+ # never created.
+ # This is partially mitigated for the worker case by flushing any
+ # outstanding editgroups every 5 minutes, but there is still that
+ # window when editgroups might be hanging (unsubmitted).
batch = self.consumer.consume(
num_messages=self.consume_batch_size,
timeout=self.poll_interval)
print("... got {} kafka messages ({}sec poll interval)".format(
len(batch), self.poll_interval))
if not batch:
- # TODO: could have some larger timeout here and
- # self.importer.finish() if it's been more than, eg, a couple
- # minutes
+ if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5):
+ # it has been some time, so flush any current editgroup
+ self.importer.finish()
+ last_push = datetime.datetime.now()
continue
# first check errors on entire batch...
for msg in batch:
@@ -758,6 +772,7 @@ class KafkaJsonPusher(RecordPusher):
count += 1
if count % 500 == 0:
print("Import counts: {}".format(self.importer.counts))
+ last_push = datetime.datetime.now()
for msg in batch:
# locally store offsets of processed messages; will be
# auto-commited by librdkafka from this "stored" value