aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xpython/ia_pdf_match.py10
-rwxr-xr-xpython/kafka_grobid.py4
-rw-r--r--python/sandcrawler/html.py2
-rw-r--r--python/sandcrawler/ingest.py8
-rw-r--r--python/sandcrawler/workers.py42
5 files changed, 34 insertions, 32 deletions
diff --git a/python/ia_pdf_match.py b/python/ia_pdf_match.py
index c5a5e11..20c65bb 100755
--- a/python/ia_pdf_match.py
+++ b/python/ia_pdf_match.py
@@ -27,7 +27,7 @@ import json
def parse(obj):
if obj['metadata']['identifier'].endswith('-test') or obj['metadata'].get('test'):
- sys.stderr.write('skip: test item\n')
+ print('skip: test item', file=sys.stderr)
return None
extid_type = None
@@ -36,14 +36,14 @@ def parse(obj):
extid_type = 'arxiv'
extid = obj['metadata'].get('source')
if not extid:
- sys.stderr.write('skip: no source\n')
+ print('skip: no source', file=sys.stderr)
return None
assert extid.startswith('http://arxiv.org/abs/')
extid = extid.replace('http://arxiv.org/abs/', '')
#print(extid)
assert '/' in extid or '.' in extid
if not 'v' in extid or not extid[-1].isdigit():
- sys.stderr.write('skip: non-versioned arxiv_id\n')
+ print('skip: non-versioned arxiv_id', file=sys.stderr)
return None
elif obj['metadata']['identifier'].startswith('paper-doi-10_'):
extid_type = 'doi'
@@ -67,9 +67,9 @@ def parse(obj):
pdf_file = f
break
if not pdf_file:
- sys.stderr.write('skip: no PDF found: {}\n'.format(obj['metadata']['identifier']))
+ print('skip: no PDF found: {}'.format(obj['metadata']['identifier']), file=sys.stderr)
#for f in obj['files']:
- # sys.stderr.write(f['format'] + "\n")
+ # print(f['format'], file=sys.stderr)
return None
assert pdf_file['name'].endswith('.pdf')
diff --git a/python/kafka_grobid.py b/python/kafka_grobid.py
index 05e48bd..b0920bc 100755
--- a/python/kafka_grobid.py
+++ b/python/kafka_grobid.py
@@ -285,10 +285,10 @@ class KafkaGrobidWorker:
producer.produce(json.dumps(grobid_output, sort_keys=True).encode('utf-8'))
sequential_failures = 0
else:
- sys.stderr.write("failed to extract: {}\n".format(status))
+ print("failed to extract: {}".format(status), file=sys.stderr)
sequential_failures += 1
if sequential_failures > 20:
- sys.stderr.write("too many failures in a row, bailing out\n")
+ print("too many failures in a row, bailing out", file=sys.stderr)
sys.exit(-1)
diff --git a/python/sandcrawler/html.py b/python/sandcrawler/html.py
index 2117eb0..780dcb2 100644
--- a/python/sandcrawler/html.py
+++ b/python/sandcrawler/html.py
@@ -30,7 +30,7 @@ def extract_fulltext_url(html_url, html_body):
elif url.startswith('http'):
return dict(pdf_url=url, technique='citation_pdf_url')
else:
- sys.stderr.write("malformed citation_pdf_url? {}\n".format(url))
+ print("malformed citation_pdf_url? {}".format(url), file=sys.stderr)
# ACS (and probably others) like:
# https://pubs.acs.org/doi/10.1021/acs.estlett.9b00379
diff --git a/python/sandcrawler/ingest.py b/python/sandcrawler/ingest.py
index d3f7043..077469a 100644
--- a/python/sandcrawler/ingest.py
+++ b/python/sandcrawler/ingest.py
@@ -70,7 +70,7 @@ class IngestFileWorker(SandcrawlerWorker):
# extraction didn't work as expected; fetch whatever SPN2 got
cdx = self.cdx_client.lookup_latest(url, follow_redirects=True)
if not cdx:
- sys.stderr.write("{}\n".format(cdx_list))
+ print("{}".format(cdx_list), file=sys.stderr)
raise SavePageNowError("Failed to find terminal capture from SPNv2")
else:
return self.spn_client.save_url_now_v1(url)
@@ -123,7 +123,7 @@ class IngestFileWorker(SandcrawlerWorker):
response['status'] = 'wayback-error'
response['error_message'] = str(e)
return response
- sys.stderr.write("CDX hit: {}\n".format(cdx_dict))
+ print("CDX hit: {}".format(cdx_dict), file=sys.stderr)
response['cdx'] = cdx_dict
# TODO: populate terminal
@@ -172,7 +172,7 @@ class IngestFileWorker(SandcrawlerWorker):
# do GROBID
response['grobid'] = self.grobid_client.process_fulltext(body)
- #sys.stderr.write("GROBID status: {}\n".format(response['grobid']['status']))
+ #print("GROBID status: {}".format(response['grobid']['status']), file=sys.stderr)
# TODO: optionally publish to Kafka here, but continue on failure (but
# send a sentry exception?)
@@ -185,7 +185,7 @@ class IngestFileWorker(SandcrawlerWorker):
response['grobid'].pop('tei_xml')
# Ok, now what?
- #sys.stderr.write("GOT TO END\n")
+ #print("GOT TO END", file=sys.stderr)
response['status'] = "success"
response['hit'] = True
return response
diff --git a/python/sandcrawler/workers.py b/python/sandcrawler/workers.py
index 25d567f..a8b03c7 100644
--- a/python/sandcrawler/workers.py
+++ b/python/sandcrawler/workers.py
@@ -47,7 +47,7 @@ class SandcrawlerWorker(object):
def finish(self):
if self.sink:
self.sink.finish()
- sys.stderr.write("Worker: {}\n".format(self.counts))
+ print("Worker: {}".format(self.counts), file=sys.stderr)
return self.counts
class MultiprocessWrapper(SandcrawlerWorker):
@@ -60,7 +60,7 @@ class MultiprocessWrapper(SandcrawlerWorker):
def push_batch(self, tasks):
self.counts['total'] += len(tasks)
- sys.stderr.write("... processing batch of: {}\n".format(len(tasks)))
+ print("... processing batch of: {}".format(len(tasks)), file=sys.stderr)
results = self.pool.map(self.worker.process, tasks)
for result in results:
if not result:
@@ -81,7 +81,7 @@ class MultiprocessWrapper(SandcrawlerWorker):
if self.sink:
self.sink.finish()
worker_counts = self.worker.finish()
- sys.stderr.write("Multiprocessing: {}\n".format(self.counts))
+ print("Multiprocessing: {}".format(self.counts), file=sys.stderr)
return worker_counts
class BlackholeSink(SandcrawlerWorker):
@@ -117,8 +117,8 @@ class KafkaSink(SandcrawlerWorker):
@staticmethod
def _fail_fast(err, msg):
if err is not None:
- sys.stderr.write("Kafka producer delivery error: {}\n".format(err))
- sys.stderr.write("Bailing out...\n")
+ print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
@@ -234,7 +234,7 @@ class JsonLinePusher(RecordPusher):
self.counts['pushed'] += len(batch)
batch = []
worker_counts = self.worker.finish()
- sys.stderr.write("JSON lines pushed: {}\n".format(self.counts))
+ print("JSON lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -281,7 +281,7 @@ class CdxLinePusher(RecordPusher):
self.counts['pushed'] += len(batch)
batch = []
worker_counts = self.worker.finish()
- sys.stderr.write("CDX lines pushed: {}\n".format(self.counts))
+ print("CDX lines pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -306,7 +306,7 @@ class ZipfilePusher(RecordPusher):
self.worker.push_record(data)
self.counts['pushed'] += 1
worker_counts = self.worker.finish()
- sys.stderr.write("ZIP PDFs pushed: {}\n".format(self.counts))
+ print("ZIP PDFs pushed: {}".format(self.counts), file=sys.stderr)
return self.counts
@@ -338,8 +338,9 @@ class KafkaJsonPusher(RecordPusher):
batch = self.consumer.consume(
num_messages=self.batch_size,
timeout=self.poll_interval)
- sys.stderr.write("... got {} kafka messages ({}sec poll interval)\n".format(
- len(batch), self.poll_interval))
+ print("... got {} kafka messages ({}sec poll interval)".format(
+ len(batch), self.poll_interval),
+ file=sys.stderr)
if not batch:
# TODO: could have some larger timeout here and
# self.worker.finish() if it's been more than, eg, a couple
@@ -355,7 +356,7 @@ class KafkaJsonPusher(RecordPusher):
records = [json.loads(msg.value().decode('utf-8')) for msg in batch]
self.worker.push_batch(records)
self.counts['pushed'] += len(batch)
- sys.stderr.write("Import counts: {}\n".format(self.worker.counts))
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
else:
for msg in batch:
self.counts['total'] += 1
@@ -363,7 +364,7 @@ class KafkaJsonPusher(RecordPusher):
self.worker.push_record(record)
self.counts['pushed'] += 1
if self.counts['total'] % 500 == 0:
- sys.stderr.write("Import counts: {}\n".format(self.worker.counts))
+ print("Import counts: {}".format(self.worker.counts), file=sys.stderr)
for msg in batch:
# locally store offsets of processed messages; will be
# auto-commited by librdkafka from this "stored" value
@@ -372,7 +373,7 @@ class KafkaJsonPusher(RecordPusher):
# TODO: should catch UNIX signals (HUP?) to shutdown cleanly, and/or
# commit the current batch if it has been lingering
worker_counts = self.worker.finish()
- sys.stderr.write("KafkaJson lines pushed: {}\n".format(self.counts))
+ print("KafkaJson lines pushed: {}".format(self.counts), file=sys.stderr)
self.consumer.close()
return self.counts
@@ -382,15 +383,15 @@ def make_kafka_consumer(hosts, consume_topic, group):
def fail_fast(err, partitions):
if err is not None:
- sys.stderr.write("Kafka consumer commit error: {}\n".format(err))
- sys.stderr.write("Bailing out...\n")
+ print("Kafka consumer commit error: {}".format(err), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
for p in partitions:
# check for partition-specific commit errors
if p.error:
- sys.stderr.write("Kafka consumer commit error: {}\n".format(p.error))
- sys.stderr.write("Bailing out...\n")
+ print("Kafka consumer commit error: {}".format(p.error), file=sys.stderr)
+ print("Bailing out...", file=sys.stderr)
# TODO: should it be sys.exit(-1)?
raise KafkaException(err)
#print("Kafka consumer commit successful")
@@ -419,8 +420,9 @@ def make_kafka_consumer(hosts, consume_topic, group):
for p in partitions:
if p.error:
raise KafkaException(p.error)
- sys.stderr.write("Kafka partitions rebalanced: {} / {}\n".format(
- consumer, partitions))
+ print("Kafka partitions rebalanced: {} / {}".format(
+ consumer, partitions),
+ file=sys.stderr)
consumer = Consumer(conf)
# NOTE: it's actually important that topic_name *not* be bytes (UTF-8
@@ -429,5 +431,5 @@ def make_kafka_consumer(hosts, consume_topic, group):
on_assign=on_rebalance,
on_revoke=on_rebalance,
)
- sys.stderr.write("Consuming from kafka topic {}, group {}\n".format(topic_name, group))
+ print("Consuming from kafka topic {}, group {}".format(topic_name, group), file=sys.stderr)
return consumer