mirror of
https://gitlab.archlinux.org/archlinux/aurweb.git
synced 2025-02-03 10:43:03 +01:00
fix(python): use standard dict/list type annotation
Since Python 3.9 list/dict can be used as type hint.
This commit is contained in:
parent
28970ccc91
commit
a509e40474
31 changed files with 175 additions and 195 deletions
|
@ -2,7 +2,7 @@ import copy
|
|||
import math
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Any, Dict, Union
|
||||
from typing import Any, Union
|
||||
from urllib.parse import quote_plus, urlencode
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
|
@ -19,7 +19,7 @@ from aurweb.templates import register_filter, register_function
|
|||
|
||||
@register_filter("pager_nav")
|
||||
@pass_context
|
||||
def pager_nav(context: Dict[str, Any],
|
||||
def pager_nav(context: dict[str, Any],
|
||||
page: int, total: int, prefix: str) -> str:
|
||||
page = int(page) # Make sure this is an int.
|
||||
|
||||
|
@ -71,7 +71,7 @@ def do_round(f: float) -> int:
|
|||
|
||||
@register_filter("tr")
|
||||
@pass_context
|
||||
def tr(context: Dict[str, Any], value: str):
|
||||
def tr(context: dict[str, Any], value: str):
|
||||
""" A translation filter; example: {{ "Hello" | tr("de") }}. """
|
||||
_ = l10n.get_translator_for_request(context.get("request"))
|
||||
return _(value)
|
||||
|
@ -79,7 +79,7 @@ def tr(context: Dict[str, Any], value: str):
|
|||
|
||||
@register_filter("tn")
|
||||
@pass_context
|
||||
def tn(context: Dict[str, Any], count: int,
|
||||
def tn(context: dict[str, Any], count: int,
|
||||
singular: str, plural: str) -> str:
|
||||
""" A singular and plural translation filter.
|
||||
|
||||
|
@ -107,7 +107,7 @@ def as_timezone(dt: datetime, timezone: str):
|
|||
|
||||
|
||||
@register_filter("extend_query")
|
||||
def extend_query(query: Dict[str, Any], *additions) -> Dict[str, Any]:
|
||||
def extend_query(query: dict[str, Any], *additions) -> dict[str, Any]:
|
||||
""" Add additional key value pairs to query. """
|
||||
q = copy.copy(query)
|
||||
for k, v in list(additions):
|
||||
|
@ -116,7 +116,7 @@ def extend_query(query: Dict[str, Any], *additions) -> Dict[str, Any]:
|
|||
|
||||
|
||||
@register_filter("urlencode")
|
||||
def to_qs(query: Dict[str, Any]) -> str:
|
||||
def to_qs(query: dict[str, Any]) -> str:
|
||||
return urlencode(query, doseq=True)
|
||||
|
||||
|
||||
|
@ -134,7 +134,7 @@ def number_format(value: float, places: int):
|
|||
|
||||
@register_filter("account_url")
|
||||
@pass_context
|
||||
def account_url(context: Dict[str, Any],
|
||||
def account_url(context: dict[str, Any],
|
||||
user: "aurweb.models.user.User") -> str:
|
||||
base = aurweb.config.get("options", "aur_location")
|
||||
return f"{base}/account/{user.Username}"
|
||||
|
@ -152,7 +152,7 @@ def ceil(*args, **kwargs) -> int:
|
|||
|
||||
@register_function("date_strftime")
|
||||
@pass_context
|
||||
def date_strftime(context: Dict[str, Any], dt: Union[int, datetime], fmt: str) \
|
||||
def date_strftime(context: dict[str, Any], dt: Union[int, datetime], fmt: str) \
|
||||
-> str:
|
||||
if isinstance(dt, int):
|
||||
dt = timestamp_to_datetime(dt)
|
||||
|
@ -162,11 +162,11 @@ def date_strftime(context: Dict[str, Any], dt: Union[int, datetime], fmt: str) \
|
|||
|
||||
@register_function("date_display")
|
||||
@pass_context
|
||||
def date_display(context: Dict[str, Any], dt: Union[int, datetime]) -> str:
|
||||
def date_display(context: dict[str, Any], dt: Union[int, datetime]) -> str:
|
||||
return date_strftime(context, dt, "%Y-%m-%d (%Z)")
|
||||
|
||||
|
||||
@register_function("datetime_display")
|
||||
@pass_context
|
||||
def datetime_display(context: Dict[str, Any], dt: Union[int, datetime]) -> str:
|
||||
def datetime_display(context: dict[str, Any], dt: Union[int, datetime]) -> str:
|
||||
return date_strftime(context, dt, "%Y-%m-%d %H:%M (%Z)")
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
from typing import List
|
||||
|
||||
from sqlalchemy import and_, literal
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.orm import backref, relationship
|
||||
|
@ -60,7 +58,7 @@ class PackageDependency(Base):
|
|||
_OfficialProvider.Name == self.DepName).exists()
|
||||
return db.query(pkg).scalar() or db.query(official).scalar()
|
||||
|
||||
def provides(self) -> List[PackageRelation]:
|
||||
def provides(self) -> list[PackageRelation]:
|
||||
from aurweb.models.relation_type import PROVIDES_ID
|
||||
|
||||
rels = db.query(PackageRelation).join(_Package).filter(
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import hashlib
|
||||
|
||||
from typing import List, Set
|
||||
from typing import Set
|
||||
|
||||
import bcrypt
|
||||
|
||||
|
@ -149,7 +149,7 @@ class User(Base):
|
|||
return self.session.SessionID
|
||||
|
||||
def has_credential(self, credential: Set[int],
|
||||
approved: List["User"] = list()):
|
||||
approved: list["User"] = list()):
|
||||
from aurweb.auth.creds import has_credential
|
||||
return has_credential(self, credential, approved)
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import List, Optional, Set
|
||||
from typing import Optional, Set
|
||||
|
||||
from fastapi import Request
|
||||
from sqlalchemy import and_, orm
|
||||
|
@ -139,7 +139,7 @@ def close_pkgreq(pkgreq: PackageRequest, closer: User,
|
|||
|
||||
def handle_request(request: Request, reqtype_id: int,
|
||||
pkgbase: PackageBase,
|
||||
target: PackageBase = None) -> List[notify.Notification]:
|
||||
target: PackageBase = None) -> list[notify.Notification]:
|
||||
"""
|
||||
Handle package requests before performing an action.
|
||||
|
||||
|
@ -158,7 +158,7 @@ def handle_request(request: Request, reqtype_id: int,
|
|||
:param pkgbase: PackageBase which the request is about
|
||||
:param target: Optional target to merge into
|
||||
"""
|
||||
notifs: List[notify.Notification] = []
|
||||
notifs: list[notify.Notification] = []
|
||||
|
||||
# If it's an orphan request, perform further verification
|
||||
# regarding existing requests.
|
||||
|
@ -187,13 +187,13 @@ def handle_request(request: Request, reqtype_id: int,
|
|||
PackageRequest.MergeBaseName == target.Name)
|
||||
|
||||
# Build an accept list out of `accept_query`.
|
||||
to_accept: List[PackageRequest] = accept_query.all()
|
||||
to_accept: list[PackageRequest] = accept_query.all()
|
||||
accepted_ids: Set[int] = set(p.ID for p in to_accept)
|
||||
|
||||
# Build a reject list out of `query` filtered by IDs not found
|
||||
# in `to_accept`. That is, unmatched records of the same base
|
||||
# query properties.
|
||||
to_reject: List[PackageRequest] = query.filter(
|
||||
to_reject: list[PackageRequest] = query.filter(
|
||||
~PackageRequest.ID.in_(accepted_ids)
|
||||
).all()
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from collections import defaultdict
|
||||
from http import HTTPStatus
|
||||
from typing import Dict, List, Tuple, Union
|
||||
from typing import Tuple, Union
|
||||
|
||||
import orjson
|
||||
|
||||
|
@ -15,7 +15,7 @@ from aurweb.models.package_relation import PackageRelation
|
|||
from aurweb.redis import redis_connection
|
||||
from aurweb.templates import register_filter
|
||||
|
||||
Providers = List[Union[PackageRelation, OfficialProvider]]
|
||||
Providers = list[Union[PackageRelation, OfficialProvider]]
|
||||
|
||||
|
||||
def dep_extra_with_arch(dep: models.PackageDependency, annotation: str) -> str:
|
||||
|
@ -123,7 +123,7 @@ def out_of_date(packages: orm.Query) -> orm.Query:
|
|||
|
||||
|
||||
def updated_packages(limit: int = 0,
|
||||
cache_ttl: int = 600) -> List[models.Package]:
|
||||
cache_ttl: int = 600) -> list[models.Package]:
|
||||
""" Return a list of valid Package objects ordered by their
|
||||
ModifiedTS column in descending order from cache, after setting
|
||||
the cache when no key yet exists.
|
||||
|
@ -168,8 +168,8 @@ def updated_packages(limit: int = 0,
|
|||
return packages
|
||||
|
||||
|
||||
def query_voted(query: List[models.Package],
|
||||
user: models.User) -> Dict[int, bool]:
|
||||
def query_voted(query: list[models.Package],
|
||||
user: models.User) -> dict[int, bool]:
|
||||
""" Produce a dictionary of package base ID keys to boolean values,
|
||||
which indicate whether or not the package base has a vote record
|
||||
related to user.
|
||||
|
@ -191,8 +191,8 @@ def query_voted(query: List[models.Package],
|
|||
return output
|
||||
|
||||
|
||||
def query_notified(query: List[models.Package],
|
||||
user: models.User) -> Dict[int, bool]:
|
||||
def query_notified(query: list[models.Package],
|
||||
user: models.User) -> dict[int, bool]:
|
||||
""" Produce a dictionary of package base ID keys to boolean values,
|
||||
which indicate whether or not the package base has a notification
|
||||
record related to user.
|
||||
|
@ -214,8 +214,8 @@ def query_notified(query: List[models.Package],
|
|||
return output
|
||||
|
||||
|
||||
def pkg_required(pkgname: str, provides: List[str]) \
|
||||
-> List[PackageDependency]:
|
||||
def pkg_required(pkgname: str, provides: list[str]) \
|
||||
-> list[PackageDependency]:
|
||||
"""
|
||||
Get dependencies that match a string in `[pkgname] + provides`.
|
||||
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
from typing import List
|
||||
|
||||
from fastapi import Request
|
||||
|
||||
from aurweb import db, logging, util
|
||||
|
@ -86,7 +84,7 @@ def pkgbase_adopt_instance(request: Request, pkgbase: PackageBase) -> None:
|
|||
|
||||
def pkgbase_delete_instance(request: Request, pkgbase: PackageBase,
|
||||
comments: str = str()) \
|
||||
-> List[notify.Notification]:
|
||||
-> list[notify.Notification]:
|
||||
notifs = handle_request(request, DELETION_ID, pkgbase) + [
|
||||
notify.DeleteNotification(request.user.ID, pkgbase.ID)
|
||||
]
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, Dict, List
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Request
|
||||
from sqlalchemy import and_
|
||||
|
@ -15,13 +15,13 @@ from aurweb.templates import make_variable_context as _make_variable_context
|
|||
|
||||
|
||||
async def make_variable_context(request: Request, pkgbase: PackageBase) \
|
||||
-> Dict[str, Any]:
|
||||
-> dict[str, Any]:
|
||||
ctx = await _make_variable_context(request, pkgbase.Name)
|
||||
return make_context(request, pkgbase, ctx)
|
||||
|
||||
|
||||
def make_context(request: Request, pkgbase: PackageBase,
|
||||
context: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
context: dict[str, Any] = None) -> dict[str, Any]:
|
||||
""" Make a basic context for package or pkgbase.
|
||||
|
||||
:param request: FastAPI request
|
||||
|
@ -89,7 +89,7 @@ def remove_comaintainer(comaint: PackageComaintainer) \
|
|||
return notif
|
||||
|
||||
|
||||
def remove_comaintainers(pkgbase: PackageBase, usernames: List[str]) -> None:
|
||||
def remove_comaintainers(pkgbase: PackageBase, usernames: list[str]) -> None:
|
||||
"""
|
||||
Remove comaintainers from `pkgbase`.
|
||||
|
||||
|
@ -163,7 +163,7 @@ def add_comaintainer(pkgbase: PackageBase, comaintainer: User) \
|
|||
|
||||
|
||||
def add_comaintainers(request: Request, pkgbase: PackageBase,
|
||||
usernames: List[str]) -> None:
|
||||
usernames: list[str]) -> None:
|
||||
"""
|
||||
Add comaintainers to `pkgbase`.
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
from aurweb import db
|
||||
from aurweb.exceptions import ValidationError
|
||||
|
@ -7,7 +7,7 @@ from aurweb.models import PackageBase
|
|||
|
||||
def request(pkgbase: PackageBase,
|
||||
type: str, comments: str, merge_into: str,
|
||||
context: Dict[str, Any]) -> None:
|
||||
context: dict[str, Any]) -> None:
|
||||
if not comments:
|
||||
raise ValidationError(["The comment field must not be empty."])
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, Callable, Dict, List, Optional
|
||||
from typing import Any, Callable, Optional
|
||||
|
||||
from prometheus_client import Counter
|
||||
from prometheus_fastapi_instrumentator import Instrumentator
|
||||
|
@ -19,7 +19,7 @@ def instrumentator():
|
|||
# Their license is included in LICENSES/starlette_exporter.
|
||||
# The code has been modified to remove child route checks
|
||||
# (since we don't have any) and to stay within an 80-width limit.
|
||||
def get_matching_route_path(scope: Dict[Any, Any], routes: List[Route],
|
||||
def get_matching_route_path(scope: dict[Any, Any], routes: list[Route],
|
||||
route_name: Optional[str] = None) -> str:
|
||||
"""
|
||||
Find a matching route and return its original path string
|
||||
|
|
|
@ -2,7 +2,7 @@ import copy
|
|||
import typing
|
||||
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Form, Request
|
||||
from fastapi.responses import HTMLResponse, RedirectResponse
|
||||
|
@ -108,7 +108,7 @@ async def passreset_post(request: Request,
|
|||
|
||||
|
||||
def process_account_form(request: Request, user: models.User,
|
||||
args: Dict[str, Any]):
|
||||
args: dict[str, Any]):
|
||||
""" Process an account form. All fields are optional and only checks
|
||||
requirements in the case they are present.
|
||||
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
from collections import defaultdict
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Dict, List
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Form, Query, Request, Response
|
||||
|
||||
|
@ -21,7 +21,7 @@ logger = logging.get_logger(__name__)
|
|||
router = APIRouter()
|
||||
|
||||
|
||||
async def packages_get(request: Request, context: Dict[str, Any],
|
||||
async def packages_get(request: Request, context: dict[str, Any],
|
||||
status_code: HTTPStatus = HTTPStatus.OK):
|
||||
# Query parameters used in this request.
|
||||
context["q"] = dict(request.query_params)
|
||||
|
@ -210,7 +210,7 @@ async def package(request: Request, name: str,
|
|||
return render_template(request, "packages/show.html", context)
|
||||
|
||||
|
||||
async def packages_unflag(request: Request, package_ids: List[int] = [],
|
||||
async def packages_unflag(request: Request, package_ids: list[int] = [],
|
||||
**kwargs):
|
||||
if not package_ids:
|
||||
return (False, ["You did not select any packages to unflag."])
|
||||
|
@ -236,7 +236,7 @@ async def packages_unflag(request: Request, package_ids: List[int] = [],
|
|||
return (True, ["The selected packages have been unflagged."])
|
||||
|
||||
|
||||
async def packages_notify(request: Request, package_ids: List[int] = [],
|
||||
async def packages_notify(request: Request, package_ids: list[int] = [],
|
||||
**kwargs):
|
||||
# In cases where we encounter errors with the request, we'll
|
||||
# use this error tuple as a return value.
|
||||
|
@ -275,7 +275,7 @@ async def packages_notify(request: Request, package_ids: List[int] = [],
|
|||
return (True, ["The selected packages' notifications have been enabled."])
|
||||
|
||||
|
||||
async def packages_unnotify(request: Request, package_ids: List[int] = [],
|
||||
async def packages_unnotify(request: Request, package_ids: list[int] = [],
|
||||
**kwargs):
|
||||
if not package_ids:
|
||||
# TODO: This error does not yet have a translation.
|
||||
|
@ -312,7 +312,7 @@ async def packages_unnotify(request: Request, package_ids: List[int] = [],
|
|||
return (True, ["The selected packages' notifications have been removed."])
|
||||
|
||||
|
||||
async def packages_adopt(request: Request, package_ids: List[int] = [],
|
||||
async def packages_adopt(request: Request, package_ids: list[int] = [],
|
||||
confirm: bool = False, **kwargs):
|
||||
if not package_ids:
|
||||
return (False, ["You did not select any packages to adopt."])
|
||||
|
@ -345,8 +345,8 @@ async def packages_adopt(request: Request, package_ids: List[int] = [],
|
|||
return (True, ["The selected packages have been adopted."])
|
||||
|
||||
|
||||
def disown_all(request: Request, pkgbases: List[models.PackageBase]) \
|
||||
-> List[str]:
|
||||
def disown_all(request: Request, pkgbases: list[models.PackageBase]) \
|
||||
-> list[str]:
|
||||
errors = []
|
||||
for pkgbase in pkgbases:
|
||||
try:
|
||||
|
@ -356,7 +356,7 @@ def disown_all(request: Request, pkgbases: List[models.PackageBase]) \
|
|||
return errors
|
||||
|
||||
|
||||
async def packages_disown(request: Request, package_ids: List[int] = [],
|
||||
async def packages_disown(request: Request, package_ids: list[int] = [],
|
||||
confirm: bool = False, **kwargs):
|
||||
if not package_ids:
|
||||
return (False, ["You did not select any packages to disown."])
|
||||
|
@ -390,7 +390,7 @@ async def packages_disown(request: Request, package_ids: List[int] = [],
|
|||
return (True, ["The selected packages have been disowned."])
|
||||
|
||||
|
||||
async def packages_delete(request: Request, package_ids: List[int] = [],
|
||||
async def packages_delete(request: Request, package_ids: list[int] = [],
|
||||
confirm: bool = False, merge_into: str = str(),
|
||||
**kwargs):
|
||||
if not package_ids:
|
||||
|
@ -430,7 +430,7 @@ async def packages_delete(request: Request, package_ids: List[int] = [],
|
|||
|
||||
# A mapping of action string -> callback functions used within the
|
||||
# `packages_post` route below. We expect any action callback to
|
||||
# return a tuple in the format: (succeeded: bool, message: List[str]).
|
||||
# return a tuple in the format: (succeeded: bool, message: list[str]).
|
||||
PACKAGE_ACTIONS = {
|
||||
"unflag": packages_unflag,
|
||||
"notify": packages_notify,
|
||||
|
@ -445,7 +445,7 @@ PACKAGE_ACTIONS = {
|
|||
@handle_form_exceptions
|
||||
@requires_auth
|
||||
async def packages_post(request: Request,
|
||||
IDs: List[int] = Form(default=[]),
|
||||
IDs: list[int] = Form(default=[]),
|
||||
action: str = Form(default=str()),
|
||||
confirm: bool = Form(default=False)):
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ import hashlib
|
|||
import re
|
||||
|
||||
from http import HTTPStatus
|
||||
from typing import List, Optional
|
||||
from typing import Optional
|
||||
from urllib.parse import unquote
|
||||
|
||||
import orjson
|
||||
|
@ -71,7 +71,7 @@ async def rpc_request(request: Request,
|
|||
type: Optional[str] = None,
|
||||
by: Optional[str] = defaults.RPC_SEARCH_BY,
|
||||
arg: Optional[str] = None,
|
||||
args: Optional[List[str]] = [],
|
||||
args: Optional[list[str]] = [],
|
||||
callback: Optional[str] = None):
|
||||
|
||||
# Create a handle to our RPC class.
|
||||
|
@ -140,7 +140,7 @@ async def rpc(request: Request,
|
|||
type: Optional[str] = Query(default=None),
|
||||
by: Optional[str] = Query(default=defaults.RPC_SEARCH_BY),
|
||||
arg: Optional[str] = Query(default=None),
|
||||
args: Optional[List[str]] = Query(default=[], alias="arg[]"),
|
||||
args: Optional[list[str]] = Query(default=[], alias="arg[]"),
|
||||
callback: Optional[str] = Query(default=None)):
|
||||
if not request.url.query:
|
||||
return documentation()
|
||||
|
@ -157,6 +157,6 @@ async def rpc_post(request: Request,
|
|||
type: Optional[str] = Form(default=None),
|
||||
by: Optional[str] = Form(default=defaults.RPC_SEARCH_BY),
|
||||
arg: Optional[str] = Form(default=None),
|
||||
args: Optional[List[str]] = Form(default=[], alias="arg[]"),
|
||||
args: Optional[list[str]] = Form(default=[], alias="arg[]"),
|
||||
callback: Optional[str] = Form(default=None)):
|
||||
return await rpc_request(request, v, type, by, arg, args, callback)
|
||||
|
|
|
@ -2,7 +2,7 @@ import html
|
|||
import typing
|
||||
|
||||
from http import HTTPStatus
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
from fastapi import APIRouter, Form, HTTPException, Request
|
||||
from fastapi.responses import RedirectResponse, Response
|
||||
|
@ -34,7 +34,7 @@ ADDVOTE_SPECIFICS = {
|
|||
}
|
||||
|
||||
|
||||
def populate_trusted_user_counts(context: Dict[str, Any]) -> None:
|
||||
def populate_trusted_user_counts(context: dict[str, Any]) -> None:
|
||||
tu_query = db.query(User).filter(
|
||||
or_(User.AccountTypeID == TRUSTED_USER_ID,
|
||||
User.AccountTypeID == TRUSTED_USER_AND_DEV_ID)
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
import os
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import Any, Callable, Dict, List, NewType, Union
|
||||
from typing import Any, Callable, NewType, Union
|
||||
|
||||
from fastapi.responses import HTMLResponse
|
||||
from sqlalchemy import and_, literal, orm
|
||||
|
@ -24,7 +24,7 @@ TYPE_MAPPING = {
|
|||
}
|
||||
|
||||
DataGenerator = NewType("DataGenerator",
|
||||
Callable[[models.Package], Dict[str, Any]])
|
||||
Callable[[models.Package], dict[str, Any]])
|
||||
|
||||
|
||||
def documentation():
|
||||
|
@ -86,7 +86,7 @@ class RPC:
|
|||
self.version = version
|
||||
self.type = RPC.TYPE_ALIASES.get(type, type)
|
||||
|
||||
def error(self, message: str) -> Dict[str, Any]:
|
||||
def error(self, message: str) -> dict[str, Any]:
|
||||
return {
|
||||
"version": self.version,
|
||||
"results": [],
|
||||
|
@ -95,7 +95,7 @@ class RPC:
|
|||
"error": message
|
||||
}
|
||||
|
||||
def _verify_inputs(self, by: str = [], args: List[str] = []) -> None:
|
||||
def _verify_inputs(self, by: str = [], args: list[str] = []) -> None:
|
||||
if self.version is None:
|
||||
raise RPCError("Please specify an API version.")
|
||||
|
||||
|
@ -111,11 +111,11 @@ class RPC:
|
|||
if self.type not in RPC.EXPOSED_TYPES:
|
||||
raise RPCError("Incorrect request type specified.")
|
||||
|
||||
def _enforce_args(self, args: List[str]) -> None:
|
||||
def _enforce_args(self, args: list[str]) -> None:
|
||||
if not args:
|
||||
raise RPCError("No request type/data specified.")
|
||||
|
||||
def _get_json_data(self, package: models.Package) -> Dict[str, Any]:
|
||||
def _get_json_data(self, package: models.Package) -> dict[str, Any]:
|
||||
""" Produce dictionary data of one Package that can be JSON-serialized.
|
||||
|
||||
:param package: Package instance
|
||||
|
@ -146,7 +146,7 @@ class RPC:
|
|||
"LastModified": package.ModifiedTS
|
||||
}
|
||||
|
||||
def _get_info_json_data(self, package: models.Package) -> Dict[str, Any]:
|
||||
def _get_info_json_data(self, package: models.Package) -> dict[str, Any]:
|
||||
data = self._get_json_data(package)
|
||||
|
||||
# All info results have _at least_ an empty list of
|
||||
|
@ -163,9 +163,9 @@ class RPC:
|
|||
|
||||
return data
|
||||
|
||||
def _assemble_json_data(self, packages: List[models.Package],
|
||||
def _assemble_json_data(self, packages: list[models.Package],
|
||||
data_generator: DataGenerator) \
|
||||
-> List[Dict[str, Any]]:
|
||||
-> list[dict[str, Any]]:
|
||||
"""
|
||||
Assemble JSON data out of a list of packages.
|
||||
|
||||
|
@ -192,8 +192,8 @@ class RPC:
|
|||
models.User.Username.label("Maintainer"),
|
||||
).group_by(models.Package.ID)
|
||||
|
||||
def _handle_multiinfo_type(self, args: List[str] = [], **kwargs) \
|
||||
-> List[Dict[str, Any]]:
|
||||
def _handle_multiinfo_type(self, args: list[str] = [], **kwargs) \
|
||||
-> list[dict[str, Any]]:
|
||||
self._enforce_args(args)
|
||||
args = set(args)
|
||||
|
||||
|
@ -296,7 +296,7 @@ class RPC:
|
|||
return self._assemble_json_data(packages, self._get_info_json_data)
|
||||
|
||||
def _handle_search_type(self, by: str = defaults.RPC_SEARCH_BY,
|
||||
args: List[str] = []) -> List[Dict[str, Any]]:
|
||||
args: list[str] = []) -> list[dict[str, Any]]:
|
||||
# If `by` isn't maintainer and we don't have any args, raise an error.
|
||||
# In maintainer's case, return all orphans if there are no args,
|
||||
# so we need args to pass through to the handler without errors.
|
||||
|
@ -318,12 +318,12 @@ class RPC:
|
|||
|
||||
return self._assemble_json_data(results, self._get_json_data)
|
||||
|
||||
def _handle_msearch_type(self, args: List[str] = [], **kwargs)\
|
||||
-> List[Dict[str, Any]]:
|
||||
def _handle_msearch_type(self, args: list[str] = [], **kwargs)\
|
||||
-> list[dict[str, Any]]:
|
||||
return self._handle_search_type(by="m", args=args)
|
||||
|
||||
def _handle_suggest_type(self, args: List[str] = [], **kwargs)\
|
||||
-> List[str]:
|
||||
def _handle_suggest_type(self, args: list[str] = [], **kwargs)\
|
||||
-> list[str]:
|
||||
if not args:
|
||||
return []
|
||||
|
||||
|
@ -336,8 +336,8 @@ class RPC:
|
|||
).order_by(models.Package.Name.asc()).limit(20)
|
||||
return [pkg.Name for pkg in packages]
|
||||
|
||||
def _handle_suggest_pkgbase_type(self, args: List[str] = [], **kwargs)\
|
||||
-> List[str]:
|
||||
def _handle_suggest_pkgbase_type(self, args: list[str] = [], **kwargs)\
|
||||
-> list[str]:
|
||||
if not args:
|
||||
return []
|
||||
|
||||
|
@ -351,16 +351,16 @@ class RPC:
|
|||
def _is_suggestion(self) -> bool:
|
||||
return self.type.startswith("suggest")
|
||||
|
||||
def _handle_callback(self, by: str, args: List[str])\
|
||||
-> Union[List[Dict[str, Any]], List[str]]:
|
||||
def _handle_callback(self, by: str, args: list[str])\
|
||||
-> Union[list[dict[str, Any]], list[str]]:
|
||||
# Get a handle to our callback and trap an RPCError with
|
||||
# an empty list of results based on callback's execution.
|
||||
callback = getattr(self, f"_handle_{self.type.replace('-', '_')}_type")
|
||||
results = callback(by=by, args=args)
|
||||
return results
|
||||
|
||||
def handle(self, by: str = defaults.RPC_SEARCH_BY, args: List[str] = [])\
|
||||
-> Union[List[Dict[str, Any]], Dict[str, Any]]:
|
||||
def handle(self, by: str = defaults.RPC_SEARCH_BY, args: list[str] = [])\
|
||||
-> Union[list[dict[str, Any]], dict[str, Any]]:
|
||||
""" Request entrypoint. A router should pass v, type and args
|
||||
to this function and expect an output dictionary to be returned.
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ import sys
|
|||
import tempfile
|
||||
|
||||
from collections import defaultdict
|
||||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
import orjson
|
||||
|
||||
|
@ -151,7 +151,7 @@ EXTENDED_FIELD_HANDLERS = {
|
|||
}
|
||||
|
||||
|
||||
def as_dict(package: Package) -> Dict[str, Any]:
|
||||
def as_dict(package: Package) -> dict[str, Any]:
|
||||
return {
|
||||
"ID": package.ID,
|
||||
"Name": package.Name,
|
||||
|
|
|
@ -1,7 +1,5 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from typing import List
|
||||
|
||||
from sqlalchemy import and_, func
|
||||
from sqlalchemy.sql.functions import coalesce
|
||||
from sqlalchemy.sql.functions import sum as _sum
|
||||
|
@ -10,7 +8,7 @@ from aurweb import db, time
|
|||
from aurweb.models import PackageBase, PackageVote
|
||||
|
||||
|
||||
def run_variable(pkgbases: List[PackageBase] = []) -> None:
|
||||
def run_variable(pkgbases: list[PackageBase] = []) -> None:
|
||||
"""
|
||||
Update popularity on a list of PackageBases.
|
||||
|
||||
|
|
|
@ -17,7 +17,7 @@ import sys
|
|||
import tempfile
|
||||
import time
|
||||
|
||||
from typing import Iterable, List
|
||||
from typing import Iterable
|
||||
|
||||
import aurweb.config
|
||||
import aurweb.schema
|
||||
|
@ -204,8 +204,8 @@ def start():
|
|||
""")
|
||||
|
||||
|
||||
def _kill_children(children: Iterable, exceptions: List[Exception] = []) \
|
||||
-> List[Exception]:
|
||||
def _kill_children(children: Iterable, exceptions: list[Exception] = []) \
|
||||
-> list[Exception]:
|
||||
"""
|
||||
Kill each process found in `children`.
|
||||
|
||||
|
@ -223,8 +223,8 @@ def _kill_children(children: Iterable, exceptions: List[Exception] = []) \
|
|||
return exceptions
|
||||
|
||||
|
||||
def _wait_for_children(children: Iterable, exceptions: List[Exception] = []) \
|
||||
-> List[Exception]:
|
||||
def _wait_for_children(children: Iterable, exceptions: list[Exception] = []) \
|
||||
-> list[Exception]:
|
||||
"""
|
||||
Wait for each process to end found in `children`.
|
||||
|
||||
|
|
|
@ -4,8 +4,6 @@ import re
|
|||
import shutil
|
||||
import subprocess
|
||||
|
||||
from typing import List
|
||||
|
||||
from aurweb import logging, util
|
||||
from aurweb.templates import base_template
|
||||
|
||||
|
@ -38,7 +36,7 @@ class AlpmDatabase:
|
|||
return pkgdir
|
||||
|
||||
def add(self, pkgname: str, pkgver: str, arch: str,
|
||||
provides: List[str] = []) -> None:
|
||||
provides: list[str] = []) -> None:
|
||||
context = {
|
||||
"pkgname": pkgname,
|
||||
"pkgver": pkgver,
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
from io import StringIO
|
||||
from typing import List
|
||||
|
||||
from lxml import etree
|
||||
|
||||
|
@ -15,11 +14,11 @@ def parse_root(html: str) -> etree.Element:
|
|||
return etree.parse(StringIO(html), parser)
|
||||
|
||||
|
||||
def get_errors(content: str) -> List[etree._Element]:
|
||||
def get_errors(content: str) -> list[etree._Element]:
|
||||
root = parse_root(content)
|
||||
return root.xpath('//ul[@class="errorlist"]/li')
|
||||
|
||||
|
||||
def get_successes(content: str) -> List[etree._Element]:
|
||||
def get_successes(content: str) -> list[etree._Element]:
|
||||
root = parse_root(content)
|
||||
return root.xpath('//ul[@class="success"]/li')
|
||||
|
|
|
@ -1,5 +1,3 @@
|
|||
from typing import Dict
|
||||
|
||||
import aurweb.config
|
||||
|
||||
|
||||
|
@ -35,8 +33,8 @@ class Request:
|
|||
user: User = User(),
|
||||
authenticated: bool = False,
|
||||
method: str = "GET",
|
||||
headers: Dict[str, str] = dict(),
|
||||
cookies: Dict[str, str] = dict()) -> "Request":
|
||||
headers: dict[str, str] = dict(),
|
||||
cookies: dict[str, str] = dict()) -> "Request":
|
||||
self.user = user
|
||||
self.user.authenticated = authenticated
|
||||
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
from typing import Any, Dict
|
||||
from typing import Any
|
||||
|
||||
from fastapi import Request
|
||||
|
||||
|
@ -34,7 +34,7 @@ def simple(U: str = str(), E: str = str(), H: bool = False,
|
|||
def language(L: str = str(),
|
||||
request: Request = None,
|
||||
user: models.User = None,
|
||||
context: Dict[str, Any] = {},
|
||||
context: dict[str, Any] = {},
|
||||
**kwargs) -> None:
|
||||
if L and L != user.LangPreference:
|
||||
with db.begin():
|
||||
|
@ -45,7 +45,7 @@ def language(L: str = str(),
|
|||
def timezone(TZ: str = str(),
|
||||
request: Request = None,
|
||||
user: models.User = None,
|
||||
context: Dict[str, Any] = {},
|
||||
context: dict[str, Any] = {},
|
||||
**kwargs) -> None:
|
||||
if TZ and TZ != user.Timezone:
|
||||
with db.begin():
|
||||
|
@ -95,7 +95,7 @@ def account_type(T: int = None,
|
|||
def password(P: str = str(),
|
||||
request: Request = None,
|
||||
user: models.User = None,
|
||||
context: Dict[str, Any] = {},
|
||||
context: dict[str, Any] = {},
|
||||
**kwargs) -> None:
|
||||
if P and not user.valid_password(P):
|
||||
# Remove the fields we consumed for passwords.
|
||||
|
|
|
@ -6,7 +6,7 @@ import string
|
|||
from datetime import datetime
|
||||
from http import HTTPStatus
|
||||
from subprocess import PIPE, Popen
|
||||
from typing import Callable, Iterable, List, Tuple, Union
|
||||
from typing import Callable, Iterable, Tuple, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
import fastapi
|
||||
|
@ -194,6 +194,6 @@ def parse_ssh_key(string: str) -> Tuple[str, str]:
|
|||
return (prefix, key)
|
||||
|
||||
|
||||
def parse_ssh_keys(string: str) -> List[Tuple[str, str]]:
|
||||
def parse_ssh_keys(string: str) -> list[Tuple[str, str]]:
|
||||
""" Parse a list of SSH public keys. """
|
||||
return [parse_ssh_key(e) for e in string.splitlines()]
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue