aboutsummaryrefslogtreecommitdiffstats
path: root/python/fatcat_web/auth.py
diff options
context:
space:
mode:
authorBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:43 -0700
committerBryan Newbold <bnewbold@robocracy.org>2021-11-02 18:14:43 -0700
commit9dc891b8098542bb089c8c47098b60a8beb76a53 (patch)
tree2c9b1c569b6b9bb4041ef51076d024b6980089c5 /python/fatcat_web/auth.py
parent6464631dbe5c4afeb76f2f3c9d63b89f917c9a3b (diff)
downloadfatcat-9dc891b8098542bb089c8c47098b60a8beb76a53.tar.gz
fatcat-9dc891b8098542bb089c8c47098b60a8beb76a53.zip
fmt (black): fatcat_web/
Diffstat (limited to 'python/fatcat_web/auth.py')
-rw-r--r--python/fatcat_web/auth.py130
1 files changed, 73 insertions, 57 deletions
diff --git a/python/fatcat_web/auth.py b/python/fatcat_web/auth.py
index 137bc2bb..4fe85770 100644
--- a/python/fatcat_web/auth.py
+++ b/python/fatcat_web/auth.py
@@ -1,4 +1,3 @@
-
from collections import namedtuple
import fatcat_openapi_client
@@ -12,11 +11,12 @@ from fatcat_web import Config, api, app, login_manager, priv_api
def handle_logout():
logout_user()
- for k in ('editor', 'api_token'):
+ for k in ("editor", "api_token"):
if k in session:
session.pop(k)
session.clear()
+
def handle_token_login(token):
try:
m = pymacaroons.Macaroon.deserialize(token)
@@ -33,22 +33,23 @@ def handle_token_login(token):
for caveat in m.first_party_caveats():
caveat = caveat.caveat_id
if caveat.startswith(b"editor_id = "):
- editor_id = caveat[12:].decode('utf-8')
+ editor_id = caveat[12:].decode("utf-8")
if not editor_id:
app.log.warning("auth fail: editor_id missing in macaroon")
abort(400)
# fetch editor info
editor = api.get_editor(editor_id)
session.permanent = True # pylint: disable=assigning-non-slot
- session['api_token'] = token
- session['editor'] = editor.to_dict()
+ session["api_token"] = token
+ session["editor"] = editor.to_dict()
login_user(load_user(editor.editor_id))
rp = "/auth/account"
- if session.get('next'):
- rp = session['next']
- session.pop('next')
+ if session.get("next"):
+ rp = session["next"]
+ session.pop("next")
return redirect(rp)
+
# This will need to login/signup via fatcatd API, then set token in session
def handle_oauth(remote, token, user_info):
if user_info:
@@ -57,22 +58,24 @@ def handle_oauth(remote, token, user_info):
# SUB is the stable internal identifier for the user (not usually the username itself)
# TODO: should have the real sub here
# TODO: would be nicer to pass preferred_username for account creation
- iss = remote.OAUTH_CONFIG['api_base_url']
+ iss = remote.OAUTH_CONFIG["api_base_url"]
# we reuse 'preferred_username' for account name auto-creation (but
# don't store it otherwise in the backend, at least currently). But i'm
# not sure all loginpass backends will set it
- if user_info.get('preferred_username'):
- preferred_username = user_info['preferred_username']
- elif 'orcid.org' in iss:
+ if user_info.get("preferred_username"):
+ preferred_username = user_info["preferred_username"]
+ elif "orcid.org" in iss:
# as a special case, prefix ORCiD identifier so it can be used as a
# username. If we instead used the human name, we could have
# collisions. Not a great user experience either way.
- preferred_username = 'i' + user_info['sub'].replace('-', '')
+ preferred_username = "i" + user_info["sub"].replace("-", "")
else:
- preferred_username = user_info['sub']
+ preferred_username = user_info["sub"]
- params = fatcat_openapi_client.AuthOidc(remote.name, user_info['sub'], iss, preferred_username)
+ params = fatcat_openapi_client.AuthOidc(
+ remote.name, user_info["sub"], iss, preferred_username
+ )
# this call requires admin privs
(resp, http_status, http_headers) = priv_api.auth_oidc_with_http_info(params)
editor = resp.editor
@@ -80,90 +83,103 @@ def handle_oauth(remote, token, user_info):
# write token and username to session
session.permanent = True # pylint: disable=assigning-non-slot
- session['api_token'] = api_token
- session['editor'] = editor.to_dict()
+ session["api_token"] = api_token
+ session["editor"] = editor.to_dict()
# call login_user(load_user(editor_id))
login_user(load_user(editor.editor_id))
rp = "/auth/account"
- if session.get('next'):
- rp = session['next']
- session.pop('next')
+ if session.get("next"):
+ rp = session["next"]
+ session.pop("next")
return redirect(rp)
# XXX: what should this actually be?
raise Exception("didn't receive OAuth user_info")
+
def handle_ia_xauth(email, password):
- resp = requests.post(Config.IA_XAUTH_URI,
- params={'op': 'authenticate'},
+ resp = requests.post(
+ Config.IA_XAUTH_URI,
+ params={"op": "authenticate"},
json={
- 'version': '1',
- 'email': email,
- 'password': password,
- 'access': Config.IA_XAUTH_CLIENT_ID,
- 'secret': Config.IA_XAUTH_CLIENT_SECRET,
- })
- if resp.status_code == 401 or (not resp.json().get('success')):
+ "version": "1",
+ "email": email,
+ "password": password,
+ "access": Config.IA_XAUTH_CLIENT_ID,
+ "secret": Config.IA_XAUTH_CLIENT_SECRET,
+ },
+ )
+ if resp.status_code == 401 or (not resp.json().get("success")):
try:
- flash("Internet Archive email/password didn't match: {}".format(resp.json()['values']['reason']))
+ flash(
+ "Internet Archive email/password didn't match: {}".format(
+ resp.json()["values"]["reason"]
+ )
+ )
except Exception:
app.log.warning("IA XAuth fail: {}".format(resp.content))
- return render_template('auth_ia_login.html', email=email), resp.status_code
+ return render_template("auth_ia_login.html", email=email), resp.status_code
elif resp.status_code != 200:
flash("Internet Archive login failed (internal error?)")
app.log.warning("IA XAuth fail: {}".format(resp.content))
- return render_template('auth_ia_login.html', email=email), resp.status_code
+ return render_template("auth_ia_login.html", email=email), resp.status_code
# Successful login; now fetch info...
- resp = requests.post(Config.IA_XAUTH_URI,
- params={'op': 'info'},
+ resp = requests.post(
+ Config.IA_XAUTH_URI,
+ params={"op": "info"},
json={
- 'version': '1',
- 'email': email,
- 'access': Config.IA_XAUTH_CLIENT_ID,
- 'secret': Config.IA_XAUTH_CLIENT_SECRET,
- })
+ "version": "1",
+ "email": email,
+ "access": Config.IA_XAUTH_CLIENT_ID,
+ "secret": Config.IA_XAUTH_CLIENT_SECRET,
+ },
+ )
if resp.status_code != 200:
flash("Internet Archive login failed (internal error?)")
app.log.warning("IA XAuth fail: {}".format(resp.content))
- return render_template('auth_ia_login.html', email=email), resp.status_code
- ia_info = resp.json()['values']
+ return render_template("auth_ia_login.html", email=email), resp.status_code
+ ia_info = resp.json()["values"]
# and pass off "as if" we did OAuth successfully
- FakeOAuthRemote = namedtuple('FakeOAuthRemote', ['name', 'OAUTH_CONFIG'])
- remote = FakeOAuthRemote(name='archive', OAUTH_CONFIG={'api_base_url': Config.IA_XAUTH_URI})
+ FakeOAuthRemote = namedtuple("FakeOAuthRemote", ["name", "OAUTH_CONFIG"])
+ remote = FakeOAuthRemote(name="archive", OAUTH_CONFIG={"api_base_url": Config.IA_XAUTH_URI})
oauth_info = {
- 'preferred_username': ia_info['itemname'].replace('@', ''),
- 'iss': Config.IA_XAUTH_URI,
- 'sub': ia_info['itemname'],
+ "preferred_username": ia_info["itemname"].replace("@", ""),
+ "iss": Config.IA_XAUTH_URI,
+ "sub": ia_info["itemname"],
}
return handle_oauth(remote, None, oauth_info)
+
def handle_wmoauth(username):
# pass off "as if" we did OAuth successfully
- FakeOAuthRemote = namedtuple('FakeOAuthRemote', ['name', 'OAUTH_CONFIG'])
- remote = FakeOAuthRemote(name='wikipedia', OAUTH_CONFIG={'api_base_url': "https://www.mediawiki.org/w"})
- conservative_username = ''.join(filter(str.isalnum, username))
+ FakeOAuthRemote = namedtuple("FakeOAuthRemote", ["name", "OAUTH_CONFIG"])
+ remote = FakeOAuthRemote(
+ name="wikipedia", OAUTH_CONFIG={"api_base_url": "https://www.mediawiki.org/w"}
+ )
+ conservative_username = "".join(filter(str.isalnum, username))
oauth_info = {
- 'preferred_username': conservative_username,
- 'iss': "https://www.mediawiki.org/w",
- 'sub': username,
+ "preferred_username": conservative_username,
+ "iss": "https://www.mediawiki.org/w",
+ "sub": username,
}
return handle_oauth(remote, None, oauth_info)
+
@login_manager.user_loader
def load_user(editor_id):
# looks for extra info in session, and updates the user object with that.
# If session isn't loaded/valid, should return None
- if (not session.get('editor')) or (not session.get('api_token')):
+ if (not session.get("editor")) or (not session.get("api_token")):
return None
- editor = session['editor']
- token = session['api_token']
+ editor = session["editor"]
+ token = session["api_token"]
user = UserMixin()
user.id = editor_id
user.editor_id = editor_id
- user.username = editor['username']
- user.is_admin = editor['is_admin']
+ user.username = editor["username"]
+ user.is_admin = editor["is_admin"]
user.token = token
return user