diff options
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r-- | python/fatcat_tools/harvest/doi_registrars.py | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py index d5e4b7ec..10492c17 100644 --- a/python/fatcat_tools/harvest/doi_registrars.py +++ b/python/fatcat_tools/harvest/doi_registrars.py @@ -10,15 +10,13 @@ import datetime from pykafka import KafkaClient from fatcat_tools.workers import most_recent_message -from .harvest_common import HarvestState +from .harvest_common import HarvestState, DATE_FMT # Skip pylint due to: # AttributeError: 'NoneType' object has no attribute 'scope' # in 'astroid/node_classes.py' # pylint: skip-file -DATE_FMT = "%Y-%m-%d" - class HarvestCrossrefWorker: """ @@ -68,7 +66,6 @@ class HarvestCrossrefWorker: self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks self.api_batch_size = 50 - # for crossref, it's "from-index-date" self.name = "Crossref" def params(self, date_str): @@ -86,6 +83,9 @@ class HarvestCrossrefWorker: params['cursor'] = resp['message']['next-cursor'] return params + def extract_key(self, obj): + return obj['DOI'].encode('utf-8') + def fetch_date(self, date): produce_topic = self.kafka.topics[self.produce_topic] @@ -112,7 +112,7 @@ class HarvestCrossrefWorker: self.extract_total(resp), http_resp.elapsed)) #print(json.dumps(resp)) for work in items: - producer.produce(json.dumps(work).encode('utf-8')) + producer.produce(json.dumps(work).encode('utf-8'), partition_key=self.extract_key(work)) if len(items) < self.api_batch_size: break params = self.update_params(params, resp) @@ -181,6 +181,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker): def extract_total(self, resp): return resp['meta']['total'] + def extract_key(self, obj): + return obj['doi'].encode('utf-8') + def update_params(self, params, resp): params['page[number]'] = resp['meta']['page'] + 1 return params |