Merge with master

This commit is contained in:
Joakim Holm 2023-04-29 19:56:50 +02:00
commit f257898681
22 changed files with 651 additions and 127 deletions

44
README.md Normal file
View File

@ -0,0 +1,44 @@
# grawlix
![GitHub release](https://img.shields.io/github/v/release/jo1gi/grawlix)
![GitHub top language](https://img.shields.io/github/languages/top/jo1gi/grawlix)
![License](https://img.shields.io/github/license/jo1gi/grawlix)
[![Donate using Ko-Fi](https://img.shields.io/badge/donate-kofi-00b9fe?logo=ko-fi&logoColor=00b9fe)](https://ko-fi.com/jo1gi)
CLI ebook downloader
## Supported services
grawlix currently supports downloading from the following sources:
- [eReolen](https://ereolen.dk)
- [Flipp](https://flipp.dk)
- [Manga Plus](https://mangaplus.shueisha.co.jp)
- [Royal Road](https://www.royalroad.com)
- [Saxo](https://saxo.com)
- [Webtoons](https://webtoons.com)
## Installation
```shell
git clone https://github.com/jo1gi/grawlix.git
cd grawlix
python3 setup.py install
```
## Authentication
Some sources require authentication, which can be done either with cli arguments
or a config file.
**Cli example**
```shell
grawlix --username "user@example.com" --password "SuperSecretPassword" <url>
```
**Config file example**
```toml
[sources.name]
username = "user@example.com"
password = "SuperSecretPassword"
```
## Download books
To download a book run:
```shell
grawlix [options] <book url>
```

View File

@ -9,9 +9,10 @@ from typing import Tuple, Optional
from rich.progress import Progress
from functools import partial
import os
import asyncio
def get_login(source: Source, config: Config, options) -> Tuple[str, str]:
def get_login(source: Source, config: Config, options) -> Tuple[str, str, Optional[str]]:
"""
Get login credentials for source
@ -24,10 +25,12 @@ def get_login(source: Source, config: Config, options) -> Tuple[str, str]:
if source_name in config.sources:
username = config.sources[source_name].username or options.username
password = config.sources[source_name].password or options.password
library = config.sources[source_name].library or options.library
else:
username = options.username
password = options.password
return username, password
password = options.password
return username, password, library
def get_urls(options) -> list[str]:
@ -60,7 +63,7 @@ def get_cookie_file(options) -> Optional[str]:
return None
def authenticate(source: Source, config: Config, options):
async def authenticate(source: Source, config: Config, options):
"""
Authenticate with source
@ -70,8 +73,8 @@ def authenticate(source: Source, config: Config, options):
"""
logging.info(f"Authenticating with source [magenta]{source.name}[/]")
if source.supports_login:
username, password = get_login(source, config, options)
source.login(username, password)
username, password, library = get_login(source, config, options)
await source.login(username, password, library=library)
source.authenticated = True
if source.supports_cookies:
cookie_file = get_cookie_file(options)
@ -81,29 +84,29 @@ def authenticate(source: Source, config: Config, options):
raise SourceNotAuthenticated
def main() -> None:
async def main() -> None:
args = arguments.parse_arguments()
config = load_config()
urls = get_urls(args)
for url in urls:
source: Source = load_source(url)
if not source.authenticated and source.requires_authentication:
authenticate(source, config, args)
result = source.download(url)
await authenticate(source, config, args)
result = await source.download(url)
if isinstance(result, Book):
with logging.progress(result.metadata.title, source.name) as progress:
template = args.output or "{title}.{ext}"
download_with_progress(result, progress, template)
template: str = args.output or "{title}.{ext}"
await download_with_progress(result, progress, template)
elif isinstance(result, Series):
with logging.progress(result.title, source.name, len(result.book_ids)) as progress:
for book_id in result.book_ids:
book = source.download_book_from_id(book_id)
template = args.output or "{series}/{title}.{ext}"
download_with_progress(book, progress, template)
book: Book = await source.download_book_from_id(book_id)
template: str = args.output or "{series}/{title}.{ext}"
await download_with_progress(book, progress, template)
logging.info("")
def download_with_progress(book: Book, progress: Progress, template: str):
async def download_with_progress(book: Book, progress: Progress, template: str):
"""
Download book with progress bar in cli
@ -113,9 +116,14 @@ def download_with_progress(book: Book, progress: Progress, template: str):
"""
task = logging.add_book(progress, book)
update_function = partial(progress.advance, task)
download_book(book, update_function, template)
await download_book(book, update_function, template)
progress.advance(task, 1)
def run() -> None:
"""Start main function"""
asyncio.run(main())
if __name__ == "__main__":
main()
run()

View File

@ -2,7 +2,7 @@ from grawlix import __version__
import argparse
def parse_arguments():
def parse_arguments() -> argparse.Namespace:
# Help
parser = argparse.ArgumentParser(
prog = "grawlix",
@ -39,6 +39,11 @@ def parse_arguments():
help = "Password for login",
dest = "password",
)
parser.add_argument(
'--library',
help = "Library for login",
dest = "library",
)
parser.add_argument(
'-c',
'--cookies',

View File

@ -46,9 +46,21 @@ class ImageList:
"""
images: list[OnlineFile]
@dataclass(slots=True)
class HtmlFile:
title: str
file: OnlineFile
selector: Optional[dict[str, str]]
@dataclass(slots=True)
class HtmlFiles:
cover: OnlineFile
htmlfiles: list[HtmlFile]
BookData = Union[
SingleFile,
ImageList
ImageList,
HtmlFiles
]
@dataclass(slots=True)
@ -56,6 +68,7 @@ class Book:
"""Stores information about a book"""
metadata: Metadata
data: BookData
overwrite: bool = False
T = TypeVar("T")

View File

@ -10,6 +10,7 @@ class SourceConfig:
"""Stores configuration for source"""
username: Optional[str]
password: Optional[str]
library: Optional[str]
@dataclass(slots=True)
@ -37,5 +38,6 @@ def load_config() -> Config:
sources[key] = SourceConfig (
username = values.get("username"),
password = values.get("password"),
library = values.get("library"),
)
return Config(sources)

View File

@ -9,11 +9,19 @@ class AESEncryption:
iv: bytes
@dataclass(slots=True)
class AESCTREncryption:
key: bytes
nonce: bytes
initial_value: bytes
@dataclass(slots=True)
class XOrEncryption:
key: bytes
Encryption = Union[
AESCTREncryption,
AESEncryption,
XOrEncryption
]
@ -26,6 +34,14 @@ def decrypt(data: bytes, encryption: Encryption) -> bytes:
:param encryption: Information about how to decrypt
:returns: Decrypted data
"""
if isinstance(encryption, AESCTREncryption):
cipher = AES.new(
key = encryption.key,
mode = AES.MODE_CTR,
nonce = encryption.nonce,
initial_value = encryption.initial_value
)
return cipher.decrypt(data)
if isinstance(encryption, AESEncryption):
cipher = AES.new(encryption.key, AES.MODE_CBC, encryption.iv)
return cipher.decrypt(data)

View File

@ -1,16 +1,16 @@
from grawlix.book import Book, BookData, SingleFile, ImageList, OnlineFile
from grawlix.book import Book, BookData, SingleFile, ImageList, OnlineFile, HtmlFiles
from grawlix.exceptions import GrawlixError
from grawlix.logging import info
from .output_format import OutputFormat
from .epub import Epub
from .cbz import Cbz
from .epub import Epub
from typing import Callable
from pathlib import Path
import os
def download_book(book: Book, update_func: Callable, template: str) -> None:
async def download_book(book: Book, update_func: Callable, template: str) -> None:
"""
Download and write book to disk
@ -18,18 +18,21 @@ def download_book(book: Book, update_func: Callable, template: str) -> None:
"""
output_format = get_default_format(book.data)
location = format_output_location(book, output_format, template)
if os.path.exists(location):
if not book.overwrite and os.path.exists(location):
info("Skipping - File already exists")
return
parent = Path(location).parent
if not parent.exists():
os.makedirs(parent)
if isinstance(book.data, SingleFile):
output_format.dl_single_file(book.data, location, update_func)
await output_format.dl_single_file(book, location, update_func)
elif isinstance(book.data, ImageList):
output_format.dl_image_list(book.data, location, update_func)
await output_format.dl_image_list(book, location, update_func)
elif isinstance(book.data, HtmlFiles):
await output_format.dl_html_files(book, location, update_func)
else:
raise NotImplementedError
await output_format.close()
def format_output_location(book: Book, output_format: OutputFormat, template: str) -> str:
@ -57,6 +60,8 @@ def get_default_format(bookdata: BookData) -> OutputFormat:
return output_format_from_str(bookdata.file.extension)
if isinstance(bookdata, ImageList):
return Cbz()
if isinstance(bookdata, HtmlFiles):
return Epub()
raise GrawlixError

View File

@ -1,18 +1,28 @@
from .output_format import OutputFormat, Update
from grawlix.book import ImageList
from .output_format import OutputFormat, Update, Book
from grawlix.book import ImageList, OnlineFile
from grawlix.exceptions import UnsupportedOutputFormat
import zipfile
from zipfile import ZipFile
import asyncio
class Cbz(OutputFormat):
"""Comic book zip file"""
extension: str = "cbz"
def dl_image_list(self, book: ImageList, location: str, update: Update) -> None:
image_count = len(book.images)
with zipfile.ZipFile(location, mode="w") as zip:
for n, file in enumerate(book.images):
content = self._download_file(file)
zip.writestr(f"Image {n}.{file.extension}", content)
async def dl_image_list(self, book: Book, location: str, update: Update) -> None:
if not isinstance(book.data, ImageList):
raise UnsupportedOutputFormat
images = book.data.images
image_count = len(images)
with ZipFile(location, mode="w") as zip:
async def download_page(index: int, file: OnlineFile):
content = await self._download_file(file)
zip.writestr(f"Image {index}.{file.extension}", content)
if update:
update(1/image_count)
tasks = [
asyncio.create_task(download_page(index, file))
for index, file in enumerate(images)
]
await asyncio.wait(tasks)

View File

@ -1,5 +1,63 @@
from grawlix.book import Book, SingleFile
from .output_format import OutputFormat
from grawlix.book import HtmlFiles, HtmlFile, OnlineFile, Book
from grawlix.exceptions import UnsupportedOutputFormat
from .output_format import OutputFormat, Update
import asyncio
from bs4 import BeautifulSoup
import os
from ebooklib import epub
class Epub(OutputFormat):
extension = "epub"
async def dl_html_files(self, book: Book, location: str, update: Update) -> None:
if not isinstance(book.data, HtmlFiles):
raise UnsupportedOutputFormat
html = book.data
output = epub.EpubBook()
output.set_title(book.metadata.title)
for author in book.metadata.authors:
output.add_author(author)
file_count = len(html.htmlfiles) + 1 # Html files + cover
async def download_cover(cover_file: OnlineFile):
cover_filename = f"cover.{html.cover.extension}"
epub_cover = epub.EpubCover(file_name = cover_filename)
epub_cover.content = await self._download_file(html.cover)
output.add_item(epub_cover)
epub_cover_page = epub.EpubCoverHtml(image_name = cover_filename)
if update:
update(1/file_count)
return epub_cover_page
async def download_file(index: int, file: HtmlFile):
response = await self._client.get(file.file.url, follow_redirects=True)
soup = BeautifulSoup(response.text, "lxml")
selected_element = soup.find(attrs=file.selector)
epub_file = epub.EpubHtml(
title = file.title,
file_name = f"part {index}.html",
content = str(selected_element)
)
if update:
update(1/file_count)
return epub_file
# Download files
tasks = [
download_file(index, file)
for index, file in enumerate(html.htmlfiles)
]
epub_files = await asyncio.gather(download_cover(html.cover), *tasks)
# Add files to epub
for epub_file in epub_files:
output.add_item(epub_file)
output.spine.append(epub_file)
output.toc.append(epub_file)
# Complete book
output.add_item(epub.EpubNcx())
output.add_item(epub.EpubNav())
epub.write_epub(location, output)

View File

@ -1,8 +1,8 @@
from grawlix.book import Book, SingleFile, OnlineFile, ImageList
from grawlix.book import Book, SingleFile, OnlineFile, ImageList, HtmlFiles, Book
from grawlix.exceptions import UnsupportedOutputFormat
from grawlix.encryption import decrypt
import requests
import httpx
from typing import Callable, Optional
Update = Optional[Callable[[float], None]]
@ -11,11 +11,16 @@ class OutputFormat:
# Extension for output files
extension: str = ""
def __init__(self):
self._session = requests.Session()
def __init__(self) -> None:
self._client = httpx.AsyncClient()
def dl_single_file(self, book: SingleFile, location: str, update_func: Update) -> None:
async def close(self) -> None:
"""Cleanup"""
await self._client.aclose()
async def dl_single_file(self, book: Book, location: str, update_func: Update) -> None:
"""
Download and write an `grawlix.SingleFile` to disk
@ -23,12 +28,14 @@ class OutputFormat:
:param location: Path to where the file is written
:raises UnsupportedOutputFormat: If datatype is not supported by format
"""
if not book.file.extension == self.extension:
if not isinstance(book.data, SingleFile):
raise UnsupportedOutputFormat
self._download_and_write_file(book.file, location)
if not book.data.file.extension == self.extension:
raise UnsupportedOutputFormat
await self._download_and_write_file(book.data.file, location, update_func)
def dl_image_list(self, book: ImageList, location: str, update_func: Update) -> None:
async def dl_image_list(self, book: Book, location: str, update_func: Update) -> None:
"""
Download and write an `grawlix.ImageList` to disk
@ -39,30 +46,45 @@ class OutputFormat:
raise UnsupportedOutputFormat
def _download_file(self, file: OnlineFile) -> bytes:
async def dl_html_files(self, book: Book, location: str, update_func: Update) -> None:
"""
Download and write a `grawlix.HtmlFiles` to disk
:param book: Book to download
:param location: Path to where the file is written
:raises UnsupportedOutputFormat: If datatype is not supported by format
"""
raise UnsupportedOutputFormat
async def _download_file(self, file: OnlineFile, update: Update = None) -> bytes:
"""
Download `grawlix.OnlineFile`
:param file: File to download
:param update: Update function that is called with a percentage every time a chunk is downloaded
:returns: Content of downloaded file
"""
response = self._session.get(
file.url,
headers = file.headers
)
content = response.content
if file.encryption is not None:
content = decrypt(content, file.encryption)
content = b""
async with self._client.stream("GET", file.url, headers = file.headers, follow_redirects=True) as request:
total_filesize = int(request.headers["Content-length"])
async for chunk in request.aiter_bytes():
content += chunk
if update:
update(len(chunk)/total_filesize)
if file.encryption is not None:
content = decrypt(content, file.encryption)
return content
def _download_and_write_file(self, file: OnlineFile, location: str) -> None:
async def _download_and_write_file(self, file: OnlineFile, location: str, update: Update = None) -> None:
"""
Download `grawlix.OnlineFile` and write to content to disk
:param file: File to download
:param location: Path to where the file is written
:param update: Update function that is called with a percentage every time a chunk is downloaded
"""
content = self._download_file(file)
content = await self._download_file(file, update)
with open(location, "wb") as f:
f.write(content)

View File

@ -1,8 +1,10 @@
from grawlix.exceptions import NoSourceFound
from .source import Source
from .ereolen import Ereolen
from .flipp import Flipp
from .mangaplus import MangaPlus
from .royal_road import RoyalRoad
from .saxo import Saxo
from .webtoons import Webtoons
@ -49,8 +51,10 @@ def get_source_classes() -> list[type[Source]]:
:returns: A list of all available source types
"""
return [
Ereolen,
Flipp,
MangaPlus,
RoyalRoad,
Saxo,
Webtoons
]

131
grawlix/sources/ereolen.py Normal file
View File

@ -0,0 +1,131 @@
from grawlix.book import Result, Book, SingleFile, Metadata, OnlineFile
from grawlix.encryption import AESCTREncryption
from grawlix.exceptions import InvalidUrl, DataNotFound
from grawlix.utils import nearest_string
from .source import Source
from bs4 import BeautifulSoup
import json
import re
from Crypto.Cipher import AES
from base64 import b64decode
LOGIN_PAGE_URL = "https://ereolen.dk/adgangsplatformen/login?destination=/user"
KEY_ENCRYPTION_KEY = bytes([30, 193, 150, 69, 32, 247, 35, 95, 92, 255, 193, 159, 121, 40, 151, 179, 39, 159, 75, 110, 32, 205, 210, 58, 81, 55, 158, 33, 8, 149, 108, 74])
KEY_ENCRYPTION_IV = bytes([0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0])
class Ereolen(Source):
name: str = "eReolen"
match: list[str] = [
r"https://ereolen.dk/ting/object/\d+-.+/read",
r"https://ereolen.dk/ting/object/\d+-[^/]+/?$"
]
_authentication_methods = [ "login" ]
_login_credentials = [ "username", "password", "library" ]
async def login(self, username: str, password: str, **kwargs) -> None:
library = kwargs["library"]
login_page = await self._client.get(LOGIN_PAGE_URL, follow_redirects=True)
login_soup = BeautifulSoup(login_page.text, "lxml")
borchk_login_form = login_soup.find(id="borchk-login-form")
login_path = borchk_login_form.get("action")
library_attr_name = borchk_login_form.find("label").get("for")
libraries = self._extract_available_libraries(login_page.text)
if not library in libraries:
library = nearest_string(library, list(libraries.keys()))
await self._client.post(
f"https://login.bib.dk{login_path}",
headers = { "Content-Type": "application/x-www-form-urlencoded" },
data = {
library_attr_name: library,
"agency": libraries[library],
"userId": username,
"pincode": password
},
follow_redirects = True
)
def _extract_available_libraries(self, login_page: str) -> dict[str, str]:
"""
Extract list of available libraries from login page
:param login_page: Content of login page as string
:returns: Dictionary with name and id of each library
"""
match = re.search("libraries = ({.+})<", login_page)
if match is None:
raise DataNotFound
library_data = json.loads(match.group(1))
libraries: dict[str, str] = {}
for library in library_data["folk"]:
library_name = library["name"]
library_id = library["branchId"]
libraries[library_name] = library_id
return libraries
async def download(self, url: str) -> Result:
book_id: str = await self._get_book_id(url)
metadata_response = await self._client.get(
f"https://bookstreaming.pubhub.dk/v1/order/metadata/{book_id}",
)
metadata = metadata_response.json()
key = self._decrypt_key(metadata["key"])
return Book(
data = SingleFile(
OnlineFile(
url = f"https://bookstreaming.pubhub.dk/v1/order/file/{book_id}",
extension = "epub",
encryption = AESCTREncryption(
key,
nonce = bytes([0,0,0,0,0,0,0,0]),
initial_value = bytes([0,0,0,0,0,0,0,0])
)
)
),
metadata = Metadata(
title = metadata["title"],
authors = [ metadata["author"] ]
)
)
def _decrypt_key(self, key: str) -> bytes:
"""
Decrypt book encryption key
:param key: Base64 encoded and encrypted key
:returns: Decoded and decrypted key
"""
decoded_key = b64decode(key)
cipher = AES.new(KEY_ENCRYPTION_KEY, AES.MODE_CBC, KEY_ENCRYPTION_IV)
return cipher.decrypt(decoded_key)[:16]
async def _get_book_id(self, url: str) -> str:
"""
Download and extract book_id
:param url: Url to book page
:returns: Book id
"""
if re.match(self.match[0], url):
return await self._get_book_id_from_reader(url)
if re.match(self.match[1], url):
return await self._get_book_id_from_reader(f"{url}/read")
else:
raise InvalidUrl
async def _get_book_id_from_reader(self, url: str) -> str:
"""
Download and extract book_id from reader page
:param url: Url to reader page
:returns: Book id
"""
page = await self._client.get(url)
soup = BeautifulSoup(page.text, "lxml")
return soup.find("div", id="pubhub-reader").get("order-id")

View File

@ -18,22 +18,22 @@ class Flipp(Source):
_authentication_methods: list[str] = []
_login_cache: Optional[dict] = None
def download(self, url: str) -> Result:
async def download(self, url: str) -> Result:
if re.match(self.match[0], url):
eid = self._get_eid(url)
publication_id = self._get_series_id(eid)
return self._download_book(eid, publication_id)
publication_id = await self._get_series_id(eid)
return await self._download_book(eid, publication_id)
elif re.match(self.match[1], url):
return self._download_series(url)
return await self._download_series(url)
raise InvalidUrl
def download_book_from_id(self, book_id: Tuple[str, str]) -> Book:
async def download_book_from_id(self, book_id: Tuple[str, str]) -> Book:
series_id, issue_id = book_id
return self._download_book(issue_id, series_id)
return await self._download_book(issue_id, series_id)
def _download_series(self, url: str) -> Series:
async def _download_series(self, url: str) -> Series:
"""
Download series with book ids from Flipp
@ -41,7 +41,7 @@ class Flipp(Source):
:returns: Series object
"""
series_id = url.split("/")[-1]
login_info = self._download_login_info()
login_info = await self._download_login_info()
series_metadata = self._extract_series_data(login_info, series_id)
issues = []
for issue in series_metadata["issues"]:
@ -53,7 +53,7 @@ class Flipp(Source):
)
def _download_login_info(self) -> dict:
async def _download_login_info(self) -> dict:
"""
Download login info from Flipp
Will use cache if available
@ -62,7 +62,7 @@ class Flipp(Source):
"""
if self._login_cache:
return self._login_cache
login_info = self._session.post(
login_cache = await self._client.post(
"https://flippapi.egmontservice.com/api/signin",
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:111.0) Gecko/20100101 Firefox/111.0"
@ -77,9 +77,9 @@ class Flipp(Source):
"uuid": "",
"os": ""
}
).json()
self.login_cache = login_info
return login_info
)
self._login_cache = login_cache.json()
return login_cache.json()
def _extract_series_data(self, response: dict, series_id: str) -> dict:
@ -96,7 +96,7 @@ class Flipp(Source):
raise DataNotFound
def _download_book(self, issue_id: str, series_id: str) -> Book:
async def _download_book(self, issue_id: str, series_id: str) -> Book:
"""
Download book from Flipp
@ -104,8 +104,8 @@ class Flipp(Source):
:param series_id: Series identifier
:returns: Book metadata
"""
pages = self._get_pages(issue_id, series_id)
metadata = self._get_metadata(issue_id, series_id)
pages = await self._get_pages(issue_id, series_id)
metadata = await self._get_metadata(issue_id, series_id)
return Book(
data = ImageList(pages),
metadata = Metadata(
@ -116,7 +116,7 @@ class Flipp(Source):
)
def _get_metadata(self, issue_id: str, series_id: str) -> dict:
async def _get_metadata(self, issue_id: str, series_id: str) -> dict:
"""
Download and extract issue data
@ -124,7 +124,7 @@ class Flipp(Source):
:param series_id: Series id
:returns: Issue metadata
"""
login_info = self._download_login_info()
login_info = await self._download_login_info()
series_metadata = self._extract_series_data(login_info, series_id)
for issue in series_metadata["issues"]:
if issue["customIssueCode"] == issue_id:
@ -136,14 +136,14 @@ class Flipp(Source):
return get_arg_from_url(url, "edid")
def _get_series_id(self, issue_id: str) -> str:
async def _get_series_id(self, issue_id: str) -> str:
"""
Download series id from issue id
:param issue_id: Issue id
:returns: Series id
"""
response = self._session.get(f"{BASEURL}/production/default.aspx?pubname=&edid={issue_id}")
response = await self._client.get(f"{BASEURL}/production/default.aspx?pubname=&edid={issue_id}")
# TODO Make faster
search = re.search(r'publicationguid = "([^"]+)', response.text)
if search is None:
@ -151,7 +151,7 @@ class Flipp(Source):
return search.group(1)
def _get_pages(self, issue_id: str, series_id: str) -> list[OnlineFile]:
async def _get_pages(self, issue_id: str, series_id: str) -> list[OnlineFile]:
"""
Download page metadata for book
@ -159,7 +159,7 @@ class Flipp(Source):
:param series_id: Series id
:return: Page image links
"""
response = self._session.get(
response = await self._client.get(
f"{BASEURL}/get_page_groups_from_eid.aspx?pubid={series_id}&eid={issue_id}",
)
result = []

View File

@ -17,28 +17,28 @@ class MangaPlus(Source):
_authentication_methods: list[str] = []
def download(self, url: str) -> Result:
async def download(self, url: str) -> Result:
if re.match(self.match[0], url):
issue_id = url.split('/')[-1]
return self._download_issue(issue_id)
return await self._download_issue(issue_id)
if re.match(self.match[1], url):
series_id = url.split("/")[-1]
return self._download_series(series_id)
return await self._download_series(series_id)
raise InvalidUrl
def download_book_from_id(self, book_id: str) -> Book:
return self._download_issue(book_id)
async def download_book_from_id(self, book_id: str) -> Book:
return await self._download_issue(book_id)
def _download_series(self, series_id: str) -> Series:
async def _download_series(self, series_id: str) -> Series:
"""
Download series from Manga Plus
:param series_id: Identifier for series
:returns: Series data
"""
content = self._session.get(
response = await self._client.get(
f"https://jumpg-api.tokyo-cdn.com/api/title_detailV2",
params = {
"title_id": series_id,
@ -48,8 +48,8 @@ class MangaPlus(Source):
"app_ver": "40",
"secret": "2afb69fbb05f57a1856cf75e1c4b6ee6"
},
).content
data, _ = blackboxprotobuf.protobuf_to_json(content)
)
data, _ = blackboxprotobuf.protobuf_to_json(response.content)
parsed = json.loads(data)
title = parsed["1"]["8"]["1"]["2"]
issues = []
@ -70,7 +70,7 @@ class MangaPlus(Source):
book_ids = issues
)
def _download_issue(self, issue_id: str) -> Book:
async def _download_issue(self, issue_id: str) -> Book:
"""
Download issue from Manga Plus
@ -78,10 +78,10 @@ class MangaPlus(Source):
:returns: Issue metadata
"""
url = f"https://jumpg-webapi.tokyo-cdn.com/api/manga_viewer?chapter_id={issue_id}&split=yes&img_quality=super_high"
content = self._session.get(url).content
response, _ = blackboxprotobuf.protobuf_to_json(content)
response = await self._client.get(url)
content, _ = blackboxprotobuf.protobuf_to_json(response.content)
images = []
parsed = json.loads(response)
parsed = json.loads(content)
for image in parsed["1"]["10"]["1"]:
if "1" in image:
images.append(

114
grawlix/sources/marvel.py Normal file
View File

@ -0,0 +1,114 @@
from grawlix.book import Book, Metadata, ImageList, OnlineFile, Series, Result
from grawlix.exceptions import InvalidUrl
from .source import Source
# Personal marvel ip key
API_KEY = "83ac0da31d3f6801f2c73c7e07ad76e8"
class Marvel(Source[str]):
name: str = "Marvel"
match = [
r"https://www.marvel.com/comics/series/\d+/.+"
]
_authentication_methods: list[str] = [ "cookies" ]
async def download(self, url: str) -> Result[str]:
match_index = self.get_match_index(url)
if match_index == 0:
return await self._download_series(url)
raise InvalidUrl
async def _download_series(self, url: str) -> Series[str]:
"""
Download series
:param url: Url of series
:returns: Series data
"""
series_id = url.split("/")[-2]
issue_ids = await self._download_issue_ids(series_id)
metadata = await self._download_series_metadata(series_id)
return Series(
title = metadata["data"]["results"][0]["title"],
book_ids = issue_ids
)
async def _download_issue_ids(self, series_id: str) -> list[str]:
"""
Download issue ids from series
:param series_id: Id of comic series on marvel.com
:returns: List of comic ids for marvel comics
"""
response = self._client.get(
f"https://api.marvel.com/browse/comics?byType=comic_series&isDigital=1&limit=10000&byId={series_id}",
).json()
issue_ids = [issue["digital_id"] for issue in response["data"]["results"]]
return issue_ids
async def _download_series_metadata(self, series_id: str) -> dict:
"""
Download series metadata
:param series_id: Id of comic series on marvel.com
:returns: Dictionary with metadata
"""
response = await self._client.get(
f"https://gateway.marvel.com:443/v1/public/series/{series_id}?apikey={API_KEY}",
headers = {
"Referer": "https://developer.marvel.com/"
}
)
return response.json()
async def download_book_from_id(self, issue_id: str) -> Book:
return Book(
metadata = await self._download_issue_metadata(issue_id),
data = await self._download_issue_pages(issue_id)
)
async def _download_issue_metadata(self, issue_id: str) -> Metadata:
"""
Download and parse metadata for issue
:param issue_id: Identifier for issue
:returns: Issue metadata
"""
response = await self._client.get(
f"https://bifrost.marvel.com/v1/catalog/digital-comics/metadata/{issue_id}"
)
issue_meta = response.json()["data"]["results"][0]["issue_meta"]
return Metadata(
title = issue_meta["title"],
series = issue_meta["series_title"],
publisher = "Marvel",
authors = [c["full_name"] for c in issue_meta["creators"]["extended_list"]]
)
async def _download_issue_pages(self, issue_id: str) -> ImageList:
"""
Download list of page links for issue
:param issue_id: Identifier for issue
:returns: List of links to comic pages
"""
response = await self._client.get(
f"https://bifrost.marvel.com/v1/catalog/digital-comics/web/assets/{issue_id}"
)
images = []
for page in response.json()["data"]["results"][0]["pages"]:
images.append(
OnlineFile(
url = page["assets"]["source"],
extension = "jpg"
)
)
return ImageList(images)

View File

@ -0,0 +1,45 @@
from .source import Source
from grawlix.book import Book, HtmlFile, HtmlFiles, OnlineFile, Metadata
from bs4 import BeautifulSoup
class RoyalRoad(Source):
name: str = "Royal Road"
match = [
r"https://www.royalroad.com/fiction/\d+/[^/]+"
]
_authentication_methods: list[str] = []
async def download(self, url: str) -> Book:
response = await self._client.get(url)
soup = BeautifulSoup(response.text, "lxml")
chapters = []
for chapter in soup.find_all("tr", class_="chapter-row"):
chapters.append(
HtmlFile(
title = chapter.find("a").text.strip(),
file = OnlineFile(
url = f"https://royalroad.com{chapter.get('data-url')}",
extension = "html"
),
selector = { "class": "chapter-content" }
)
)
return Book(
data = HtmlFiles(
cover = OnlineFile(
url = soup.find("div", class_="cover-art-container") \
.find("img") \
.get("src") \
.replace("full", "large"),
extension = "jpg"
),
htmlfiles = chapters
),
metadata = Metadata(
title = soup.find("meta", attrs={"name":"twitter:title"}).get("content"),
authors = [ soup.find("meta", attrs={"name":"twitter:creator"}).get("content") ]
),
overwrite = True
)

View File

@ -13,8 +13,8 @@ class Saxo(Source):
_authentication_methods = [ "login" ]
user_id: str
def login(self, username: str, password: str, **kwargs) -> None:
response = self._session.post(
async def login(self, username: str, password: str, **kwargs) -> None:
response = await self._client.post(
"https://auth-read.saxo.com/auth/token",
data = {
"username": username,
@ -27,7 +27,7 @@ class Saxo(Source):
)
json = response.json()
bearer_token = json["access_token"]
self._session.headers = {
self._client.headers = {
"Appauthorization": f"bearer {bearer_token}",
"App-Os": "android",
"App-Version": "6.2.4"
@ -35,16 +35,16 @@ class Saxo(Source):
self.user_id = json["id"]
def download(self, url: str) -> Book:
async def download(self, url: str) -> Book:
isbn = self._extract_isbn_from_url(url)
book_id = self._get_book_id(isbn)
metadata = self._get_book_metadata(book_id)
book_id = await self._get_book_id(isbn)
metadata = await self._get_book_metadata(book_id)
ebook_id = metadata["id"] # Id of ebook file
return Book(
metadata = self._extract_metadata(metadata),
data = SingleFile(
OnlineFile(
url = self._get_book_file_link(ebook_id),
url = await self._get_book_file_link(ebook_id),
extension = "epub",
# Encryption keys extracted from app
encryption = AESEncryption(
@ -56,33 +56,33 @@ class Saxo(Source):
)
def _get_book_id(self, isbn: str) -> str:
async def _get_book_id(self, isbn: str) -> str:
"""
Download internal book id of book from isbn
:param isbn: Isbn of book
:returns: Saxo internal book id
"""
response = self._session.get(
response = await self._client.get(
f"https://api-read.saxo.com/api/v2/search/user/{self.user_id}/premium/books/{isbn}"
)
return response.json()["items"][0]["bookId"]
def _get_book_metadata(self, book_id: str) -> dict:
async def _get_book_metadata(self, book_id: str) -> dict:
"""
Download metadata of book
:param book_id: Id of book
:returns: Metadata of book
"""
response = self._session.get(
response = await self._client.get(
f"https://api-read.saxo.com/api/v2/book/{book_id}/user/{self.user_id}/details"
)
return response.json()["ebooks"][0]
def _get_book_file_link(self, ebook_id: str) -> str:
async def _get_book_file_link(self, ebook_id: str) -> str:
"""
Download link to epub file
@ -90,12 +90,13 @@ class Saxo(Source):
:returns: Link to ebook file
:raises ThrottleError: If there have been too many downloads
"""
response = self._session.get(
response = await self._client.get(
f"https://api-read.saxo.com/api/v1/book/{ebook_id}/content/encryptedstream/"
).json()
if not "link" in response:
)
json = response.json()
if not "link" in json:
raise ThrottleError
return response["link"]
return json["link"]
@staticmethod

View File

@ -2,8 +2,9 @@ from grawlix.book import Book, Series, Result
from typing import Generic, TypeVar, Tuple, Optional
from http.cookiejar import MozillaCookieJar
import requests
import re
from typing import Generic, TypeVar, Tuple
import httpx
T = TypeVar("T")
@ -15,10 +16,11 @@ class Source(Generic[T]):
name: str = "UNKNOWN"
match: list[str] = []
_authentication_methods: list[str] = []
_login_credentials = [ "username", "password" ]
authenticated = False
def __init__(self):
self._session = requests.Session()
self._client = httpx.AsyncClient()
@property
@ -33,7 +35,7 @@ class Source(Generic[T]):
return "login" in self._authentication_methods
def login(self, username: str, password: str, **kwargs: str):
async def login(self, username: str, password: str, **kwargs: str):
"""
Login to source
@ -58,11 +60,11 @@ class Source(Generic[T]):
if self.supports_cookies:
cookie_jar = MozillaCookieJar()
cookie_jar.load(cookie_file, ignore_expires=True)
self._session.cookies.update(cookie_jar)
self._client.cookies.update(cookie_jar)
self.authenticated = True
def download(self, url: str) -> Result[T]:
async def download(self, url: str) -> Result[T]:
"""
Download book metadata from source
@ -72,7 +74,7 @@ class Source(Generic[T]):
raise NotImplementedError
def download_book_from_id(self, book_id: T) -> Book:
async def download_book_from_id(self, book_id: T) -> Book:
"""
Download book from id

View File

@ -19,19 +19,19 @@ class Webtoons(Source[str]):
]
_authentication_methods: list[str] = []
def download(self, url: str) -> Result[str]:
async def download(self, url: str) -> Result[str]:
if re.match(self.match[0], url):
return self._download_episode(url)
return await self._download_episode(url)
if re.match(self.match[1], url):
return self._download_series(url)
return await self._download_series(url)
raise InvalidUrl
def download_book_from_id(self, book_id: str) -> Book:
return self._download_episode(book_id)
async def download_book_from_id(self, book_id: str) -> Book:
return await self._download_episode(book_id)
def _download_series(self, url: str) -> Series[str]:
async def _download_series(self, url: str) -> Series[str]:
"""
Download a series of webtoons
@ -39,7 +39,7 @@ class Webtoons(Source[str]):
:returns: Webtoons series data
"""
parsed_url = urlparse(url)
page = self._session.get(
response = await self._client.get(
f"https://m.webtoons.com{parsed_url.path}",
params = parsed_url.query,
headers = {
@ -49,9 +49,10 @@ class Webtoons(Source[str]):
"needGDPR": "FALSE",
"needCCPA": "FALSE",
"needCOPPA": "FALSE"
}
).text
soup = BeautifulSoup(page, "lxml")
},
follow_redirects = True,
)
soup = BeautifulSoup(response.text, "lxml")
title = soup.find("meta", property="og:title").get("content")
episodes = []
for episode in soup.find_all("li", class_="_episodeItem"):
@ -63,15 +64,15 @@ class Webtoons(Source[str]):
)
def _download_episode(self, url: str) -> Book:
async def _download_episode(self, url: str) -> Book:
"""
Download single webtoon episode
:param url: Url of episode
:returns: Episode
"""
page = self._session.get(url).text
soup = BeautifulSoup(page, "lxml")
response = await self._client.get(url, follow_redirects = True)
soup = BeautifulSoup(response.text, "lxml")
title = soup.find("h1", class_="subj_episode").get("title")
series = soup.find("div", class_="subj_info").find("a").get("title")
images = []

View File

@ -1,6 +1,7 @@
from grawlix.exceptions import DataNotFound
from urllib.parse import urlparse, parse_qs
from functools import lru_cache
def get_arg_from_url(url: str, key: str) -> str:
parsed_url = urlparse(url)
@ -9,3 +10,31 @@ def get_arg_from_url(url: str, key: str) -> str:
return query[key][0]
except:
raise DataNotFound
@lru_cache
def levenstein_distance(a: str, b: str) -> int:
"""
Calculates the levenstein distance between `a` and `b`
https://en.wikipedia.org/wiki/Levenshtein_distance
"""
if len(a) == 0:
return len(b)
if len(b) == 0:
return len(a)
if a[0] == b[0]:
return levenstein_distance(a[1:], b[1:])
return 1 + min(
levenstein_distance(a, b[1:]), # Character is inserted
levenstein_distance(a[1:], b), # Character is deleted
levenstein_distance(a[1:], b[1:]) # Character is replaced
)
def nearest_string(input: str, list: list[str]) -> str:
"""
Finds the nearest string in `list` to `input` based on levenstein distance
"""
return sorted(list, key = lambda x: levenstein_distance(input, x))[0]

View File

@ -14,12 +14,13 @@ dependencies = [
"appdirs",
"beautifulsoup4",
"blackboxprotobuf",
"EbookLib",
"httpx",
"importlib-resources",
"lxml",
"pycryptodome",
"requests",
"rich",
"tomli"
"tomli",
]
dynamic = ["version"]
@ -28,7 +29,7 @@ dynamic = ["version"]
"Bugtracker" = "https://github.com/jo1gi/ebook-dl/issues"
[project.scripts]
audiobook-dl = "grawlix.__main__:main"
audiobook-dl = "grawlix.__main__:run"
[build-system]

View File

@ -20,6 +20,18 @@ let
doCheck = false;
};
ebooklib = python3Packages.buildPythonPackage rec {
pname = "EbookLib";
version = "0.18";
src = python3Packages.fetchPypi {
inherit pname version;
sha256 = "sha256-OFYmQ6e8lNm/VumTC0kn5Ok7XR0JF/aXpkVNtaHBpTM=";
};
propagatedBuildInputs = with python3Packages; [
six
lxml
];
};
in
mkShell {
buildInputs = [
@ -27,10 +39,11 @@ mkShell {
appdirs
beautifulsoup4
blackboxprotobuf
ebooklib
httpx
importlib-resources
lxml
pycryptodome
requests
rich
tomli