Files
Portable-Computing-Toolkit-…/portable_computing_toolkit_installer/ui/installer_wizard.py
2019-05-09 16:51:38 -04:00

377 lines
20 KiB
Python
Executable File

import platform
import subprocess
import tarfile
import tempfile
import webbrowser
import zipfile
from pathlib import Path
from queue import Queue
import json
from collections import deque
from itertools import product
from functools import partial
from threading import Lock
import psutil
from PyQt5 import QtCore, QtGui, QtNetwork, QtWebEngineWidgets, QtWidgets
from PyQt5.uic import loadUi
try:
import win32file
except ImportError:
if platform.system()=='Windows':
raise #require win32file on windows
from validators.location_validator import Location_Validator
from .menu_iterator import MenuIterator
from .downloader import DownloadWorker, Downloader
class InstallerWizard(QtWidgets.QWizard):
def __init__(self, parent=None):
super().__init__(parent)
loadUi(Path(__file__).parent / "installer_wizard.ui", baseinstance=self)
self.setWindowIcon(QtWidgets.QApplication.instance().windowIcon())
self.intro_page.validatePage = self._requirements_check
self.selection_page.initializePage = self._load_tools
self.intro.anchorClicked.connect(self._link_clicked)
self.license.anchorClicked.connect(self._link_clicked)
self.selection_menu.itemClicked.connect(self._enforce_dependencies)
self.install_location.setValidator(Location_Validator(self))
self.location_page.registerField("Location*", self.install_location)
self.browse_button.clicked.connect(self._select_location)
self.selection_menu.itemClicked.connect(self._open_homepage)
self.selection_menu.itemCollapsed.connect(lambda x: self.selection_menu.resizeColumnToContents(0))
self.selection_menu.itemExpanded.connect(lambda x: self.selection_menu.resizeColumnToContents(0))
self.selection_page.setCommitPage(True)
self.progress_page.setFinalPage(True)
#Change button text on license page
self.license_page.setButtonText(QtWidgets.QWizard.NextButton, "Agree")
self.license_page.setButtonText(QtWidgets.QWizard.CancelButton, "Decline")
self._network_manager = QtNetwork.QNetworkAccessManager(parent=QtWidgets.QApplication.instance())
self._thread_pool = QtCore.QThreadPool.globalInstance()
self._project_page=Path("https://github.com/norweeg/portable-computing-toolkit-installer/")
self._progress_update_lock = Lock()
@QtCore.pyqtSlot("QUrl")
def _link_clicked(self, address):
"""Receives anchorClicked signal from the intro or license text browsers and opens the address in an external web browser.
Opens an error message window if it is not able to do so.
Arguments:
address {QtCore.QUrl} -- The URL from the received signal
"""
try:
webbrowser.open(address.toString())
except webbrowser.Error as e:
self._display_error("Unable to launch external browser",e)
except Exception as e:
self._display_error("An error has occurred",e)
@QtCore.pyqtSlot()
def _select_location(self):
"""Opens a QFileDialog in either the root of the most-recently connected non-network, non-cd drive or the user's home directory,
if no suitable external device is found
"""
if platform.system() == 'Windows':
#get all drives read-writable drives except C:\
drives = [drive for drive in psutil.disk_partitions(all = True) if drive.device not in ("","C:\\") and "rw" in drive.opts]
if drives: #drives other than C:\ exist, are likely installation locations
dialog_location = Path(drives[0].mountpoint)
else: #C:\ was only drive, so start in user home
dialog_location = Path.home()
else:
dialog_location = Path.home()
selected_location = QtWidgets.QFileDialog.getExistingDirectory(parent=self,caption="Select Location",
directory=str(dialog_location),
options=(QtWidgets.QFileDialog.ShowDirsOnly))
self.install_location.setText(selected_location)
def _requirements_check(self):
"""Checks that the minimum system requirements are met. Opens error message and resets the wizard if they are not.
Minimum Requirements: Windows 7 or greater (64-bit)
"""
process_dialog = QtWidgets.QProgressDialog("Checking System Requirements...", str(), 0, 100, self, QtCore.Qt.Dialog)
process_dialog.setAutoClose(True)
process_dialog.setAutoReset(True)
process_dialog.setCancelButton(None)
if self._network_manager.networkAccessible():
request = QtNetwork.QNetworkRequest(QtCore.QUrl("https://www.gnu.org/licenses/gpl-3.0-standalone.html"))
request.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute, QtNetwork.QNetworkRequest.AlwaysNetwork)
request.setAttribute(QtNetwork.QNetworkRequest.FollowRedirectsAttribute, True)
reply = self._network_manager.get(request)
reply.downloadProgress.connect(lambda received, total: process_dialog.setMaximum(total))
reply.downloadProgress.connect(lambda received, total: process_dialog.setValue(received))
process_dialog.show()
else:
self._display_error("This installer requires an active internet connection, but you do not appear to be online")
return False
if not platform.system() == 'Windows' or int(platform.release()) < 7:
if QtWidgets.QMessageBox.question(self, "", "This toolkit requires Microsoft Windows 7 or newer to be used. Continue anyway?") in (QtWidgets.QMessageBox.No, 0):
reply.abort()
process_dialog.cancel()
return False
if not platform.machine().lower() in ('amd64', 'x86_64'):
if QtWidgets.QMessageBox.question(self, "", "Parts of this toolkit require a 64-bit processor. Continue anyway?") in (QtWidgets.QMessageBox.No, 0):
reply.abort()
process_dialog.cancel()
return False
while reply.isRunning():
QtWidgets.QApplication.instance().processEvents()
QtCore.QThread.currentThread().msleep(100)
if not reply.error():
self.license.setHtml(bytearray(reply.readAll()).decode("utf-8"))
return True
else:
self._display_error(f"Encountered an error while testing Internet connectivity:\n\n{reply.errorString()}")
return False
def _display_error(self, message, details = None):
"""Displays an error message.
Arguments:
message {str} -- The contents of the error message to be displayed
Keyword Arguments:
details {Any} -- Any string-printable object that will be set in the more details, typically an Exception object (default: {None})
"""
message=QtWidgets.QMessageBox(icon=QtWidgets.QMessageBox.Critical,parent=self,text=message)
if details:
message.setDetailedText(str(details))
message.show()
return message
def _load_tools(self):
"""Loads tool data from supported_tools.json hosted on the project Github page
"""
process_dialog = QtWidgets.QProgressDialog("Loading tool information...", str(), 0, 100, self, QtCore.Qt.Dialog)
process_dialog.setAutoClose(True)
process_dialog.setAutoReset(True)
process_dialog.setCancelButton(None)
try: #clear data from this page before reloading
self.__tools__ = None
self._icon_cache = {item.text(0):item.icon(0) for item in list(MenuIterator(self.selection_menu, QtWidgets.QTreeWidgetItemIterator.NoChildren))}
self.selection_menu.clear()
except NameError:
pass
request = QtNetwork.QNetworkRequest(QtCore.QUrl("https://raw.githubusercontent.com/norweeg/portable-computing-toolkit-installer/initial_dev/portable_computing_toolkit_installer/resources/supported_tools.json"))
request.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute, QtNetwork.QNetworkRequest.AlwaysNetwork)
request.setAttribute(QtNetwork.QNetworkRequest.FollowRedirectsAttribute, True)
reply = self._network_manager.get(request)
reply.downloadProgress.connect(lambda received, total: process_dialog.setMaximum(total))
reply.downloadProgress.connect(lambda received, total: process_dialog.setValue(received))
process_dialog.show()
while reply.isRunning():
QtWidgets.QApplication.instance().processEvents()
QtCore.QThread.currentThread().msleep(100)
if not reply.error():
try:
self.__tools__ = json.loads(bytearray(reply.readAll()))
except json.JSONDecodeError as e:
self._display_error(f"Unable to decode {request.url.toString()}", e).accepted.connect(self.back)
else:
self._populate_menu()
else:
self._display_error(f"Encountered an error while loading supported tools from {request.url().toString()}", reply.errorString()).finished.connect(lambda x: self.back())
def _populate_menu(self):
"""Populates the selection menu with items loaded from supported_tools.json
"""
process_dialog = QtWidgets.QProgressDialog("Loading tool information...", str(), 0, len(self.__tools__)-1, self, QtCore.Qt.Dialog)
process_dialog.setAutoClose(True)
process_dialog.setAutoReset(True)
process_dialog.setCancelButton(None)
process_dialog.show()
for tool in sorted(self.__tools__, key = lambda x: x["category"] + f".{x['name']}"):
placement = deque([level.capitalize().strip() for level in tool["category"].split(".") if level] + [tool["name"]])
root = self.selection_menu.invisibleRootItem()
self._add_menu_item(placement, root, tool)
process_dialog.setValue(process_dialog.value() + 1)
for item in [self.selection_menu.topLevelItem(i) for i in range(self.selection_menu.topLevelItemCount())]:
item.setExpanded(True)
def _add_menu_item(self, placement, root, tool):
current_level = placement.popleft()
children = [root.child(i) for i in range(root.childCount())]
if any([item is child for item, child in product(self.selection_menu.findItems(current_level,
QtCore.Qt.MatchFixedString|QtCore.Qt.MatchCaseSensitive|QtCore.Qt.MatchRecursive), children)]):
root = list(filter(lambda child: child.text(0) == current_level, children)).pop()
self._add_menu_item(placement, root, tool)
else:
new_item = QtWidgets.QTreeWidgetItem(root, QtWidgets.QTreeWidgetItem.Type)
new_item.setText(0, current_level)
new_item.setFlags(QtCore.Qt.ItemIsSelectable|QtCore.Qt.ItemIsUserCheckable|QtCore.Qt.ItemIsEnabled)
new_item.setCheckState(0, QtCore.Qt.Unchecked)
if not placement: #all items popped from placement
new_item.setFlags(new_item.flags()|QtCore.Qt.ItemNeverHasChildren)
new_item.setData(1, QtCore.Qt.DecorationRole, QtGui.QIcon(str(Path(__file__).parent.parent/"resources/icons/internet-web-browser-4.png")))
new_item.setData(1, QtCore.Qt.ToolTipRole, tool["homepage"])
try:
new_item.setIcon(0, self._icon_cache[new_item.text(0)])
except (AttributeError, KeyError):
try:
self._get_icon(new_item, tool["icon url"])
except KeyError:
self._get_icon(new_item)
try:
if tool["depends on"] == current_level: #tool depends on itself makes it mandatory
new_item.setFlags(new_item.flags()^QtCore.Qt.ItemIsUserCheckable)
new_item.setCheckState(0, QtCore.Qt.Checked)
except KeyError:
pass
else:
new_item.setFlags(new_item.flags()|QtCore.Qt.ItemIsAutoTristate)
new_item.setExpanded(False)
new_item.addChild(self._add_menu_item(placement, new_item, tool))
return new_item
@QtCore.pyqtSlot("QTreeWidgetItem*", "int")
def _open_homepage(self, item, column):
"""Receives the itemClicked signal from the selection_menu QTreeWidget and if the user has clicked a homepage, opens it in a new browser tab.
Arguments:
item {QTreeWidgetItem} -- The item that was clicked
column {int} -- The index of the column of the item that was clicked
"""
try:
#if the item has a valid URL, open it, otherwise do nothing
if column and QtCore.QUrl(item.data(column, QtCore.Qt.ToolTipRole), QtCore.QUrl.StrictMode).scheme().startswith("http"):
webbrowser.open_new_tab(item.data(column, QtCore.Qt.ToolTipRole))
except:
pass
def _get_icon(self, tree_item, icon_url = None):
"""Given a QTreeWidgetItem, this will initiate a request to download an icon for it. If icon_url is specified, that will be fetched, otherwise
the favicon of the tree item's homepage.
Arguments:
tree_item {QTreeWidgetItem} -- a tree item with a specified homepage to set an icon on
Keyword Arguments:
icon_url {str} -- A string representing the url of the icon to fetch instead of the favicon of the homepage (default: {None})
"""
if not icon_url:
icon_url = QtCore.QUrl(f"http://www.google.com/s2/favicons?domain={QtCore.QUrl(tree_item.data(1, QtCore.Qt.ToolTipRole)).toString()}")
icon_request = QtNetwork.QNetworkRequest(QtCore.QUrl(icon_url))
icon_request.setAttribute(QtNetwork.QNetworkRequest.CacheLoadControlAttribute, QtNetwork.QNetworkRequest.AlwaysNetwork)
icon_request.setAttribute(QtNetwork.QNetworkRequest.FollowRedirectsAttribute, True)
icon_reply = self._network_manager.get(icon_request)
#when download is complete, validates request returned without error and sets icon
icon_reply.finished.connect(partial(self._set_icon, tree_item, icon_reply))
def _set_icon(self, tree_item, reply):
"""Validates a network request and sets its result as the icon of a QTreeWidgetItem.
Arguments:
tree_item {QTreeWidgetItem} -- A tree widget item to set the icon of
request {QNetworkReply} -- The results of the HTTP GET request whose data will be used as the icon for tree_item
"""
if not reply.error():
pixmap = QtGui.QPixmap()
pixmap.loadFromData(reply.readAll())
tree_item.setIcon(0, QtGui.QIcon(pixmap))
def _get_selections(self):
"""Returns a subset of the supported tools whose corresponding item was selected by the user.
"""
return list(MenuIterator(self.selection_menu, QtWidgets.QTreeWidgetItemIterator.Checked|QtWidgets.QTreeWidgetItemIterator.NoChildren))
@QtCore.pyqtSlot("QTreeWidgetItem*","int")
def _enforce_dependencies(self, item, column):
if column: #ignore everything not in column 0 (the names)
return
elif item.childCount(): #recurse through child items til no children
for child in [item.child(i) for i in range(item.childCount())]:
self._enforce_dependencies(child, column)
else:
#if item is checked, look for dependent items and check them too
tools_by_name = {tool["name"]:tool for tool in self.__tools__}
if item.checkState(0) == QtCore.Qt.Checked:
try:
dependencies = tools_by_name[item.text(0)]["depends on"].split(",")
except KeyError:
pass
else:
for dependency in dependencies:
for dependant_item in self.selection_menu.findItems(dependency, QtCore.Qt.MatchFixedString|QtCore.Qt.MatchCaseSensitive|QtCore.Qt.MatchRecursive):
dependant_item.setCheckState(0, QtCore.Qt.Checked)
else: #item was unchecked
#find all checked items and build a set of their dependencies
selected = self._get_selections()
dependencies_of_selected = set()
names_of_dependant_items = []
for selected_item in selected:
try:
dependencies_of_selected |= set((tools_by_name[selected_item.text(0)]["depends on"]).split(","))
except KeyError:
pass
else:
if item.text(0) in (tools_by_name[selected_item.text(0)]["depends on"]).split(","):
names_of_dependant_items.append(selected_item.text(0))
#if the item that was just unchecked is a dependency of another selected item
if item.text(0) in dependencies_of_selected:
#build a gramatically correct error (minus oxford comma (sorry!!))
if len(names_of_dependant_items) > 1:
for name in names_of_dependant_items[1:-1]:
name = f", {name}"
names_of_dependant_items[-1] = f" and {names_of_dependant_items[-1]}"
depend_text = "depend"
else:
depend_text = "depends"
#display the error and disallow unchecking item while something that depends on it is selected
self._display_error(f"{str().join(names_of_dependant_items)} {depend_text} on {item.text(0)}")
item.setCheckState(0, QtCore.Qt.Checked)
def _begin_downloads(self, download_queue = None, install_ready = None, download_directory = None):
if download_queue:
pass
else:
self._download_queue = Queue()
if install_ready:
pass
else:
self._install_ready = Queue()
self._download_error = Queue()
if download_directory:
pass
else:
try:
self._download_directory = QtCore.QTemporaryDir()
except Exception as e:
self._display_error(str(e), self._download_directory.errorString()).accepted.connect(lambda: self.back())
if self._download_queue.empty():
tools_by_name = {tool["name"]:tool for tool in self.__tools__}
self._download_progress = {}
for selected_tool in self._get_selections():
self._download_queue.put_nowait(tools_by_name[selected_tool.text(0)])
self.progress_bar.setRange(0, self._download_queue.qsize())
while self._thread_pool.activeThreadCount() < self._thread_pool.maxThreadCount() and not self._download_queue.empty():
self._thread_pool.tryStart(DownloadWorker(self._download_queue, self._install_ready, self._download_error, self._download_directory, self))
QtCore.QThread.currentThread().msleep(250)
assert self._download_error.empty()
@QtCore.pyqtSlot(str, int, int)
def track_progress(self, tool_name, bytes_received, bytes_total):
if tool_name not in [tool["name"] for tool in self.__tools__]:
return
else:
try:
self._download_progress[tool_name] = bytes_received/bytes_total
except ZeroDivisionError:
self._download_progress[tool_name] = 0
self._progress_update_lock.acquire()
self.progress_bar.setValue(sum(self._download_progress.values()))
self._progress_update_lock.release()