aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest/doi_registrars.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_tools/harvest/doi_registrars.py')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py13
1 files changed, 8 insertions, 5 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index d5e4b7ec..10492c17 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -10,15 +10,13 @@ import datetime
from pykafka import KafkaClient
from fatcat_tools.workers import most_recent_message
-from .harvest_common import HarvestState
+from .harvest_common import HarvestState, DATE_FMT
# Skip pylint due to:
# AttributeError: 'NoneType' object has no attribute 'scope'
# in 'astroid/node_classes.py'
# pylint: skip-file
-DATE_FMT = "%Y-%m-%d"
-
class HarvestCrossrefWorker:
"""
@@ -68,7 +66,6 @@ class HarvestCrossrefWorker:
self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
self.api_batch_size = 50
- # for crossref, it's "from-index-date"
self.name = "Crossref"
def params(self, date_str):
@@ -86,6 +83,9 @@ class HarvestCrossrefWorker:
params['cursor'] = resp['message']['next-cursor']
return params
+ def extract_key(self, obj):
+ return obj['DOI'].encode('utf-8')
+
def fetch_date(self, date):
produce_topic = self.kafka.topics[self.produce_topic]
@@ -112,7 +112,7 @@ class HarvestCrossrefWorker:
self.extract_total(resp), http_resp.elapsed))
#print(json.dumps(resp))
for work in items:
- producer.produce(json.dumps(work).encode('utf-8'))
+ producer.produce(json.dumps(work).encode('utf-8'), partition_key=self.extract_key(work))
if len(items) < self.api_batch_size:
break
params = self.update_params(params, resp)
@@ -181,6 +181,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
def extract_total(self, resp):
return resp['meta']['total']
+ def extract_key(self, obj):
+ return obj['doi'].encode('utf-8')
+
def update_params(self, params, resp):
params['page[number]'] = resp['meta']['page'] + 1
return params