1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
|
#!/usr/bin/env python3
"""
Transform an unpaywall dump (JSON) into ingest requests.
"""
import argparse
import json
import sys
import urlcanon
DOMAIN_BLOCKLIST = [
# large OA publishers (we get via DOI)
# large repos and aggregators (we crawl directly)
"://arxiv.org/",
"://europepmc.org/",
"ncbi.nlm.nih.gov/",
"://doi.org/",
"zenodo.org/",
"figshare.com/",
]
RELEASE_STAGE_MAP = {
"draftVersion": "draft",
"submittedVersion": "submitted",
"acceptedVersion": "accepted",
"publishedVersion": "published",
"updatedVersion": "updated",
}
def canon(s):
parsed = urlcanon.parse_url(s)
return str(urlcanon.whatwg(parsed))
def transform(obj):
"""
Transforms from a single unpaywall object to zero or more ingest requests.
Returns a list of dicts.
"""
requests = []
if not obj["doi"].startswith("10."):
return requests
if not obj["oa_locations"]:
return requests
for location in obj["oa_locations"]:
if not location["url_for_pdf"]:
continue
skip = False
for domain in DOMAIN_BLOCKLIST:
if domain in location["url_for_pdf"]:
skip = True
if skip:
continue
try:
base_url = canon(location["url_for_pdf"])
except UnicodeEncodeError:
continue
request = {
"base_url": base_url,
"ingest_type": "pdf",
"link_source": "unpaywall",
"link_source_id": obj["doi"].lower(),
"ingest_request_source": "unpaywall",
"release_stage": RELEASE_STAGE_MAP.get(location["version"]),
"rel": location["host_type"],
"ext_ids": {
"doi": obj["doi"].lower(),
},
"edit_extra": {},
}
if obj.get("oa_status"):
request["edit_extra"]["oa_status"] = obj["oa_status"]
if location.get("evidence"):
request["edit_extra"]["evidence"] = location["evidence"]
if location["pmh_id"]:
request["ext_ids"]["pmh_id"] = location["pmh_id"]
requests.append(request)
return requests
def run(args):
for l in args.json_file:
if not l.strip():
continue
row = json.loads(l)
requests = transform(row) or []
for r in requests:
print("{}".format(json.dumps(r, sort_keys=True)))
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"json_file", help="unpaywall dump file to use", type=argparse.FileType("r")
)
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
if __name__ == "__main__":
main()
|