aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/fatcat_worker.py')
-rwxr-xr-xpython/fatcat_worker.py49
1 files changed, 43 insertions, 6 deletions
diff --git a/python/fatcat_worker.py b/python/fatcat_worker.py
index 611c8e1b..f68b0606 100755
--- a/python/fatcat_worker.py
+++ b/python/fatcat_worker.py
@@ -2,29 +2,51 @@
import sys
import argparse
+import datetime
from fatcat_tools.workers.changelog import FatcatChangelogWorker, FatcatEntityUpdatesWorker
from fatcat_tools.workers.elastic import FatcatElasticReleaseWorker
+from fatcat_tools.harvest import HarvestCrossrefWorker
-def run_changelog_worker(args):
+def run_changelog(args):
topic = "fatcat-{}.changelog".format(args.env)
worker = FatcatChangelogWorker(args.api_host_url, args.kafka_hosts, topic,
args.poll_interval)
worker.run()
-def run_entity_updates_worker(args):
+def run_entity_updates(args):
changelog_topic = "fatcat-{}.changelog".format(args.env)
release_topic = "fatcat-{}.release-updates".format(args.env)
worker = FatcatEntityUpdatesWorker(args.api_host_url, args.kafka_hosts,
changelog_topic, release_topic)
worker.run()
-def run_elastic_release_worker(args):
+def run_elastic_release(args):
consume_topic = "fatcat-{}.release-updates".format(args.env)
worker = FatcatElasticReleaseWorker(args.kafka_hosts,
consume_topic, elastic_backend=args.elastic_backend,
elastic_index=args.elastic_index)
worker.run()
+def run_harvest_crossref(args):
+ worker = HarvestCrossrefWorker(
+ args.kafka_hosts,
+ produce_topic="fatcat-{}.crossref".format(args.env),
+ state_topic="fatcat-{}.crossref-state".format(args.env),
+ contact_email=args.contact_email,
+ start_date=args.start_date,
+ end_date=args.end_date)
+ worker.run_once()
+
+def run_harvest_datacite(args):
+ worker = HarvestDataciteWorker(
+ args.kafka_hosts,
+ produce_topic="fatcat-{}.datacite".format(args.env),
+ state_topic="fatcat-{}.datacite-state".format(args.env),
+ contact_email=args.contact_email,
+ start_date=args.start_date,
+ end_date=args.end_date)
+ worker.run_once()
+
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--debug',
@@ -42,16 +64,16 @@ def main():
subparsers = parser.add_subparsers()
sub_changelog = subparsers.add_parser('changelog')
- sub_changelog.set_defaults(func=run_changelog_worker)
+ sub_changelog.set_defaults(func=run_changelog)
sub_changelog.add_argument('--poll-interval',
help="how long to wait between polling (seconds)",
default=10.0, type=float)
sub_entity_updates = subparsers.add_parser('entity-updates')
- sub_entity_updates.set_defaults(func=run_entity_updates_worker)
+ sub_entity_updates.set_defaults(func=run_entity_updates)
sub_elastic_release = subparsers.add_parser('elastic-release')
- sub_elastic_release.set_defaults(func=run_elastic_release_worker)
+ sub_elastic_release.set_defaults(func=run_elastic_release)
sub_elastic_release.add_argument('--elastic-backend',
help="elasticsearch backend to connect to",
default="http://localhost:9200")
@@ -59,6 +81,21 @@ def main():
help="elasticsearch index to push into",
default="fatcat")
+ def mkdate(raw):
+ return datetime.datetime.strptime(raw, "%Y-%m-%d").date()
+
+ sub_harvest_crossref = subparsers.add_parser('harvest-crossref')
+ sub_harvest_crossref.set_defaults(func=run_harvest_crossref)
+ sub_harvest_crossref.add_argument('--contact-email',
+ default="undefined", # better?
+ help="contact email to use in API header")
+ sub_harvest_crossref.add_argument('--start-date',
+ default=None, type=mkdate,
+ help="begining of harvest period")
+ sub_harvest_crossref.add_argument('--end-date',
+ default=None, type=mkdate,
+ help="end of harvest period")
+
args = parser.parse_args()
if not args.__dict__.get("func"):
print("tell me what to do!")