diff options
| author | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:43:46 -0800 | 
|---|---|---|
| committer | Bryan Newbold <bnewbold@robocracy.org> | 2018-11-19 23:43:46 -0800 | 
| commit | dbcf33944dca294472e7ab42f632d8f64ef1c006 (patch) | |
| tree | 4601612f91bd0d9a2c04e66f571d4cbbbdef5677 | |
| parent | 456a4deab80d027fb5015269058cd6518680ea8e (diff) | |
| download | fatcat-dbcf33944dca294472e7ab42f632d8f64ef1c006.tar.gz fatcat-dbcf33944dca294472e7ab42f632d8f64ef1c006.zip | |
fix some broken importer args
| -rw-r--r-- | python/fatcat_tools/importers/common.py | 12 | 
1 files changed, 7 insertions, 5 deletions
| diff --git a/python/fatcat_tools/importers/common.py b/python/fatcat_tools/importers/common.py index e1efde80..18594884 100644 --- a/python/fatcat_tools/importers/common.py +++ b/python/fatcat_tools/importers/common.py @@ -37,11 +37,11 @@ class FatcatImporter:          print("Processed {} lines, inserted {}, updated {}.".format(              self.counts['processed_lines'], self.counts['insert'], self.counts['update'])) -    def create_row(self, row, editgroup_id=None): +    def create_row(self, row, editgroup=None):          # sub-classes expected to implement this          raise NotImplementedError -    def create_batch(self, rows, editgroup_id=None): +    def create_batch(self, rows, editgroup=None):          # sub-classes expected to implement this          raise NotImplementedError @@ -51,7 +51,7 @@ class FatcatImporter:              fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae'))          i = 0          for i, row in enumerate(source): -            self.create_row(row, editgroup_id=eg.id) +            self.create_row(row, editgroup=eg.id)              if i > 0 and (i % group_size) == 0:                  self.api.accept_editgroup(eg.id)                  eg = self.api.create_editgroup( @@ -60,13 +60,15 @@ class FatcatImporter:          if i == 0 or (i % group_size) != 0:              self.api.accept_editgroup(eg.id) -    def process_batch(self, source, size=50): +    def process_batch(self, source, size=50, decode_kafka=False):          """Reads and processes in batches (not API-call-per-)"""          for rows in grouper(source, size): +            if decode_kafka: +                rows = [msg.value.decode('utf-8') for msg in rows]              self.counts['processed_lines'] += len(rows)              eg = self.api.create_editgroup(                  fatcat_client.Editgroup(editor_id='aaaaaaaaaaaabkvkaaaaaaaaae')) -            self.create_batch(rows, editgroup_id=eg.id) +            self.create_batch(rows, editgroup=eg.id)      def process_csv_source(self, source, group_size=100, delimiter=','):          reader = csv.DictReader(source, delimiter=delimiter) | 
