aboutsummaryrefslogtreecommitdiffstats
path: root/python
diff options
context:
space:
mode:
Diffstat (limited to 'python')
-rwxr-xr-xpython/grobid_tool.py8
-rw-r--r--python/sandcrawler/grobid.py4
2 files changed, 10 insertions, 2 deletions
diff --git a/python/grobid_tool.py b/python/grobid_tool.py
index c9bcdc8..e7a7e5c 100755
--- a/python/grobid_tool.py
+++ b/python/grobid_tool.py
@@ -17,7 +17,7 @@ def run_extract_json(args):
wayback_client = WaybackClient()
worker = GrobidWorker(grobid_client, wayback_client, sink=None)
multi_worker = MultiprocessWrapper(worker, args.sink)
- pusher = JsonBatchPusher(worker, args.json_file, batch_size=30)
+ pusher = JsonLinePusher(multi_worker, args.json_file, batch_size=args.jobs)
pusher.run()
def run_extract_cdx(args):
@@ -27,7 +27,7 @@ def run_extract_cdx(args):
multi_worker = MultiprocessWrapper(worker, args.sink)
pusher = CdxLinePusher(multi_worker, args.cdx_file,
filter_http_statuses=[200], filter_mimetypes=['application/pdf'],
- batch_size=30)
+ batch_size=args.jobs)
pusher.run()
def run_extract_zipfile(args):
@@ -47,6 +47,9 @@ def main():
parser.add_argument('--kafka-env',
default="dev",
help="Kafka topic namespace to use (eg, prod, qa, dev)")
+ parser.add_argument('-j', '--jobs',
+ default=8,
+ help="parallelism for batch CPU jobs")
parser.add_argument('--grobid-host',
default="http://grobid.qa.fatcat.wiki",
help="GROBID API host/port")
@@ -78,6 +81,7 @@ def main():
args.sink = None
if args.kafka_mode:
produce_topic = "sandcrawler-{}.grobid-output-json".format(args.kafka_env)
+ print("Running in kafka output mode, publishing to {}\n".format(produce_topic))
args.sink = KafkaGrobidSink(kafka_hosts=args.kafka_hosts,
produce_topic=produce_topic)
diff --git a/python/sandcrawler/grobid.py b/python/sandcrawler/grobid.py
index a610404..32addca 100644
--- a/python/sandcrawler/grobid.py
+++ b/python/sandcrawler/grobid.py
@@ -57,6 +57,7 @@ class GrobidWorker(SandcrawlerWorker):
self.consolidate_mode = 1
def process(self, record):
+ self.counts['total'] += 1
if record.get('warc_path') and record.get('warc_offset'):
# it's a full CDX dict. fetch using WaybackClient
if not self.wayback_client:
@@ -81,6 +82,7 @@ class GrobidWorker(SandcrawlerWorker):
result['file_meta'] = gen_file_metadata(blob)
result['source'] = record
result['key'] = result['file_meta']['sha1hex']
+ self.counts[result['status']] += 1
return result
class GrobidBlobWorker(SandcrawlerWorker):
@@ -96,9 +98,11 @@ class GrobidBlobWorker(SandcrawlerWorker):
self.consolidate_mode = 1
def process(self, blob):
+ self.counts['total'] += 1
assert blob
result = self.grobid_client.process_fulltext(blob, consolidate_mode=self.consolidate_mode)
result['file_meta'] = gen_file_metadata(blob)
result['key'] = result['file_meta']['sha1hex']
+ self.counts[result['status']] += 1
return result