Merge branch 'pu_accounts' into pu

This commit is contained in:
Kevin Morris 2021-07-13 21:57:06 -07:00
commit ae953ce19b
7 changed files with 760 additions and 7 deletions

View file

@ -1,3 +1,4 @@
import logging
import re
import tempfile
@ -14,7 +15,7 @@ from aurweb import captcha
from aurweb.asgi import app
from aurweb.db import commit, create, query
from aurweb.models.accepted_term import AcceptedTerm
from aurweb.models.account_type import AccountType
from aurweb.models.account_type import DEVELOPER_ID, TRUSTED_USER_AND_DEV_ID, TRUSTED_USER_ID, AccountType
from aurweb.models.ban import Ban
from aurweb.models.session import Session
from aurweb.models.ssh_pub_key import SSHPubKey, get_fingerprint
@ -31,6 +32,8 @@ TEST_EMAIL = "test@example.org"
client = TestClient(app)
user = None
logger = logging.getLogger(__name__)
def make_ssh_pubkey():
# Create a public key with ssh-keygen (this adds ssh-keygen as a
@ -55,8 +58,8 @@ def setup():
account_type = query(AccountType,
AccountType.AccountType == "User").first()
user = create(User, Username=TEST_USERNAME, Email=TEST_EMAIL,
RealName="Test User", Passwd="testPassword",
AccountType=account_type)
RealName="Test UserZ", Passwd="testPassword",
IRCNick="testZ", AccountType=account_type)
yield user
@ -65,6 +68,14 @@ def setup():
setup_test_db("Terms", "AcceptedTerms")
@pytest.fixture
def tu_user():
user.AccountType = query(AccountType,
AccountType.ID == TRUSTED_USER_AND_DEV_ID).first()
commit()
yield user
def test_get_passreset_authed_redirects():
sid = user.login(Request(), "testPassword")
assert sid is not None
@ -929,6 +940,479 @@ def test_get_account_unauthenticated():
assert "You must log in to view user information." in content
def test_get_accounts(tu_user):
""" Test that we can GET request /accounts/ and receive
a form which can be used to POST /accounts/. """
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.get("/accounts/", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
parser = lxml.etree.HTMLParser()
root = lxml.etree.fromstring(response.text, parser=parser)
# Get the form.
form = root.xpath('//form[contains(@class, "account-search-form")]')
# Make sure there's only one form and it goes where it should.
assert len(form) == 1
form = next(iter(form))
assert form.attrib.get("method") == "post"
assert form.attrib.get("action") == "/accounts/"
def field(element):
""" Return the given element string as a valid
selector in the form. """
return f"./fieldset/p/{element}"
username = form.xpath(field('input[@id="id_username"]'))
assert bool(username)
account_type = form.xpath(field('select[@id="id_type"]'))
assert bool(account_type)
suspended = form.xpath(field('input[@id="id_suspended"]'))
assert bool(suspended)
email = form.xpath(field('input[@id="id_email"]'))
assert bool(email)
realname = form.xpath(field('input[@id="id_realname"]'))
assert bool(realname)
irc = form.xpath(field('input[@id="id_irc"]'))
assert bool(irc)
sortby = form.xpath(field('select[@id="id_sortby"]'))
assert bool(sortby)
def parse_root(html):
parser = lxml.etree.HTMLParser()
return lxml.etree.fromstring(html, parser=parser)
def get_rows(html):
root = parse_root(html)
return root.xpath('//table[contains(@class, "users")]/tbody/tr')
def test_post_accounts(tu_user):
# Set a PGPKey.
user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30"
# Create a few more users.
users = [user]
for i in range(10):
_user = create(User, Username=f"test_{i}",
Email=f"test_{i}@example.org",
RealName=f"Test #{i}",
Passwd="testPassword",
IRCNick=f"test_#{i}",
autocommit=False)
users.append(_user)
# Commit everything to the database.
commit()
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts/", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 11
# Simulate default ascending ORDER_BY.
sorted_users = sorted(users, key=lambda u: u.Username)
for i, _user in enumerate(sorted_users):
columns = rows[i].xpath("./td")
assert len(columns) == 7
username, atype, suspended, real_name, \
irc_nick, pgp_key, edit = columns
username = next(iter(username.xpath("./a")))
assert username.text.strip() == _user.Username
assert atype.text.strip() == str(_user.AccountType)
assert suspended.text.strip() == "Active"
assert real_name.text.strip() == _user.RealName
assert irc_nick.text == _user.IRCNick
assert pgp_key.text == (_user.PGPKey or None)
edit = edit.xpath("./a")
if user.can_edit_user(_user):
edit = next(iter(edit))
assert edit.text.strip() == "Edit"
else:
assert not edit
logger.debug('Checked user row {"id": %s, "username": "%s"}.'
% (_user.ID, _user.Username))
def test_post_accounts_username(tu_user):
# Test the U parameter path.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"U": user.Username})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
username = next(iter(username.xpath("./a")))
assert username.text.strip() == user.Username
def test_post_accounts_account_type(tu_user):
# Check the different account type options.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Make a user with the "User" role here so we can
# test the `u` parameter.
account_type = query(AccountType,
AccountType.AccountType == "User").first()
create(User, Username="test_2",
Email="test_2@example.org",
RealName="Test User 2",
Passwd="testPassword",
AccountType=account_type)
# Expect no entries; we marked our only user as a User type.
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"T": "t"})
assert response.status_code == int(HTTPStatus.OK)
assert len(get_rows(response.text)) == 0
# So, let's also ensure that specifying "u" returns our user.
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"T": "u"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "User"
# Set our only user to a Trusted User.
user.AccountType = query(AccountType,
AccountType.ID == TRUSTED_USER_ID).first()
commit()
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"T": "t"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "Trusted User"
user.AccountType = query(AccountType,
AccountType.ID == DEVELOPER_ID).first()
commit()
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"T": "d"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "Developer"
user.AccountType = query(AccountType,
AccountType.ID == TRUSTED_USER_AND_DEV_ID
).first()
commit()
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"T": "td"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert type.text.strip() == "Trusted User & Developer"
def test_post_accounts_status(tu_user):
# Test the functionality of Suspended.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts/", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert status.text.strip() == "Active"
user.Suspended = True
commit()
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"S": True})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
row = next(iter(rows))
username, type, status, realname, irc, pgp_key, edit = row
assert status.text.strip() == "Suspended"
def test_post_accounts_email(tu_user):
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Search via email.
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"E": user.Email})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_realname(tu_user):
# Test the R parameter path.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"R": user.RealName})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_irc(tu_user):
# Test the I parameter path.
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"I": user.IRCNick})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_sortby(tu_user):
# Create a second user so we can compare sorts.
account_type = query(AccountType,
AccountType.ID == DEVELOPER_ID).first()
create(User, Username="test2",
Email="test2@example.org",
RealName="Test User 2",
Passwd="testPassword",
IRCNick="test2",
AccountType=account_type)
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Show that "u" is the default search order, by username.
with client as request:
response = request.post("/accounts/", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
first_rows = rows
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"SB": "u"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
def compare_text_values(column, lhs, rhs):
return [row[column].text.strip() for row in lhs] \
== [row[column].text.strip() for row in rhs]
# Test the username rows are ordered the same.
assert compare_text_values(0, first_rows, rows) is True
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"SB": "i"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
# Test the rows are reversed when ordering by IRCNick.
assert compare_text_values(4, first_rows, reversed(rows)) is True
# Sort by "i" -> RealName.
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"SB": "r"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
# Test the rows are reversed when ordering by RealName.
assert compare_text_values(4, first_rows, reversed(rows)) is True
user.AccountType = query(AccountType,
AccountType.ID == TRUSTED_USER_AND_DEV_ID).first()
commit()
# Fetch first_rows again with our new AccountType ordering.
with client as request:
response = request.post("/accounts/", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
first_rows = rows
# Sort by "t" -> AccountType.
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"SB": "t"})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 2
# Test that rows again got reversed.
assert compare_text_values(1, first_rows, reversed(rows))
def test_post_accounts_pgp_key(tu_user):
user.PGPKey = "5F18B20346188419750745D7335F2CB41F253D30"
commit()
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
# Search via PGPKey.
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"K": user.PGPKey})
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 1
def test_post_accounts_paged(tu_user):
# Create 150 users.
users = [user]
account_type = query(AccountType,
AccountType.AccountType == "User").first()
for i in range(150):
_user = create(User, Username=f"test_#{i}",
Email=f"test_#{i}@example.org",
RealName=f"Test User #{i}",
Passwd="testPassword",
AccountType=account_type,
autocommit=False)
users.append(_user)
commit()
sid = user.login(Request(), "testPassword")
cookies = {"AURSID": sid}
with client as request:
response = request.post("/accounts/", cookies=cookies)
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 50 # `pp`, or hits per page is defined at 50.
# Sort users in ascending default sort by order.
sorted_users = sorted(users, key=lambda u: u.Username)
# Get the first fifty sorted users and assert that's what
# we got in the first search result page.
first_fifty = sorted_users[:50]
for i, _user in enumerate(first_fifty):
row = rows[i]
username = row[0].xpath("./a")[0] # First column
assert username.text.strip() == _user.Username
root = parse_root(response.text)
page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0]
page_next = root.xpath('//button[contains(@class, "page-next")]')[0]
assert page_prev.attrib["disabled"] == "disabled"
assert "disabled" not in page_next.attrib
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"O": 50}) # +50 offset.
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 50
second_fifty = sorted_users[50:100]
for i, _user in enumerate(second_fifty):
row = rows[i]
username = row[0].xpath("./a")[0] # First column
assert username.text.strip() == _user.Username
with client as request:
response = request.post("/accounts/", cookies=cookies,
data={"O": 101}) # Last page.
assert response.status_code == int(HTTPStatus.OK)
rows = get_rows(response.text)
assert len(rows) == 50
root = parse_root(response.text)
page_prev = root.xpath('//button[contains(@class, "page-prev")]')[0]
page_next = root.xpath('//button[contains(@class, "page-next")]')[0]
assert "disabled" not in page_prev.attrib
assert page_next.attrib["disabled"] == "disabled"
def test_get_terms_of_service():
term = create(Term, Description="Test term.",
URL="http://localhost", Revision=1)