diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 45 |
1 files changed, 37 insertions, 8 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index 073725ad..d51a5ff4 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -6,6 +6,7 @@ import json import ftfy import base64 import sqlite3 +import datetime import subprocess import unicodedata from collections import Counter @@ -112,7 +113,7 @@ def clean(thing, force_xml=False): This function is appropriate to be called on any random, non-markup string, such as author names, titles, etc. - It will try to clean up commong unicode mangles, HTML characters, etc. + It will try to clean up common unicode mangles, HTML characters, etc. This will detect XML/HTML and "do the right thing" (aka, not remove entities like '&' if there are tags in the string), unless you pass the @@ -271,6 +272,11 @@ class EntityImporter: if didn't update or insert because of existing) self.counts['update'] += 1 if updated an entity + + Parameters: + + submit_mode: instead of accepting editgroups, only submits them. + implementors must write insert_batch appropriately """ def __init__(self, api, **kwargs): @@ -282,6 +288,7 @@ class EntityImporter: self.api = api self.bezerk_mode = kwargs.get('bezerk_mode', False) + self.submit_mode = kwargs.get('submit_mode', False) self.edit_batch_size = kwargs.get('edit_batch_size', 100) self.editgroup_description = kwargs.get('editgroup_description') self.editgroup_extra = eg_extra @@ -330,8 +337,20 @@ class EntityImporter: raise NotImplementedError def finish(self): + """ + Gets called as cleanup at the end of imports, but can also be called at + any time to "snip off" current editgroup progress. In other words, safe + to call this but then continue pushing records. + + For example, in a persistent worker could call this if there have been + no new entities fed in for more than some time period, to ensure that + entities actually get created within a reasonable time frame. + """ if self._edit_count > 0: - self.api.accept_editgroup(self._editgroup_id) + if self.submit_mode: + self.api.submit_editgroup(self._editgroup_id) + else: + self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] @@ -345,7 +364,10 @@ class EntityImporter: def get_editgroup_id(self, edits=1): if self._edit_count >= self.edit_batch_size: - self.api.accept_editgroup(self._editgroup_id) + if self.submit_mode: + self.api.submit_editgroup(self._editgroup_id) + else: + self.api.accept_editgroup(self._editgroup_id) self._editgroup_id = None self._edit_count = 0 self._edits_inflight = [] @@ -715,22 +737,28 @@ class KafkaJsonPusher(RecordPusher): def run(self): count = 0 + last_push = datetime.datetime.now() while True: - # TODO: this is batch-oriented, because underlying importer is + # Note: this is batch-oriented, because underlying importer is # often batch-oriented, but this doesn't confirm that entire batch # has been pushed to fatcat before commiting offset. Eg, consider # case where there there is one update and thousands of creates; # update would be lingering in importer, and if importer crashed - # never created. Not great. + # never created. + # This is partially mitigated for the worker case by flushing any + # outstanding editgroups every 5 minutes, but there is still that + # window when editgroups might be hanging (unsubmitted). batch = self.consumer.consume( num_messages=self.consume_batch_size, timeout=self.poll_interval) print("... got {} kafka messages ({}sec poll interval)".format( len(batch), self.poll_interval)) if not batch: - # TODO: could have some larger timeout here and - # self.importer.finish() if it's been more than, eg, a couple - # minutes + if datetime.datetime.now() - last_push > datetime.timedelta(minutes=5): + # it has been some time, so flush any current editgroup + self.importer.finish() + last_push = datetime.datetime.now() + #print("Flushed any partial import batch: {}".format(self.importer.counts)) continue # first check errors on entire batch... for msg in batch: @@ -743,6 +771,7 @@ class KafkaJsonPusher(RecordPusher): count += 1 if count % 500 == 0: print("Import counts: {}".format(self.importer.counts)) + last_push = datetime.datetime.now() for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value |