1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
|
import json
import time
import random
import hashlib
from sqlalchemy.orm.session import make_transient
from fatcat import db
import fatcat.api
from fatcat.models import *
def populate_db():
admin_editor = Editor(id=1, username="admin", is_admin=True)
db.session.add(admin_editor)
db.session.commit()
def add_crossref_via_model(meta):
title = meta['title'][0]
# authors
author_revs = []
author_ids = []
for am in meta['author']:
ar = CreatorRev(
name="{} {}".format(am['given'], am['family']),
sortname="{}, {}".format(am['family'], am['given']),
orcid=None)
author_revs.append(ar)
author_ids.append(CreatorIdent(rev=ar))
# container
container = ContainerRev(
issn=meta['ISSN'][0],
name=meta['container-title'][0],
#container_id=None,
publisher=meta['publisher'],
sortname=meta['short-container-title'][0])
container_id = ContainerIdent(rev=container)
# work and release
work = WorkRev(title=title)
work_id = WorkIdent(rev=work)
release = ReleaseRev(
title=title,
creators=[ReleaseContrib(creator=a) for a in author_ids],
# XXX: work=work,
container=container_id,
release_type=meta['type'],
doi=meta['DOI'],
date=meta['created']['date-time'],
license=meta.get('license', [dict(URL=None)])[0]['URL'] or None,
issue=meta.get('issue', None),
volume=meta.get('volume', None),
pages=meta.get('page', None))
release_id = ReleaseIdent(rev=release)
work.primary_release = release_id
release.extra_json = json.dumps({
'crossref': {
'links': meta.get('link', []),
'subject': meta['subject'],
'type': meta['type'],
'alternative-id': meta.get('alternative-id', []),
}
}, indent=None).encode('utf-8')
# references
for i, rm in enumerate(meta.get('reference', [])):
ref = ReleaseRef(
release_rev=release,
doi=rm.get("DOI", None),
index=i+1,
# TODO: how to generate a proper stub here from k/v metadata?
stub="| ".join(rm.values()))
release.refs.append(ref)
db.session.add_all([work, work_id, release, release_id, container,
container_id])
db.session.add_all(author_revs)
db.session.add_all(author_ids)
db.session.commit()
def accept_editgroup(eg):
# check if already accepted
# XXX: add a test for this
assert ChangelogEntry.query.filter(ChangelogEntry.editgroup_id==eg.id).count() == 0
# start transaction (TODO: explicitly?)
# for each entity type:
for cls in (WorkEdit, ReleaseEdit, CreatorEdit, ContainerEdit, FileEdit):
edits = cls.query.filter(cls.editgroup_id==eg.id).all()
# for each entity edit->ident:
for edit in edits:
# update entity ident state (activate, redirect, delete)
edit.ident.is_live = True
edit.ident.rev_id = edit.rev_id
edit.ident.redirect_id = edit.redirect_id
db.session.add(edit.ident)
# append log/changelog row
cle = ChangelogEntry(
editgroup_id=eg.id,
# TODO: is this UTC?
timestamp=int(time.time()))
db.session.add(cle)
# update edit group state
db.session.add(eg)
# no longer "active"
eg.editor.active_editgroup = None
db.session.add(eg.editor)
db.session.commit()
def merge_works(left_id, right_id, editgroup=None):
"""Helper to merge two works together."""
left = WorkIdent.query.get_or_404(left_id)
right = WorkIdent.query.get_or_404(right_id)
assert left.is_live and right.is_live
assert left.rev and right.rev
assert (left.redirect_id is None) and (right.redirect_id is None)
if editgroup is None:
editgroup = fatcat.api.get_or_create_editgroup()
releases = ReleaseIdent.query\
.join(ReleaseIdent.rev)\
.filter(ReleaseRev.work_ident_id==right_id)\
.filter(ReleaseIdent.is_live==True)\
.all()
# update all right releases to point to left
for release_ident in releases:
rev = release_ident.rev
old_id = rev.id
db.session.expunge(rev)
make_transient(rev)
rev.id = None
rev.parent = old_id
rev.work_ident_id = left.id
re = ReleaseEdit(editgroup=editgroup, ident=release_ident, rev=rev)
db.session.add_all([rev, re])
# redirect right id to left (via editgroup)
neww = WorkEdit(editgroup=editgroup, ident=right,
rev=left.rev, redirect_id=left.id)
db.session.add_all([neww])
|