Merge github.com:jo1gi/grawlix

This commit is contained in:
Joakim Holm 2023-04-25 00:04:22 +02:00
commit c64228e58b
9 changed files with 205 additions and 12 deletions

View File

@ -5,12 +5,12 @@ from .sources import load_source, Source
from .output import download_book
from . import arguments, logging
from typing import Tuple
from typing import Tuple, Optional
from rich.progress import Progress
from functools import partial
def get_login(source: Source, config: Config, options) -> Tuple[str, str]:
def get_login(source: Source, config: Config, options) -> Tuple[str, str, Optional[str]]:
"""
Get login credentials for source
@ -23,10 +23,11 @@ def get_login(source: Source, config: Config, options) -> Tuple[str, str]:
if source_name in config.sources:
username = config.sources[source_name].username or options.username
password = config.sources[source_name].password or options.password
library = config.sources[source_name].library or options.library
else:
username = options.username
password = options.password
return username, password
return username, password, library
def get_urls(options) -> list[str]:
@ -55,8 +56,8 @@ def authenticate(source: Source, config: Config, options):
"""
logging.info(f"Authenticating with source [magenta]{source.name}[/]")
if source.supports_login:
username, password = get_login(source, config, options)
source.login(username, password)
username, password, library = get_login(source, config, options)
source.login(username, password, library=library)
source.authenticated = True
else:
raise SourceNotAuthenticated

View File

@ -39,6 +39,11 @@ def parse_arguments() -> argparse.Namespace:
help = "Password for login",
dest = "password",
)
parser.add_argument(
'--library',
help = "Library for login",
dest = "library",
)
# Outputs
parser.add_argument(
'-o',

View File

@ -10,6 +10,7 @@ class SourceConfig:
"""Stores configuration for source"""
username: Optional[str]
password: Optional[str]
library: Optional[str]
@dataclass(slots=True)
@ -37,5 +38,6 @@ def load_config() -> Config:
sources[key] = SourceConfig (
username = values.get("username"),
password = values.get("password"),
library = values.get("library"),
)
return Config(sources)

View File

@ -9,11 +9,19 @@ class AESEncryption:
iv: bytes
@dataclass(slots=True)
class AESCTREncryption:
key: bytes
nonce: bytes
initial_value: bytes
@dataclass(slots=True)
class XOrEncryption:
key: bytes
Encryption = Union[
AESCTREncryption,
AESEncryption,
XOrEncryption
]
@ -26,6 +34,14 @@ def decrypt(data: bytes, encryption: Encryption) -> bytes:
:param encryption: Information about how to decrypt
:returns: Decrypted data
"""
if isinstance(encryption, AESCTREncryption):
cipher = AES.new(
key = encryption.key,
mode = AES.MODE_CTR,
nonce = encryption.nonce,
initial_value = encryption.initial_value
)
return cipher.decrypt(data)
if isinstance(encryption, AESEncryption):
cipher = AES.new(encryption.key, AES.MODE_CBC, encryption.iv)
return cipher.decrypt(data)

View File

@ -25,7 +25,7 @@ class OutputFormat:
"""
if not book.file.extension == self.extension:
raise UnsupportedOutputFormat
self._download_and_write_file(book.file, location)
self._download_and_write_file(book.file, location, update_func)
def dl_image_list(self, book: ImageList, location: str, update_func: Update) -> None:
@ -39,30 +39,38 @@ class OutputFormat:
raise UnsupportedOutputFormat
def _download_file(self, file: OnlineFile) -> bytes:
def _download_file(self, file: OnlineFile, update: Update = None) -> bytes:
"""
Download `grawlix.OnlineFile`
:param file: File to download
:param update: Update function that is called with a percentage every time a chunk is downloaded
:returns: Content of downloaded file
"""
response = self._session.get(
request = self._session.get(
file.url,
headers = file.headers
headers = file.headers,
stream = True
)
content = response.content
total_filesize = int(request.headers["Content-length"])
content = b""
for chunk in request.iter_content(chunk_size=1024):
content += chunk
if update:
update(len(chunk)/total_filesize)
if file.encryption is not None:
content = decrypt(content, file.encryption)
return content
def _download_and_write_file(self, file: OnlineFile, location: str) -> None:
def _download_and_write_file(self, file: OnlineFile, location: str, update: Update = None) -> None:
"""
Download `grawlix.OnlineFile` and write to content to disk
:param file: File to download
:param location: Path to where the file is written
:param update: Update function that is called with a percentage every time a chunk is downloaded
"""
content = self._download_file(file)
content = self._download_file(file, update)
with open(location, "wb") as f:
f.write(content)

View File

@ -1,6 +1,7 @@
from grawlix.exceptions import NoSourceFound
from .source import Source
from .ereolen import Ereolen
from .flipp import Flipp
from .mangaplus import MangaPlus
from .saxo import Saxo
@ -49,6 +50,7 @@ def get_source_classes() -> list[type[Source]]:
:returns: A list of all available source types
"""
return [
Ereolen,
Flipp,
MangaPlus,
Saxo,

129
grawlix/sources/ereolen.py Normal file
View File

@ -0,0 +1,129 @@
from grawlix.book import Result, Book, SingleFile, Metadata, OnlineFile
from grawlix.encryption import AESCTREncryption
from grawlix.exceptions import InvalidUrl, DataNotFound
from grawlix.utils import nearest_string
from .source import Source
from bs4 import BeautifulSoup
import json
import re
from Crypto.Cipher import AES
from base64 import b64decode
LOGIN_PAGE_URL = "https://ereolen.dk/adgangsplatformen/login?destination=/user"
KEY_ENCRYPTION_KEY = bytes([30, 193, 150, 69, 32, 247, 35, 95, 92, 255, 193, 159, 121, 40, 151, 179, 39, 159, 75, 110, 32, 205, 210, 58, 81, 55, 158, 33, 8, 149, 108, 74])
KEY_ENCRYPTION_IV = bytes([0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0])
class Ereolen(Source):
name: str = "eReolen"
match: list[str] = [
r"https://ereolen.dk/ting/object/\d+-.+/read",
r"https://ereolen.dk/ting/object/\d+-[^/]+/?$"
]
_authentication_methods = [ "login" ]
_login_credentials = [ "username", "password", "library" ]
def login(self, username: str, password: str, **kwargs) -> None:
library = kwargs["library"]
login_page = self._session.get(LOGIN_PAGE_URL).text
login_soup = BeautifulSoup(login_page, "lxml")
borchk_login_form = login_soup.find(id="borchk-login-form")
login_path = borchk_login_form.get("action")
library_attr_name = borchk_login_form.find("label").get("for")
libraries = self._extract_available_libraries(login_page)
if not library in libraries:
library = nearest_string(library, list(libraries.keys()))
self._session.post(
f"https://login.bib.dk{login_path}",
headers = { "Content-Type": "application/x-www-form-urlencoded" },
data = {
library_attr_name: library,
"agency": libraries[library],
"userId": username,
"pincode": password
}
)
def _extract_available_libraries(self, login_page: str) -> dict[str, str]:
"""
Extract list of available libraries from login page
:param login_page: Content of login page as string
:returns: Dictionary with name and id of each library
"""
match = re.search("libraries = ({.+})<", login_page)
if match is None:
raise DataNotFound
library_data = json.loads(match.group(1))
libraries: dict[str, str] = {}
for library in library_data["folk"]:
library_name = library["name"]
library_id = library["branchId"]
libraries[library_name] = library_id
return libraries
def download(self, url: str) -> Result:
book_id = self._get_book_id(url)
metadata = self._session.get(
f"https://bookstreaming.pubhub.dk/v1/order/metadata/{book_id}"
).json()
key = self._decrypt_key(metadata["key"])
return Book(
data = SingleFile(
OnlineFile(
url = f"https://bookstreaming.pubhub.dk/v1/order/file/{book_id}",
extension = "epub",
encryption = AESCTREncryption(
key,
nonce = bytes([0,0,0,0,0,0,0,0]),
initial_value = bytes([0,0,0,0,0,0,0,0])
)
)
),
metadata = Metadata(
title = metadata["title"],
authors = [ metadata["author"] ]
)
)
def _decrypt_key(self, key: str) -> bytes:
"""
Decrypt book encryption key
:param key: Base64 encoded and encrypted key
:returns: Decoded and decrypted key
"""
decoded_key = b64decode(key)
cipher = AES.new(KEY_ENCRYPTION_KEY, AES.MODE_CBC, KEY_ENCRYPTION_IV)
return cipher.decrypt(decoded_key)[:16]
def _get_book_id(self, url: str) -> str:
"""
Download and extract book_id
:param url: Url to book page
:returns: Book id
"""
if re.match(self.match[0], url):
return self._get_book_id_from_reader(url)
if re.match(self.match[1], url):
return self._get_book_id_from_reader(f"{url}/read")
else:
raise InvalidUrl
def _get_book_id_from_reader(self, url: str) -> str:
"""
Download and extract book_id from reader page
:param url: Url to reader page
:returns: Book id
"""
page = self._session.get(url).text
soup = BeautifulSoup(page, "lxml")
return soup.find("div", id="pubhub-reader").get("order-id")

View File

@ -13,6 +13,7 @@ class Source(Generic[T]):
name: str = "UNKNOWN"
match: list[str] = []
_authentication_methods: list[str] = []
_login_credentials = [ "username", "password" ]
authenticated = False
def __init__(self):

View File

@ -1,6 +1,7 @@
from grawlix.exceptions import DataNotFound
from urllib.parse import urlparse, parse_qs
from functools import lru_cache
def get_arg_from_url(url: str, key: str) -> str:
parsed_url = urlparse(url)
@ -9,3 +10,31 @@ def get_arg_from_url(url: str, key: str) -> str:
return query[key][0]
except:
raise DataNotFound
@lru_cache
def levenstein_distance(a: str, b: str) -> int:
"""
Calculates the levenstein distance between `a` and `b`
https://en.wikipedia.org/wiki/Levenshtein_distance
"""
if len(a) == 0:
return len(b)
if len(b) == 0:
return len(a)
if a[0] == b[0]:
return levenstein_distance(a[1:], b[1:])
return 1 + min(
levenstein_distance(a, b[1:]), # Character is inserted
levenstein_distance(a[1:], b), # Character is deleted
levenstein_distance(a[1:], b[1:]) # Character is replaced
)
def nearest_string(input: str, list: list[str]) -> str:
"""
Finds the nearest string in `list` to `input` based on levenstein distance
"""
return sorted(list, key = lambda x: levenstein_distance(input, x))[0]