diff options
Diffstat (limited to 'python/fatcat_tools/importers/common.py')
-rw-r--r-- | python/fatcat_tools/importers/common.py | 12 |
1 files changed, 7 insertions, 5 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index e1efde80..18594884 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -37,11 +37,11 @@ class FatcatImporter: print("Processed {} lines, inserted {}, updated {}.".format( self.counts['processed_lines'], self.counts['insert'], self.counts['update'])) - def create_row(self, row, editgroup_id=None): + def create_row(self, row, editgroup=None): # sub-classes expected to implement this raise NotImplementedError - def create_batch(self, rows, editgroup_id=None): + def create_batch(self, rows, editgroup=None): # sub-classes expected to implement this raise NotImplementedError @@ -51,7 +51,7 @@ class FatcatImporter: fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) i = 0 for i, row in enumerate(source): - self.create_row(row, editgroup_id=eg.id) + self.create_row(row, editgroup=eg.id) if i > 0 and (i % group_size) == 0: self.api.accept_editgroup(eg.id) eg = self.api.create_editgroup( @@ -60,13 +60,15 @@ class FatcatImporter: if i == 0 or (i % group_size) != 0: self.api.accept_editgroup(eg.id) - def process_batch(self, source, size=50): + def process_batch(self, source, size=50, decode_kafka=False): """Reads and processes in batches (not API-call-per-)""" for rows in grouper(source, size): + if decode_kafka: + rows = [msg.value.decode('utf-8') for msg in rows] self.counts['processed_lines'] += len(rows) eg = self.api.create_editgroup( fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) - self.create_batch(rows, editgroup_id=eg.id) + self.create_batch(rows, editgroup=eg.id) def process_csv_source(self, source, group_size=100, delimiter=','): reader = csv.DictReader(source, delimiter=delimiter) |