mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
add passreset routes
Introduced `get|post` `/passreset` routes. These routes mimic the behavior of the existing PHP implementation, with the exception of HTTP status code returns. Routes added: GET /passreset POST /passreset Routers added: aurweb.routers.accounts * On an unknown user or mismatched resetkey (where resetkey must == user.resetkey), return HTTP status NOT_FOUND (404). * On another error in the request, return HTTP status BAD_REQUEST (400). Both `get|post` routes requires that the current user is **not** authenticated, hence `@auth_required(False, redirect="/")`. + Added auth_required decorator to aurweb.auth. + Added some more utility to aurweb.models.user.User. + Added `partials/error.html` template. + Added `passreset.html` template. + Added aurweb.db.ConnectionExecutor functor for paramstyle logic. Decoupling the executor logic from the database connection logic is needed for us to easily use the same logic with a fastapi database session, when we need to use aurweb.scripts modules. At this point, notification configuration is now required to complete tests involved with notifications properly, like passreset. `conf/config.dev` has been modified to include [notifications] sendmail, sender and reply-to overrides. Dockerfile and .gitlab-ci.yml have been updated to setup /etc/hosts and start postfix before running tests. * setup.cfg: ignore E741, C901 in aurweb.routers.accounts These two warnings (shown in the commit) are not dangerous and a bi-product of maintaining compatibility with our current code flow. Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
parent
4423326cec
commit
a33d076d8b
15 changed files with 552 additions and 41 deletions
|
@ -27,6 +27,7 @@ For all the test to run, the following Arch packages should be installed:
|
|||
- python-pytest
|
||||
- python-pytest-cov
|
||||
- python-pytest-asyncio
|
||||
- postfix
|
||||
|
||||
Running tests
|
||||
-------------
|
||||
|
@ -37,6 +38,10 @@ First, setup the test configuration:
|
|||
|
||||
$ sed -r 's;YOUR_AUR_ROOT;$(pwd);g' conf/config.dev > conf/config
|
||||
|
||||
You'll need to make sure that emails can be sent out by aurweb.scripts.notify.
|
||||
If you don't have anything setup, just install postfix and start it before
|
||||
running tests.
|
||||
|
||||
With those installed, one can run Python tests manually with any AUR config
|
||||
specified by `AUR_CONFIG`:
|
||||
|
||||
|
|
218
test/test_accounts_routes.py
Normal file
218
test/test_accounts_routes.py
Normal file
|
@ -0,0 +1,218 @@
|
|||
from http import HTTPStatus
|
||||
|
||||
import pytest
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from aurweb.asgi import app
|
||||
from aurweb.db import query
|
||||
from aurweb.models.account_type import AccountType
|
||||
from aurweb.models.session import Session
|
||||
from aurweb.models.user import User
|
||||
from aurweb.testing import setup_test_db
|
||||
from aurweb.testing.models import make_user
|
||||
from aurweb.testing.requests import Request
|
||||
|
||||
# Some test global constants.
|
||||
TEST_USERNAME = "test"
|
||||
TEST_EMAIL = "test@example.org"
|
||||
|
||||
# Global mutables.
|
||||
client = TestClient(app)
|
||||
user = None
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup():
|
||||
global user
|
||||
|
||||
setup_test_db("Users", "Sessions", "Bans")
|
||||
|
||||
account_type = query(AccountType,
|
||||
AccountType.AccountType == "User").first()
|
||||
user = make_user(Username=TEST_USERNAME, Email=TEST_EMAIL,
|
||||
RealName="Test User", Passwd="testPassword",
|
||||
AccountType=account_type)
|
||||
|
||||
|
||||
def test_get_passreset_authed_redirects():
|
||||
sid = user.login(Request(), "testPassword")
|
||||
assert sid is not None
|
||||
|
||||
with client as request:
|
||||
response = request.get("/passreset", cookies={"AURSID": sid},
|
||||
allow_redirects=False)
|
||||
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert response.headers.get("location") == "/"
|
||||
|
||||
|
||||
def test_get_passreset():
|
||||
with client as request:
|
||||
response = request.get("/passreset")
|
||||
assert response.status_code == int(HTTPStatus.OK)
|
||||
|
||||
|
||||
def test_get_passreset_translation():
|
||||
# Test that translation works.
|
||||
with client as request:
|
||||
response = request.get("/passreset", cookies={"AURLANG": "de"})
|
||||
|
||||
# The header title should be translated.
|
||||
assert "Passwort zurücksetzen".encode("utf-8") in response.content
|
||||
|
||||
# The form input label should be translated.
|
||||
assert "Benutzername oder primäre E-Mail-Adresse eingeben:".encode(
|
||||
"utf-8") in response.content
|
||||
|
||||
# And the button.
|
||||
assert "Weiter".encode("utf-8") in response.content
|
||||
|
||||
|
||||
def test_get_passreset_with_resetkey():
|
||||
with client as request:
|
||||
response = request.get("/passreset", data={"resetkey": "abcd"})
|
||||
assert response.status_code == int(HTTPStatus.OK)
|
||||
|
||||
|
||||
def test_post_passreset_authed_redirects():
|
||||
sid = user.login(Request(), "testPassword")
|
||||
assert sid is not None
|
||||
|
||||
with client as request:
|
||||
response = request.post("/passreset",
|
||||
cookies={"AURSID": sid},
|
||||
data={"user": "blah"},
|
||||
allow_redirects=False)
|
||||
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert response.headers.get("location") == "/"
|
||||
|
||||
|
||||
def test_post_passreset_user():
|
||||
# With username.
|
||||
with client as request:
|
||||
response = request.post("/passreset", data={"user": TEST_USERNAME})
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert response.headers.get("location") == "/passreset?step=confirm"
|
||||
|
||||
# With e-mail.
|
||||
with client as request:
|
||||
response = request.post("/passreset", data={"user": TEST_EMAIL})
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert response.headers.get("location") == "/passreset?step=confirm"
|
||||
|
||||
|
||||
def test_post_passreset_resetkey():
|
||||
from aurweb.db import session
|
||||
|
||||
user.session = Session(UsersID=user.ID, SessionID="blah",
|
||||
LastUpdateTS=datetime.utcnow().timestamp())
|
||||
session.commit()
|
||||
|
||||
# Prepare a password reset.
|
||||
with client as request:
|
||||
response = request.post("/passreset", data={"user": TEST_USERNAME})
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert response.headers.get("location") == "/passreset?step=confirm"
|
||||
|
||||
# Now that we've prepared the password reset, prepare a POST
|
||||
# request with the user's ResetKey.
|
||||
resetkey = user.ResetKey
|
||||
post_data = {
|
||||
"user": TEST_USERNAME,
|
||||
"resetkey": resetkey,
|
||||
"password": "abcd1234",
|
||||
"confirm": "abcd1234"
|
||||
}
|
||||
|
||||
with client as request:
|
||||
response = request.post("/passreset", data=post_data)
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert response.headers.get("location") == "/passreset?step=complete"
|
||||
|
||||
|
||||
def test_post_passreset_error_invalid_email():
|
||||
# First, test with a user that doesn't even exist.
|
||||
with client as request:
|
||||
response = request.post("/passreset", data={"user": "invalid"})
|
||||
assert response.status_code == int(HTTPStatus.NOT_FOUND)
|
||||
|
||||
error = "Invalid e-mail."
|
||||
assert error in response.content.decode("utf-8")
|
||||
|
||||
# Then, test with an invalid resetkey for a real user.
|
||||
_ = make_resetkey()
|
||||
post_data = make_passreset_data("fake")
|
||||
post_data["password"] = "abcd1234"
|
||||
post_data["confirm"] = "abcd1234"
|
||||
|
||||
with client as request:
|
||||
response = request.post("/passreset", data=post_data)
|
||||
assert response.status_code == int(HTTPStatus.NOT_FOUND)
|
||||
assert error in response.content.decode("utf-8")
|
||||
|
||||
|
||||
def make_resetkey():
|
||||
with client as request:
|
||||
response = request.post("/passreset", data={"user": TEST_USERNAME})
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert response.headers.get("location") == "/passreset?step=confirm"
|
||||
return user.ResetKey
|
||||
|
||||
|
||||
def make_passreset_data(resetkey):
|
||||
return {
|
||||
"user": user.Username,
|
||||
"resetkey": resetkey
|
||||
}
|
||||
|
||||
|
||||
def test_post_passreset_error_missing_field():
|
||||
# Now that we've prepared the password reset, prepare a POST
|
||||
# request with the user's ResetKey.
|
||||
resetkey = make_resetkey()
|
||||
post_data = make_passreset_data(resetkey)
|
||||
|
||||
with client as request:
|
||||
response = request.post("/passreset", data=post_data)
|
||||
|
||||
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
|
||||
|
||||
error = "Missing a required field."
|
||||
assert error in response.content.decode("utf-8")
|
||||
|
||||
|
||||
def test_post_passreset_error_password_mismatch():
|
||||
resetkey = make_resetkey()
|
||||
post_data = make_passreset_data(resetkey)
|
||||
|
||||
post_data["password"] = "abcd1234"
|
||||
post_data["confirm"] = "mismatched"
|
||||
|
||||
with client as request:
|
||||
response = request.post("/passreset", data=post_data)
|
||||
|
||||
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
|
||||
|
||||
error = "Password fields do not match."
|
||||
assert error in response.content.decode("utf-8")
|
||||
|
||||
|
||||
def test_post_passreset_error_password_requirements():
|
||||
resetkey = make_resetkey()
|
||||
post_data = make_passreset_data(resetkey)
|
||||
|
||||
passwd_min_len = User.minimum_passwd_length()
|
||||
assert passwd_min_len >= 4
|
||||
|
||||
post_data["password"] = "x"
|
||||
post_data["confirm"] = "x"
|
||||
|
||||
with client as request:
|
||||
response = request.post("/passreset", data=post_data)
|
||||
|
||||
assert response.status_code == int(HTTPStatus.BAD_REQUEST)
|
||||
|
||||
error = f"Your password must be at least {passwd_min_len} characters."
|
||||
assert error in response.content.decode("utf-8")
|
|
@ -14,8 +14,12 @@ from aurweb.models.session import Session
|
|||
from aurweb.testing import setup_test_db
|
||||
from aurweb.testing.models import make_user
|
||||
|
||||
client = TestClient(app)
|
||||
# Some test global constants.
|
||||
TEST_USERNAME = "test"
|
||||
TEST_EMAIL = "test@example.org"
|
||||
|
||||
# Global mutables.
|
||||
client = TestClient(app)
|
||||
user = None
|
||||
|
||||
|
||||
|
@ -27,7 +31,8 @@ def setup():
|
|||
|
||||
account_type = query(AccountType,
|
||||
AccountType.AccountType == "User").first()
|
||||
user = make_user(Username="test", Email="test@example.org",
|
||||
|
||||
user = make_user(Username=TEST_USERNAME, Email=TEST_EMAIL,
|
||||
RealName="Test User", Passwd="testPassword",
|
||||
AccountType=account_type)
|
||||
|
||||
|
@ -40,16 +45,16 @@ def test_login_logout():
|
|||
}
|
||||
|
||||
with client as request:
|
||||
response = client.get("/login")
|
||||
# First, let's test get /login.
|
||||
response = request.get("/login")
|
||||
assert response.status_code == int(HTTPStatus.OK)
|
||||
|
||||
response = request.post("/login", data=post_data,
|
||||
allow_redirects=False)
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
response = request.get(response.headers.get("location"), cookies={
|
||||
"AURSID": response.cookies.get("AURSID")
|
||||
})
|
||||
# Simulate following the redirect location from above's response.
|
||||
response = request.get(response.headers.get("location"))
|
||||
assert response.status_code == int(HTTPStatus.OK)
|
||||
|
||||
response = request.post("/logout", data=post_data,
|
||||
|
@ -60,9 +65,37 @@ def test_login_logout():
|
|||
"AURSID": response.cookies.get("AURSID")
|
||||
}, allow_redirects=False)
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
assert "AURSID" not in response.cookies
|
||||
|
||||
|
||||
def test_authenticated_login_forbidden():
|
||||
post_data = {
|
||||
"user": "test",
|
||||
"passwd": "testPassword",
|
||||
"next": "/"
|
||||
}
|
||||
|
||||
with client as request:
|
||||
# Login.
|
||||
response = request.post("/login", data=post_data,
|
||||
allow_redirects=False)
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
# Now, let's verify that we receive 403 Forbidden when we
|
||||
# try to get /login as an authenticated user.
|
||||
response = request.get("/login", allow_redirects=False)
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
|
||||
def test_unauthenticated_logout_unauthorized():
|
||||
with client as request:
|
||||
# Alright, let's verify that attempting to /logout when not
|
||||
# authenticated returns 401 Unauthorized.
|
||||
response = request.get("/logout", allow_redirects=False)
|
||||
assert response.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
|
||||
def test_login_missing_username():
|
||||
post_data = {
|
||||
"passwd": "testPassword",
|
||||
|
@ -75,8 +108,6 @@ def test_login_missing_username():
|
|||
|
||||
|
||||
def test_login_remember_me():
|
||||
from aurweb.db import session
|
||||
|
||||
post_data = {
|
||||
"user": "test",
|
||||
"passwd": "testPassword",
|
||||
|
@ -94,8 +125,8 @@ def test_login_remember_me():
|
|||
"options", "persistent_cookie_timeout")
|
||||
expected_ts = datetime.utcnow().timestamp() + cookie_timeout
|
||||
|
||||
_session = session.query(Session).filter(
|
||||
Session.UsersID == user.ID).first()
|
||||
_session = query(Session,
|
||||
Session.UsersID == user.ID).first()
|
||||
|
||||
# Expect that LastUpdateTS was within 5 seconds of the expected_ts,
|
||||
# which is equal to the current timestamp + persistent_cookie_timeout.
|
||||
|
|
|
@ -3,7 +3,6 @@ import re
|
|||
import sqlite3
|
||||
import tempfile
|
||||
|
||||
from datetime import datetime
|
||||
from unittest import mock
|
||||
|
||||
import mysql.connector
|
||||
|
@ -185,3 +184,15 @@ def test_create_delete():
|
|||
db.delete(AccountType, AccountType.AccountType == "test")
|
||||
record = db.query(AccountType, AccountType.AccountType == "test").first()
|
||||
assert record is None
|
||||
|
||||
|
||||
@mock.patch("mysql.connector.paramstyle", "qmark")
|
||||
def test_connection_executor_mysql_paramstyle():
|
||||
executor = db.ConnectionExecutor(None, backend="mysql")
|
||||
assert executor.paramstyle() == "qmark"
|
||||
|
||||
|
||||
@mock.patch("sqlite3.paramstyle", "pyformat")
|
||||
def test_connection_executor_sqlite_paramstyle():
|
||||
executor = db.ConnectionExecutor(None, backend="sqlite")
|
||||
assert executor.paramstyle() == "pyformat"
|
||||
|
|
|
@ -28,8 +28,8 @@ def setup():
|
|||
|
||||
setup_test_db("Users", "Sessions", "Bans")
|
||||
|
||||
account_type = session.query(AccountType).filter(
|
||||
AccountType.AccountType == "User").first()
|
||||
account_type = query(AccountType,
|
||||
AccountType.AccountType == "User").first()
|
||||
|
||||
user = make_user(Username="test", Email="test@example.org",
|
||||
RealName="Test User", Passwd="testPassword",
|
||||
|
@ -67,7 +67,7 @@ def test_user_login_logout():
|
|||
assert user.session.User == user
|
||||
|
||||
# Search for the user via query API.
|
||||
result = session.query(User).filter(User.ID == user.ID).first()
|
||||
result = query(User, User.ID == user.ID).first()
|
||||
|
||||
# Compare the result and our original user.
|
||||
assert result == user
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue