mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
Allows us to set different cache eviction timespans for search queries and statistics. Stats and especially "last package updates" should probably be refreshed more often, whereas we might want to cache search results for a bit longer. So this gives us a bit more flexibility playing around with different settings and tweak things. Signed-off-by: moson <moson@archlinux.org>
504 lines
17 KiB
Python
504 lines
17 KiB
Python
from collections import defaultdict
|
|
from http import HTTPStatus
|
|
from typing import Any
|
|
|
|
from fastapi import APIRouter, Form, Query, Request, Response
|
|
|
|
import aurweb.filters # noqa: F401
|
|
from aurweb import aur_logging, config, db, defaults, models, util
|
|
from aurweb.auth import creds, requires_auth
|
|
from aurweb.cache import db_count_cache, db_query_cache
|
|
from aurweb.exceptions import InvariantError, handle_form_exceptions
|
|
from aurweb.models.relation_type import CONFLICTS_ID, PROVIDES_ID, REPLACES_ID
|
|
from aurweb.packages import util as pkgutil
|
|
from aurweb.packages.search import PackageSearch
|
|
from aurweb.packages.util import get_pkg_or_base
|
|
from aurweb.pkgbase import actions as pkgbase_actions, util as pkgbaseutil
|
|
from aurweb.templates import make_context, make_variable_context, render_template
|
|
from aurweb.util import hash_query
|
|
|
|
logger = aur_logging.get_logger(__name__)
|
|
router = APIRouter()
|
|
|
|
|
|
async def packages_get(
|
|
request: Request, context: dict[str, Any], status_code: HTTPStatus = HTTPStatus.OK
|
|
):
|
|
# Query parameters used in this request.
|
|
context["q"] = dict(request.query_params)
|
|
|
|
# Per page and offset.
|
|
offset, per_page = util.sanitize_params(
|
|
request.query_params.get("O", defaults.O),
|
|
request.query_params.get("PP", defaults.PP),
|
|
)
|
|
context["O"] = offset
|
|
|
|
# Limit PP to options.max_search_results
|
|
max_search_results = config.getint("options", "max_search_results")
|
|
context["PP"] = per_page = min(per_page, max_search_results)
|
|
|
|
# Query search by.
|
|
search_by = context["SeB"] = request.query_params.get("SeB", "nd")
|
|
|
|
# Query sort by.
|
|
sort_by = request.query_params.get("SB", None)
|
|
|
|
# Query sort order.
|
|
sort_order = request.query_params.get("SO", None)
|
|
|
|
# Apply ordering, limit and offset.
|
|
search = PackageSearch(request.user)
|
|
|
|
# For each keyword found in K, apply a search_by filter.
|
|
# This means that for any sentences separated by spaces,
|
|
# they are used as if they were ANDed.
|
|
keywords = context["K"] = request.query_params.get("K", str())
|
|
|
|
keywords = keywords.split(" ")
|
|
if search_by == "k":
|
|
# If we're searchin by keywords, supply a set of keywords.
|
|
search.search_by(search_by, set(keywords))
|
|
else:
|
|
for keyword in keywords:
|
|
search.search_by(search_by, keyword)
|
|
|
|
flagged = request.query_params.get("outdated", None)
|
|
if flagged:
|
|
# If outdated was given, set it up in the context.
|
|
context["outdated"] = flagged
|
|
|
|
# When outdated is set to "on," we filter records which do have
|
|
# an OutOfDateTS. When it's set to "off," we filter out any which
|
|
# do **not** have OutOfDateTS.
|
|
criteria = None
|
|
if flagged == "on":
|
|
criteria = models.PackageBase.OutOfDateTS.isnot
|
|
else:
|
|
criteria = models.PackageBase.OutOfDateTS.is_
|
|
|
|
# Apply the flag criteria to our PackageSearch.query.
|
|
search.query = search.query.filter(criteria(None))
|
|
|
|
submit = request.query_params.get("submit", "Go")
|
|
if submit == "Orphans":
|
|
# If the user clicked the "Orphans" button, we only want
|
|
# orphaned packages.
|
|
search.query = search.query.filter(models.PackageBase.MaintainerUID.is_(None))
|
|
|
|
# Collect search result count here; we've applied our keywords.
|
|
# Including more query operations below, like ordering, will
|
|
# increase the amount of time required to collect a count.
|
|
# we use redis for caching the results of the query
|
|
cache_expire = config.getint("cache", "expiry_time_search", 600)
|
|
num_packages = db_count_cache(hash_query(search.query), search.query, cache_expire)
|
|
|
|
# Apply user-specified sort column and ordering.
|
|
search.sort_by(sort_by, sort_order)
|
|
|
|
# Insert search results into the context.
|
|
results = search.results().with_entities(
|
|
models.Package.ID,
|
|
models.Package.Name,
|
|
models.Package.PackageBaseID,
|
|
models.Package.Version,
|
|
models.Package.Description,
|
|
models.PackageBase.Popularity,
|
|
models.PackageBase.NumVotes,
|
|
models.PackageBase.OutOfDateTS,
|
|
models.PackageBase.ModifiedTS,
|
|
models.User.Username.label("Maintainer"),
|
|
models.PackageVote.PackageBaseID.label("Voted"),
|
|
models.PackageNotification.PackageBaseID.label("Notify"),
|
|
)
|
|
|
|
# paging
|
|
results = results.limit(per_page).offset(offset)
|
|
|
|
# we use redis for caching the results of the query
|
|
packages = db_query_cache(hash_query(results), results, cache_expire)
|
|
|
|
context["packages"] = packages
|
|
context["packages_count"] = num_packages
|
|
|
|
return render_template(
|
|
request, "packages/index.html", context, status_code=status_code
|
|
)
|
|
|
|
|
|
@router.get("/packages")
|
|
async def packages(request: Request) -> Response:
|
|
context = await make_variable_context(request, "Packages")
|
|
return await packages_get(request, context)
|
|
|
|
|
|
@router.get("/packages/{name}")
|
|
async def package(
|
|
request: Request,
|
|
name: str,
|
|
all_deps: bool = Query(default=False),
|
|
all_reqs: bool = Query(default=False),
|
|
) -> Response:
|
|
"""
|
|
Get a package by name.
|
|
|
|
By default, we limit the number of depends and requires results
|
|
to 20. To bypass this and load all of them, which should be triggered
|
|
via a "Show more" link near the limited listing.
|
|
|
|
:param name: Package.Name
|
|
:param all_deps: Boolean indicating whether we should load all depends
|
|
:param all_reqs: Boolean indicating whether we should load all requires
|
|
:return: FastAPI Response
|
|
"""
|
|
|
|
# Get the Package.
|
|
pkg = get_pkg_or_base(name, models.Package)
|
|
pkgbase = pkg.PackageBase
|
|
|
|
rels = pkg.package_relations.order_by(models.PackageRelation.RelName.asc())
|
|
rels_data = defaultdict(list)
|
|
for rel in rels:
|
|
if rel.RelTypeID == CONFLICTS_ID:
|
|
rels_data["c"].append(rel)
|
|
elif rel.RelTypeID == PROVIDES_ID:
|
|
rels_data["p"].append(rel)
|
|
elif rel.RelTypeID == REPLACES_ID:
|
|
rels_data["r"].append(rel)
|
|
|
|
# Add our base information.
|
|
context = await pkgbaseutil.make_variable_context(request, pkgbase)
|
|
|
|
context.update({"all_deps": all_deps, "all_reqs": all_reqs})
|
|
|
|
context["package"] = pkg
|
|
|
|
# Package sources.
|
|
context["sources"] = pkg.package_sources.order_by(
|
|
models.PackageSource.Source.asc()
|
|
).all()
|
|
|
|
# Listing metadata.
|
|
context["max_listing"] = max_listing = 20
|
|
|
|
# Package dependencies.
|
|
deps = pkg.package_dependencies.order_by(
|
|
models.PackageDependency.DepTypeID.asc(), models.PackageDependency.DepName.asc()
|
|
)
|
|
context["depends_count"] = deps.count()
|
|
if not all_deps:
|
|
deps = deps.limit(max_listing)
|
|
context["dependencies"] = deps.all()
|
|
|
|
# Package requirements (other packages depend on this one).
|
|
reqs = pkgutil.pkg_required(pkg.Name, [p.RelName for p in rels_data.get("p", [])])
|
|
context["reqs_count"] = reqs.count()
|
|
if not all_reqs:
|
|
reqs = reqs.limit(max_listing)
|
|
context["required_by"] = reqs.all()
|
|
|
|
context["licenses"] = pkg.package_licenses
|
|
|
|
conflicts = pkg.package_relations.filter(
|
|
models.PackageRelation.RelTypeID == CONFLICTS_ID
|
|
).order_by(models.PackageRelation.RelName.asc())
|
|
context["conflicts"] = conflicts
|
|
|
|
provides = pkg.package_relations.filter(
|
|
models.PackageRelation.RelTypeID == PROVIDES_ID
|
|
).order_by(models.PackageRelation.RelName.asc())
|
|
context["provides"] = provides
|
|
|
|
replaces = pkg.package_relations.filter(
|
|
models.PackageRelation.RelTypeID == REPLACES_ID
|
|
).order_by(models.PackageRelation.RelName.asc())
|
|
context["replaces"] = replaces
|
|
|
|
return render_template(request, "packages/show.html", context)
|
|
|
|
|
|
async def packages_unflag(request: Request, package_ids: list[int] = [], **kwargs):
|
|
if not package_ids:
|
|
return False, ["You did not select any packages to unflag."]
|
|
|
|
# Holds the set of package bases we're looking to unflag.
|
|
# Constructed below via looping through the packages query.
|
|
bases = set()
|
|
|
|
package_ids = set(package_ids) # Convert this to a set for O(1).
|
|
packages = db.query(models.Package).filter(models.Package.ID.in_(package_ids)).all()
|
|
for pkg in packages:
|
|
has_cred = request.user.has_credential(
|
|
creds.PKGBASE_UNFLAG, approved=[pkg.PackageBase.Flagger]
|
|
)
|
|
if not has_cred:
|
|
return False, ["You did not select any packages to unflag."]
|
|
|
|
if pkg.PackageBase not in bases:
|
|
bases.update({pkg.PackageBase})
|
|
|
|
for pkgbase in bases:
|
|
pkgbase_actions.pkgbase_unflag_instance(request, pkgbase)
|
|
return True, ["The selected packages have been unflagged."]
|
|
|
|
|
|
async def packages_notify(request: Request, package_ids: list[int] = [], **kwargs):
|
|
# In cases where we encounter errors with the request, we'll
|
|
# use this error tuple as a return value.
|
|
# TODO: This error does not yet have a translation.
|
|
error_tuple = (False, ["You did not select any packages to be notified about."])
|
|
if not package_ids:
|
|
return error_tuple
|
|
|
|
bases = set()
|
|
package_ids = set(package_ids)
|
|
packages = db.query(models.Package).filter(models.Package.ID.in_(package_ids)).all()
|
|
|
|
for pkg in packages:
|
|
if pkg.PackageBase not in bases:
|
|
bases.update({pkg.PackageBase})
|
|
|
|
# Perform some checks on what the user selected for notify.
|
|
for pkgbase in bases:
|
|
notif = db.query(
|
|
pkgbase.notifications.filter(
|
|
models.PackageNotification.UserID == request.user.ID
|
|
).exists()
|
|
).scalar()
|
|
has_cred = request.user.has_credential(creds.PKGBASE_NOTIFY)
|
|
|
|
# If the request user either does not have credentials
|
|
# or the notification already exists:
|
|
if not (has_cred and not notif):
|
|
return error_tuple
|
|
|
|
# If we get here, user input is good.
|
|
for pkgbase in bases:
|
|
pkgbase_actions.pkgbase_notify_instance(request, pkgbase)
|
|
|
|
# TODO: This message does not yet have a translation.
|
|
return True, ["The selected packages' notifications have been enabled."]
|
|
|
|
|
|
async def packages_unnotify(request: Request, package_ids: list[int] = [], **kwargs):
|
|
if not package_ids:
|
|
# TODO: This error does not yet have a translation.
|
|
return False, ["You did not select any packages for notification removal."]
|
|
|
|
# TODO: This error does not yet have a translation.
|
|
error_tuple = (
|
|
False,
|
|
["A package you selected does not have notifications enabled."],
|
|
)
|
|
|
|
bases = set()
|
|
package_ids = set(package_ids)
|
|
packages = db.query(models.Package).filter(models.Package.ID.in_(package_ids)).all()
|
|
|
|
for pkg in packages:
|
|
if pkg.PackageBase not in bases:
|
|
bases.update({pkg.PackageBase})
|
|
|
|
# Perform some checks on what the user selected for notify.
|
|
for pkgbase in bases:
|
|
notif = db.query(
|
|
pkgbase.notifications.filter(
|
|
models.PackageNotification.UserID == request.user.ID
|
|
).exists()
|
|
).scalar()
|
|
if not notif:
|
|
return error_tuple
|
|
|
|
for pkgbase in bases:
|
|
pkgbase_actions.pkgbase_unnotify_instance(request, pkgbase)
|
|
|
|
# TODO: This message does not yet have a translation.
|
|
return True, ["The selected packages' notifications have been removed."]
|
|
|
|
|
|
async def packages_adopt(
|
|
request: Request, package_ids: list[int] = [], confirm: bool = False, **kwargs
|
|
):
|
|
if not package_ids:
|
|
return False, ["You did not select any packages to adopt."]
|
|
|
|
if not confirm:
|
|
return (
|
|
False,
|
|
[
|
|
"The selected packages have not been adopted, "
|
|
"check the confirmation checkbox."
|
|
],
|
|
)
|
|
|
|
bases = set()
|
|
package_ids = set(package_ids)
|
|
packages = db.query(models.Package).filter(models.Package.ID.in_(package_ids)).all()
|
|
|
|
for pkg in packages:
|
|
if pkg.PackageBase not in bases:
|
|
bases.update({pkg.PackageBase})
|
|
|
|
# Check that the user has credentials for every package they selected.
|
|
for pkgbase in bases:
|
|
has_cred = request.user.has_credential(creds.PKGBASE_ADOPT)
|
|
if not (has_cred or not pkgbase.Maintainer):
|
|
# TODO: This error needs to be translated.
|
|
return (
|
|
False,
|
|
["You are not allowed to adopt one of the " "packages you selected."],
|
|
)
|
|
|
|
# Now, really adopt the bases.
|
|
for pkgbase in bases:
|
|
pkgbase_actions.pkgbase_adopt_instance(request, pkgbase)
|
|
|
|
return True, ["The selected packages have been adopted."]
|
|
|
|
|
|
def disown_all(request: Request, pkgbases: list[models.PackageBase]) -> list[str]:
|
|
errors = []
|
|
for pkgbase in pkgbases:
|
|
try:
|
|
pkgbase_actions.pkgbase_disown_instance(request, pkgbase)
|
|
except InvariantError as exc:
|
|
errors.append(str(exc))
|
|
return errors
|
|
|
|
|
|
async def packages_disown(
|
|
request: Request, package_ids: list[int] = [], confirm: bool = False, **kwargs
|
|
):
|
|
if not package_ids:
|
|
return False, ["You did not select any packages to disown."]
|
|
|
|
if not confirm:
|
|
return (
|
|
False,
|
|
[
|
|
"The selected packages have not been disowned, "
|
|
"check the confirmation checkbox."
|
|
],
|
|
)
|
|
|
|
bases = set()
|
|
package_ids = set(package_ids)
|
|
packages = db.query(models.Package).filter(models.Package.ID.in_(package_ids)).all()
|
|
|
|
for pkg in packages:
|
|
if pkg.PackageBase not in bases:
|
|
bases.update({pkg.PackageBase})
|
|
|
|
# Check that the user has credentials for every package they selected.
|
|
for pkgbase in bases:
|
|
has_cred = request.user.has_credential(
|
|
creds.PKGBASE_DISOWN, approved=[pkgbase.Maintainer]
|
|
)
|
|
if not has_cred:
|
|
# TODO: This error needs to be translated.
|
|
return (
|
|
False,
|
|
["You are not allowed to disown one " "of the packages you selected."],
|
|
)
|
|
|
|
# Now, disown all the bases if we can.
|
|
if errors := disown_all(request, bases):
|
|
return False, errors
|
|
|
|
return True, ["The selected packages have been disowned."]
|
|
|
|
|
|
async def packages_delete(
|
|
request: Request,
|
|
package_ids: list[int] = [],
|
|
confirm: bool = False,
|
|
merge_into: str = str(),
|
|
**kwargs,
|
|
):
|
|
if not package_ids:
|
|
return False, ["You did not select any packages to delete."]
|
|
|
|
if not confirm:
|
|
return (
|
|
False,
|
|
[
|
|
"The selected packages have not been deleted, "
|
|
"check the confirmation checkbox."
|
|
],
|
|
)
|
|
|
|
if not request.user.has_credential(creds.PKGBASE_DELETE):
|
|
return False, ["You do not have permission to delete packages."]
|
|
|
|
# set-ify package_ids and query the database for related records.
|
|
package_ids = set(package_ids)
|
|
packages = db.query(models.Package).filter(models.Package.ID.in_(package_ids)).all()
|
|
|
|
if len(packages) != len(package_ids):
|
|
# Let the user know there was an issue with their input: they have
|
|
# provided at least one package_id which does not exist in the DB.
|
|
# TODO: This error has not yet been translated.
|
|
return False, ["One of the packages you selected does not exist."]
|
|
|
|
# Make a set out of all package bases related to `packages`.
|
|
bases = {pkg.PackageBase for pkg in packages}
|
|
deleted_bases, notifs = [], []
|
|
for pkgbase in bases:
|
|
deleted_bases.append(pkgbase.Name)
|
|
notifs += pkgbase_actions.pkgbase_delete_instance(request, pkgbase)
|
|
|
|
# Log out the fact that this happened for accountability.
|
|
logger.info(
|
|
f"Privileged user '{request.user.Username}' deleted the "
|
|
f"following package bases: {str(deleted_bases)}."
|
|
)
|
|
|
|
util.apply_all(notifs, lambda n: n.send())
|
|
return True, ["The selected packages have been deleted."]
|
|
|
|
|
|
# A mapping of action string -> callback functions used within the
|
|
# `packages_post` route below. We expect any action callback to
|
|
# return a tuple in the format: (succeeded: bool, message: list[str]).
|
|
PACKAGE_ACTIONS = {
|
|
"unflag": packages_unflag,
|
|
"notify": packages_notify,
|
|
"unnotify": packages_unnotify,
|
|
"adopt": packages_adopt,
|
|
"disown": packages_disown,
|
|
"delete": packages_delete,
|
|
}
|
|
|
|
|
|
@router.post("/packages")
|
|
@handle_form_exceptions
|
|
@requires_auth
|
|
async def packages_post(
|
|
request: Request,
|
|
IDs: list[int] = Form(default=[]),
|
|
action: str = Form(default=str()),
|
|
confirm: bool = Form(default=False),
|
|
):
|
|
# If an invalid action is specified, just render GET /packages
|
|
# with an BAD_REQUEST status_code.
|
|
if action not in PACKAGE_ACTIONS:
|
|
context = make_context(request, "Packages")
|
|
return await packages_get(request, context, HTTPStatus.BAD_REQUEST)
|
|
|
|
context = make_context(request, "Packages")
|
|
|
|
# We deal with `IDs`, `merge_into` and `confirm` arguments
|
|
# within action callbacks.
|
|
callback = PACKAGE_ACTIONS.get(action)
|
|
retval = await callback(request, package_ids=IDs, confirm=confirm)
|
|
if retval: # If *anything* was returned:
|
|
success, messages = retval
|
|
if not success:
|
|
# If the first element was False:
|
|
context["errors"] = messages
|
|
return await packages_get(request, context, HTTPStatus.BAD_REQUEST)
|
|
else:
|
|
# Otherwise:
|
|
context["success"] = messages
|
|
|
|
return await packages_get(request, context)
|