mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
fix(requests): rework handling of requests
This commit changes several things about how we were handling package requests. Modifications (requests): ------------- - `/requests/{id}/close` no longer provides an Accepted selection. All manual request closures will cause a rejection. - Relevent `pkgbase` actions now trigger request closures: `/pkgbase/{name}/delete` (deletion), `/pkgbase/{name}/merge` (merge) and `/pkgbase/{name}/disown` (orphan). - Comment fields have been added to `/pkgbase/{name}/{delete,merge,disown}`, which is used to set the `PackageRequest.ClosureComment` on pending requests. If the comment field is left blank, a closure comment is autogenerated. - Autogenerated request notifications are only sent out once as a closure notification. - Some markup has been fixed. Modifications (disown/orphan): ----------------------------- - Orphan requests are now handled through the same path as deletion/merge. - We now check for due date when disowning as non-maintainer; previously, this was only done for display and not functionally. This check applies to Trusted Users' disowning of a package. This style of notification flow does reduce our visibility, but accounting can still be done via the close request; it includes the action, pkgbase name and the user who accepted it. Closes #204 Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
parent
bad57ba502
commit
26b1674c9e
10 changed files with 1044 additions and 354 deletions
|
@ -10,7 +10,7 @@ import pytest
|
|||
from fastapi.testclient import TestClient
|
||||
from sqlalchemy import and_
|
||||
|
||||
from aurweb import asgi, config, db, defaults
|
||||
from aurweb import asgi, db, defaults
|
||||
from aurweb.models import License, PackageLicense
|
||||
from aurweb.models.account_type import USER_ID, AccountType
|
||||
from aurweb.models.dependency_type import DependencyType
|
||||
|
@ -28,6 +28,7 @@ from aurweb.models.package_vote import PackageVote
|
|||
from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID, RelationType
|
||||
from aurweb.models.request_type import DELETION_ID, MERGE_ID, RequestType
|
||||
from aurweb.models.user import User
|
||||
from aurweb.testing.email import Email
|
||||
from aurweb.testing.html import get_errors, get_successes, parse_root
|
||||
from aurweb.testing.requests import Request
|
||||
|
||||
|
@ -126,6 +127,39 @@ def package(maintainer: User) -> Package:
|
|||
yield package
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pkgbase(package: Package) -> PackageBase:
|
||||
yield package.PackageBase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def target(maintainer: User) -> PackageBase:
|
||||
""" Merge target. """
|
||||
now = int(datetime.utcnow().timestamp())
|
||||
with db.begin():
|
||||
pkgbase = db.create(PackageBase, Name="target-package",
|
||||
Maintainer=maintainer,
|
||||
Packager=maintainer,
|
||||
Submitter=maintainer,
|
||||
ModifiedTS=now)
|
||||
db.create(Package, PackageBase=pkgbase, Name=pkgbase.Name)
|
||||
yield pkgbase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pkgreq(user: User, pkgbase: PackageBase) -> PackageRequest:
|
||||
""" Yield a PackageRequest related to `pkgbase`. """
|
||||
with db.begin():
|
||||
pkgreq = db.create(PackageRequest,
|
||||
ReqTypeID=DELETION_ID,
|
||||
User=user,
|
||||
PackageBase=pkgbase,
|
||||
PackageBaseName=pkgbase.Name,
|
||||
Comments=f"Deletion request for {pkgbase.Name}",
|
||||
ClosureComment=str())
|
||||
yield pkgreq
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def comment(user: User, package: Package) -> PackageComment:
|
||||
pkgbase = package.PackageBase
|
||||
|
@ -296,15 +330,7 @@ def test_package_comments(client: TestClient, user: User, package: Package):
|
|||
|
||||
|
||||
def test_package_requests_display(client: TestClient, user: User,
|
||||
package: Package):
|
||||
type_ = db.query(RequestType, RequestType.ID == DELETION_ID).first()
|
||||
with db.begin():
|
||||
db.create(PackageRequest, PackageBase=package.PackageBase,
|
||||
PackageBaseName=package.PackageBase.Name,
|
||||
User=user, RequestType=type_,
|
||||
Comments="Test comment.",
|
||||
ClosureComment=str())
|
||||
|
||||
package: Package, pkgreq: PackageRequest):
|
||||
# Test that a single request displays "1 pending request".
|
||||
with client as request:
|
||||
resp = request.get(package_endpoint(package))
|
||||
|
@ -1516,115 +1542,6 @@ def test_pkgbase_request(client: TestClient, user: User, package: Package):
|
|||
assert resp.status_code == int(HTTPStatus.OK)
|
||||
|
||||
|
||||
def test_pkgbase_request_post_deletion(client: TestClient, user: User,
|
||||
package: Package):
|
||||
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={
|
||||
"type": "deletion",
|
||||
"comments": "We want to delete this."
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = db.query(PackageRequest).filter(
|
||||
PackageRequest.PackageBaseID == package.PackageBase.ID
|
||||
).first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.RequestType.Name == "deletion"
|
||||
assert pkgreq.PackageBaseName == package.PackageBase.Name
|
||||
assert pkgreq.Comments == "We want to delete this."
|
||||
|
||||
|
||||
def test_pkgbase_request_post_maintainer_deletion(
|
||||
client: TestClient, maintainer: User, package: Package):
|
||||
pkgbasename = package.PackageBase.Name
|
||||
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
|
||||
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={
|
||||
"type": "deletion",
|
||||
"comments": "We want to delete this."
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = db.query(PackageRequest).filter(
|
||||
PackageRequest.PackageBaseName == pkgbasename
|
||||
).first()
|
||||
assert pkgreq.Status == ACCEPTED_ID
|
||||
|
||||
|
||||
def test_pkgbase_request_post_orphan(client: TestClient, user: User,
|
||||
package: Package):
|
||||
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={
|
||||
"type": "orphan",
|
||||
"comments": "We want to disown this."
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = db.query(PackageRequest).filter(
|
||||
PackageRequest.PackageBaseID == package.PackageBase.ID
|
||||
).first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.RequestType.Name == "orphan"
|
||||
assert pkgreq.PackageBaseName == package.PackageBase.Name
|
||||
assert pkgreq.Comments == "We want to disown this."
|
||||
|
||||
|
||||
def test_pkgbase_request_post_auto_orphan(client: TestClient, user: User,
|
||||
package: Package):
|
||||
now = int(datetime.utcnow().timestamp())
|
||||
auto_orphan_age = config.getint("options", "auto_orphan_age")
|
||||
with db.begin():
|
||||
package.PackageBase.OutOfDateTS = now - auto_orphan_age - 1
|
||||
|
||||
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={
|
||||
"type": "orphan",
|
||||
"comments": "We want to disown this."
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = db.query(PackageRequest).filter(
|
||||
PackageRequest.PackageBaseID == package.PackageBase.ID
|
||||
).first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.Status == ACCEPTED_ID
|
||||
|
||||
|
||||
def test_pkgbase_request_post_merge(client: TestClient, user: User,
|
||||
package: Package):
|
||||
with db.begin():
|
||||
pkgbase2 = db.create(PackageBase, Name="new-pkgbase",
|
||||
Submitter=user, Maintainer=user, Packager=user)
|
||||
target = db.create(Package, PackageBase=pkgbase2,
|
||||
Name=pkgbase2.Name, Version="1.0.0")
|
||||
|
||||
endpoint = f"/pkgbase/{package.PackageBase.Name}/request"
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={
|
||||
"type": "merge",
|
||||
"merge_into": target.PackageBase.Name,
|
||||
"comments": "We want to merge this."
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = db.query(PackageRequest).filter(
|
||||
PackageRequest.PackageBaseID == package.PackageBase.ID
|
||||
).first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.RequestType.Name == "merge"
|
||||
assert pkgreq.PackageBaseName == package.PackageBase.Name
|
||||
assert pkgreq.MergeBaseName == target.PackageBase.Name
|
||||
assert pkgreq.Comments == "We want to merge this."
|
||||
|
||||
|
||||
def test_pkgbase_request_post_not_found(client: TestClient, user: User):
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
|
@ -1718,22 +1635,6 @@ def test_pkgbase_request_post_merge_self_error(client: TestClient, user: User,
|
|||
assert error.text.strip() == expected
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pkgreq(user: User, package: Package) -> PackageRequest:
|
||||
reqtype = db.query(RequestType).filter(
|
||||
RequestType.ID == DELETION_ID
|
||||
).first()
|
||||
with db.begin():
|
||||
pkgreq = db.create(PackageRequest,
|
||||
RequestType=reqtype,
|
||||
User=user,
|
||||
PackageBase=package.PackageBase,
|
||||
PackageBaseName=package.PackageBase.Name,
|
||||
Comments=str(),
|
||||
ClosureComment=str())
|
||||
yield pkgreq
|
||||
|
||||
|
||||
def test_requests_close(client: TestClient, user: User,
|
||||
pkgreq: PackageRequest):
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
|
@ -1753,16 +1654,6 @@ def test_requests_close_unauthorized(client: TestClient, maintainer: User,
|
|||
assert resp.headers.get("location") == "/"
|
||||
|
||||
|
||||
def test_requests_close_post_invalid_reason(client: TestClient, user: User,
|
||||
pkgreq: PackageRequest):
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(f"/requests/{pkgreq.ID}/close", data={
|
||||
"reason": 0
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
|
||||
|
||||
|
||||
def test_requests_close_post_unauthorized(client: TestClient, maintainer: User,
|
||||
pkgreq: PackageRequest):
|
||||
cookies = {"AURSID": maintainer.login(Request(), "testPassword")}
|
||||
|
@ -1778,9 +1669,8 @@ def test_requests_close_post(client: TestClient, user: User,
|
|||
pkgreq: PackageRequest):
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(f"/requests/{pkgreq.ID}/close", data={
|
||||
"reason": REJECTED_ID
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
resp = request.post(f"/requests/{pkgreq.ID}/close",
|
||||
cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
assert pkgreq.Status == REJECTED_ID
|
||||
|
@ -1792,9 +1682,8 @@ def test_requests_close_post_rejected(client: TestClient, user: User,
|
|||
pkgreq: PackageRequest):
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post(f"/requests/{pkgreq.ID}/close", data={
|
||||
"reason": REJECTED_ID
|
||||
}, cookies=cookies, allow_redirects=False)
|
||||
resp = request.post(f"/requests/{pkgreq.ID}/close",
|
||||
cookies=cookies, allow_redirects=False)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
assert pkgreq.Status == REJECTED_ID
|
||||
|
@ -1971,18 +1860,6 @@ def test_pkgbase_vote(client: TestClient, user: User, package: Package):
|
|||
assert pkgbase.NumVotes == 0
|
||||
|
||||
|
||||
def test_pkgbase_disown_as_tu(client: TestClient, tu_user: User,
|
||||
package: Package):
|
||||
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
|
||||
pkgbase = package.PackageBase
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/disown"
|
||||
|
||||
# But we do here.
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={"confirm": True}, cookies=cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
|
||||
def test_pkgbase_disown_as_sole_maintainer(client: TestClient,
|
||||
maintainer: User,
|
||||
package: Package):
|
||||
|
@ -2114,6 +1991,38 @@ def test_pkgbase_delete(client: TestClient, tu_user: User, package: Package):
|
|||
).first()
|
||||
assert record is None
|
||||
|
||||
# Two emails should've been sent out; an autogenerated
|
||||
# request's accepted notification and a deletion notification.
|
||||
assert Email.count() == 1
|
||||
|
||||
req_close = Email(1).parse()
|
||||
expr = r"^\[PRQ#\d+\] Deletion Request for [^ ]+ Accepted$"
|
||||
subject = req_close.headers.get("Subject")
|
||||
assert re.match(expr, subject)
|
||||
|
||||
|
||||
def test_pkgbase_delete_with_request(client: TestClient, tu_user: User,
|
||||
pkgbase: PackageBase,
|
||||
pkgreq: PackageRequest):
|
||||
# TODO: Test that a previously existing request gets Accepted when
|
||||
# a TU deleted the package.
|
||||
|
||||
# Delete the package as `tu_user` via POST request.
|
||||
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/delete"
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={"confirm": True}, cookies=cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert resp.headers.get("location") == "/packages"
|
||||
|
||||
# We should've just sent one closure email since `pkgreq` exists.
|
||||
assert Email.count() == 1
|
||||
|
||||
# Make sure it was a closure for the deletion request.
|
||||
email = Email(1).parse()
|
||||
expr = r"^\[PRQ#\d+\] Deletion Request for [^ ]+ Accepted$"
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
|
||||
def test_packages_post_unknown_action(client: TestClient, user: User,
|
||||
package: Package):
|
||||
|
@ -2371,9 +2280,11 @@ def test_packages_post_adopt(client: TestClient, user: User,
|
|||
assert successes[0].text.strip() == expected
|
||||
|
||||
|
||||
def test_packages_post_disown(client: TestClient, user: User,
|
||||
maintainer: User, package: Package):
|
||||
# Initially prove that we have a maintainer: `maintainer`.
|
||||
def test_packages_post_disown_as_maintainer(client: TestClient, user: User,
|
||||
maintainer: User,
|
||||
package: Package):
|
||||
""" Disown packages as a maintainer. """
|
||||
# Initially prove that we have a maintainer.
|
||||
assert package.PackageBase.Maintainer is not None
|
||||
assert package.PackageBase.Maintainer == maintainer
|
||||
|
||||
|
@ -2430,9 +2341,24 @@ def test_packages_post_disown(client: TestClient, user: User,
|
|||
assert successes[0].text.strip() == expected
|
||||
|
||||
|
||||
def test_packages_post_disown(client: TestClient, tu_user: User,
|
||||
maintainer: User, package: Package):
|
||||
""" Disown packages as a Trusted User, which cannot bypass idle time. """
|
||||
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
resp = request.post("/packages", data={
|
||||
"action": "disown",
|
||||
"IDs": [package.ID],
|
||||
"confirm": True
|
||||
}, cookies=cookies)
|
||||
|
||||
errors = get_errors(resp.text)
|
||||
expected = r"^No due existing orphan requests to accept for .+\.$"
|
||||
assert re.match(expected, errors[0].text.strip())
|
||||
|
||||
|
||||
def test_packages_post_delete(caplog: pytest.fixture, client: TestClient,
|
||||
user: User, tu_user: User, package: Package):
|
||||
|
||||
# First, let's try to use the delete action with no packages IDs.
|
||||
user_cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
with client as request:
|
||||
|
@ -2556,23 +2482,19 @@ def test_pkgbase_merge_post_self_invalid(client: TestClient, tu_user: User,
|
|||
|
||||
|
||||
def test_pkgbase_merge_post(client: TestClient, tu_user: User,
|
||||
packages: List[Package]):
|
||||
package, target = packages[:2]
|
||||
package: Package,
|
||||
pkgbase: PackageBase,
|
||||
target: PackageBase,
|
||||
pkgreq: PackageRequest):
|
||||
pkgname = package.Name
|
||||
pkgbasename = package.PackageBase.Name
|
||||
pkgbasename = pkgbase.Name
|
||||
|
||||
# Create a merge request destined for another target.
|
||||
# This will allow our test code to exercise closing
|
||||
# such a request after merging the pkgbase in question.
|
||||
with db.begin():
|
||||
pkgreq = db.create(PackageRequest,
|
||||
User=tu_user,
|
||||
ReqTypeID=MERGE_ID,
|
||||
PackageBase=package.PackageBase,
|
||||
PackageBaseName=pkgbasename,
|
||||
MergeBaseName="test",
|
||||
Comments="Test comment.",
|
||||
ClosureComment="Test closure.")
|
||||
pkgreq.ReqTypeID = MERGE_ID
|
||||
pkgreq.MergeBaseName = target.Name
|
||||
|
||||
# Vote for the package.
|
||||
cookies = {"AURSID": tu_user.login(Request(), "testPassword")}
|
||||
|
@ -2604,32 +2526,37 @@ def test_pkgbase_merge_post(client: TestClient, tu_user: User,
|
|||
endpoint = f"/pkgbase/{package.PackageBase.Name}/merge"
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data={
|
||||
"into": target.PackageBase.Name,
|
||||
"into": target.Name,
|
||||
"confirm": True
|
||||
}, cookies=cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
loc = resp.headers.get("location")
|
||||
assert loc == f"/pkgbase/{target.PackageBase.Name}"
|
||||
assert loc == f"/pkgbase/{target.Name}"
|
||||
|
||||
# Two emails should've been sent out.
|
||||
assert Email.count() == 1
|
||||
email_body = Email(1).parse().glue()
|
||||
assert f"Merge Request for {pkgbasename} Accepted" in email_body
|
||||
|
||||
# Assert that the original comments, notifs and votes we setup
|
||||
# got migrated to target as intended.
|
||||
assert comments == target.PackageBase.comments.all()
|
||||
assert notifs == target.PackageBase.notifications.all()
|
||||
assert votes == target.PackageBase.package_votes.all()
|
||||
assert comments == target.comments.all()
|
||||
assert notifs == target.notifications.all()
|
||||
assert votes == target.package_votes.all()
|
||||
|
||||
# ...and that the package got deleted.
|
||||
package = db.query(Package).filter(Package.Name == pkgname).first()
|
||||
assert package is None
|
||||
|
||||
# Our fake target request should have gotten rejected.
|
||||
assert pkgreq.Status == REJECTED_ID
|
||||
# Our previously-made request should have gotten accepted.
|
||||
assert pkgreq.Status == ACCEPTED_ID
|
||||
assert pkgreq.Closer is not None
|
||||
|
||||
# A PackageRequest is always created when merging this way.
|
||||
pkgreq = db.query(PackageRequest).filter(
|
||||
and_(PackageRequest.ReqTypeID == MERGE_ID,
|
||||
PackageRequest.PackageBaseName == pkgbasename,
|
||||
PackageRequest.MergeBaseName == target.PackageBase.Name)
|
||||
PackageRequest.MergeBaseName == target.Name)
|
||||
).first()
|
||||
assert pkgreq is not None
|
||||
|
||||
|
|
558
test/test_requests.py
Normal file
558
test/test_requests.py
Normal file
|
@ -0,0 +1,558 @@
|
|||
import re
|
||||
|
||||
from datetime import datetime
|
||||
from http import HTTPStatus
|
||||
from logging import DEBUG
|
||||
|
||||
import pytest
|
||||
|
||||
from fastapi.testclient import TestClient
|
||||
|
||||
from aurweb import asgi, config, db
|
||||
from aurweb.models import Package, PackageBase, PackageRequest, User
|
||||
from aurweb.models.account_type import TRUSTED_USER_ID, USER_ID
|
||||
from aurweb.models.package_notification import PackageNotification
|
||||
from aurweb.models.package_request import ACCEPTED_ID, PENDING_ID, REJECTED_ID
|
||||
from aurweb.models.request_type import DELETION_ID, MERGE_ID, ORPHAN_ID
|
||||
from aurweb.packages.requests import ClosureFactory
|
||||
from aurweb.testing.email import Email
|
||||
from aurweb.testing.html import get_errors
|
||||
from aurweb.testing.requests import Request
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def setup(db_test) -> None:
|
||||
""" Setup the database. """
|
||||
return
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client() -> TestClient:
|
||||
""" Yield a TestClient. """
|
||||
yield TestClient(app=asgi.app)
|
||||
|
||||
|
||||
def create_user(username: str, email: str) -> User:
|
||||
"""
|
||||
Create a user based on `username` and `email`.
|
||||
|
||||
:param username: User.Username
|
||||
:param email: User.Email
|
||||
:return: User instance
|
||||
"""
|
||||
with db.begin():
|
||||
user = db.create(User, Username=username, Email=email,
|
||||
Passwd="testPassword", AccountTypeID=USER_ID)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user() -> User:
|
||||
""" Yield a User instance. """
|
||||
user = create_user("test", "test@example.org")
|
||||
yield user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auser(user: User) -> User:
|
||||
""" Yield an authenticated User instance. """
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
user.cookies = cookies
|
||||
yield user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def user2() -> User:
|
||||
""" Yield a secondary non-maintainer User instance. """
|
||||
user = create_user("test2", "test2@example.org")
|
||||
yield user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auser2(user2: User) -> User:
|
||||
""" Yield an authenticated secondary non-maintainer User instance. """
|
||||
cookies = {"AURSID": user2.login(Request(), "testPassword")}
|
||||
user2.cookies = cookies
|
||||
yield user2
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def tu_user() -> User:
|
||||
""" Yield an authenticated Trusted User instance. """
|
||||
user = create_user("test_tu", "test_tu@example.org")
|
||||
with db.begin():
|
||||
user.AccountTypeID = TRUSTED_USER_ID
|
||||
cookies = {"AURSID": user.login(Request(), "testPassword")}
|
||||
user.cookies = cookies
|
||||
yield user
|
||||
|
||||
|
||||
def create_pkgbase(user: User, name: str) -> PackageBase:
|
||||
"""
|
||||
Create a package base based on `user` and `name`.
|
||||
|
||||
This function also creates a matching Package record.
|
||||
|
||||
:param user: User instance
|
||||
:param name: PackageBase.Name
|
||||
:return: PackageBase instance
|
||||
"""
|
||||
now = int(datetime.utcnow().timestamp())
|
||||
with db.begin():
|
||||
pkgbase = db.create(PackageBase, Name=name,
|
||||
Maintainer=user, Packager=user,
|
||||
SubmittedTS=now, ModifiedTS=now)
|
||||
db.create(Package, Name=pkgbase.Name, PackageBase=pkgbase)
|
||||
return pkgbase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pkgbase(user: User) -> PackageBase:
|
||||
""" Yield a package base. """
|
||||
pkgbase = create_pkgbase(user, "test-package")
|
||||
yield pkgbase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def target(user: User) -> PackageBase:
|
||||
""" Yield a merge target (package base). """
|
||||
with db.begin():
|
||||
target = db.create(PackageBase, Name="target-package",
|
||||
Maintainer=user, Packager=user)
|
||||
yield target
|
||||
|
||||
|
||||
def create_request(reqtype_id: int, user: User, pkgbase: PackageBase,
|
||||
comments: str) -> PackageRequest:
|
||||
"""
|
||||
Create a package request based on `reqtype_id`, `user`,
|
||||
`pkgbase` and `comments`.
|
||||
|
||||
:param reqtype_id: RequestType.ID
|
||||
:param user: User instance
|
||||
:param pkgbase: PackageBase instance
|
||||
:param comments: PackageRequest.Comments
|
||||
:return: PackageRequest instance
|
||||
"""
|
||||
now = int(datetime.utcnow().timestamp())
|
||||
with db.begin():
|
||||
pkgreq = db.create(PackageRequest, ReqTypeID=reqtype_id,
|
||||
User=user, PackageBase=pkgbase,
|
||||
PackageBaseName=pkgbase.Name,
|
||||
RequestTS=now,
|
||||
Comments=comments,
|
||||
ClosureComment=str())
|
||||
return pkgreq
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def pkgreq(user: User, pkgbase: PackageBase):
|
||||
""" Yield a package request. """
|
||||
pkgreq = create_request(DELETION_ID, user, pkgbase, "Test request.")
|
||||
yield pkgreq
|
||||
|
||||
|
||||
def create_notification(user: User, pkgbase: PackageBase):
|
||||
""" Create a notification for a `user` on `pkgbase`. """
|
||||
with db.begin():
|
||||
notif = db.create(PackageNotification, User=user, PackageBase=pkgbase)
|
||||
return notif
|
||||
|
||||
|
||||
def test_request(client: TestClient, auser: User, pkgbase: PackageBase):
|
||||
""" Test the standard pkgbase request route GET method. """
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/request"
|
||||
with client as request:
|
||||
resp = request.get(endpoint, cookies=auser.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.OK)
|
||||
|
||||
|
||||
def test_request_post_deletion(client: TestClient, auser2: User,
|
||||
pkgbase: PackageBase):
|
||||
""" Test the POST route for creating a deletion request works. """
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/request"
|
||||
data = {"comments": "Test request.", "type": "deletion"}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=auser2.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = pkgbase.requests.first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.ReqTypeID == DELETION_ID
|
||||
assert pkgreq.Status == PENDING_ID
|
||||
|
||||
# A RequestOpenNotification should've been sent out.
|
||||
assert Email.count() == 1
|
||||
email = Email(1)
|
||||
expr = r"^\[PRQ#%d\] Deletion Request for [^ ]+$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
|
||||
def test_request_post_deletion_as_maintainer(client: TestClient, auser: User,
|
||||
pkgbase: PackageBase):
|
||||
""" Test the POST route for creating a deletion request as maint works. """
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/request"
|
||||
data = {"comments": "Test request.", "type": "deletion"}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=auser.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
# Check the pkgreq record got created and accepted.
|
||||
pkgreq = db.query(PackageRequest).first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.ReqTypeID == DELETION_ID
|
||||
assert pkgreq.Status == ACCEPTED_ID
|
||||
|
||||
# Should've gotten two emails.
|
||||
assert Email.count() == 2
|
||||
|
||||
# A RequestOpenNotification should've been sent out.
|
||||
email = Email(1)
|
||||
expr = r"^\[PRQ#%d\] Deletion Request for [^ ]+$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
# Check the content of the close notification.
|
||||
email = Email(2)
|
||||
expr = r"^\[PRQ#%d\] Deletion Request for [^ ]+ Accepted$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
|
||||
def test_request_post_deletion_autoaccept(client: TestClient, auser: User,
|
||||
pkgbase: PackageBase,
|
||||
caplog: pytest.LogCaptureFixture):
|
||||
""" Test the request route for deletion as maintainer. """
|
||||
caplog.set_level(DEBUG)
|
||||
|
||||
now = int(datetime.utcnow().timestamp())
|
||||
auto_delete_age = config.getint("options", "auto_delete_age")
|
||||
with db.begin():
|
||||
pkgbase.ModifiedTS = now - auto_delete_age + 100
|
||||
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/request"
|
||||
data = {"comments": "Test request.", "type": "deletion"}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=auser.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = db.query(PackageRequest).filter(
|
||||
PackageRequest.PackageBaseName == pkgbase.Name
|
||||
).first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.ReqTypeID == DELETION_ID
|
||||
assert pkgreq.Status == ACCEPTED_ID
|
||||
|
||||
# A RequestOpenNotification should've been sent out.
|
||||
assert Email.count() == 2
|
||||
Email.dump()
|
||||
|
||||
# Check the content of the open notification.
|
||||
email = Email(1)
|
||||
expr = r"^\[PRQ#%d\] Deletion Request for [^ ]+$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
# Check the content of the close notification.
|
||||
email = Email(2)
|
||||
expr = r"^\[PRQ#%d\] Deletion Request for [^ ]+ Accepted$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
# Check logs.
|
||||
expr = r"New request #\d+ is marked for auto-deletion."
|
||||
assert re.search(expr, caplog.text)
|
||||
|
||||
|
||||
def test_request_post_merge(client: TestClient, auser: User,
|
||||
pkgbase: PackageBase, target: PackageBase):
|
||||
""" Test the request route for merge as maintainer. """
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/request"
|
||||
data = {
|
||||
"type": "merge",
|
||||
"merge_into": target.Name,
|
||||
"comments": "Test request.",
|
||||
}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=auser.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = pkgbase.requests.first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.ReqTypeID == MERGE_ID
|
||||
assert pkgreq.Status == PENDING_ID
|
||||
assert pkgreq.MergeBaseName == target.Name
|
||||
|
||||
# A RequestOpenNotification should've been sent out.
|
||||
assert Email.count() == 1
|
||||
email = Email(1)
|
||||
expr = r"^\[PRQ#%d\] Merge Request for [^ ]+$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
|
||||
def test_request_post_orphan(client: TestClient, auser: User,
|
||||
pkgbase: PackageBase):
|
||||
""" Test the POST route for creating an orphan request works. """
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/request"
|
||||
data = {
|
||||
"type": "orphan",
|
||||
"comments": "Test request.",
|
||||
}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=auser.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = pkgbase.requests.first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.ReqTypeID == ORPHAN_ID
|
||||
assert pkgreq.Status == PENDING_ID
|
||||
|
||||
# A RequestOpenNotification should've been sent out.
|
||||
assert Email.count() == 1
|
||||
|
||||
email = Email(1)
|
||||
expr = r"^\[PRQ#%d\] Orphan Request for [^ ]+$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
|
||||
def test_deletion_request(client: TestClient, user: User, tu_user: User,
|
||||
pkgbase: PackageBase, pkgreq: PackageRequest):
|
||||
""" Test deleting a package with a preexisting request. """
|
||||
# `pkgreq`.ReqTypeID is already DELETION_ID.
|
||||
create_request(DELETION_ID, user, pkgbase, "Other request.")
|
||||
|
||||
# Create a notification record for another user. They should then
|
||||
# also receive a DeleteNotification.
|
||||
user2 = create_user("test2", "test2@example.org")
|
||||
create_notification(user2, pkgbase)
|
||||
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/delete"
|
||||
comments = "Test closure."
|
||||
data = {"comments": comments, "confirm": True}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=tu_user.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert resp.headers.get("location") == "/packages"
|
||||
|
||||
# Ensure that `pkgreq`.ClosureComment was left alone when specified.
|
||||
assert pkgreq.ClosureComment == comments
|
||||
|
||||
# We should've gotten three emails. Two accepted requests and
|
||||
# a DeleteNotification.
|
||||
assert Email.count() == 3
|
||||
|
||||
# Both requests should have gotten accepted and had a notification
|
||||
# sent out for them.
|
||||
for i in range(Email.count() - 1):
|
||||
email = Email(i + 1).parse()
|
||||
expr = r"^\[PRQ#\d+\] Deletion Request for [^ ]+ Accepted$"
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
# We should've also had a DeleteNotification sent out.
|
||||
email = Email(3).parse()
|
||||
subject = r"^AUR Package deleted: [^ ]+$"
|
||||
assert re.match(subject, email.headers.get("Subject"))
|
||||
body = r"%s [1] deleted %s [2]." % (tu_user.Username, pkgbase.Name)
|
||||
assert body in email.body
|
||||
|
||||
|
||||
def test_deletion_autorequest(client: TestClient, tu_user: User,
|
||||
pkgbase: PackageBase):
|
||||
""" Test deleting a package without a request. """
|
||||
# `pkgreq`.ReqTypeID is already DELETION_ID.
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/delete"
|
||||
data = {"confirm": True}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=tu_user.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
assert resp.headers.get("location") == "/packages"
|
||||
assert Email.count() == 1
|
||||
|
||||
email = Email(1).parse()
|
||||
subject = r"^\[PRQ#\d+\] Deletion Request for [^ ]+ Accepted$"
|
||||
assert re.match(subject, email.headers.get("Subject"))
|
||||
assert "[Autogenerated]" in email.body
|
||||
|
||||
|
||||
def test_merge_request(client: TestClient, user: User, tu_user: User,
|
||||
pkgbase: PackageBase, target: PackageBase,
|
||||
pkgreq: PackageRequest):
|
||||
""" Test merging a package with a pre - existing request. """
|
||||
with db.begin():
|
||||
pkgreq.ReqTypeID = MERGE_ID
|
||||
pkgreq.MergeBaseName = target.Name
|
||||
|
||||
other_target = create_pkgbase(user, "other-target")
|
||||
other_request = create_request(MERGE_ID, user, pkgbase, "Other request.")
|
||||
other_target2 = create_pkgbase(user, "other-target2")
|
||||
other_request2 = create_request(MERGE_ID, user, pkgbase, "Other request2.")
|
||||
with db.begin():
|
||||
other_request.MergeBaseName = other_target.Name
|
||||
other_request2.MergeBaseName = other_target2.Name
|
||||
|
||||
# `pkgreq`.ReqTypeID is already DELETION_ID.
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/merge"
|
||||
comments = "Test merge closure."
|
||||
data = {"into": target.Name, "comments": comments, "confirm": True}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=tu_user.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert resp.headers.get("location") == f"/pkgbase/{target.Name}"
|
||||
|
||||
# Ensure that `pkgreq`.ClosureComment was left alone when specified.
|
||||
assert pkgreq.ClosureComment == comments
|
||||
|
||||
# We should've gotten 3 emails: an accepting and two rejections.
|
||||
assert Email.count() == 3
|
||||
|
||||
# Assert specific IDs match up in the subjects.
|
||||
accepted = Email(1).parse()
|
||||
subj = r"^\[PRQ#%d\] Merge Request for [^ ]+ Accepted$" % pkgreq.ID
|
||||
assert re.match(subj, accepted.headers.get("Subject"))
|
||||
|
||||
# In the accepted case, we already supplied a closure comment,
|
||||
# which stops one from being autogenerated by the algorithm.
|
||||
assert "[Autogenerated]" not in accepted.body
|
||||
|
||||
# Test rejection emails, which do have autogenerated closures.
|
||||
rejected = Email(2).parse()
|
||||
subj = r"^\[PRQ#%d\] Merge Request for [^ ]+ Rejected$" % other_request.ID
|
||||
assert re.match(subj, rejected.headers.get("Subject"))
|
||||
assert "[Autogenerated]" in rejected.body
|
||||
|
||||
rejected = Email(3).parse()
|
||||
subj = r"^\[PRQ#%d\] Merge Request for [^ ]+ Rejected$" % other_request2.ID
|
||||
assert re.match(subj, rejected.headers.get("Subject"))
|
||||
assert "[Autogenerated]" in rejected.body
|
||||
|
||||
|
||||
def test_merge_autorequest(client: TestClient, user: User, tu_user: User,
|
||||
pkgbase: PackageBase, target: PackageBase):
|
||||
""" Test merging a package without a request. """
|
||||
with db.begin():
|
||||
pkgreq.ReqTypeID = MERGE_ID
|
||||
pkgreq.MergeBaseName = target.Name
|
||||
|
||||
# `pkgreq`.ReqTypeID is already DELETION_ID.
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/merge"
|
||||
data = {"into": target.Name, "confirm": True}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=tu_user.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert resp.headers.get("location") == f"/pkgbase/{target.Name}"
|
||||
|
||||
# Should've gotten one email; an [Autogenerated] one.
|
||||
assert Email.count() == 1
|
||||
|
||||
# Test accepted merge request notification.
|
||||
email = Email(1).parse()
|
||||
subj = r"^\[PRQ#\d+\] Merge Request for [^ ]+ Accepted$"
|
||||
assert re.match(subj, email.headers.get("Subject"))
|
||||
assert "[Autogenerated]" in email.body
|
||||
|
||||
|
||||
def test_orphan_request(client: TestClient, user: User, tu_user: User,
|
||||
pkgbase: PackageBase, pkgreq: PackageRequest):
|
||||
""" Test the standard orphan request route. """
|
||||
idle_time = config.getint("options", "request_idle_time")
|
||||
now = int(datetime.utcnow().timestamp())
|
||||
with db.begin():
|
||||
pkgreq.ReqTypeID = ORPHAN_ID
|
||||
# Set the request time so it's seen as due (idle_time has passed).
|
||||
pkgreq.RequestTS = now - idle_time - 10
|
||||
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/disown"
|
||||
comments = "Test orphan closure."
|
||||
data = {"comments": comments, "confirm": True}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=tu_user.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
|
||||
|
||||
# Ensure that `pkgreq`.ClosureComment was left alone when specified.
|
||||
assert pkgreq.ClosureComment == comments
|
||||
|
||||
# Check the email we expect.
|
||||
assert Email.count() == 1
|
||||
email = Email(1).parse()
|
||||
subj = r"^\[PRQ#%d\] Orphan Request for [^ ]+ Accepted$" % pkgreq.ID
|
||||
assert re.match(subj, email.headers.get("Subject"))
|
||||
|
||||
|
||||
def test_request_post_orphan_autoaccept(client: TestClient, auser: User,
|
||||
pkgbase: PackageBase,
|
||||
caplog: pytest.LogCaptureFixture):
|
||||
""" Test the standard pkgbase request route GET method. """
|
||||
caplog.set_level(DEBUG)
|
||||
now = int(datetime.utcnow().timestamp())
|
||||
auto_orphan_age = config.getint("options", "auto_orphan_age")
|
||||
with db.begin():
|
||||
pkgbase.OutOfDateTS = now - auto_orphan_age - 100
|
||||
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/request"
|
||||
data = {
|
||||
"type": "orphan",
|
||||
"comments": "Test request.",
|
||||
}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=auser.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
|
||||
pkgreq = pkgbase.requests.first()
|
||||
assert pkgreq is not None
|
||||
assert pkgreq.ReqTypeID == ORPHAN_ID
|
||||
|
||||
# A Request(Open|Close)Notification should've been sent out.
|
||||
assert Email.count() == 2
|
||||
|
||||
# Check the first email; should be our open request.
|
||||
email = Email(1)
|
||||
expr = r"^\[PRQ#%d\] Orphan Request for [^ ]+$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
# And the second should be the automated closure.
|
||||
email = Email(2)
|
||||
expr = r"^\[PRQ#%d\] Orphan Request for [^ ]+ Accepted$" % pkgreq.ID
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
# Check logs.
|
||||
expr = r"New request #\d+ is marked for auto-orphan."
|
||||
assert re.search(expr, caplog.text)
|
||||
|
||||
|
||||
def test_orphan_as_maintainer(client: TestClient, auser: User,
|
||||
pkgbase: PackageBase):
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/disown"
|
||||
data = {"confirm": True}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=auser.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.SEE_OTHER)
|
||||
assert resp.headers.get("location") == f"/pkgbase/{pkgbase.Name}"
|
||||
|
||||
assert Email.count() == 1
|
||||
email = Email(1)
|
||||
expr = r"^\[PRQ#\d+\] Orphan Request for [^ ]+ Accepted$"
|
||||
assert re.match(expr, email.headers.get("Subject"))
|
||||
|
||||
|
||||
def test_orphan_without_requests(client: TestClient, tu_user: User,
|
||||
pkgbase: PackageBase):
|
||||
""" Test orphans are automatically accepted past a certain date. """
|
||||
endpoint = f"/pkgbase/{pkgbase.Name}/disown"
|
||||
data = {"confirm": True}
|
||||
with client as request:
|
||||
resp = request.post(endpoint, data=data, cookies=tu_user.cookies)
|
||||
assert resp.status_code == int(HTTPStatus.BAD_REQUEST)
|
||||
|
||||
errors = get_errors(resp.text)
|
||||
expected = r"^No due existing orphan requests to accept for .+\.$"
|
||||
assert re.match(expected, errors[0].text.strip())
|
||||
|
||||
assert Email.count() == 0
|
||||
|
||||
|
||||
def test_closure_factory_invalid_reqtype_id():
|
||||
""" Test providing an invalid reqtype_id raises NotImplementedError. """
|
||||
automated = ClosureFactory()
|
||||
match = r"^Unsupported '.+' value\.$"
|
||||
with pytest.raises(NotImplementedError, match=match):
|
||||
automated.get_closure(666, None, None, None, ACCEPTED_ID)
|
||||
with pytest.raises(NotImplementedError, match=match):
|
||||
automated.get_closure(666, None, None, None, REJECTED_ID)
|
Loading…
Add table
Add a link
Reference in a new issue