diff options
Diffstat (limited to 'python/sandcrawler_worker.py')
-rwxr-xr-x | python/sandcrawler_worker.py | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/python/sandcrawler_worker.py b/python/sandcrawler_worker.py index d42cd8c..73bd444 100755 --- a/python/sandcrawler_worker.py +++ b/python/sandcrawler_worker.py @@ -12,7 +12,11 @@ import sys import raven from sandcrawler import * -from sandcrawler.persist import PersistHtmlTeiXmlWorker, PersistXmlDocWorker +from sandcrawler.persist import ( + PersistCrossrefWorker, + PersistHtmlTeiXmlWorker, + PersistXmlDocWorker, +) # Yep, a global. Gets DSN from `SENTRY_DSN` environment variable try: @@ -291,6 +295,22 @@ def run_persist_ingest_file(args): pusher.run() +def run_persist_crossref(args): + grobid_client = GrobidClient(host_url=args.grobid_host) + consume_topic = "fatcat-{}.api-crossref".format(args.env) + worker = PersistCrossrefWorker(db_url=args.db_url, grobid_client=grobid_client) + pusher = KafkaJsonPusher( + worker=worker, + kafka_hosts=args.kafka_hosts, + consume_topic=consume_topic, + group="persist-ingest", + push_batches=True, + # small batch size because doing GROBID processing + batch_size=20, + ) + pusher.run() + + def main(): parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) parser.add_argument( @@ -302,7 +322,7 @@ def main(): "--env", default="dev", help="Kafka topic namespace to use (eg, prod, qa, dev)" ) parser.add_argument( - "--grobid-host", default="http://grobid.qa.fatcat.wiki", help="GROBID API host/port" + "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port" ) parser.add_argument( "--db-url", @@ -417,6 +437,15 @@ def main(): ) sub_persist_ingest_file.set_defaults(func=run_persist_ingest_file) + sub_persist_crossref = subparsers.add_parser( + "persist-crossref", + help="daemon that persists crossref to postgres; also does GROBID ref transform", + ) + sub_persist_crossref.add_argument( + "--grobid-host", default="https://grobid.qa.fatcat.wiki", help="GROBID API host/port" + ) + sub_persist_crossref.set_defaults(func=run_persist_crossref) + args = parser.parse_args() if not args.__dict__.get("func"): parser.print_help(file=sys.stderr) |