249 lines
12 KiB
Python
249 lines
12 KiB
Python
import platform
|
|
import re
|
|
import webbrowser
|
|
from collections import deque, namedtuple
|
|
from pathlib import Path
|
|
from queue import Empty, Queue
|
|
from urllib.parse import urljoin
|
|
|
|
from parsel import Selector
|
|
from PyQt5 import QtCore, QtGui, QtNetwork, QtWebEngineWidgets, QtWidgets
|
|
from PyQt5.uic import loadUi
|
|
|
|
try:
|
|
import win32api
|
|
except ImportError:
|
|
if platform.system()=='Windows':
|
|
raise #require win32api on windows
|
|
|
|
|
|
|
|
class Downloader(QtWidgets.QMainWindow):
|
|
|
|
download_progress = QtCore.pyqtSignal(str, int, int)
|
|
LoadedPage = namedtuple("LoadedPage", ("success", "html"))
|
|
|
|
def __init__(self, download_queue, install_ready, download_errors, download_directory, parent = None, flags = None):
|
|
super().__init__(parent, flags)
|
|
loadUi(Path(__file__).parent / "downloader.ui", baseinstance = self)
|
|
self.hide()
|
|
#create an off-the-record profile, isolated from other Downloader objects
|
|
new_page = QtWebEngineWidgets.QtWebEnginePage(QtWebEngineWidgets.QWebEngineProfile(self.web_view))
|
|
self.web_view.setPage(new_page)
|
|
self._load_result = Queue(maxsize=1)
|
|
#set application style and decorate pushbuttons
|
|
self._style = QtWidgets.QApplication.instance().style()
|
|
self.setWindowIcon(QtWidgets.QApplication.instance().windowIcon())
|
|
self.back_button.setIcon(self._style.standardIcon(self._style.SP_ArrowBack))
|
|
self.forward_button.setIcon(self._style.standardIcon(self._style.SP_ArrowForward))
|
|
self.reload_stop_button.setIcon(self._style.standardIcon(self._style.SP_BrowserReload))
|
|
self.home_button.setIcon(self._style.standardIcon(self._style.SP_ArrowUp))
|
|
#connect signals to make useable as a browser
|
|
self.web_view.urlChanged.connect(lambda url: self.address_bar.setText(url.toString()))
|
|
self.web_view.loadStarted.connect(self._reload_becomes_stop)
|
|
self.web_view.loadFinished.connect(self._stop_becomes_reload)
|
|
self.address_bar.returnPressed.connect(lambda: self.web_view.load(QtCore.QUrl(self.address_bar.text())))
|
|
#connect cancel actions to handlers
|
|
self.actionCancel.triggered.connect(self._cancel_manual_search)
|
|
self.actionCancel_and_Open_Issue_on_Github.triggered.connect(self._cancel_manual_search_open_issue)
|
|
#set attributes
|
|
self._download_queue = download_queue
|
|
self._install_ready = install_ready
|
|
self._download_errors = download_errors
|
|
self._download_directory = download_directory
|
|
|
|
|
|
def _reload_becomes_stop(self):
|
|
"""Turns the reload button into a stop button.
|
|
"""
|
|
self._reload_stop_button.clicked.disconnect()
|
|
self._reload_stop_button.clicked.connect(self.web_view.stop)
|
|
self._reload_stop_button.setToolTip("Stop")
|
|
self._reload_stop_button.setIcon(self._style.standardIcon(self._style.SP_BrowserStop))
|
|
self._check_history()
|
|
|
|
def _stop_becomes_reload(self, ok):
|
|
"""Turns a temporary stop button back into a reload button
|
|
|
|
Args:
|
|
ok (bool): Received from signal indicating load was stopped without error (True) or because of error (False). Ignored.
|
|
"""
|
|
self._reload_stop_button.clicked.disconnect()
|
|
self._reload_stop_button.clicked.connect(self.web_view.reload)
|
|
self._reload_stop_button.setToolTip("Reload")
|
|
self._reload_stop_button.setIcon(self._style.standardIcon(self._style.SP_BrowserReload))
|
|
self._check_history()
|
|
|
|
def _check_history(self):
|
|
"""Checks if the current web view has history to navigate forward or backward to. Disables or enables the forward/backward buttons accordingly
|
|
"""
|
|
if self.web_view.history().canGoBack():
|
|
self.back_button.setEnabled(True)
|
|
else:
|
|
self.back_button.setEnabled(False)
|
|
|
|
if self.web_view.history().canGoForward():
|
|
self.forward_button.setEnabled(True)
|
|
else:
|
|
self.forward_button.setEnabled(False)
|
|
|
|
def _cancel_manual_search(self):
|
|
"""Action handler for the cancel action. Puts the received tool info into errors queue and immediately exits
|
|
"""
|
|
self._download_errors.put(self._tool_info)
|
|
self.close()
|
|
self.deleteLater()
|
|
QtCore.QThread.currentThread().exit(1)
|
|
|
|
def _cancel_manual_search_open_issue(self):
|
|
"""Action handler for the cancel and open a bug report. Same as cancel, except it will open the github issues page in the default browser first
|
|
"""
|
|
self._download_errors.put(self._tool_info)
|
|
webbrowser.open_new_tab("https://github.com/norweeg/portable-computing-toolkit-installer/issues/new")
|
|
self.close()
|
|
self.deleteLater()
|
|
QtCore.QThread.currentThread().exit(1)
|
|
|
|
def begin_manual_search(self):
|
|
self.web_view.history().clear()
|
|
self._check_history()
|
|
self.show()
|
|
self.web_view.load(QtCore.QUrl(self._home_page))
|
|
|
|
def begin_auto_search(self):
|
|
"""Begins the search process by getting tool info from a download queue and finalizes some GUI elements (in case manual search is necessary)
|
|
"""
|
|
try:
|
|
tool_info = self._download_queue.get(timeout = 1)
|
|
except Empty:
|
|
QtCore.QThread.currentThread().quit()
|
|
else:
|
|
self._tool_info = tool_info
|
|
self._tool_name = tool_info["name"]
|
|
self._download_page_selector = deque(tool_info["search"]["selector"])
|
|
self._home_page = tool_info["homepage"]
|
|
#name window from attribute
|
|
self.setWindowTitle(f"Find {self._tool_name}")
|
|
#get homepage from attributes and set home button to load it
|
|
self.home_button.clicked.connect(lambda: self.web_view.load(QtCore.QUrl(self._home_page)))
|
|
self.web_view.loadFinished.connect(lambda status: self._load_result.put(Downloader.LoadedPage(status, self.web_view.page().toHtml())))
|
|
try:
|
|
self._download_file(self._find_installer_url(self._tool_info["page"]))
|
|
except:
|
|
self.begin_manual_search()
|
|
|
|
def _find_installer_url(self, url):
|
|
"""Finds the URL of the next page to navigate to or the item to download using the search selectors from a tool info dictionary. The fully-rendered
|
|
HTML will be scraped, in case the desired URL is rendered by JavaScript
|
|
|
|
Args:
|
|
url (str): The URL of the page to be scraped
|
|
"""
|
|
#load the url with a browser that will render page, including urls generated by JavaScript and get result
|
|
self.web_view.load(QtCore.QUrl(url))
|
|
load_result = self._load_result.get()
|
|
#if page loaded successfully, parse the results, otherwise return nothing
|
|
if load_result.success:
|
|
page = Selector(text = load_result.html)
|
|
self._load_result.task_done()
|
|
else:
|
|
self._load_result.task_done()
|
|
return
|
|
'''if tool has a download page selector, we need to follow one or more links from the "page" url to find the download page to search
|
|
portableapps.com loves to make you do this'''
|
|
if self._download_page_selector:
|
|
next_page_query = self._download_page_selector.popleft()
|
|
return self._find_installer_url(urljoin(url, page.css(next_page_query).attrib["href"]))
|
|
#get all links in the current page and search for the tool's installer file using a regular expression
|
|
for link in [urljoin(url, anchor.attrib["href"]) for anchor in page.xpath("//a")]:
|
|
if re.search(self._tool_info["search"]["filename regex"], link):
|
|
#the link matched the regular expression, our download URL is found!
|
|
return link
|
|
#if nothing was found by now, returns nothing. automated search has failed
|
|
return
|
|
|
|
def _download_file(self, url):
|
|
"""Initiates a download of url
|
|
|
|
Args:
|
|
url (str): The URL of the file to be downloaded
|
|
"""
|
|
#no url means autosearch has failed
|
|
assert url
|
|
try:
|
|
self.web_view.loadFinished.disconnect()
|
|
except:
|
|
pass
|
|
self.web_view.page().profile().downloadRequested.connect(self._process_download)
|
|
self.web_view.load(url)
|
|
|
|
@QtCore.pyqtSlot("QWebEngineDownloadItem*")
|
|
def _process_download(self, download_item):
|
|
self._tool_info["download url"] = download_item.url().toString()
|
|
self._tool_info["mimetype"] = download_item.mimeType()
|
|
suggested_path = Path(download_item.path())
|
|
filename = suggested_path.name
|
|
download_item.setPath(str(Path(self._download_directory/filename)))
|
|
self._tool_info["filename"] = Path(download_item.path())
|
|
download_item.downloadProgress.connect(lambda x, y: self.download_progress.emit(self._tool_name, x, y))
|
|
download_item.stateChanged.connect(self._download_interrupted)
|
|
download_item.finished(self._check_finished)
|
|
download_item.accept()
|
|
|
|
@QtCore.pyqtSlot("QWebEngineDownloadItem::DownloadState")
|
|
def _download_interrupted(self, state):
|
|
"""Receives the QWebEngineDownloadItem's stateChanged signal and, if the state is "interrupted", trigger a failure
|
|
"""
|
|
if state == QtWebEngineWidgets.QWebEngineDownloadItem.DownloadInterrupted:
|
|
sender = self.sender()
|
|
self._tool_info["error reason"] = sender.interruptReasonString()
|
|
sender.cancel()
|
|
self._tool_info["filename"].unlink()
|
|
self._tool_info.pop("filename", None)
|
|
self._download_errors.put(self._tool_info)
|
|
self.close()
|
|
self.deleteLater()
|
|
QtCore.QThread.currentThread().exit(1)
|
|
|
|
@QtCore.pyqtSlot()
|
|
def _check_finished(self):
|
|
"""Receives the QWebEngineDownloadItem's finished signal, verifies success, unblocks the file an triggers a success
|
|
"""
|
|
#get the downloaditem that sent this signal
|
|
sender = self.sender()
|
|
#if the download item is completed, unblock the file on windows, then close and exit this thread
|
|
if sender.state() == QtWebEngineWidgets.QWebEngineDownloadItem.DownloadCompleted:
|
|
if platform.system() == 'Windows':
|
|
self._unblock_file()
|
|
#check if the total bytes in the download is known. -1 means not known.
|
|
if sender.totalBytes() == -1:
|
|
#if not known, signal that this download has completed, since the progress thusfar has been ignored
|
|
self.download_progress.emit(self._tool_name, 1, 1)
|
|
self._install_ready.put(self._tool_info)
|
|
self.close()
|
|
self.deleteLater()
|
|
QtCore.QThread.currentThread().quit()
|
|
|
|
def _unblock_file(self):
|
|
"""Removes the "downladed from internet" Zone Identifier. Windows will prevent executables that have this set from executing.
|
|
Contents of zip files inherit this from the .zip they come from
|
|
"""
|
|
try:
|
|
win32api.DeleteFile(str(self._tool_info["filename"])+r":Zone.Identifier")
|
|
except win32api.error:
|
|
#just ignore the error if the above Zone Identifier is not set
|
|
pass
|
|
|
|
class DownloadWorker(QtCore.QRunnable):
|
|
def __init__(self, download_queue, install_ready, download_error, download_directory, wizard):
|
|
super().__init__()
|
|
self._download_queue = download_queue
|
|
self._install_ready = install_ready
|
|
self._download_directory = download_directory
|
|
self._wizard = wizard
|
|
|
|
def run(self):
|
|
downloader = Downloader(self.download_queue, self._install_ready, self._download_error, self._download_directory)
|
|
downloader.download_progress.connect(self._wizard.track_progress)
|
|
downloader.begin_auto_search()
|