change(fastapi): simplify model imports across code-base

Closes: #133

Signed-off-by: Kevin Morris <kevr@0cost.org>
This commit is contained in:
Kevin Morris 2021-10-16 19:25:25 -07:00
parent bfdc85d7d6
commit 28c4e9697b
No known key found for this signature in database
GPG key ID: F7E46DED420788F3
12 changed files with 341 additions and 343 deletions

View file

@ -7,27 +7,13 @@ from fastapi.responses import JSONResponse, RedirectResponse
from sqlalchemy import and_, case
import aurweb.filters
import aurweb.models.package_comment
import aurweb.models.package_keyword
import aurweb.packages.util
from aurweb import db, defaults, l10n, util
from aurweb import db, defaults, l10n, models, util
from aurweb.auth import auth_required
from aurweb.models.license import License
from aurweb.models.package import Package
from aurweb.models.package_base import PackageBase
from aurweb.models.package_comaintainer import PackageComaintainer
from aurweb.models.package_comment import PackageComment
from aurweb.models.package_dependency import PackageDependency
from aurweb.models.package_license import PackageLicense
from aurweb.models.package_notification import PackageNotification
from aurweb.models.package_relation import PackageRelation
from aurweb.models.package_request import ACCEPTED_ID, PENDING_ID, REJECTED_ID, PackageRequest
from aurweb.models.package_source import PackageSource
from aurweb.models.package_vote import PackageVote
from aurweb.models.package_request import ACCEPTED_ID, PENDING_ID, REJECTED_ID
from aurweb.models.relation_type import CONFLICTS_ID
from aurweb.models.request_type import DELETION_ID, RequestType
from aurweb.models.user import User
from aurweb.models.request_type import DELETION_ID
from aurweb.packages.search import PackageSearch
from aurweb.packages.util import get_pkg_or_base, get_pkgbase_comment, query_notified, query_voted
from aurweb.scripts import notify, popupdate
@ -75,9 +61,9 @@ async def packages_get(request: Request, context: Dict[str, Any]):
# do **not** have OutOfDateTS.
criteria = None
if flagged == "on":
criteria = PackageBase.OutOfDateTS.isnot
criteria = models.PackageBase.OutOfDateTS.isnot
else:
criteria = PackageBase.OutOfDateTS.is_
criteria = models.PackageBase.OutOfDateTS.is_
# Apply the flag criteria to our PackageSearch.query.
search.query = search.query.filter(criteria(None))
@ -86,7 +72,8 @@ async def packages_get(request: Request, context: Dict[str, Any]):
if submit == "Orphans":
# If the user clicked the "Orphans" button, we only want
# orphaned packages.
search.query = search.query.filter(PackageBase.MaintainerUID.is_(None))
search.query = search.query.filter(
models.PackageBase.MaintainerUID.is_(None))
# Apply user-specified specified sort column and ordering.
search.sort_by(sort_by, sort_order)
@ -116,19 +103,19 @@ async def packages(request: Request) -> Response:
return await packages_get(request, context)
def create_request_if_missing(requests: List[PackageRequest],
reqtype: RequestType,
user: User,
package: Package):
def create_request_if_missing(requests: List[models.PackageRequest],
reqtype: models.RequestType,
user: models.User,
package: models.Package):
now = int(datetime.utcnow().timestamp())
pkgreq = db.query(PackageRequest).filter(
PackageRequest.PackageBaseName == package.PackageBase.Name
pkgreq = db.query(models.PackageRequest).filter(
models.PackageRequest.PackageBaseName == package.PackageBase.Name
).first()
if not pkgreq:
# No PackageRequest existed. Create one.
comments = "Automatically generated by aurweb."
closure_comment = "Deleted by aurweb."
pkgreq = db.create(PackageRequest,
pkgreq = db.create(models.PackageRequest,
RequestType=reqtype,
PackageBase=package.PackageBase,
PackageBaseName=package.PackageBase.Name,
@ -141,8 +128,7 @@ def create_request_if_missing(requests: List[PackageRequest],
requests.append(pkgreq)
def delete_package(deleter: User,
package: Package):
def delete_package(deleter: models.User, package: models.Package):
notifications = []
requests = []
bases_to_delete = []
@ -150,8 +136,8 @@ def delete_package(deleter: User,
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
# In all cases, though, just delete the Package in question.
if package.PackageBase.packages.count() == 1:
reqtype = db.query(RequestType).filter(
RequestType.ID == DELETION_ID
reqtype = db.query(models.RequestType).filter(
models.RequestType.ID == DELETION_ID
).first()
with db.begin():
@ -187,7 +173,7 @@ def delete_package(deleter: User,
async def make_single_context(request: Request,
pkgbase: PackageBase) -> Dict[str, Any]:
pkgbase: models.PackageBase) -> Dict[str, Any]:
""" Make a basic context for package or pkgbase.
:param request: FastAPI request
@ -203,11 +189,11 @@ async def make_single_context(request: Request,
context["packages_count"] = pkgbase.packages.count()
context["keywords"] = pkgbase.keywords
context["comments"] = pkgbase.comments.order_by(
PackageComment.CommentTS.desc()
models.PackageComment.CommentTS.desc()
)
context["pinned_comments"] = pkgbase.comments.filter(
PackageComment.PinnedTS != 0
).order_by(PackageComment.CommentTS.desc())
models.PackageComment.PinnedTS != 0
).order_by(models.PackageComment.CommentTS.desc())
context["is_maintainer"] = (request.user.is_authenticated()
and request.user.ID == pkgbase.MaintainerUID)
@ -216,10 +202,10 @@ async def make_single_context(request: Request,
context["out_of_date"] = bool(pkgbase.OutOfDateTS)
context["voted"] = request.user.package_votes.filter(
PackageVote.PackageBaseID == pkgbase.ID).scalar()
models.PackageVote.PackageBaseID == pkgbase.ID).scalar()
context["requests"] = pkgbase.requests.filter(
PackageRequest.ClosedTS.is_(None)
models.PackageRequest.ClosedTS.is_(None)
).count()
return context
@ -228,8 +214,8 @@ async def make_single_context(request: Request,
@router.get("/packages/{name}")
async def package(request: Request, name: str) -> Response:
# Get the Package.
pkg = get_pkg_or_base(name, Package)
pkgbase = (get_pkg_or_base(name, PackageBase)
pkg = get_pkg_or_base(name, models.Package)
pkgbase = (get_pkg_or_base(name, models.PackageBase)
if not pkg else pkg.PackageBase)
# Add our base information.
@ -237,28 +223,32 @@ async def package(request: Request, name: str) -> Response:
context["package"] = pkg
# Package sources.
context["sources"] = db.query(PackageSource).join(Package).join(
PackageBase).filter(PackageBase.ID == pkgbase.ID)
context["sources"] = db.query(models.PackageSource).join(
models.Package).join(models.PackageBase).filter(
models.PackageBase.ID == pkgbase.ID)
# Package dependencies.
dependencies = db.query(PackageDependency).join(Package).join(
PackageBase).filter(PackageBase.ID == pkgbase.ID)
dependencies = db.query(models.PackageDependency).join(
models.Package).join(models.PackageBase).filter(
models.PackageBase.ID == pkgbase.ID)
context["dependencies"] = dependencies
# Package requirements (other packages depend on this one).
required_by = db.query(PackageDependency).join(Package).filter(
PackageDependency.DepName == pkgbase.Name).order_by(
Package.Name.asc())
required_by = db.query(models.PackageDependency).join(
models.Package).filter(
models.PackageDependency.DepName == pkgbase.Name).order_by(
models.Package.Name.asc())
context["required_by"] = required_by
licenses = db.query(License).join(PackageLicense).join(Package).join(
PackageBase).filter(PackageBase.ID == pkgbase.ID)
licenses = db.query(models.License).join(models.PackageLicense).join(
models.Package).join(models.PackageBase).filter(
models.PackageBase.ID == pkgbase.ID)
context["licenses"] = licenses
conflicts = db.query(PackageRelation).join(Package).join(
PackageBase).filter(
and_(PackageRelation.RelTypeID == CONFLICTS_ID,
PackageBase.ID == pkgbase.ID)
conflicts = db.query(models.PackageRelation).join(models.Package).join(
models.PackageBase).filter(
and_(models.PackageRelation.RelTypeID == CONFLICTS_ID,
models.PackageBase.ID == pkgbase.ID)
)
context["conflicts"] = conflicts
@ -268,7 +258,7 @@ async def package(request: Request, name: str) -> Response:
@router.get("/pkgbase/{name}")
async def package_base(request: Request, name: str) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
# If this is not a split package, redirect to /packages/{name}.
if pkgbase.packages.count() == 1:
@ -285,7 +275,7 @@ async def package_base(request: Request, name: str) -> Response:
@router.get("/pkgbase/{name}/voters")
async def package_base_voters(request: Request, name: str) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
context = make_context(request, "Voters")
context["pkgbase"] = pkgbase
return render_template(request, "pkgbase/voters.html", context)
@ -298,7 +288,7 @@ async def pkgbase_comments_post(
comment: str = Form(default=str()),
enable_notifications: bool = Form(default=False)):
""" Add a new comment. """
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not comment:
raise HTTPException(status_code=HTTPStatus.BAD_REQUEST)
@ -307,13 +297,13 @@ async def pkgbase_comments_post(
# update the db record.
now = int(datetime.utcnow().timestamp())
with db.begin():
comment = db.create(PackageComment, User=request.user,
comment = db.create(models.PackageComment, User=request.user,
PackageBase=pkgbase,
Comments=comment, RenderedComment=str(),
CommentTS=now)
if enable_notifications and not request.user.notified(pkgbase):
db.create(PackageNotification,
db.create(models.PackageNotification,
User=request.user,
PackageBase=pkgbase)
update_comment_render(comment.ID)
@ -327,8 +317,8 @@ async def pkgbase_comments_post(
@auth_required(True, login=False)
async def pkgbase_comment_form(request: Request, name: str, id: int):
""" Produce a comment form for comment {id}. """
pkgbase = get_pkg_or_base(name, PackageBase)
comment = pkgbase.comments.filter(PackageComment.ID == id).first()
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = pkgbase.comments.filter(models.PackageComment.ID == id).first()
if not comment:
return JSONResponse({}, status_code=HTTPStatus.NOT_FOUND)
@ -349,7 +339,7 @@ async def pkgbase_comment_post(
request: Request, name: str, id: int,
comment: str = Form(default=str()),
enable_notifications: bool = Form(default=False)):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
db_comment = get_pkgbase_comment(pkgbase, id)
if not comment:
@ -365,10 +355,10 @@ async def pkgbase_comment_post(
db_comment.EditedTS = now
db_notif = request.user.notifications.filter(
PackageNotification.PackageBaseID == pkgbase.ID
models.PackageNotification.PackageBaseID == pkgbase.ID
).first()
if enable_notifications and not db_notif:
db.create(PackageNotification,
db.create(models.PackageNotification,
User=request.user,
PackageBase=pkgbase)
update_comment_render(db_comment.ID)
@ -381,7 +371,7 @@ async def pkgbase_comment_post(
@router.post("/pkgbase/{name}/comments/{id}/delete")
@auth_required(True, redirect="/pkgbase/{name}/comments/{id}/delete")
async def pkgbase_comment_delete(request: Request, name: str, id: int):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
authorized = request.user.has_credential("CRED_COMMENT_DELETE",
@ -404,7 +394,7 @@ async def pkgbase_comment_delete(request: Request, name: str, id: int):
@router.post("/pkgbase/{name}/comments/{id}/undelete")
@auth_required(True, redirect="/pkgbase/{name}/comments/{id}/undelete")
async def pkgbase_comment_undelete(request: Request, name: str, id: int):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential("CRED_COMMENT_UNDELETE",
@ -426,7 +416,7 @@ async def pkgbase_comment_undelete(request: Request, name: str, id: int):
@router.post("/pkgbase/{name}/comments/{id}/pin")
@auth_required(True, redirect="/pkgbase/{name}/comments/{id}/pin")
async def pkgbase_comment_pin(request: Request, name: str, id: int):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential("CRED_COMMENT_PIN",
@ -448,7 +438,7 @@ async def pkgbase_comment_pin(request: Request, name: str, id: int):
@router.post("/pkgbase/{name}/comments/{id}/unpin")
@auth_required(True, redirect="/pkgbase/{name}/comments/{id}/unpin")
async def pkgbase_comment_unpin(request: Request, name: str, id: int):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
comment = get_pkgbase_comment(pkgbase, id)
has_cred = request.user.has_credential("CRED_COMMENT_PIN",
@ -470,7 +460,7 @@ async def pkgbase_comment_unpin(request: Request, name: str, id: int):
@auth_required(True, redirect="/pkgbase/{name}/comaintainers")
async def package_base_comaintainers(request: Request, name: str) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
# Unauthorized users (Non-TU/Dev and not the pkgbase maintainer)
# get redirected to the package base's page.
@ -498,8 +488,8 @@ def remove_users(pkgbase, usernames):
for username in usernames:
# We know that the users we passed here are in the DB.
# No need to check for their existence.
comaintainer = pkgbase.comaintainers.join(User).filter(
User.Username == username
comaintainer = pkgbase.comaintainers.join(models.User).filter(
models.User.Username == username
).first()
notifications.append(
notify.ComaintainerRemoveNotification(
@ -519,7 +509,7 @@ async def package_base_comaintainers_post(
request: Request, name: str,
users: str = Form(default=str())) -> Response:
# Get the PackageBase.
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
# Unauthorized users (Non-TU/Dev and not the pkgbase maintainer)
# get redirected to the package base's page.
@ -540,7 +530,7 @@ async def package_base_comaintainers_post(
# Get the highest priority in the comaintainer set.
last_priority = pkgbase.comaintainers.order_by(
PackageComaintainer.Priority.desc()
models.PackageComaintainer.Priority.desc()
).limit(1).first()
# If that record exists, we use a priority which is 1 higher.
@ -562,7 +552,8 @@ async def package_base_comaintainers_post(
_ = l10n.get_translator_for_request(request)
memo = {}
for username in usernames:
user = db.query(User).filter(User.Username == username).first()
user = db.query(models.User).filter(
models.User.Username == username).first()
if not user:
return _("Invalid user name: %s") % username
memo[username] = user
@ -579,7 +570,7 @@ async def package_base_comaintainers_post(
# If we get here, our user model object is in the memo.
comaintainer = db.create(
PackageComaintainer,
models.PackageComaintainer,
PackageBase=pkgbase,
User=user,
Priority=priority)
@ -620,21 +611,21 @@ async def requests(request: Request,
context["PP"] = PP
# A PackageRequest query, with left inner joined User and RequestType.
query = db.query(PackageRequest).join(
User, PackageRequest.UsersID == User.ID
).join(RequestType)
query = db.query(models.PackageRequest).join(
models.User, models.PackageRequest.UsersID == models.User.ID
).join(models.RequestType)
# If the request user is not elevated (TU or Dev), then
# filter PackageRequests which are owned by the request user.
if not request.user.is_elevated():
query = query.filter(PackageRequest.UsersID == request.user.ID)
query = query.filter(models.PackageRequest.UsersID == request.user.ID)
context["total"] = query.count()
context["results"] = query.order_by(
# Order primarily by the Status column being PENDING_ID,
# and secondarily by RequestTS; both in descending order.
case([(PackageRequest.Status == PENDING_ID, 1)], else_=0).desc(),
PackageRequest.RequestTS.desc()
case([(models.PackageRequest.Status == PENDING_ID, 1)], else_=0).desc(),
models.PackageRequest.RequestTS.desc()
).limit(PP).offset(O).all()
return render_template(request, "requests.html", context)
@ -645,7 +636,8 @@ async def requests(request: Request,
async def package_request(request: Request, name: str):
context = make_context(request, "Submit Request")
pkgbase = db.query(PackageBase).filter(PackageBase.Name == name).first()
pkgbase = db.query(models.PackageBase).filter(
models.PackageBase.Name == name).first()
if not pkgbase:
raise HTTPException(status_code=HTTPStatus.NOT_FOUND)
@ -660,7 +652,7 @@ async def pkgbase_request_post(request: Request, name: str,
type: str = Form(...),
merge_into: str = Form(default=None),
comments: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
# Create our render context.
context = make_context(request, "Submit Request")
@ -682,8 +674,8 @@ async def pkgbase_request_post(request: Request, name: str,
context["errors"] = ['The "Merge into" field must not be empty.']
return render_template(request, "pkgbase/request.html", context)
target = db.query(PackageBase).filter(
PackageBase.Name == merge_into
target = db.query(models.PackageBase).filter(
models.PackageBase.Name == merge_into
).first()
if not target:
# TODO: This error needs to be translated.
@ -701,12 +693,14 @@ async def pkgbase_request_post(request: Request, name: str,
# All good. Create a new PackageRequest based on the given type.
now = int(datetime.utcnow().timestamp())
reqtype = db.query(RequestType, RequestType.Name == type).first()
reqtype = db.query(models.RequestType).filter(
models.RequestType.Name == type).first()
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notify_ = None
with db.begin():
pkgreq = db.create(PackageRequest, RequestType=reqtype, RequestTS=now,
PackageBase=pkgbase, PackageBaseName=pkgbase.Name,
pkgreq = db.create(models.PackageRequest, RequestType=reqtype,
RequestTS=now, PackageBase=pkgbase,
PackageBaseName=pkgbase.Name,
MergeBaseName=merge_into, User=request.user,
Comments=comments, ClosureComment=str())
@ -726,7 +720,8 @@ async def pkgbase_request_post(request: Request, name: str,
@router.get("/requests/{id}/close")
@auth_required(True, redirect="/requests/{id}/close")
async def requests_close(request: Request, id: int):
pkgreq = db.query(PackageRequest).filter(PackageRequest.ID == id).first()
pkgreq = db.query(models.PackageRequest).filter(
models.PackageRequest.ID == id).first()
if not request.user.is_elevated() and request.user != pkgreq.User:
# Request user doesn't have permission here: redirect to '/'.
return RedirectResponse("/", status_code=HTTPStatus.SEE_OTHER)
@ -741,7 +736,8 @@ async def requests_close(request: Request, id: int):
async def requests_close_post(request: Request, id: int,
reason: int = Form(default=0),
comments: str = Form(default=str())):
pkgreq = db.query(PackageRequest).filter(PackageRequest.ID == id).first()
pkgreq = db.query(models.PackageRequest).filter(
models.PackageRequest.ID == id).first()
if not request.user.is_elevated() and request.user != pkgreq.User:
# Request user doesn't have permission here: redirect to '/'.
return RedirectResponse("/", status_code=HTTPStatus.SEE_OTHER)
@ -776,7 +772,7 @@ async def requests_close_post(request: Request, id: int,
@router.get("/pkgbase/{name}/flag")
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_flag_get(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential("CRED_PKGBASE_FLAG")
if not has_cred or pkgbase.Flagger is not None:
@ -792,7 +788,7 @@ async def pkgbase_flag_get(request: Request, name: str):
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_flag_post(request: Request, name: str,
comments: str = Form(default=str())):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not comments:
context = make_context(request, "Flag Package Out-Of-Date")
@ -817,7 +813,7 @@ async def pkgbase_flag_post(request: Request, name: str,
@router.post("/pkgbase/{name}/unflag")
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_unflag(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential(
"CRED_PKGBASE_UNFLAG", approved=[pkgbase.Flagger, pkgbase.Maintainer])
@ -834,15 +830,15 @@ async def pkgbase_unflag(request: Request, name: str):
@router.post("/pkgbase/{name}/notify")
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_notify(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
notif = db.query(pkgbase.notifications.filter(
PackageNotification.UserID == request.user.ID
models.PackageNotification.UserID == request.user.ID
).exists()).scalar()
has_cred = request.user.has_credential("CRED_PKGBASE_NOTIFY")
if has_cred and not notif:
with db.begin():
db.create(PackageNotification,
db.create(models.PackageNotification,
PackageBase=pkgbase,
User=request.user)
@ -853,10 +849,10 @@ async def pkgbase_notify(request: Request, name: str):
@router.post("/pkgbase/{name}/unnotify")
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_unnotify(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
notif = pkgbase.notifications.filter(
PackageNotification.UserID == request.user.ID
models.PackageNotification.UserID == request.user.ID
).first()
has_cred = request.user.has_credential("CRED_PKGBASE_NOTIFY")
if has_cred and notif:
@ -870,16 +866,16 @@ async def pkgbase_unnotify(request: Request, name: str):
@router.post("/pkgbase/{name}/vote")
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_vote(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
vote = pkgbase.package_votes.filter(
PackageVote.UsersID == request.user.ID
models.PackageVote.UsersID == request.user.ID
).first()
has_cred = request.user.has_credential("CRED_PKGBASE_VOTE")
if has_cred and not vote:
now = int(datetime.utcnow().timestamp())
with db.begin():
db.create(PackageVote,
db.create(models.PackageVote,
User=request.user,
PackageBase=pkgbase,
VoteTS=now)
@ -895,10 +891,10 @@ async def pkgbase_vote(request: Request, name: str):
@router.post("/pkgbase/{name}/unvote")
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_unvote(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
vote = pkgbase.package_votes.filter(
PackageVote.UsersID == request.user.ID
models.PackageVote.UsersID == request.user.ID
).first()
has_cred = request.user.has_credential("CRED_PKGBASE_VOTE")
if has_cred and vote:
@ -913,7 +909,7 @@ async def pkgbase_unvote(request: Request, name: str):
status_code=HTTPStatus.SEE_OTHER)
def disown_pkgbase(pkgbase: PackageBase, disowner: User):
def disown_pkgbase(pkgbase: models.PackageBase, disowner: models.User):
conn = db.ConnectionExecutor(db.get_engine().raw_connection())
notif = notify.DisownNotification(conn, disowner.ID, pkgbase.ID)
@ -922,7 +918,7 @@ def disown_pkgbase(pkgbase: PackageBase, disowner: User):
pkgbase.Maintainer = None
else:
co = pkgbase.comaintainers.order_by(
PackageComaintainer.Priority.asc()
models.PackageComaintainer.Priority.asc()
).limit(1).first()
if co:
@ -938,7 +934,7 @@ def disown_pkgbase(pkgbase: PackageBase, disowner: User):
@router.get("/pkgbase/{name}/disown")
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_disown_get(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential("CRED_PKGBASE_DISOWN",
approved=[pkgbase.Maintainer])
@ -955,7 +951,7 @@ async def pkgbase_disown_get(request: Request, name: str):
@auth_required(True, redirect="/pkgbase/{name}")
async def pkgbase_disown_post(request: Request, name: str,
confirm: bool = Form(default=False)):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential("CRED_PKGBASE_DISOWN",
approved=[pkgbase.Maintainer])
@ -979,7 +975,7 @@ async def pkgbase_disown_post(request: Request, name: str,
@router.post("/pkgbase/{name}/adopt")
@auth_required(True)
async def pkgbase_adopt_post(request: Request, name: str):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
has_cred = request.user.has_credential("CRED_PKGBASE_ADOPT")
if has_cred or not pkgbase.Maintainer:
@ -1001,7 +997,7 @@ async def pkgbase_delete_get(request: Request, name: str):
status_code=HTTPStatus.SEE_OTHER)
context = make_context(request, "Package Deletion")
context["pkgbase"] = get_pkg_or_base(name, PackageBase)
context["pkgbase"] = get_pkg_or_base(name, models.PackageBase)
return render_template(request, "packages/delete.html", context)
@ -1009,7 +1005,7 @@ async def pkgbase_delete_get(request: Request, name: str):
@auth_required(True)
async def pkgbase_delete_post(request: Request, name: str,
confirm: bool = Form(default=False)):
pkgbase = get_pkg_or_base(name, PackageBase)
pkgbase = get_pkg_or_base(name, models.PackageBase)
if not request.user.has_credential("CRED_PKGBASE_DELETE"):
return RedirectResponse(f"/pkgbase/{name}",