diff options
Diffstat (limited to 'python/fatcat_worker.py')
-rwxr-xr-x | python/fatcat_worker.py | 49 |
1 files changed, 43 insertions, 6 deletions
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py index 611c8e1b..f68b0606 100755 --- a/python/fatcat_worker.py +++ b/python/fatcat_worker.py @@ -2,29 +2,51 @@ import sys import argparse +import datetime from fatcat_tools.workers.changelog import FatcatChangelogWorker, FatcatEntityUpdatesWorker from fatcat_tools.workers.elastic import FatcatElasticReleaseWorker +from fatcat_tools.harvest import HarvestCrossrefWorker -def run_changelog_worker(args): +def run_changelog(args): topic = "fatcat-{}.changelog".format(args.env) worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic, args.poll_interval) worker.run() -def run_entity_updates_worker(args): +def run_entity_updates(args): changelog_topic = "fatcat-{}.changelog".format(args.env) release_topic = "fatcat-{}.release-updates".format(args.env) worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts, changelog_topic, release_topic) worker.run() -def run_elastic_release_worker(args): +def run_elastic_release(args): consume_topic = "fatcat-{}.release-updates".format(args.env) worker = FatcatElasticReleaseWorker(args.kafka_hosts, consume_topic, elastic_backend=args.elastic_backend, elastic_index=args.elastic_index) worker.run() +def run_harvest_crossref(args): + worker = HarvestCrossrefWorker( + args.kafka_hosts, + produce_topic="fatcat-{}.crossref".format(args.env), + state_topic="fatcat-{}.crossref-state".format(args.env), + contact_email=args.contact_email, + start_date=args.start_date, + end_date=args.end_date) + worker.run_once() + +def run_harvest_datacite(args): + worker = HarvestDataciteWorker( + args.kafka_hosts, + produce_topic="fatcat-{}.datacite".format(args.env), + state_topic="fatcat-{}.datacite-state".format(args.env), + contact_email=args.contact_email, + start_date=args.start_date, + end_date=args.end_date) + worker.run_once() + def main(): parser = argparse.ArgumentParser() parser.add_argument('--debug', @@ -42,16 +64,16 @@ def main(): subparsers = parser.add_subparsers() sub_changelog = subparsers.add_parser('changelog') - sub_changelog.set_defaults(func=run_changelog_worker) + sub_changelog.set_defaults(func=run_changelog) sub_changelog.add_argument('--poll-interval', help="how long to wait between polling (seconds)", default=10.0, type=float) sub_entity_updates = subparsers.add_parser('entity-updates') - sub_entity_updates.set_defaults(func=run_entity_updates_worker) + sub_entity_updates.set_defaults(func=run_entity_updates) sub_elastic_release = subparsers.add_parser('elastic-release') - sub_elastic_release.set_defaults(func=run_elastic_release_worker) + sub_elastic_release.set_defaults(func=run_elastic_release) sub_elastic_release.add_argument('--elastic-backend', help="elasticsearch backend to connect to", default="http://localhost:9200") @@ -59,6 +81,21 @@ def main(): help="elasticsearch index to push into", default="fatcat") + def mkdate(raw): + return datetime.datetime.strptime(raw, "%Y-%m-%d").date() + + sub_harvest_crossref = subparsers.add_parser('harvest-crossref') + sub_harvest_crossref.set_defaults(func=run_harvest_crossref) + sub_harvest_crossref.add_argument('--contact-email', + default="undefined", # better? + help="contact email to use in API header") + sub_harvest_crossref.add_argument('--start-date', + default=None, type=mkdate, + help="begining of harvest period") + sub_harvest_crossref.add_argument('--end-date', + default=None, type=mkdate, + help="end of harvest period") + args = parser.parse_args() if not args.__dict__.get("func"): print("tell me what to do!") |