Spaces:
Running
Running
| from dataclasses import dataclass | |
| from datetime import time as dt_time | |
| from datetime import timedelta | |
| import pandas as pd | |
| from config import STORE_DIR, DATA_DIR, divider_time, RE_PODCAST_SRT_FILE | |
| class SplitedText: | |
| part: int | |
| start: int | |
| end: int | |
| text: str | |
| class Episode: | |
| id_: int | |
| title: str | None | |
| texts: list[SplitedText] | |
| def str_to_timedelta(s: str) -> timedelta: | |
| t = dt_time.fromisoformat(s) | |
| return timedelta(hours=t.hour, minutes=t.minute, seconds=t.second) | |
| def make_episode(id_: int, title: str, srt_filename: str) -> Episode: | |
| episode = Episode( | |
| id_=id_, | |
| title=title, | |
| texts=[] | |
| ) | |
| part = 1 | |
| start = None | |
| end = None | |
| text = None | |
| with open(srt_filename) as f: | |
| for line in f: | |
| first = None | |
| second = None | |
| line_text = None | |
| if line.strip().isdigit(): | |
| continue | |
| elif line.strip() == "": | |
| continue | |
| elif "-->" in line: | |
| first_str, second_str = line.strip().split("-->") | |
| first = str_to_timedelta(first_str.strip()) | |
| second = str_to_timedelta(second_str.strip()) | |
| else: | |
| line_text = line.strip() | |
| if first: | |
| if start is None: | |
| start = first | |
| if line_text: | |
| if text is None: | |
| text = line_text | |
| else: | |
| text += "\n" + line_text | |
| if start and second and text: | |
| if abs(second - start) > divider_time: | |
| end = second | |
| st = SplitedText(part=part, | |
| start=int(start.total_seconds()), | |
| end=int(end.total_seconds()), | |
| text=text) | |
| episode.texts.append(st) | |
| # print(text) | |
| part += 1 | |
| start = None | |
| text = None | |
| # print(episode) | |
| print(len(episode.texts)) | |
| return episode | |
| def make_df(episode: Episode) -> pd.DataFrame: | |
| data = [] | |
| for text in episode.texts: | |
| data.append([episode.id_, text.part, text.start, text.end, text.text]) | |
| df = pd.DataFrame(data, columns=["id", "part", "start", "end_", "text"]) | |
| return df | |
| def get_srt_files(): | |
| lst = [] | |
| for file_path in DATA_DIR.glob("*.srt"): | |
| m = RE_PODCAST_SRT_FILE.search(file_path.name) | |
| if m is not None: | |
| filename = file_path.name | |
| id_ = int(m.group(1)) | |
| lst.append({"id": id_, "srt": filename}) | |
| return lst | |
| def main(): | |
| lst = sorted(get_srt_files(), key=lambda x: x["id"]) | |
| print(f"{len(lst)=}") | |
| for item in lst: | |
| print(item["id"]) | |
| episode = make_episode(item["id"], item.get("title"), DATA_DIR / item["srt"]) | |
| df = make_df(episode) | |
| # print(df) | |
| df.to_parquet(STORE_DIR / f"podcast-{item['id']}.parquet") | |
| # break | |
| if __name__ == "__main__": | |
| main() | |