102 lines
3.6 KiB
Python
102 lines
3.6 KiB
Python
from operator import attrgetter, itemgetter, methodcaller
|
|
from pathlib import Path
|
|
from shutil import move
|
|
|
|
from aptdaemon.client import AptClient
|
|
from aptsources.distro import get_distro
|
|
from aptsources.sourceslist import SourcesList
|
|
from toolz import complement, compose, count, curry, excepts, groupby
|
|
from toolz.curried.operator import eq
|
|
|
|
# Get info about currently installed OS
|
|
distro = get_distro()
|
|
RELEASE = distro.release
|
|
CODENAME = distro.codename
|
|
|
|
# Gather all apt repos and remove the OS base repos
|
|
sources = SourcesList()
|
|
sources_by_file = groupby(attrgetter("file"), sources)
|
|
|
|
for source in sources_by_file["/etc/apt/sources.list"]:
|
|
sources.remove(source)
|
|
|
|
# Some methods used for filtering sources
|
|
is_valid = complement(attrgetter("invalid"))
|
|
|
|
comments = map(attrgetter("comment"), filter(is_valid, sources))
|
|
|
|
is_current = compose(
|
|
eq((RELEASE, CODENAME)),
|
|
excepts(IndexError, itemgetter(-2, -1), lambda _: ("", "")),
|
|
methodcaller("split", " "),
|
|
)
|
|
|
|
# Path to where additional repos are stored
|
|
sources_list_d = Path("/etc/apt/sources.list.d")
|
|
|
|
|
|
@curry
|
|
def restore_backup(file, suffix, e):
|
|
move(file + suffix, file)
|
|
sources.refresh()
|
|
|
|
|
|
# if there isn't a backup file for every additional source
|
|
if count(sources_list_d.glob("*.original.save")) != count(
|
|
sources_list_d.glob("*.list")
|
|
):
|
|
# for each valid line in source add a comment with the current OS release and codename
|
|
for source in filter(is_valid, sources):
|
|
source.comment = (
|
|
f"{source.comment} installed on Ubuntu {RELEASE} {CODENAME}".strip()
|
|
)
|
|
sources.save()
|
|
sources.backup(".original.save")
|
|
else:
|
|
# if the comment on a valid source line does not match the current OS
|
|
if not all(map(is_current, comments)):
|
|
# create an apt DBus client
|
|
client = AptClient()
|
|
# create some temp backup files of the repo .list files
|
|
temp_suffix = sources.backup()
|
|
# restore the original versions of the .list files and refresh data
|
|
sources.restore_backup(".original.save")
|
|
sources.refresh()
|
|
|
|
# for each valid source line
|
|
for source in filter(is_valid, sources):
|
|
# get the release and codename which are the last 2 words in the comment
|
|
orig_release, orig_codename = source.comment.split(" ")[-2:]
|
|
|
|
# if they don't match the current OS, update them, save, and refresh
|
|
if (RELEASE != orig_release) or (CODENAME != orig_codename):
|
|
if source.dist == orig_release:
|
|
source.dist = RELEASE
|
|
|
|
if orig_release in source.uri:
|
|
source.uri = source.uri.replace(orig_release, RELEASE)
|
|
|
|
if orig_codename in source.uri:
|
|
source.uri = source.uri.replace(orig_codename, CODENAME)
|
|
|
|
source.comment = (
|
|
f"{source.comment} installed on Ubuntu {RELEASE} {CODENAME}".strip()
|
|
)
|
|
|
|
sources.save()
|
|
sources.refresh()
|
|
|
|
# test that the repo, updated for the current OS, is still valid by
|
|
# refreshing the cache. An error indicates that it is not valid. If
|
|
# an error is detected, restore the temporary backup and refresh using
|
|
# error handler defined above, otherwise leave updated .list file as-is
|
|
client.update_cache(
|
|
source.file,
|
|
True,
|
|
error_handler=restore_backup(source.file, temp_suffix),
|
|
)
|
|
|
|
# cleanup any remaining temp backup .list files
|
|
for file in sources_list_d.glob(f"*{temp_suffix}"):
|
|
file.unlink()
|