summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--python/fatcat_tools/importers/common.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py
index e1efde80..18594884 100644
--- a/python/fatcat_tools/importers/common.py
+++ b/python/fatcat_tools/importers/common.py
@@ -37,11 +37,11 @@ class FatcatImporter:
print("Processed {} lines, inserted {}, updated {}.".format(
self.counts['processed_lines'], self.counts['insert'], self.counts['update']))
- def create_row(self, row, editgroup_id=None):
+ def create_row(self, row, editgroup=None):
# sub-classes expected to implement this
raise NotImplementedError
- def create_batch(self, rows, editgroup_id=None):
+ def create_batch(self, rows, editgroup=None):
# sub-classes expected to implement this
raise NotImplementedError
@@ -51,7 +51,7 @@ class FatcatImporter:
fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae'))
i = 0
for i, row in enumerate(source):
- self.create_row(row, editgroup_id=eg.id)
+ self.create_row(row, editgroup=eg.id)
if i > 0 and (i % group_size) == 0:
self.api.accept_editgroup(eg.id)
eg = self.api.create_editgroup(
@@ -60,13 +60,15 @@ class FatcatImporter:
if i == 0 or (i % group_size) != 0:
self.api.accept_editgroup(eg.id)
- def process_batch(self, source, size=50):
+ def process_batch(self, source, size=50, decode_kafka=False):
"""Reads and processes in batches (not API-call-per-)"""
for rows in grouper(source, size):
+ if decode_kafka:
+ rows = [msg.value.decode('utf-8') for msg in rows]
self.counts['processed_lines'] += len(rows)
eg = self.api.create_editgroup(
fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae'))
- self.create_batch(rows, editgroup_id=eg.id)
+ self.create_batch(rows, editgroup=eg.id)
def process_csv_source(self, source, group_size=100, delimiter=','):
reader = csv.DictReader(source, delimiter=delimiter)