1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
|
#!/usr/bin/env python3
"""
Transform an unpaywall dump (JSON) into ingest requests.
"""
import argparse
import json
import sys
import urlcanon
DOMAIN_BLOCKLIST = [
# large OA publishers (we get via DOI)
# large repos and aggregators (we crawl directly)
"://arxiv.org/",
"://europepmc.org/",
"ncbi.nlm.nih.gov/",
"semanticscholar.org/",
"://doi.org/",
"zenodo.org/",
"figshare.com/",
"://archive.org/",
".archive.org/",
]
RELEASE_STAGE_MAP = {
'draftVersion': 'draft',
'submittedVersion': 'submitted',
'acceptedVersion': 'accepted',
'publishedVersion': 'published',
'updatedVersion': 'updated',
}
def canon(s):
parsed = urlcanon.parse_url(s)
return str(urlcanon.whatwg(parsed))
def transform(obj):
"""
Transforms from a single unpaywall object to zero or more ingest requests.
Returns a list of dicts.
"""
requests = []
if not obj['doi'].startswith('10.'):
return requests
if not obj['oa_locations']:
return requests
for location in obj['oa_locations']:
if not location['url_for_pdf']:
continue
skip = False
for domain in DOMAIN_BLOCKLIST:
if domain in location['url_for_pdf']:
skip = True
if skip:
continue
try:
base_url = canon(location['url_for_pdf'])
except UnicodeEncodeError:
continue
request = {
'base_url': base_url,
'ingest_type': 'pdf',
'link_source': 'unpaywall',
'link_source_id': obj['doi'].lower(),
'ingest_request_source': 'unpaywall',
'release_stage': RELEASE_STAGE_MAP.get(location['version']),
'rel': location['host_type'],
'ext_ids': {
'doi': obj['doi'].lower(),
},
'edit_extra': {},
}
if obj.get('oa_status'):
request['edit_extra']['oa_status'] = obj['oa_status']
if location.get('evidence'):
request['edit_extra']['evidence'] = location['evidence']
if location['pmh_id']:
request['ext_ids']['pmh_id'] = location['pmh_id']
requests.append(request)
return requests
def run(args):
for l in args.json_file:
if not l.strip():
continue
row = json.loads(l)
requests = transform(row) or []
for r in requests:
print("{}".format(json.dumps(r, sort_keys=True)))
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('json_file',
help="unpaywall dump file to use",
type=argparse.FileType('r'))
subparsers = parser.add_subparsers()
args = parser.parse_args()
run(args)
if __name__ == '__main__':
main()
|