Add basic support for Nextory

This commit is contained in:
Joakim Holm 2023-06-12 22:36:54 +02:00
parent f91a32b0ab
commit 9e876a0cf6
6 changed files with 308 additions and 5 deletions

View File

@ -12,6 +12,7 @@ grawlix currently supports downloading from the following sources:
- [Flipp](https://flipp.dk)
- [Internet Archive](https://archive.org)
- [Manga Plus](https://mangaplus.shueisha.co.jp)
- [Nextory](https://nextory.com)
- [Royal Road](https://www.royalroad.com)
- [Saxo](https://saxo.com)
- [Webtoons](https://webtoons.com)

View File

@ -55,6 +55,16 @@ class ImageList:
"""
images: list[OnlineFile]
@dataclass(slots=True)
class EpubInParts:
"""
Epub split up into smaller epubs
"""
files: list[OnlineFile]
files_in_toc: dict[str, str]
@dataclass(slots=True)
class HtmlFile:
title: str
@ -67,6 +77,7 @@ class HtmlFiles:
htmlfiles: list[HtmlFile]
BookData = Union[
EpubInParts,
SingleFile,
ImageList,
HtmlFiles

View File

@ -1,4 +1,4 @@
from grawlix.book import Book, BookData, SingleFile, ImageList, OnlineFile, HtmlFiles
from grawlix.book import Book, BookData, SingleFile, ImageList, OnlineFile, HtmlFiles, EpubInParts
from grawlix.exceptions import GrawlixError, UnsupportedOutputFormat
from grawlix.logging import info
@ -58,9 +58,9 @@ def get_default_format(book: Book) -> OutputFormat:
bookdata = book.data
if isinstance(bookdata, SingleFile):
extension = bookdata.file.extension
if isinstance(bookdata, ImageList):
elif isinstance(bookdata, ImageList):
extension = "cbz"
if isinstance(bookdata, HtmlFiles):
elif isinstance(bookdata, HtmlFiles) or isinstance(bookdata, EpubInParts):
extension = "epub"
output_format = find_output_format(book, extension)
return output_format()

View File

@ -1,4 +1,4 @@
from grawlix.book import HtmlFiles, HtmlFile, OnlineFile, Book, SingleFile, Metadata
from grawlix.book import HtmlFiles, HtmlFile, OnlineFile, Book, SingleFile, Metadata, EpubInParts
from grawlix.exceptions import UnsupportedOutputFormat
from .output_format import OutputFormat, Update
@ -6,19 +6,25 @@ import asyncio
from bs4 import BeautifulSoup
import os
from ebooklib import epub
from zipfile import ZipFile
import rich
class Epub(OutputFormat):
extension = "epub"
input_types = [SingleFile, HtmlFiles]
input_types = [SingleFile, HtmlFiles, EpubInParts]
async def download(self, book: Book, location: str, update: Update) -> None:
if isinstance(book.data, SingleFile):
await self._download_single_file(book, location, update)
elif isinstance(book.data, HtmlFiles):
await self._download_html_files(book.data, book.metadata, location, update)
elif isinstance(book.data, EpubInParts):
await self._download_epub_in_parts(book.data, book.metadata, location, update)
else:
raise UnsupportedOutputFormat
async def _download_html_files(self, html: HtmlFiles, metadata: Metadata, location: str, update: Update) -> None:
output = epub.EpubBook()
output.set_title(metadata.title)
@ -67,3 +73,59 @@ class Epub(OutputFormat):
output.add_item(epub.EpubNcx())
output.add_item(epub.EpubNav())
epub.write_epub(location, output)
async def _download_epub_in_parts(self, data: EpubInParts, metadata: Metadata, location: str, update: Update) -> None:
files = data.files
file_count = len(files)
progress = 1/(file_count)
temporary_file_location = f"{location}.tmp"
added_files: set[str] = set()
def get_new_files(zipfile: ZipFile):
"""Returns files in zipfile not already added to file"""
for filename in zipfile.namelist():
if filename in added_files or filename.endswith(".opf") or filename.endswith(".ncx"):
continue
yield filename
output = epub.EpubBook()
for file in files:
await self._download_and_write_file(file, temporary_file_location)
with ZipFile(temporary_file_location, "r") as zipfile:
for filepath in get_new_files(zipfile):
content = zipfile.read(filepath)
if filepath.endswith("html"):
filename = os.path.basename(filepath)
is_in_toc = False
title = None
for key, value in data.files_in_toc.items():
toc_filename = key.split("#")[0]
if filename == toc_filename:
title = value
is_in_toc = True
break
epub_file = epub.EpubHtml(
title = title,
file_name = filepath,
content = content
)
output.add_item(epub_file)
output.spine.append(epub_file)
if is_in_toc:
output.toc.append(epub_file)
else:
epub_file = epub.EpubItem(
file_name = filepath,
content = content
)
output.add_item(epub_file)
added_files.add(filepath)
if update:
update(progress)
os.remove(temporary_file_location)
output.add_item(epub.EpubNcx())
output.add_item(epub.EpubNav())
epub.write_epub(location, output)
exit()

View File

@ -5,6 +5,7 @@ from .ereolen import Ereolen
from .flipp import Flipp
from .internet_archive import InternetArchive
from .mangaplus import MangaPlus
from .nextory import Nextory
from .royal_road import RoyalRoad
from .saxo import Saxo
from .webtoons import Webtoons
@ -56,6 +57,7 @@ def get_source_classes() -> list[type[Source]]:
Flipp,
InternetArchive,
MangaPlus,
Nextory,
RoyalRoad,
Saxo,
Webtoons

227
grawlix/sources/nextory.py Normal file
View File

@ -0,0 +1,227 @@
from grawlix.book import Book, Metadata, OnlineFile, BookData, OnlineFile, SingleFile, EpubInParts, Result, Series
from grawlix.encryption import AESEncryption
from grawlix.exceptions import InvalidUrl
from .source import Source
from typing import Optional
import uuid
import rich
import base64
LOCALE = "en_GB"
class Nextory(Source):
name: str = "Nextory"
match = [
r"https?://((www|catalog-\w\w).)?nextory.+"
]
_authentication_methods = [ "login" ]
@staticmethod
def _create_device_id() -> str:
"""Create unique device id"""
return str(uuid.uuid3(uuid.NAMESPACE_DNS, "audiobook-dl"))
async def login(self, username: str, password: str, **kwargs) -> None:
# Set permanent headers
device_id = self._create_device_id()
self._client.headers.update(
{
"X-Application-Id": "200",
"X-App-Version": "5.0.0",
"X-Locale": LOCALE,
"X-Model": "Personal Computer",
"X-Device-Id": device_id,
"locale": LOCALE,
"device": device_id,
"osinfo": "Android 13",
"model": "Personal Computer",
"version": "4.34.6",
"appid": "200",
}
)
# Login for account
session_response = await self._client.post(
"https://api.nextory.com/user/v1/sessions",
json = {
"identifier": username,
"password": password
},
)
session_response = session_response.json()
rich.print(session_response)
login_token = session_response["login_token"]
country = session_response["country"]
self._client.headers.update(
{
"token": login_token,
"X-Login-Token": login_token,
"X-Country-Code": country,
}
)
# Login for user
profiles_response = await self._client.get(
"https://api.nextory.com/user/v1/me/profiles",
)
profiles_response = profiles_response.json()
rich.print(profiles_response)
profile = profiles_response["profiles"][0]
login_key = profile["login_key"]
authorize_response = await self._client.post(
"https://api.nextory.com/user/v1/profile/authorize",
json = {
"login_key": login_key
}
)
authorize_response = authorize_response.json()
rich.print(authorize_response)
profile_token = authorize_response["profile_token"]
self._client.headers.update({"X-Profile-Token": profile_token})
self._client.headers.update({"X-Profile-Token": profile_token})
@staticmethod
def _find_epub_id(product_data) -> str:
"""Find id of book format of type epub for given book"""
for format in product_data["formats"]:
if format["type"] == "epub":
return format["identifier"]
raise InvalidUrl
@staticmethod
def _extract_id_from_url(url: str) -> str:
"""
Extract id of book from url. This id is not always the internal id for
the book.
:param url: Url to book information page
:return: Id in url
"""
return url.split("-")[-1].replace("/", "")
async def download(self, url: str) -> Result:
url_id = self._extract_id_from_url(url)
if "serier" in url:
return await self._download_series(url_id)
else:
book_id = await self._get_book_id_from_url_id(url_id)
return await self._download_book(book_id)
async def download_book_from_id(self, book_id: str) -> Book:
return await self._download_book(book_id)
async def _download_series(self, series_id: str) -> Series:
"""
Download series from Nextory
:param series_id: Id of series on Nextory
:returns: Series data
"""
response = await self._client.get(
f"https://api.nextory.com/discovery/v1/series/{series_id}/products",
params = {
"content_type": "book",
"page": 0,
"per": 100,
}
)
series_data = response.json()
book_ids = []
for book in series_data["products"]:
book_id = book["id"]
book_ids.append(book_id)
return Series(
title = series_data["products"][0]["series"]["name"],
book_ids = book_ids,
)
@staticmethod
def _extract_series_name(product_info: dict) -> Optional[str]:
if not "series" in product_info:
return None
return product_info["series"]["name"]
async def _get_book_id_from_url_id(self, url_id: str) -> str:
"""
Download book id from url id
:param url_id: Id of book from url
:return: Book id
"""
response = await self._client.get(
f"https://api.nextory.se/api/app/product/7.5/bookinfo",
params = { "id": url_id },
)
rich.print(response.url)
rich.print(response.content)
exit()
async def _download_book(self, book_id: str) -> Book:
product_data = await self._client.get(
f"https://api.nextory.com/library/v1/products/{book_id}"
)
product_data = product_data.json()
epub_id = self._find_epub_id(product_data)
pages = await self._get_pages(epub_id)
return Book(
data = pages,
metadata = Metadata(
title = product_data["title"],
authors = [author["name"] for author in product_data["authors"]],
series = self._extract_series_name(product_data),
)
)
@staticmethod
def _fix_key(value: str) -> bytes:
"""Remove unused data and decode key"""
return base64.b64decode(value[:-1])
async def _get_pages(self, epub_id: str) -> BookData:
"""
Download page information for book
:param epub_id: Id of epub file
:return: Page data
"""
# Nextory books are for some reason split up into multiple epub files -
# one for each chapter file. All of these files has to be decrypted and
# combined afterwards. Many of the provided epub files contain the same
# files and some of them contain the same file names but with variation
# in the content and comments that describe what should have been there
# if the book was whole from the start.
response = await self._client.get(
f"https://api.nextory.com/reader/books/{epub_id}/packages/epub"
)
epub_data = response.json()
encryption = AESEncryption(
key = self._fix_key(epub_data["crypt_key"]),
iv = self._fix_key(epub_data["crypt_iv"])
)
files = []
for part in epub_data["spines"]:
files.append(
OnlineFile(
url = part["spine_url"],
extension = "epub",
encryption = encryption
)
)
files_in_toc = {}
for item in epub_data["toc"]["childrens"]: # Why is it "childrens"?
files_in_toc[item["src"]] = item["name"]
return EpubInParts(
files,
files_in_toc
)