summaryrefslogtreecommitdiffstats
path: root/python/fatcat_tools/harvest
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:59 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:59 -0700
commit31d1a6a713d177990609767d508209ced19ca396 (patch)
treea628a57bdb373669394a6b520102b1b4b5ffe7da /python/fatcat_tools/harvest
parent9dc891b8098542bb089c8c47098b60a8beb76a53 (diff)
downloadfatcat-31d1a6a713d177990609767d508209ced19ca396.tar.gz
fatcat-31d1a6a713d177990609767d508209ced19ca396.zip
fmt (black): fatcat_tools/
Diffstat (limited to 'python/fatcat_tools/harvest')
-rw-r--r--python/fatcat_tools/harvest/doi_registrars.py145
-rw-r--r--python/fatcat_tools/harvest/harvest_common.py83
-rw-r--r--python/fatcat_tools/harvest/oaipmh.py57
-rw-r--r--python/fatcat_tools/harvest/pubmed.py159
4 files changed, 271 insertions, 173 deletions
diff --git a/python/fatcat_tools/harvest/doi_registrars.py b/python/fatcat_tools/harvest/doi_registrars.py
index d441d495..dd48e256 100644
--- a/python/fatcat_tools/harvest/doi_registrars.py
+++ b/python/fatcat_tools/harvest/doi_registrars.py
@@ -1,4 +1,3 @@
-
import json
import sys
import time
@@ -59,29 +58,35 @@ class HarvestCrossrefWorker:
to be careful how state is serialized back into kafka.
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
- api_host_url="https://api.crossref.org/works", start_date=None,
- end_date=None):
+ def __init__(
+ self,
+ kafka_hosts,
+ produce_topic,
+ state_topic,
+ contact_email,
+ api_host_url="https://api.crossref.org/works",
+ start_date=None,
+ end_date=None,
+ ):
self.api_host_url = api_host_url
self.produce_topic = produce_topic
self.state_topic = state_topic
self.contact_email = contact_email
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes
}
self.state = HarvestState(start_date, end_date)
self.state.initialize_from_kafka(self.state_topic, self.kafka_config)
- self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
+ self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
self.api_batch_size = 50
self.name = "Crossref"
self.producer = self._kafka_producer()
def _kafka_producer(self):
-
def fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
@@ -92,46 +97,53 @@ class HarvestCrossrefWorker:
self._kafka_fail_fast = fail_fast
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
return Producer(producer_conf)
def params(self, date_str):
- filter_param = 'from-update-date:{},until-update-date:{}'.format(
- date_str, date_str)
+ filter_param = "from-update-date:{},until-update-date:{}".format(date_str, date_str)
return {
- 'filter': filter_param,
- 'rows': self.api_batch_size,
- 'cursor': '*',
+ "filter": filter_param,
+ "rows": self.api_batch_size,
+ "cursor": "*",
}
def update_params(self, params, resp):
- params['cursor'] = resp['message']['next-cursor']
+ params["cursor"] = resp["message"]["next-cursor"]
return params
def extract_key(self, obj):
- return obj['DOI'].encode('utf-8')
+ return obj["DOI"].encode("utf-8")
def fetch_date(self, date):
date_str = date.isoformat()
params = self.params(date_str)
http_session = requests_retry_session()
- http_session.headers.update({
- 'User-Agent': 'fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests'.format(
- self.contact_email),
- })
+ http_session.headers.update(
+ {
+ "User-Agent": "fatcat_tools/0.1.0 (https://fatcat.wiki; mailto:{}) python-requests".format(
+ self.contact_email
+ ),
+ }
+ )
count = 0
while True:
http_resp = http_session.get(self.api_host_url, params=params)
if http_resp.status_code == 503:
# crude backoff; now redundant with session exponential
# backoff, but allows for longer backoff/downtime on remote end
- print("got HTTP {}, pausing for 30 seconds".format(http_resp.status_code), file=sys.stderr)
+ print(
+ "got HTTP {}, pausing for 30 seconds".format(http_resp.status_code),
+ file=sys.stderr,
+ )
# keep kafka producer connection alive
self.producer.poll(0)
time.sleep(30.0)
@@ -143,19 +155,27 @@ class HarvestCrossrefWorker:
except json.JSONDecodeError as exc:
# Datacite API returned HTTP 200, but JSON seemed unparseable.
# It might be a glitch, so we retry.
- print("failed to decode body from {}: {}".format(http_resp.url, resp_body), file=sys.stderr)
+ print(
+ "failed to decode body from {}: {}".format(http_resp.url, resp_body),
+ file=sys.stderr,
+ )
raise exc
items = self.extract_items(resp)
count += len(items)
- print("... got {} ({} of {}), HTTP fetch took {}".format(len(items), count,
- self.extract_total(resp), http_resp.elapsed), file=sys.stderr)
- #print(json.dumps(resp))
+ print(
+ "... got {} ({} of {}), HTTP fetch took {}".format(
+ len(items), count, self.extract_total(resp), http_resp.elapsed
+ ),
+ file=sys.stderr,
+ )
+ # print(json.dumps(resp))
for work in items:
self.producer.produce(
self.produce_topic,
- json.dumps(work).encode('utf-8'),
+ json.dumps(work).encode("utf-8"),
key=self.extract_key(work),
- on_delivery=self._kafka_fail_fast)
+ on_delivery=self._kafka_fail_fast,
+ )
self.producer.poll(0)
if len(items) < self.api_batch_size:
break
@@ -163,10 +183,10 @@ class HarvestCrossrefWorker:
self.producer.flush()
def extract_items(self, resp):
- return resp['message']['items']
+ return resp["message"]["items"]
def extract_total(self, resp):
- return resp['message']['total-results']
+ return resp["message"]["total-results"]
def run(self, continuous=False):
@@ -175,9 +195,9 @@ class HarvestCrossrefWorker:
if current:
print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
- self.state.complete(current,
- kafka_topic=self.state_topic,
- kafka_config=self.kafka_config)
+ self.state.complete(
+ current, kafka_topic=self.state_topic, kafka_config=self.kafka_config
+ )
continue
if continuous:
@@ -200,16 +220,25 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
could/should use this script for that, and dump to JSON?
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic, contact_email,
- api_host_url="https://api.datacite.org/dois",
- start_date=None, end_date=None):
- super().__init__(kafka_hosts=kafka_hosts,
- produce_topic=produce_topic,
- state_topic=state_topic,
- api_host_url=api_host_url,
- contact_email=contact_email,
- start_date=start_date,
- end_date=end_date)
+ def __init__(
+ self,
+ kafka_hosts,
+ produce_topic,
+ state_topic,
+ contact_email,
+ api_host_url="https://api.datacite.org/dois",
+ start_date=None,
+ end_date=None,
+ ):
+ super().__init__(
+ kafka_hosts=kafka_hosts,
+ produce_topic=produce_topic,
+ state_topic=state_topic,
+ api_host_url=api_host_url,
+ contact_email=contact_email,
+ start_date=start_date,
+ end_date=end_date,
+ )
# for datecite, it's "from-update-date"
self.name = "Datacite"
@@ -219,19 +248,21 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
Dates have to be supplied in 2018-10-27T22:36:30.000Z format.
"""
return {
- 'query': 'updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]'.format(date_str, date_str),
- 'page[size]': self.api_batch_size,
- 'page[cursor]': 1,
+ "query": "updated:[{}T00:00:00.000Z TO {}T23:59:59.999Z]".format(
+ date_str, date_str
+ ),
+ "page[size]": self.api_batch_size,
+ "page[cursor]": 1,
}
def extract_items(self, resp):
- return resp['data']
+ return resp["data"]
def extract_total(self, resp):
- return resp['meta']['total']
+ return resp["meta"]["total"]
def extract_key(self, obj):
- return obj['attributes']['doi'].encode('utf-8')
+ return obj["attributes"]["doi"].encode("utf-8")
def update_params(self, params, resp):
"""
@@ -245,9 +276,9 @@ class HarvestDataciteWorker(HarvestCrossrefWorker):
https://github.com/datacite/datacite/issues/897 (HTTP 400)
https://github.com/datacite/datacite/issues/898 (HTTP 500)
"""
- parsed = urlparse(resp['links']['next'])
- page_cursor = parse_qs(parsed.query).get('page[cursor]')
+ parsed = urlparse(resp["links"]["next"])
+ page_cursor = parse_qs(parsed.query).get("page[cursor]")
if not page_cursor:
- raise ValueError('no page[cursor] in .links.next')
- params['page[cursor]'] = page_cursor[0]
+ raise ValueError("no page[cursor] in .links.next")
+ params["page[cursor]"] = page_cursor[0]
return params
diff --git a/python/fatcat_tools/harvest/harvest_common.py b/python/fatcat_tools/harvest/harvest_common.py
index 45c2b8ea..fda0dc62 100644
--- a/python/fatcat_tools/harvest/harvest_common.py
+++ b/python/fatcat_tools/harvest/harvest_common.py
@@ -1,4 +1,3 @@
-
import datetime
import json
import sys
@@ -14,8 +13,10 @@ from requests.packages.urllib3.util.retry import Retry # pylint: disable=import
# Used for parsing ISO date format (YYYY-MM-DD)
DATE_FMT = "%Y-%m-%d"
-def requests_retry_session(retries=10, backoff_factor=3,
- status_forcelist=(500, 502, 504), session=None):
+
+def requests_retry_session(
+ retries=10, backoff_factor=3, status_forcelist=(500, 502, 504), session=None
+):
"""
From: https://www.peterbe.com/plog/best-practice-with-retries-with-requests
"""
@@ -28,10 +29,11 @@ def requests_retry_session(retries=10, backoff_factor=3,
status_forcelist=status_forcelist,
)
adapter = HTTPAdapter(max_retries=retry)
- session.mount('http://', adapter)
- session.mount('https://', adapter)
+ session.mount("http://", adapter)
+ session.mount("https://", adapter)
return session
+
class HarvestState:
"""
First version of this works with full days (dates)
@@ -57,8 +59,9 @@ class HarvestState:
self.enqueue_period(start_date, end_date, catchup_days)
def __str__(self):
- return '<HarvestState to_process={}, completed={}>'.format(
- len(self.to_process), len(self.completed))
+ return "<HarvestState to_process={}, completed={}>".format(
+ len(self.to_process), len(self.completed)
+ )
def enqueue_period(self, start_date=None, end_date=None, catchup_days=14):
"""
@@ -92,7 +95,9 @@ class HarvestState:
"""
if continuous:
# enqueue yesterday
- self.enqueue_period(start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1))
+ self.enqueue_period(
+ start_date=datetime.datetime.utcnow().date() - datetime.timedelta(days=1)
+ )
if not self.to_process:
return None
return sorted(list(self.to_process))[0]
@@ -105,8 +110,8 @@ class HarvestState:
state stored on disk or in Kafka.
"""
state = json.loads(state_json)
- if 'completed-date' in state:
- date = datetime.datetime.strptime(state['completed-date'], DATE_FMT).date()
+ if "completed-date" in state:
+ date = datetime.datetime.strptime(state["completed-date"], DATE_FMT).date()
self.complete(date)
def complete(self, date, kafka_topic=None, kafka_config=None):
@@ -123,12 +128,14 @@ class HarvestState:
except KeyError:
pass
self.completed.add(date)
- state_json = json.dumps({
- 'in-progress-dates': [str(d) for d in self.to_process],
- 'completed-date': str(date),
- }).encode('utf-8')
+ state_json = json.dumps(
+ {
+ "in-progress-dates": [str(d) for d in self.to_process],
+ "completed-date": str(date),
+ }
+ ).encode("utf-8")
if kafka_topic:
- assert(kafka_config)
+ assert kafka_config
def fail_fast(err, msg):
if err:
@@ -136,17 +143,16 @@ class HarvestState:
print("Committing status to Kafka: {}".format(kafka_topic), file=sys.stderr)
producer_conf = kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
- producer.produce(
- kafka_topic,
- state_json,
- on_delivery=fail_fast)
+ producer.produce(kafka_topic, state_json, on_delivery=fail_fast)
producer.flush()
return state_json
@@ -166,22 +172,25 @@ class HarvestState:
raise KafkaException(err)
conf = kafka_config.copy()
- conf.update({
- 'group.id': 'dummy_init_group', # should never be committed
- 'enable.auto.commit': False,
- 'auto.offset.reset': 'earliest',
- 'session.timeout.ms': 10000,
- })
+ conf.update(
+ {
+ "group.id": "dummy_init_group", # should never be committed
+ "enable.auto.commit": False,
+ "auto.offset.reset": "earliest",
+ "session.timeout.ms": 10000,
+ }
+ )
consumer = Consumer(conf)
# this watermark fetch is mostly to ensure we are connected to broker and
# fail fast if not, but we also confirm that we read to end below.
hwm = consumer.get_watermark_offsets(
- TopicPartition(kafka_topic, 0),
- timeout=5.0,
- cached=False)
+ TopicPartition(kafka_topic, 0), timeout=5.0, cached=False
+ )
if not hwm:
- raise Exception("Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic))
+ raise Exception(
+ "Kafka consumer timeout, or topic {} doesn't exist".format(kafka_topic)
+ )
consumer.assign([TopicPartition(kafka_topic, 0, 0)])
c = 0
@@ -191,8 +200,8 @@ class HarvestState:
break
if msg.error():
raise KafkaException(msg.error())
- #sys.stdout.write('.')
- self.update(msg.value().decode('utf-8'))
+ # sys.stdout.write('.')
+ self.update(msg.value().decode("utf-8"))
c += 1
consumer.close()
diff --git a/python/fatcat_tools/harvest/oaipmh.py b/python/fatcat_tools/harvest/oaipmh.py
index 0eb0343d..40d1c853 100644
--- a/python/fatcat_tools/harvest/oaipmh.py
+++ b/python/fatcat_tools/harvest/oaipmh.py
@@ -1,4 +1,3 @@
-
import sys
import time
@@ -25,19 +24,18 @@ class HarvestOaiPmhWorker:
would want something similar operationally. Oh well!
"""
- def __init__(self, kafka_hosts, produce_topic, state_topic,
- start_date=None, end_date=None):
+ def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
self.produce_topic = produce_topic
self.state_topic = state_topic
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes
}
- self.loop_sleep = 60*60 # how long to wait, in seconds, between date checks
+ self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
- self.endpoint_url = None # needs override
+ self.endpoint_url = None # needs override
self.metadata_prefix = None # needs override
self.name = "unnamed"
self.state = HarvestState(start_date, end_date)
@@ -45,7 +43,6 @@ class HarvestOaiPmhWorker:
print(self.state, file=sys.stderr)
def fetch_date(self, date):
-
def fail_fast(err, msg):
if err is not None:
print("Kafka producer delivery error: {}".format(err), file=sys.stderr)
@@ -54,12 +51,14 @@ class HarvestOaiPmhWorker:
raise KafkaException(err)
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
producer = Producer(producer_conf)
api = sickle.Sickle(self.endpoint_url, max_retries=5, retry_status_codes=[503])
@@ -67,13 +66,18 @@ class HarvestOaiPmhWorker:
# this dict kwargs hack is to work around 'from' as a reserved python keyword
# recommended by sickle docs
try:
- records = api.ListRecords(**{
- 'metadataPrefix': self.metadata_prefix,
- 'from': date_str,
- 'until': date_str,
- })
+ records = api.ListRecords(
+ **{
+ "metadataPrefix": self.metadata_prefix,
+ "from": date_str,
+ "until": date_str,
+ }
+ )
except sickle.oaiexceptions.NoRecordsMatch:
- print("WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str), file=sys.stderr)
+ print(
+ "WARN: no OAI-PMH records for this date: {} (UTC)".format(date_str),
+ file=sys.stderr,
+ )
return
count = 0
@@ -83,9 +87,10 @@ class HarvestOaiPmhWorker:
print("... up to {}".format(count), file=sys.stderr)
producer.produce(
self.produce_topic,
- item.raw.encode('utf-8'),
- key=item.header.identifier.encode('utf-8'),
- on_delivery=fail_fast)
+ item.raw.encode("utf-8"),
+ key=item.header.identifier.encode("utf-8"),
+ on_delivery=fail_fast,
+ )
producer.flush()
def run(self, continuous=False):
@@ -95,9 +100,9 @@ class HarvestOaiPmhWorker:
if current:
print("Fetching DOIs updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
- self.state.complete(current,
- kafka_topic=self.state_topic,
- kafka_config=self.kafka_config)
+ self.state.complete(
+ current, kafka_topic=self.state_topic, kafka_config=self.kafka_config
+ )
continue
if continuous:
diff --git a/python/fatcat_tools/harvest/pubmed.py b/python/fatcat_tools/harvest/pubmed.py
index ee55f4eb..0f33f334 100644
--- a/python/fatcat_tools/harvest/pubmed.py
+++ b/python/fatcat_tools/harvest/pubmed.py
@@ -60,14 +60,15 @@ class PubmedFTPWorker:
<tr>
"""
+
def __init__(self, kafka_hosts, produce_topic, state_topic, start_date=None, end_date=None):
- self.name = 'Pubmed'
- self.host = 'ftp.ncbi.nlm.nih.gov'
+ self.name = "Pubmed"
+ self.host = "ftp.ncbi.nlm.nih.gov"
self.produce_topic = produce_topic
self.state_topic = state_topic
self.kafka_config = {
- 'bootstrap.servers': kafka_hosts,
- 'message.max.bytes': 20000000, # ~20 MBytes; broker is ~50 MBytes
+ "bootstrap.servers": kafka_hosts,
+ "message.max.bytes": 20000000, # ~20 MBytes; broker is ~50 MBytes
}
self.loop_sleep = 60 * 60 # how long to wait, in seconds, between date checks
self.state = HarvestState(start_date, end_date)
@@ -86,12 +87,14 @@ class PubmedFTPWorker:
self._kafka_fail_fast = fail_fast
producer_conf = self.kafka_config.copy()
- producer_conf.update({
- 'delivery.report.only.error': True,
- 'default.topic.config': {
- 'request.required.acks': -1, # all brokers must confirm
- },
- })
+ producer_conf.update(
+ {
+ "delivery.report.only.error": True,
+ "default.topic.config": {
+ "request.required.acks": -1, # all brokers must confirm
+ },
+ }
+ )
return Producer(producer_conf)
def fetch_date(self, date):
@@ -105,24 +108,35 @@ class PubmedFTPWorker:
if self.date_file_map is None:
raise ValueError("cannot fetch date without date file mapping")
- date_str = date.strftime('%Y-%m-%d')
+ date_str = date.strftime("%Y-%m-%d")
paths = self.date_file_map.get(date_str)
if paths is None:
- print("WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(date_str, self.date_file_map), file=sys.stderr)
+ print(
+ "WARN: no pubmed update for this date: {} (UTC), available dates were: {}".format(
+ date_str, self.date_file_map
+ ),
+ file=sys.stderr,
+ )
return False
count = 0
for path in paths:
# Fetch and decompress file.
url = "ftp://{}{}".format(self.host, path)
- filename = ftpretr(url, proxy_hostport="159.69.240.245:15201") # TODO: proxy obsolete, when networking issue is resolved
- with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as decomp:
+ filename = ftpretr(
+ url, proxy_hostport="159.69.240.245:15201"
+ ) # TODO: proxy obsolete, when networking issue is resolved
+ with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as decomp:
try:
gzf = gzip.open(filename)
shutil.copyfileobj(gzf, decomp)
except zlib.error as exc:
- print('[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)'.format(
- url, exc), file=sys.stderr)
+ print(
+ "[skip] retrieving {} failed with {} (maybe empty, missing or broken gzip)".format(
+ url, exc
+ ),
+ file=sys.stderr,
+ )
continue
# Here, blob is the unparsed XML; we peek into it to use PMID as
@@ -131,15 +145,17 @@ class PubmedFTPWorker:
# WARNING: Parsing foreign XML exposes us at some
# https://docs.python.org/3/library/xml.html#xml-vulnerabilities
# here.
- for blob in xmlstream(decomp.name, 'PubmedArticle', encoding='utf-8'):
- soup = BeautifulSoup(blob, 'xml')
- pmid = soup.find('PMID')
+ for blob in xmlstream(decomp.name, "PubmedArticle", encoding="utf-8"):
+ soup = BeautifulSoup(blob, "xml")
+ pmid = soup.find("PMID")
if pmid is None:
raise ValueError("no PMID found, please adjust identifier extraction")
count += 1
if count % 50 == 0:
print("... up to {}".format(count), file=sys.stderr)
- self.producer.produce(self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast)
+ self.producer.produce(
+ self.produce_topic, blob, key=pmid.text, on_delivery=self._kafka_fail_fast
+ )
self.producer.flush()
os.remove(filename)
@@ -151,13 +167,17 @@ class PubmedFTPWorker:
while True:
self.date_file_map = generate_date_file_map(host=self.host)
if len(self.date_file_map) == 0:
- raise ValueError("map from dates to files should not be empty, maybe the HTML changed?")
+ raise ValueError(
+ "map from dates to files should not be empty, maybe the HTML changed?"
+ )
current = self.state.next_span(continuous)
if current:
print("Fetching citations updated on {} (UTC)".format(current), file=sys.stderr)
self.fetch_date(current)
- self.state.complete(current, kafka_topic=self.state_topic, kafka_config=self.kafka_config)
+ self.state.complete(
+ current, kafka_topic=self.state_topic, kafka_config=self.kafka_config
+ )
continue
if continuous:
@@ -168,7 +188,7 @@ class PubmedFTPWorker:
print("{} FTP ingest caught up".format(self.name))
-def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
+def generate_date_file_map(host="ftp.ncbi.nlm.nih.gov"):
"""
Generate a DefaultDict[string, set] mapping dates to absolute filepaths on
the server (mostly we have one file, but sometimes more).
@@ -176,14 +196,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
Example: {"2020-01-02": set(["/pubmed/updatefiles/pubmed20n1016.xml.gz"]), ...}
"""
mapping = collections.defaultdict(set)
- pattern = re.compile(r'Filename: ([^ ]*.xml) -- Created: ([^<]*)')
+ pattern = re.compile(r"Filename: ([^ ]*.xml) -- Created: ([^<]*)")
ftp = ftplib.FTP(host)
ftp.login()
- filenames = ftp.nlst('/pubmed/updatefiles')
+ filenames = ftp.nlst("/pubmed/updatefiles")
retries, retry_delay = 10, 60
for name in filenames:
- if not name.endswith('.html'):
+ if not name.endswith(".html"):
continue
sio = io.StringIO()
for i in range(retries):
@@ -201,10 +221,14 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
ftp = ftplib.FTP(host)
ftp.login()
sio.truncate(0)
- ftp.retrlines('RETR {}'.format(name), sio.write)
+ ftp.retrlines("RETR {}".format(name), sio.write)
except (EOFError, ftplib.error_temp, socket.gaierror, BrokenPipeError) as exc:
- print("ftp retr on {} failed with {} ({}) ({} retries left)".format(
- name, exc, type(exc), retries - (i + 1)), file=sys.stderr)
+ print(
+ "ftp retr on {} failed with {} ({}) ({} retries left)".format(
+ name, exc, type(exc), retries - (i + 1)
+ ),
+ file=sys.stderr,
+ )
if i + 1 == retries:
raise
else:
@@ -214,16 +238,24 @@ def generate_date_file_map(host='ftp.ncbi.nlm.nih.gov'):
contents = sio.getvalue()
match = pattern.search(contents)
if match is None:
- print('pattern miss in {} on: {}, may need to adjust pattern: {}'.format(name, contents, pattern), file=sys.stderr)
+ print(
+ "pattern miss in {} on: {}, may need to adjust pattern: {}".format(
+ name, contents, pattern
+ ),
+ file=sys.stderr,
+ )
continue
- filename, filedate = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019')
+ (
+ filename,
+ filedate,
+ ) = match.groups() # ('pubmed20n1017.xml', 'Tue Dec 17 15:23:32 EST 2019')
date = dateparser.parse(filedate)
- fullpath = '/pubmed/updatefiles/{}.gz'.format(filename)
- date_str = date.strftime('%Y-%m-%d')
+ fullpath = "/pubmed/updatefiles/{}.gz".format(filename)
+ date_str = date.strftime("%Y-%m-%d")
mapping[date_str].add(fullpath)
- print('added entry for {}: {}'.format(date_str, fullpath), file=sys.stderr)
+ print("added entry for {}: {}".format(date_str, fullpath), file=sys.stderr)
- print('generated date-file mapping for {} dates'.format(len(mapping)), file=sys.stderr)
+ print("generated date-file mapping for {} dates".format(len(mapping)), file=sys.stderr)
return mapping
@@ -241,20 +273,29 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
when we encountered EOFError while talking to the FTP server. Retry delay in seconds.
"""
if proxy_hostport is not None:
- return ftpretr_via_http_proxy(url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay)
+ return ftpretr_via_http_proxy(
+ url, proxy_hostport, max_retries=max_retries, retry_delay=retry_delay
+ )
parsed = urlparse(url)
server, path = parsed.netloc, parsed.path
for i in range(max_retries):
try:
ftp = ftplib.FTP(server)
ftp.login()
- with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f:
- print('retrieving {} from {} to {} ...'.format(path, server, f.name), file=sys.stderr)
- ftp.retrbinary('RETR %s' % path, f.write)
+ with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f:
+ print(
+ "retrieving {} from {} to {} ...".format(path, server, f.name),
+ file=sys.stderr,
+ )
+ ftp.retrbinary("RETR %s" % path, f.write)
ftp.close()
except EOFError as exc:
- print("ftp retrbinary on {} failed with {} ({}) ({} retries left)".format(
- path, exc, type(exc), max_retries - (i + 1)), file=sys.stderr)
+ print(
+ "ftp retrbinary on {} failed with {} ({}) ({} retries left)".format(
+ path, exc, type(exc), max_retries - (i + 1)
+ ),
+ file=sys.stderr,
+ )
if i + 1 == max_retries:
raise
else:
@@ -263,7 +304,9 @@ def ftpretr(url, max_retries=10, retry_delay=1, proxy_hostport=None):
return f.name
-def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1):
+def ftpretr_via_http_proxy(
+ url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retries=10, retry_delay=1
+):
"""
Fetch file from FTP via external HTTP proxy, e.g. ftp.host.com:/a/b/c would
be retrievable via proxy.com/a/b/c; (in 09/2021 we used
@@ -276,19 +319,23 @@ def ftpretr_via_http_proxy(url, proxy_hostport="ftp.ncbi.nlm.nih.gov", max_retri
try:
url = "http://{}{}".format(proxy_hostport, path)
print("retrieving file via proxy (ftpup) from {}".format(url), file=sys.stderr)
- with tempfile.NamedTemporaryFile(prefix='fatcat-ftp-tmp-', delete=False) as f:
+ with tempfile.NamedTemporaryFile(prefix="fatcat-ftp-tmp-", delete=False) as f:
cmd = ["wget", "-c", url, "-O", f.name]
result = subprocess.run(cmd)
return f.name
except (subprocess.CalledProcessError, OSError, ValueError) as exc:
- print("ftp fetch {} failed with {} ({}) ({} retries left)".format(
- url, exc, type(exc), max_retries - (i + 1)), file=sys.stderr)
+ print(
+ "ftp fetch {} failed with {} ({}) ({} retries left)".format(
+ url, exc, type(exc), max_retries - (i + 1)
+ ),
+ file=sys.stderr,
+ )
if i + 1 == max_retries:
raise
time.sleep(retry_delay)
-def xmlstream(filename, tag, encoding='utf-8'):
+def xmlstream(filename, tag, encoding="utf-8"):
"""
Note: This might move into a generic place in the future.
@@ -300,23 +347,29 @@ def xmlstream(filename, tag, encoding='utf-8'):
Known vulnerabilities: https://docs.python.org/3/library/xml.html#xml-vulnerabilities
"""
+
def strip_ns(tag):
- if '}' not in tag:
+ if "}" not in tag:
return tag
- return tag.split('}')[1]
+ return tag.split("}")[1]
# https://stackoverflow.com/a/13261805, http://effbot.org/elementtree/iterparse.htm
- context = iter(ET.iterparse(filename, events=(
- 'start',
- 'end',
- )))
+ context = iter(
+ ET.iterparse(
+ filename,
+ events=(
+ "start",
+ "end",
+ ),
+ )
+ )
try:
_, root = next(context)
except StopIteration:
return
for event, elem in context:
- if not strip_ns(elem.tag) == tag or event == 'start':
+ if not strip_ns(elem.tag) == tag or event == "start":
continue
yield ET.tostring(elem, encoding=encoding)