diff options
Diffstat (limited to 'python')
-rwxr-xr-x | python/ia_pdf_match.py | 10 | ||||
-rwxr-xr-x | python/kafka_grobid.py | 4 | ||||
-rw-r--r-- | python/sandcrawler/html.py | 2 | ||||
-rw-r--r-- | python/sandcrawler/ingest.py | 8 | ||||
-rw-r--r-- | python/sandcrawler/workers.py | 42 |
5 files changed, 34 insertions, 32 deletions
diff --git a/python/ia_pdf_match.py b/python/ia_pdf_match.py index c5a5e11..20c65bb 100755 --- a/python/ia_pdf_match.py +++ b/python/ia_pdf_match.py @@ -27,7 +27,7 @@ import json def parse(obj): if obj['metadata']['identifier'].endswith('-test') or obj['metadata'].get('test'): - sys.stderr.write('skip: test item\n') + print('skip: test item', file=sys.stderr) return None extid_type = None @@ -36,14 +36,14 @@ def parse(obj): extid_type = 'arxiv' extid = obj['metadata'].get('source') if not extid: - sys.stderr.write('skip: no source\n') + print('skip: no source', file=sys.stderr) return None assert extid.startswith('http://arxiv.org/abs/') extid = extid.replace('http://arxiv.org/abs/', '') #print(extid) assert '/' in extid or '.' in extid if not 'v' in extid or not extid[-1].isdigit(): - sys.stderr.write('skip: non-versioned arxiv_id\n') + print('skip: non-versioned arxiv_id', file=sys.stderr) return None elif obj['metadata']['identifier'].startswith('paper-doi-10_'): extid_type = 'doi' @@ -67,9 +67,9 @@ def parse(obj): pdf_file = f break if not pdf_file: - sys.stderr.write('skip: no PDF found: {}\n'.format(obj['metadata']['identifier'])) + print('skip: no PDF found: {}'.format(obj['metadata']['identifier']), file=sys.stderr) #for f in obj['files']: - # sys.stderr.write(f['format'] + "\n") + # print(f['format'], file=sys.stderr) return None assert pdf_file['name'].endswith('.pdf') diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py index 05e48bd..b0920bc 100755 --- a/python/kafka_grobid.py +++ b/python/kafka_grobid.py @@ -285,10 +285,10 @@ class KafkaGrobidWorker: producer.produce(json.dumps(grobid_output, sort_keys=True).encode('utf-8')) sequential_failures = 0 else: - sys.stderr.write("failed to extract: {}\n".format(status)) + print("failed to extract: {}".format(status), file=sys.stderr) sequential_failures += 1 if sequential_failures > 20: - sys.stderr.write("too many failures in a row, bailing out\n") + print("too many failures in a row, bailing out", file=sys.stderr) sys.exit(-1) diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py index 2117eb0..780dcb2 100644 --- a/python/sandcrawler/html.py +++ b/python/sandcrawler/html.py @@ -30,7 +30,7 @@ def extract_fulltext_url(html_url, html_body): elif url.startswith('http'): return dict(pdf_url=url, technique='citation_pdf_url') else: - sys.stderr.write("malformed citation_pdf_url? {}\n".format(url)) + print("malformed citation_pdf_url? {}".format(url), file=sys.stderr) # ACS (and probably others) like: # https://pubs.acs.org/doi/10.1021/acs.estlett.9b00379 diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py index d3f7043..077469a 100644 --- a/python/sandcrawler/ingest.py +++ b/python/sandcrawler/ingest.py @@ -70,7 +70,7 @@ class IngestFileWorker(SandcrawlerWorker): # extraction didn't work as expected; fetch whatever SPN2 got cdx = self.cdx_client.lookup_latest(url, follow_redirects=True) if not cdx: - sys.stderr.write("{}\n".format(cdx_list)) + print("{}".format(cdx_list), file=sys.stderr) raise SavePageNowError("Failed to find terminal capture from SPNv2") else: return self.spn_client.save_url_now_v1(url) @@ -123,7 +123,7 @@ class IngestFileWorker(SandcrawlerWorker): response['status'] = 'wayback-error' response['error_message'] = str(e) return response - sys.stderr.write("CDX hit: {}\n".format(cdx_dict)) + print("CDX hit: {}".format(cdx_dict), file=sys.stderr) response['cdx'] = cdx_dict # TODO: populate terminal @@ -172,7 +172,7 @@ class IngestFileWorker(SandcrawlerWorker): # do GROBID response['grobid'] = self.grobid_client.process_fulltext(body) - #sys.stderr.write("GROBID status: {}\n".format(response['grobid']['status'])) + #print("GROBID status: {}".format(response['grobid']['status']), file=sys.stderr) # TODO: optionally publish to Kafka here, but continue on failure (but # send a sentry exception?) @@ -185,7 +185,7 @@ class IngestFileWorker(SandcrawlerWorker): response['grobid'].pop('tei_xml') # Ok, now what? - #sys.stderr.write("GOT TO END\n") + #print("GOT TO END", file=sys.stderr) response['status'] = "success" response['hit'] = True return response diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py index 25d567f..a8b03c7 100644 --- a/python/sandcrawler/workers.py +++ b/python/sandcrawler/workers.py @@ -47,7 +47,7 @@ class SandcrawlerWorker(object): def finish(self): if self.sink: self.sink.finish() - sys.stderr.write("Worker: {}\n".format(self.counts)) + print("Worker: {}".format(self.counts), file=sys.stderr) return self.counts class MultiprocessWrapper(SandcrawlerWorker): @@ -60,7 +60,7 @@ class MultiprocessWrapper(SandcrawlerWorker): def push_batch(self, tasks): self.counts['total'] += len(tasks) - sys.stderr.write("... processing batch of: {}\n".format(len(tasks))) + print("... processing batch of: {}".format(len(tasks)), file=sys.stderr) results = self.pool.map(self.worker.process, tasks) for result in results: if not result: @@ -81,7 +81,7 @@ class MultiprocessWrapper(SandcrawlerWorker): if self.sink: self.sink.finish() worker_counts = self.worker.finish() - sys.stderr.write("Multiprocessing: {}\n".format(self.counts)) + print("Multiprocessing: {}".format(self.counts), file=sys.stderr) return worker_counts class BlackholeSink(SandcrawlerWorker): @@ -117,8 +117,8 @@ class KafkaSink(SandcrawlerWorker): @staticmethod def _fail_fast(err, msg): if err is not None: - sys.stderr.write("Kafka producer delivery error: {}\n".format(err)) - sys.stderr.write("Bailing out...\n") + print("Kafka producer delivery error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) @@ -234,7 +234,7 @@ class JsonLinePusher(RecordPusher): self.counts['pushed'] += len(batch) batch = [] worker_counts = self.worker.finish() - sys.stderr.write("JSON lines pushed: {}\n".format(self.counts)) + print("JSON lines pushed: {}".format(self.counts), file=sys.stderr) return self.counts @@ -281,7 +281,7 @@ class CdxLinePusher(RecordPusher): self.counts['pushed'] += len(batch) batch = [] worker_counts = self.worker.finish() - sys.stderr.write("CDX lines pushed: {}\n".format(self.counts)) + print("CDX lines pushed: {}".format(self.counts), file=sys.stderr) return self.counts @@ -306,7 +306,7 @@ class ZipfilePusher(RecordPusher): self.worker.push_record(data) self.counts['pushed'] += 1 worker_counts = self.worker.finish() - sys.stderr.write("ZIP PDFs pushed: {}\n".format(self.counts)) + print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr) return self.counts @@ -338,8 +338,9 @@ class KafkaJsonPusher(RecordPusher): batch = self.consumer.consume( num_messages=self.batch_size, timeout=self.poll_interval) - sys.stderr.write("... got {} kafka messages ({}sec poll interval)\n".format( - len(batch), self.poll_interval)) + print("... got {} kafka messages ({}sec poll interval)".format( + len(batch), self.poll_interval), + file=sys.stderr) if not batch: # TODO: could have some larger timeout here and # self.worker.finish() if it's been more than, eg, a couple @@ -355,7 +356,7 @@ class KafkaJsonPusher(RecordPusher): records = [json.loads(msg.value().decode('utf-8')) for msg in batch] self.worker.push_batch(records) self.counts['pushed'] += len(batch) - sys.stderr.write("Import counts: {}\n".format(self.worker.counts)) + print("Import counts: {}".format(self.worker.counts), file=sys.stderr) else: for msg in batch: self.counts['total'] += 1 @@ -363,7 +364,7 @@ class KafkaJsonPusher(RecordPusher): self.worker.push_record(record) self.counts['pushed'] += 1 if self.counts['total'] % 500 == 0: - sys.stderr.write("Import counts: {}\n".format(self.worker.counts)) + print("Import counts: {}".format(self.worker.counts), file=sys.stderr) for msg in batch: # locally store offsets of processed messages; will be # auto-commited by librdkafka from this "stored" value @@ -372,7 +373,7 @@ class KafkaJsonPusher(RecordPusher): # TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or # commit the current batch if it has been lingering worker_counts = self.worker.finish() - sys.stderr.write("KafkaJson lines pushed: {}\n".format(self.counts)) + print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr) self.consumer.close() return self.counts @@ -382,15 +383,15 @@ def make_kafka_consumer(hosts, consume_topic, group): def fail_fast(err, partitions): if err is not None: - sys.stderr.write("Kafka consumer commit error: {}\n".format(err)) - sys.stderr.write("Bailing out...\n") + print("Kafka consumer commit error: {}".format(err), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) for p in partitions: # check for partition-specific commit errors if p.error: - sys.stderr.write("Kafka consumer commit error: {}\n".format(p.error)) - sys.stderr.write("Bailing out...\n") + print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr) + print("Bailing out...", file=sys.stderr) # TODO: should it be sys.exit(-1)? raise KafkaException(err) #print("Kafka consumer commit successful") @@ -419,8 +420,9 @@ def make_kafka_consumer(hosts, consume_topic, group): for p in partitions: if p.error: raise KafkaException(p.error) - sys.stderr.write("Kafka partitions rebalanced: {} / {}\n".format( - consumer, partitions)) + print("Kafka partitions rebalanced: {} / {}".format( + consumer, partitions), + file=sys.stderr) consumer = Consumer(conf) # NOTE: it's actually important that topic_name *not* be bytes (UTF-8 @@ -429,5 +431,5 @@ def make_kafka_consumer(hosts, consume_topic, group): on_assign=on_rebalance, on_revoke=on_rebalance, ) - sys.stderr.write("Consuming from kafka topic {}, group {}\n".format(topic_name, group)) + print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr) return consumer |