1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
|
from collections import namedtuple
from typing import Any, Dict, Optional
import fatcat_openapi_client
import pymacaroons
import requests
from flask import abort, flash, redirect, render_template, session
from flask_login import UserMixin, login_user, logout_user
from fatcat_web import AnyResponse, Config, api, app, login_manager, priv_api
def handle_logout() -> None:
logout_user()
for k in ("editor", "api_token"):
if k in session:
session.pop(k)
session.clear()
def handle_token_login(token: str) -> AnyResponse:
try:
m = pymacaroons.Macaroon.deserialize(token)
except pymacaroons.exceptions.MacaroonDeserializationException:
# TODO: what kind of Exceptions?
app.log.warning("auth fail: MacaroonDeserializationException")
return abort(400)
except pymacaroons.exceptions.MacaroonInitException:
# TODO: what kind of Exceptions?
app.log.warning("auth fail: must supply a valid token")
return abort(400)
# extract editor_id
editor_id = None
for caveat in m.first_party_caveats():
caveat = caveat.caveat_id
if caveat.startswith(b"editor_id = "):
editor_id = caveat[12:].decode("utf-8")
if not editor_id:
app.log.warning("auth fail: editor_id missing in macaroon")
abort(400)
# fetch editor info
editor = api.get_editor(editor_id)
session.permanent = True # pylint: disable=assigning-non-slot
session["api_token"] = token
session["editor"] = editor.to_dict()
login_user(load_user(editor.editor_id))
rp = "/auth/account"
if session.get("next"):
rp = session["next"]
session.pop("next")
return redirect(rp)
# This will need to login/signup via fatcatd API, then set token in session
def handle_oauth(remote: Any, token: Optional[str], user_info: Dict[str, Any]) -> AnyResponse:
if user_info:
# fetch api login/signup using user_info
# ISS is basically the API url (though more formal in OIDC)
# SUB is the stable internal identifier for the user (not usually the username itself)
# TODO: should have the real sub here
# TODO: would be nicer to pass preferred_username for account creation
iss = remote.OAUTH_CONFIG["api_base_url"]
# we reuse 'preferred_username' for account name auto-creation (but
# don't store it otherwise in the backend, at least currently). But i'm
# not sure all loginpass backends will set it
if user_info.get("preferred_username"):
preferred_username = user_info["preferred_username"]
elif "orcid.org" in iss:
# as a special case, prefix ORCiD identifier so it can be used as a
# username. If we instead used the human name, we could have
# collisions. Not a great user experience either way.
preferred_username = "i" + user_info["sub"].replace("-", "")
else:
preferred_username = user_info["sub"]
params = fatcat_openapi_client.AuthOidc(
remote.name, user_info["sub"], iss, preferred_username
)
# this call requires admin privs
(resp, http_status, http_headers) = priv_api.auth_oidc_with_http_info(params)
editor = resp.editor
api_token = resp.token
# write token and username to session
session.permanent = True # pylint: disable=assigning-non-slot
session["api_token"] = api_token
session["editor"] = editor.to_dict()
# call login_user(load_user(editor_id))
login_user(load_user(editor.editor_id))
rp = "/auth/account"
if session.get("next"):
rp = session["next"]
session.pop("next")
return redirect(rp)
# XXX: what should this actually be?
raise Exception("didn't receive OAuth user_info")
def handle_ia_xauth(email: str, password: str) -> AnyResponse:
resp = requests.post(
Config.IA_XAUTH_URI,
params={"op": "authenticate"},
json={
"version": "1",
"email": email,
"password": password,
"access": Config.IA_XAUTH_CLIENT_ID,
"secret": Config.IA_XAUTH_CLIENT_SECRET,
},
)
if resp.status_code == 401 or (not resp.json().get("success")):
try:
flash(
"Internet Archive email/password didn't match: {}".format(
resp.json()["values"]["reason"]
)
)
except Exception:
app.log.warning("IA XAuth fail: {}".format(resp.content))
return render_template("auth_ia_login.html", email=email), resp.status_code
elif resp.status_code != 200:
flash("Internet Archive login failed (internal error?)")
app.log.warning("IA XAuth fail: {}".format(resp.content))
return render_template("auth_ia_login.html", email=email), resp.status_code
# Successful login; now fetch info...
resp = requests.post(
Config.IA_XAUTH_URI,
params={"op": "info"},
json={
"version": "1",
"email": email,
"access": Config.IA_XAUTH_CLIENT_ID,
"secret": Config.IA_XAUTH_CLIENT_SECRET,
},
)
if resp.status_code != 200:
flash("Internet Archive login failed (internal error?)")
app.log.warning("IA XAuth fail: {}".format(resp.content))
return render_template("auth_ia_login.html", email=email), resp.status_code
ia_info = resp.json()["values"]
# and pass off "as if" we did OAuth successfully
FakeOAuthRemote = namedtuple("FakeOAuthRemote", ["name", "OAUTH_CONFIG"])
remote = FakeOAuthRemote(name="archive", OAUTH_CONFIG={"api_base_url": Config.IA_XAUTH_URI})
oauth_info = {
"preferred_username": ia_info["itemname"].replace("@", ""),
"iss": Config.IA_XAUTH_URI,
"sub": ia_info["itemname"],
}
return handle_oauth(remote, None, oauth_info)
def handle_wmoauth(username: str) -> AnyResponse:
# pass off "as if" we did OAuth successfully
FakeOAuthRemote = namedtuple("FakeOAuthRemote", ["name", "OAUTH_CONFIG"])
remote = FakeOAuthRemote(
name="wikipedia", OAUTH_CONFIG={"api_base_url": "https://www.mediawiki.org/w"}
)
conservative_username = "".join(filter(str.isalnum, username))
oauth_info = {
"preferred_username": conservative_username,
"iss": "https://www.mediawiki.org/w",
"sub": username,
}
return handle_oauth(remote, None, oauth_info)
@login_manager.user_loader
def load_user(editor_id: str) -> UserMixin:
# looks for extra info in session, and updates the user object with that.
# If session isn't loaded/valid, should return None
if (not session.get("editor")) or (not session.get("api_token")):
return None
editor = session["editor"]
token = session["api_token"]
user = UserMixin()
user.id = editor_id
user.editor_id = editor_id
user.username = editor["username"]
user.is_admin = editor["is_admin"]
user.token = token
return user
|