This commit is contained in:
Joakim Holm 2023-06-27 20:34:07 +02:00
commit 961789d316
17 changed files with 715 additions and 36 deletions

1
.gitignore vendored
View File

@ -36,3 +36,4 @@ MANIFEST
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
cookies.txt

View File

@ -9,14 +9,24 @@ CLI ebook downloader
## Supported services
grawlix currently supports downloading from the following sources:
- [eReolen](https://ereolen.dk)
- [fanfiction.net](https://www.fanfiction.net)
- [Flipp](https://flipp.dk)
- [Internet Archive](https://archive.org)
- [Manga Plus](https://mangaplus.shueisha.co.jp)
- [Marvel Unlimited](https://marvel.com)
- [Nextory](https://nextory.com)
- [Royal Road](https://www.royalroad.com)
- [Saxo](https://saxo.com)
- [Webtoons](https://webtoons.com)
## Installation
### From pypi (recommended)
```shell
pip install grawlix
```
### From repo (unstable)
```shell
git clone https://github.com/jo1gi/grawlix.git
cd grawlix
@ -24,6 +34,10 @@ python3 setup.py install
```
## Authentication
Authentication can either be done with login (username and password) or cookies.
Not all sources support both methods.
### Login
Some sources require authentication, which can be done either with cli arguments
or a config file.
@ -38,6 +52,15 @@ grawlix --username "user@example.com" --password "SuperSecretPassword" <url>
username = "user@example.com"
password = "SuperSecretPassword"
```
Config file should be placed in `~/.config/grawlix/grawlix.toml`
### Cookies
Some sources can be authenticated with Netscape cookie files. I use
[this extension](https://github,com/rotemdan/ExportCookies) to export my
cookies from my browser.
Cookies can be placed in current dir as `cookies.txt` or be given with the
`--cookie` argument.
## Download books
To download a book run:

View File

@ -8,6 +8,7 @@ from . import arguments, logging
from typing import Tuple, Optional
from rich.progress import Progress
from functools import partial
import os
import asyncio
import traceback
@ -49,6 +50,20 @@ def get_urls(options) -> list[str]:
return urls
def get_cookie_file(options) -> Optional[str]:
"""
Get path to cookie file
:param options: Cli arguments
:returns: Path to cookie file
"""
if options.cookie_file is not None and os.path.exists(options.cookie_file):
return options.cookie_file
if os.path.exists("./cookies.txt"):
return "./cookies.txt"
return None
async def authenticate(source: Source, config: Config, options):
"""
Authenticate with source
@ -62,6 +77,12 @@ async def authenticate(source: Source, config: Config, options):
username, password, library = get_login(source, config, options)
await source.login(username, password, library=library)
source.authenticated = True
elif source.supports_cookies:
cookie_file = get_cookie_file(options)
if cookie_file:
source.load_cookies(cookie_file)
else:
raise SourceNotAuthenticated
else:
raise SourceNotAuthenticated

View File

@ -44,6 +44,12 @@ def parse_arguments() -> argparse.Namespace:
help = "Library for login",
dest = "library",
)
parser.add_argument(
'-c',
'--cookies',
help = "Path to netscape cookie file",
dest = "cookie_file"
)
# Outputs
parser.add_argument(
'-o',
@ -55,6 +61,7 @@ def parse_arguments() -> argparse.Namespace:
parser.add_argument(
'--debug',
help = "Enable debug messages",
dest = "debug"
dest = "debug",
action="store_true",
)
return parser.parse_args()

View File

@ -1,6 +1,7 @@
from grawlix import Encryption
from dataclasses import dataclass, field
from typing import Optional, Union, TypeVar, Generic
from typing import Optional, Union, TypeVar, Generic, Any
from datetime import date
@dataclass(slots=True)
class Metadata:
@ -11,6 +12,8 @@ class Metadata:
language: Optional[str] = None
publisher: Optional[str] = None
identifier: Optional[str] = None
description: Optional[str] = None
release_date: Optional[date] = None
def as_dict(self) -> dict:
return {
@ -20,6 +23,8 @@ class Metadata:
"identifier": self.identifier or "UNKNOWN",
"language": self.language or "UNKNOWN",
"authors": "; ".join(self.authors),
"description": self.description or "UNKNOWN",
"relase_date": self.release_date.isoformat() if self.release_date else "UNKNOWN",
}
@ -30,6 +35,7 @@ class OnlineFile:
extension: str
encryption: Optional[Encryption] = None
headers: Optional[dict[str, str]] = None
cookies: Optional[Any] = None # TODO Change type
@dataclass(slots=True)
class OfflineFile:
@ -55,6 +61,16 @@ class ImageList:
"""
images: list[OnlineFile]
@dataclass(slots=True)
class EpubInParts:
"""
Epub split up into smaller epubs
"""
files: list[OnlineFile]
files_in_toc: dict[str, str]
@dataclass(slots=True)
class HtmlFile:
title: str
@ -63,10 +79,11 @@ class HtmlFile:
@dataclass(slots=True)
class HtmlFiles:
cover: OnlineFile
htmlfiles: list[HtmlFile]
cover: Optional[OnlineFile] = None
BookData = Union[
EpubInParts,
SingleFile,
ImageList,
HtmlFiles

View File

@ -1,4 +1,4 @@
from grawlix.book import Book, BookData, SingleFile, ImageList, OnlineFile, HtmlFiles
from grawlix.book import Book, BookData, SingleFile, ImageList, OnlineFile, HtmlFiles, EpubInParts
from grawlix.exceptions import GrawlixError, UnsupportedOutputFormat
from grawlix.logging import info
@ -58,9 +58,9 @@ def get_default_format(book: Book) -> OutputFormat:
bookdata = book.data
if isinstance(bookdata, SingleFile):
extension = bookdata.file.extension
if isinstance(bookdata, ImageList):
elif isinstance(bookdata, ImageList):
extension = "cbz"
if isinstance(bookdata, HtmlFiles):
elif isinstance(bookdata, HtmlFiles) or isinstance(bookdata, EpubInParts):
extension = "epub"
output_format = find_output_format(book, extension)
return output_format()

View File

@ -1,6 +1,7 @@
from .output_format import OutputFormat, Update, Book
from grawlix.book import ImageList, OnlineFile
from grawlix.exceptions import UnsupportedOutputFormat
from .metadata.comicinfo import to_comic_info
from zipfile import ZipFile
import asyncio
@ -29,3 +30,4 @@ class Cbz(OutputFormat):
for index, file in enumerate(images)
]
await asyncio.wait(tasks)
zip.writestr("ComicInfo.xml", to_comic_info(book.metadata))

View File

@ -1,4 +1,4 @@
from grawlix.book import HtmlFiles, HtmlFile, OnlineFile, Book, SingleFile, Metadata
from grawlix.book import HtmlFiles, HtmlFile, OnlineFile, Book, SingleFile, Metadata, EpubInParts
from grawlix.exceptions import UnsupportedOutputFormat
from .output_format import OutputFormat, Update
@ -6,19 +6,25 @@ import asyncio
from bs4 import BeautifulSoup
import os
from ebooklib import epub
from zipfile import ZipFile
import rich
class Epub(OutputFormat):
extension = "epub"
input_types = [SingleFile, HtmlFiles]
input_types = [SingleFile, HtmlFiles, EpubInParts]
async def download(self, book: Book, location: str, update: Update) -> None:
if isinstance(book.data, SingleFile):
await self._download_single_file(book, location, update)
elif isinstance(book.data, HtmlFiles):
await self._download_html_files(book.data, book.metadata, location, update)
elif isinstance(book.data, EpubInParts):
await self._download_epub_in_parts(book.data, book.metadata, location, update)
else:
raise UnsupportedOutputFormat
async def _download_html_files(self, html: HtmlFiles, metadata: Metadata, location: str, update: Update) -> None:
output = epub.EpubBook()
output.set_title(metadata.title)
@ -27,9 +33,9 @@ class Epub(OutputFormat):
file_count = len(html.htmlfiles) + 1 # Html files + cover
async def download_cover(cover_file: OnlineFile):
cover_filename = f"cover.{html.cover.extension}"
cover_filename = f"cover.{cover_file.extension}"
epub_cover = epub.EpubCover(file_name = cover_filename)
epub_cover.content = await self._download_file(html.cover)
epub_cover.content = await self._download_file(cover_file)
output.add_item(epub_cover)
epub_cover_page = epub.EpubCoverHtml(image_name = cover_filename)
if update:
@ -38,7 +44,12 @@ class Epub(OutputFormat):
async def download_file(index: int, file: HtmlFile):
response = await self._client.get(file.file.url, follow_redirects=True)
response = await self._client.get(
file.file.url,
headers = file.file.headers,
cookies = file.file.cookies,
follow_redirects=True
)
soup = BeautifulSoup(response.text, "lxml")
selected_element = soup.find(attrs=file.selector)
epub_file = epub.EpubHtml(
@ -55,7 +66,9 @@ class Epub(OutputFormat):
download_file(index, file)
for index, file in enumerate(html.htmlfiles)
]
epub_files = await asyncio.gather(download_cover(html.cover), *tasks)
if html.cover:
tasks.append(download_cover(html.cover))
epub_files = await asyncio.gather(*tasks)
# Add files to epub
for epub_file in epub_files:
@ -67,3 +80,59 @@ class Epub(OutputFormat):
output.add_item(epub.EpubNcx())
output.add_item(epub.EpubNav())
epub.write_epub(location, output)
async def _download_epub_in_parts(self, data: EpubInParts, metadata: Metadata, location: str, update: Update) -> None:
files = data.files
file_count = len(files)
progress = 1/(file_count)
temporary_file_location = f"{location}.tmp"
added_files: set[str] = set()
def get_new_files(zipfile: ZipFile):
"""Returns files in zipfile not already added to file"""
for filename in zipfile.namelist():
if filename in added_files or filename.endswith(".opf") or filename.endswith(".ncx"):
continue
yield filename
output = epub.EpubBook()
for file in files:
await self._download_and_write_file(file, temporary_file_location)
with ZipFile(temporary_file_location, "r") as zipfile:
for filepath in get_new_files(zipfile):
content = zipfile.read(filepath)
if filepath.endswith("html"):
filename = os.path.basename(filepath)
is_in_toc = False
title = None
for key, value in data.files_in_toc.items():
toc_filename = key.split("#")[0]
if filename == toc_filename:
title = value
is_in_toc = True
break
epub_file = epub.EpubHtml(
title = title,
file_name = filepath,
content = content
)
output.add_item(epub_file)
output.spine.append(epub_file)
if is_in_toc:
output.toc.append(epub_file)
else:
epub_file = epub.EpubItem(
file_name = filepath,
content = content
)
output.add_item(epub_file)
added_files.add(filepath)
if update:
update(progress)
os.remove(temporary_file_location)
output.add_item(epub.EpubNcx())
output.add_item(epub.EpubNav())
epub.write_epub(location, output)
exit()

View File

@ -0,0 +1,5 @@
from grawlix.book import Metadata
from typing import Callable, Any
MetadataOutput = Callable[[Metadata], Any]

View File

@ -0,0 +1,37 @@
from grawlix.book import Metadata
import xml.etree.ElementTree as ET
from typing import Optional
def add_value(element: ET.Element, name: str, value: Optional[str]) -> None:
"""
Add new tag to element
:param element: Element to add tag to
:param name: Name of new tag
:param value: Contents of new tag
"""
if value:
subelement = ET.SubElement(element, name)
subelement.text = str(value)
def to_comic_info(metadata: Metadata) -> str:
"""
Output as ComicRack metadata format (ComicInfo)
Documentation: https://anansi-project.github.io/docs/comicinfo
:param metadata: Book metadata
:returns: ComicInfo xml as a string
"""
root = ET.Element("ComicInfo")
add_value(root, "Title", metadata.title)
add_value(root, "Series", metadata.series)
add_value(root, "Summary", metadata.description)
add_value(root, "Publisher", metadata.publisher)
add_value(root, "Year", getattr(metadata.release_date, "year", None))
add_value(root, "Month", getattr(metadata.release_date, "month", None))
add_value(root, "Day", getattr(metadata.release_date, "day", None))
add_value(root, "Format", "Web")
return ET.tostring(root).decode("utf8")

View File

@ -59,7 +59,7 @@ class OutputFormat:
:returns: Content of downloaded file
"""
content = b""
async with self._client.stream("GET", file.url, headers = file.headers, follow_redirects=True) as request:
async with self._client.stream("GET", file.url, headers = file.headers, cookies = file.cookies, follow_redirects=True) as request:
total_filesize = int(request.headers["Content-length"])
async for chunk in request.aiter_bytes():
content += chunk

View File

@ -2,9 +2,12 @@ from grawlix.exceptions import InvalidUrl
from .source import Source
from .ereolen import Ereolen
from .fanfictionnet import FanfictionNet
from .flipp import Flipp
from .internet_archive import InternetArchive
from .mangaplus import MangaPlus
from .marvel import Marvel
from .nextory import Nextory
from .royal_road import RoyalRoad
from .saxo import Saxo
from .webtoons import Webtoons
@ -53,9 +56,12 @@ def get_source_classes() -> list[type[Source]]:
"""
return [
Ereolen,
FanfictionNet,
Flipp,
InternetArchive,
MangaPlus,
Marvel,
Nextory,
RoyalRoad,
Saxo,
Webtoons

View File

@ -0,0 +1,55 @@
from .source import Source
from grawlix.book import Book, HtmlFile, HtmlFiles, OnlineFile, Metadata
from bs4 import BeautifulSoup
USER_AGENT = "Mozilla/5.0 (Windows NT 10.0; rv:113.0) Gecko/20100101 Firefox/113.0"
class FanfictionNet(Source):
name: str = "fanfiction.net"
match = [
r"https://www.fanfiction.net/s/\d+/\d+.*"
]
_authentication_methods: list[str] = [ "cookies" ]
async def download(self, url: str) -> Book:
book_id = self._extract_id(url)
response = await self._client.get(
f"https://www.fanfiction.net/s/{book_id}/1",
headers = {
"User-Agent": USER_AGENT
}
)
soup = BeautifulSoup(response.text, "lxml")
chapters = []
for index, chapter in enumerate(soup.find(id="chap_select").find_all("option")):
chapters.append(
HtmlFile(
title = chapter.text,
file = OnlineFile(
url = f"https://www.fanfiction.net/s/{book_id}/{index+1}",
extension = "html",
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; rv:113.0) Gecko/20100101 Firefox/113.0",
},
cookies = self._client.cookies
),
selector = { "id": "storytext" }
)
)
return Book(
data = HtmlFiles(htmlfiles = chapters),
metadata = Metadata(
title = soup.find("b", class_="xcontrast_txt").text,
)
)
@staticmethod
def _extract_id(url: str) -> str:
"""
Extracts book id from url
:param url: Url of book
:returns: Id of book
"""
return url.split("/")[4]

View File

@ -1,6 +1,7 @@
from .source import Source
from grawlix.book import Book, Metadata, ImageList, OnlineFile, Series, Result
from grawlix.exceptions import InvalidUrl, DataNotFound
from grawlix.logging import debug
from grawlix.utils import get_arg_from_url
import re
@ -9,31 +10,41 @@ from typing import Tuple, Optional
BASEURL = "https://reader.flipp.dk/html5/reader"
LANGUAGE_CODE_MAPPING = {
"dk": "da-DK",
"no": "nb-NO",
"se": "sv-SE"
}
class Flipp(Source):
name: str = "Flipp"
match = [
r"https?://reader.flipp.dk/html5/reader/production/default.aspx\?pubname=&edid=([^/]+)",
r"https?://magasiner.flipp.dk/flipp/web-app/#/publications/.+"
r"https?://reader.flipp.(dk|no|se)/html5/reader/production/default.aspx\?pubname=&edid=([^/]+)",
r"https?://(magasiner|blader).flipp.(dk|no|se)/flipp/web-app/#/publications/.+"
]
_authentication_methods: list[str] = []
_login_cache: Optional[dict] = None
_login_cache: dict = {}
async def download(self, url: str) -> Result:
domain_extension = self.get_domain_extension(url)
if re.match(self.match[0], url):
eid = self._get_eid(url)
publication_id = await self._get_series_id(eid)
return await self._download_book(eid, publication_id)
issue_id = self._extract_issue_id(url)
series_id = await self._get_series_id(issue_id)
debug(f"{series_id=}")
return await self._download_book(issue_id, series_id, domain_extension)
elif re.match(self.match[1], url):
return await self._download_series(url)
return await self._download_series(url, domain_extension)
raise InvalidUrl
async def download_book_from_id(self, book_id: Tuple[str, str]) -> Book:
series_id, issue_id = book_id
return await self._download_book(issue_id, series_id)
async def download_book_from_id(self, book_id: Tuple[str, str, str]) -> Book:
series_id, issue_id, language_code = book_id
return await self._download_book(issue_id, series_id, language_code)
async def _download_series(self, url: str) -> Series:
async def _download_series(self, url: str, language_code) -> Series:
"""
Download series with book ids from Flipp
@ -41,27 +52,27 @@ class Flipp(Source):
:returns: Series object
"""
series_id = url.split("/")[-1]
login_info = await self._download_login_info()
login_info = await self._download_login_info(language_code)
series_metadata = self._extract_series_data(login_info, series_id)
issues = []
for issue in series_metadata["issues"]:
issue_id = issue["customIssueCode"]
issues.append((series_id, issue_id))
issues.append((series_id, issue_id, language_code))
return Series(
title = series_metadata["name"],
book_ids = issues
)
async def _download_login_info(self) -> dict:
async def _download_login_info(self, language_code: str) -> dict:
"""
Download login info from Flipp
Will use cache if available
:returns: Login info
"""
if self._login_cache:
return self._login_cache
if language_code in self._login_cache:
return self._login_cache[language_code]
login_cache = await self._client.post(
"https://flippapi.egmontservice.com/api/signin",
headers = {
@ -71,14 +82,14 @@ class Flipp(Source):
"email": "",
"password": "",
"token": "",
"languageCulture": "da-DK",
"languageCulture": LANGUAGE_CODE_MAPPING[language_code],
"appId": "",
"appVersion": "",
"uuid": "",
"os": ""
}
)
self._login_cache = login_cache.json()
self._login_cache[language_code] = login_cache.json()
return login_cache.json()
@ -96,7 +107,7 @@ class Flipp(Source):
raise DataNotFound
async def _download_book(self, issue_id: str, series_id: str) -> Book:
async def _download_book(self, issue_id: str, series_id: str, language_code: str) -> Book:
"""
Download book from Flipp
@ -105,7 +116,7 @@ class Flipp(Source):
:returns: Book metadata
"""
pages = await self._get_pages(issue_id, series_id)
metadata = await self._get_metadata(issue_id, series_id)
metadata = await self._get_metadata(issue_id, series_id, language_code)
return Book(
data = ImageList(pages),
metadata = Metadata(
@ -116,7 +127,7 @@ class Flipp(Source):
)
async def _get_metadata(self, issue_id: str, series_id: str) -> dict:
async def _get_metadata(self, issue_id: str, series_id: str, language_code: str) -> dict:
"""
Download and extract issue data
@ -124,7 +135,7 @@ class Flipp(Source):
:param series_id: Series id
:returns: Issue metadata
"""
login_info = await self._download_login_info()
login_info = await self._download_login_info(language_code)
series_metadata = self._extract_series_data(login_info, series_id)
for issue in series_metadata["issues"]:
if issue["customIssueCode"] == issue_id:
@ -132,7 +143,28 @@ class Flipp(Source):
return issue
raise DataNotFound
def _get_eid(self, url: str) -> str:
@staticmethod
def get_domain_extension(url: str) -> str:
"""
Extract domain extension from url
:param url: Url to parse
:returns: Domain extension of url
"""
parsed_url = urlparse(url)
extension = parsed_url.netloc.split(".")[-1]
return extension
@staticmethod
def _extract_issue_id(url: str) -> str:
"""
Extract eid from url
:param url: Url to extract data from
:returns: Eid in url
"""
return get_arg_from_url(url, "edid")

142
grawlix/sources/marvel.py Normal file
View File

@ -0,0 +1,142 @@
from grawlix.book import Book, Metadata, ImageList, OnlineFile, Series, Result
from grawlix.exceptions import InvalidUrl, DataNotFound
from grawlix import logging
from .source import Source
import re
from datetime import date
# Personal marvel ip key
API_KEY = "83ac0da31d3f6801f2c73c7e07ad76e8"
class Marvel(Source[str]):
name: str = "Marvel"
match = [
r"https://www.marvel.com/comics/issue/\d+/.+",
r"https://read.marvel.com/#/book/\d+",
r"https://www.marvel.com/comics/series/\d+/.+"
]
_authentication_methods: list[str] = [ "cookies" ]
async def download(self, url: str) -> Result[str]:
match_index = self.get_match_index(url)
if match_index == 0:
issue_id = await self._get_issue_id(url)
return await self.download_book_from_id(issue_id)
if match_index == 1:
issue_id = url.split("/")[-1]
return await self.download_book_from_id(issue_id)
if match_index == 2:
return await self._download_series(url)
raise InvalidUrl
async def _download_series(self, url: str) -> Series[str]:
"""
Download series
:param url: Url of series
:returns: Series data
"""
series_id = url.split("/")[-2]
issue_ids = await self._download_issue_ids(series_id)
metadata = await self._download_series_metadata(series_id)
return Series(
title = metadata["data"]["results"][0]["title"],
book_ids = issue_ids
)
async def _download_issue_ids(self, series_id: str) -> list[str]:
"""
Download issue ids from series
:param series_id: Id of comic series on marvel.com
:returns: List of comic ids for marvel comics
"""
response = await self._client.get(
f"https://api.marvel.com/browse/comics?byType=comic_series&isDigital=1&limit=10000&byId={series_id}",
)
issue_ids = [issue["digital_id"] for issue in response.json()["data"]["results"]]
return issue_ids
async def _download_series_metadata(self, series_id: str) -> dict:
"""
Download series metadata
:param series_id: Id of comic series on marvel.com
:returns: Dictionary with metadata
"""
response = await self._client.get(
f"https://gateway.marvel.com:443/v1/public/series/{series_id}?apikey={API_KEY}",
headers = {
"Referer": "https://developer.marvel.com/"
}
)
return response.json()
async def _get_issue_id(self, url: str) -> str:
"""
Download issue id from url
:param url: Url to issue info page
:return: Issue id
"""
response = await self._client.get(url)
search = re.search(r"digital_comic_id: \"(\d+)\"", response.text)
if not search:
raise DataNotFound
return search.group(1)
async def download_book_from_id(self, issue_id: str) -> Book:
return Book(
metadata = await self._download_issue_metadata(issue_id),
data = await self._download_issue_pages(issue_id)
)
async def _download_issue_metadata(self, issue_id: str) -> Metadata:
"""
Download and parse metadata for issue
:param issue_id: Identifier for issue
:returns: Issue metadata
"""
response = await self._client.get(
f"https://bifrost.marvel.com/v1/catalog/digital-comics/metadata/{issue_id}"
)
issue_meta = response.json()["data"]["results"][0]["issue_meta"]
return Metadata(
title = issue_meta["title"],
series = issue_meta.get("series_title"),
description = issue_meta.get("description"),
publisher = "Marvel",
release_date = date.fromisoformat(issue_meta.get("release_date_digital")),
authors = [c["full_name"] for c in issue_meta["creators"]["extended_list"]] if "extended_list" in issue_meta["creators"] else []
)
async def _download_issue_pages(self, issue_id: str) -> ImageList:
"""
Download list of page links for issue
:param issue_id: Identifier for issue
:returns: List of links to comic pages
"""
response = await self._client.get(
f"https://bifrost.marvel.com/v1/catalog/digital-comics/web/assets/{issue_id}"
)
images = []
for page in response.json()["data"]["results"][0]["pages"]:
images.append(
OnlineFile(
url = page["assets"]["source"],
extension = "jpg"
)
)
return ImageList(images)

227
grawlix/sources/nextory.py Normal file
View File

@ -0,0 +1,227 @@
from grawlix.book import Book, Metadata, OnlineFile, BookData, OnlineFile, SingleFile, EpubInParts, Result, Series
from grawlix.encryption import AESEncryption
from grawlix.exceptions import InvalidUrl
from .source import Source
from typing import Optional
import uuid
import rich
import base64
LOCALE = "en_GB"
class Nextory(Source):
name: str = "Nextory"
match = [
r"https?://((www|catalog-\w\w).)?nextory.+"
]
_authentication_methods = [ "login" ]
@staticmethod
def _create_device_id() -> str:
"""Create unique device id"""
return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl"))
async def login(self, username: str, password: str, **kwargs) -> None:
# Set permanent headers
device_id = self._create_device_id()
self._client.headers.update(
{
"X-Application-Id": "200",
"X-App-Version": "5.0.0",
"X-Locale": LOCALE,
"X-Model": "Personal Computer",
"X-Device-Id": device_id,
"locale": LOCALE,
"device": device_id,
"osinfo": "Android 13",
"model": "Personal Computer",
"version": "4.34.6",
"appid": "200",
}
)
# Login for account
session_response = await self._client.post(
"https://api.nextory.com/user/v1/sessions",
json = {
"identifier": username,
"password": password
},
)
session_response = session_response.json()
rich.print(session_response)
login_token = session_response["login_token"]
country = session_response["country"]
self._client.headers.update(
{
"token": login_token,
"X-Login-Token": login_token,
"X-Country-Code": country,
}
)
# Login for user
profiles_response = await self._client.get(
"https://api.nextory.com/user/v1/me/profiles",
)
profiles_response = profiles_response.json()
rich.print(profiles_response)
profile = profiles_response["profiles"][0]
login_key = profile["login_key"]
authorize_response = await self._client.post(
"https://api.nextory.com/user/v1/profile/authorize",
json = {
"login_key": login_key
}
)
authorize_response = authorize_response.json()
rich.print(authorize_response)
profile_token = authorize_response["profile_token"]
self._client.headers.update({"X-Profile-Token": profile_token})
self._client.headers.update({"X-Profile-Token": profile_token})
@staticmethod
def _find_epub_id(product_data) -> str:
"""Find id of book format of type epub for given book"""
for format in product_data["formats"]:
if format["type"] == "epub":
return format["identifier"]
raise InvalidUrl
@staticmethod
def _extract_id_from_url(url: str) -> str:
"""
Extract id of book from url. This id is not always the internal id for
the book.
:param url: Url to book information page
:return: Id in url
"""
return url.split("-")[-1].replace("/", "")
async def download(self, url: str) -> Result:
url_id = self._extract_id_from_url(url)
if "serier" in url:
return await self._download_series(url_id)
else:
book_id = await self._get_book_id_from_url_id(url_id)
return await self._download_book(book_id)
async def download_book_from_id(self, book_id: str) -> Book:
return await self._download_book(book_id)
async def _download_series(self, series_id: str) -> Series:
"""
Download series from Nextory
:param series_id: Id of series on Nextory
:returns: Series data
"""
response = await self._client.get(
f"https://api.nextory.com/discovery/v1/series/{series_id}/products",
params = {
"content_type": "book",
"page": 0,
"per": 100,
}
)
series_data = response.json()
book_ids = []
for book in series_data["products"]:
book_id = book["id"]
book_ids.append(book_id)
return Series(
title = series_data["products"][0]["series"]["name"],
book_ids = book_ids,
)
@staticmethod
def _extract_series_name(product_info: dict) -> Optional[str]:
if not "series" in product_info:
return None
return product_info["series"]["name"]
async def _get_book_id_from_url_id(self, url_id: str) -> str:
"""
Download book id from url id
:param url_id: Id of book from url
:return: Book id
"""
response = await self._client.get(
f"https://api.nextory.se/api/app/product/7.5/bookinfo",
params = { "id": url_id },
)
rich.print(response.url)
rich.print(response.content)
exit()
async def _download_book(self, book_id: str) -> Book:
product_data = await self._client.get(
f"https://api.nextory.com/library/v1/products/{book_id}"
)
product_data = product_data.json()
epub_id = self._find_epub_id(product_data)
pages = await self._get_pages(epub_id)
return Book(
data = pages,
metadata = Metadata(
title = product_data["title"],
authors = [author["name"] for author in product_data["authors"]],
series = self._extract_series_name(product_data),
)
)
@staticmethod
def _fix_key(value: str) -> bytes:
"""Remove unused data and decode key"""
return base64.b64decode(value[:-1])
async def _get_pages(self, epub_id: str) -> BookData:
"""
Download page information for book
:param epub_id: Id of epub file
:return: Page data
"""
# Nextory books are for some reason split up into multiple epub files -
# one for each chapter file. All of these files has to be decrypted and
# combined afterwards. Many of the provided epub files contain the same
# files and some of them contain the same file names but with variation
# in the content and comments that describe what should have been there
# if the book was whole from the start.
response = await self._client.get(
f"https://api.nextory.com/reader/books/{epub_id}/packages/epub"
)
epub_data = response.json()
encryption = AESEncryption(
key = self._fix_key(epub_data["crypt_key"]),
iv = self._fix_key(epub_data["crypt_iv"])
)
files = []
for part in epub_data["spines"]:
files.append(
OnlineFile(
url = part["spine_url"],
extension = "epub",
encryption = encryption
)
)
files_in_toc = {}
for item in epub_data["toc"]["childrens"]: # Why is it "childrens"?
files_in_toc[item["src"]] = item["name"]
return EpubInParts(
files,
files_in_toc
)

View File

@ -1,5 +1,8 @@
from grawlix.book import Book, Series, Result
from typing import Generic, TypeVar, Tuple, Optional
from http.cookiejar import MozillaCookieJar
import re
from typing import Generic, TypeVar, Tuple
import httpx
@ -42,6 +45,25 @@ class Source(Generic[T]):
raise NotImplementedError
@property
def supports_cookies(self) -> bool:
"""Does the source support authentication with cookie file"""
return "cookies" in self._authentication_methods
def load_cookies(self, cookie_file: str):
"""
Authenticate with source with netscape cookie file
:param cookie_file: Path to netscape cookie file
"""
if self.supports_cookies:
cookie_jar = MozillaCookieJar()
cookie_jar.load(cookie_file, ignore_expires=True)
self._client.cookies.update(cookie_jar)
self.authenticated = True
async def download(self, url: str) -> Result[T]:
"""
Download book metadata from source
@ -60,3 +82,16 @@ class Source(Generic[T]):
:returns: Downloaded book metadata
"""
raise NotImplementedError
def get_match_index(self, url: str) -> Optional[int]:
"""
Find the first regex in `self.match` that matches url
:param url: Url to match
:returns: Index of regex
"""
for index, match in enumerate(self.match):
if re.match(match, url):
return index
return None