mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
fix: remove sessions of suspended users
Fixes: #394 Signed-off-by: Leonidas Spyropoulos <artafinde@archlinux.org>
This commit is contained in:
parent
30e72d2db5
commit
0dddaeeb98
3 changed files with 80 additions and 8 deletions
|
@ -412,6 +412,7 @@ async def account_edit_post(
|
||||||
TZ: str = Form(aurweb.config.get("options", "default_timezone")),
|
TZ: str = Form(aurweb.config.get("options", "default_timezone")),
|
||||||
P: str = Form(default=str()), # New Password
|
P: str = Form(default=str()), # New Password
|
||||||
C: str = Form(default=None), # Password Confirm
|
C: str = Form(default=None), # Password Confirm
|
||||||
|
S: bool = Form(default=False), # Suspended
|
||||||
PK: str = Form(default=None), # PubKey
|
PK: str = Form(default=None), # PubKey
|
||||||
CN: bool = Form(default=False), # Comment Notify
|
CN: bool = Form(default=False), # Comment Notify
|
||||||
UN: bool = Form(default=False), # Update Notify
|
UN: bool = Form(default=False), # Update Notify
|
||||||
|
@ -455,6 +456,7 @@ async def account_edit_post(
|
||||||
update.ssh_pubkey,
|
update.ssh_pubkey,
|
||||||
update.account_type,
|
update.account_type,
|
||||||
update.password,
|
update.password,
|
||||||
|
update.suspend,
|
||||||
]
|
]
|
||||||
|
|
||||||
# These update functions are all guarded by retry_deadlock;
|
# These update functions are all guarded by retry_deadlock;
|
||||||
|
|
|
@ -134,3 +134,19 @@ def password(
|
||||||
# If the target user is the request user, login with
|
# If the target user is the request user, login with
|
||||||
# the updated password to update the Session record.
|
# the updated password to update the Session record.
|
||||||
user.login(request, P, cookies.timeout(remember_me))
|
user.login(request, P, cookies.timeout(remember_me))
|
||||||
|
|
||||||
|
|
||||||
|
@db.retry_deadlock
|
||||||
|
def suspend(
|
||||||
|
S: bool = False,
|
||||||
|
request: Request = None,
|
||||||
|
user: models.User = None,
|
||||||
|
context: dict[str, Any] = {},
|
||||||
|
**kwargs,
|
||||||
|
) -> None:
|
||||||
|
if S and user.session:
|
||||||
|
context["S"] = None
|
||||||
|
with db.begin():
|
||||||
|
db.delete_all(
|
||||||
|
db.query(models.Session).filter(models.Session.UsersID == user.ID)
|
||||||
|
)
|
||||||
|
|
|
@ -9,6 +9,7 @@ import lxml.html
|
||||||
import pytest
|
import pytest
|
||||||
from fastapi.testclient import TestClient
|
from fastapi.testclient import TestClient
|
||||||
|
|
||||||
|
import aurweb.config
|
||||||
import aurweb.models.account_type as at
|
import aurweb.models.account_type as at
|
||||||
from aurweb import captcha, db, logging, time
|
from aurweb import captcha, db, logging, time
|
||||||
from aurweb.asgi import app
|
from aurweb.asgi import app
|
||||||
|
@ -35,6 +36,9 @@ logger = logging.get_logger(__name__)
|
||||||
# Some test global constants.
|
# Some test global constants.
|
||||||
TEST_USERNAME = "test"
|
TEST_USERNAME = "test"
|
||||||
TEST_EMAIL = "test@example.org"
|
TEST_EMAIL = "test@example.org"
|
||||||
|
TEST_REFERER = {
|
||||||
|
"referer": aurweb.config.get("options", "aur_location") + "/login",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
def make_ssh_pubkey():
|
def make_ssh_pubkey():
|
||||||
|
@ -61,7 +65,12 @@ def setup(db_test):
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def client() -> TestClient:
|
def client() -> TestClient:
|
||||||
yield TestClient(app=app)
|
client = TestClient(app=app)
|
||||||
|
|
||||||
|
# Necessary for forged login CSRF protection on the login route. Set here
|
||||||
|
# instead of only on the necessary requests for convenience.
|
||||||
|
client.headers.update(TEST_REFERER)
|
||||||
|
yield client
|
||||||
|
|
||||||
|
|
||||||
def create_user(username: str) -> User:
|
def create_user(username: str) -> User:
|
||||||
|
@ -1003,13 +1012,8 @@ def test_post_account_edit_suspended(client: TestClient, user: User):
|
||||||
|
|
||||||
# Make sure the user record got updated correctly.
|
# Make sure the user record got updated correctly.
|
||||||
assert user.Suspended
|
assert user.Suspended
|
||||||
|
# Let's make sure the DB got updated properly.
|
||||||
post_data.update({"S": False})
|
assert user.session is None
|
||||||
with client as request:
|
|
||||||
resp = request.post(endpoint, data=post_data, cookies=cookies)
|
|
||||||
assert resp.status_code == int(HTTPStatus.OK)
|
|
||||||
|
|
||||||
assert not user.Suspended
|
|
||||||
|
|
||||||
|
|
||||||
def test_post_account_edit_error_unauthorized(client: TestClient, user: User):
|
def test_post_account_edit_error_unauthorized(client: TestClient, user: User):
|
||||||
|
@ -1262,6 +1266,56 @@ def test_post_account_edit_other_user_type_as_tu(
|
||||||
assert expected in caplog.text
|
assert expected in caplog.text
|
||||||
|
|
||||||
|
|
||||||
|
def test_post_account_edit_other_user_suspend_as_tu(client: TestClient, tu_user: User):
|
||||||
|
with db.begin():
|
||||||
|
user = create_user("test3")
|
||||||
|
# Create a session for user
|
||||||
|
sid = user.login(Request(), "testPassword")
|
||||||
|
assert sid is not None
|
||||||
|
|
||||||
|
# `user` needs its own TestClient, to keep its AURSID cookies
|
||||||
|
# apart from `tu_user`s during our testing.
|
||||||
|
user_client = TestClient(app=app)
|
||||||
|
user_client.headers.update(TEST_REFERER)
|
||||||
|
|
||||||
|
# Test that `user` can view their account edit page while logged in.
|
||||||
|
user_cookies = {"AURSID": sid}
|
||||||
|
with client as request:
|
||||||
|
endpoint = f"/account/{user.Username}/edit"
|
||||||
|
resp = request.get(endpoint, cookies=user_cookies, allow_redirects=False)
|
||||||
|
assert resp.status_code == HTTPStatus.OK
|
||||||
|
|
||||||
|
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
|
||||||
|
assert cookies is not None # This is useless, we create the dict here ^
|
||||||
|
# As a TU, we can see the Account for other users.
|
||||||
|
with client as request:
|
||||||
|
resp = request.get(endpoint, cookies=cookies, allow_redirects=False)
|
||||||
|
assert resp.status_code == int(HTTPStatus.OK)
|
||||||
|
# As a TU, we can modify other user's account types.
|
||||||
|
data = {
|
||||||
|
"U": user.Username,
|
||||||
|
"E": user.Email,
|
||||||
|
"S": True,
|
||||||
|
"passwd": "testPassword",
|
||||||
|
}
|
||||||
|
with client as request:
|
||||||
|
resp = request.post(endpoint, data=data, cookies=cookies)
|
||||||
|
assert resp.status_code == int(HTTPStatus.OK)
|
||||||
|
|
||||||
|
# Test that `user` no longer has a session.
|
||||||
|
with user_client as request:
|
||||||
|
resp = request.get(endpoint, cookies=user_cookies, allow_redirects=False)
|
||||||
|
assert resp.status_code == HTTPStatus.SEE_OTHER
|
||||||
|
|
||||||
|
# Since user is now suspended, they should not be able to login.
|
||||||
|
data = {"user": user.Username, "passwd": "testPassword", "next": "/"}
|
||||||
|
with user_client as request:
|
||||||
|
resp = request.post("/login", data=data)
|
||||||
|
assert resp.status_code == HTTPStatus.OK
|
||||||
|
errors = get_errors(resp.text)
|
||||||
|
assert errors[0].text.strip() == "Account Suspended"
|
||||||
|
|
||||||
|
|
||||||
def test_post_account_edit_other_user_type_as_tu_invalid_type(
|
def test_post_account_edit_other_user_type_as_tu_invalid_type(
|
||||||
client: TestClient, tu_user: User, caplog: pytest.LogCaptureFixture
|
client: TestClient, tu_user: User, caplog: pytest.LogCaptureFixture
|
||||||
):
|
):
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue