11"""
2- title: WolframAlpha API
2+ title: YouTube Utility Tools
33author: MartainInGreen
44author_url: https://github.com/MartianInGreen/OpenWebUI-Tools
5- version: 0.1.0
6- requirements: pytube, youtube_transcript_api
5+ version: 0.2.0
6+ requirements: youtube_transcript_api, google-api-python-client, pydantic
7+
8+ This module provides tools for interacting with YouTube via Data API v3
9+ and fetching transcripts via the youtube_transcript_api.
10+
11+ Available tools:
12+ - Tools.transcript_download(video_id) : Download video metadata and full transcript.
13+ - Tools.search(query, max_results) : Search YouTube for videos matching a keyword.
14+
15+ All tools return only JSON-serializable types and support optional streaming
16+ via an `__event_emitter__` callback for incremental output. Search results
17+ are cached with LRU (maxsize=128) and enforce max_results bounds.
718"""
819
920import json
10- from pytubefix import YouTube #type: ignore
11- from youtube_transcript_api import YouTubeTranscriptApi #type: ignore
12- from pydantic import BaseModel , Field #type: ignore
13- from typing import Callable , Awaitable
21+ from youtube_transcript_api import YouTubeTranscriptApi # type: ignore
22+ from pydantic import BaseModel , Field # type: ignore
23+ from typing import Callable , Awaitable , List , Dict
24+ from googleapiclient .discovery import build # type: ignore
25+ from functools import lru_cache
1426
15- def youtube_func (video_id : str ):
16- #print(f"Getting video with id: {video_id}")
17- yt = YouTube (f'https://www.youtube.com/watch?v={ video_id } ' , use_po_token = True )
1827
19- try :
20- # Get avalilbe languages
21- languages_raw = YouTubeTranscriptApi .list_transcripts (video_id )
22- languages = []
28+ class TranscriptDownloadResult (BaseModel ):
29+ video_id : str = Field (..., description = "YouTube video ID" )
30+ title : str = Field (..., description = "Video title" )
31+ channel : str = Field (..., description = "Channel or author name" )
32+ description : str = Field (..., description = "Full video description" )
33+ duration : str = Field (..., description = "ISO8601 duration (e.g. 'PT5M33S')" )
34+ view_count : str = Field (..., description = "Total view count as string" )
35+ transcription : List [str ] = Field (
36+ ..., description = "List of transcript text segments"
37+ )
38+
39+
40+ class SearchItem (BaseModel ):
41+ video_id : str = Field (..., description = "Unique YouTube video ID" )
42+ title : str = Field (..., description = "Video title" )
43+ channel : str = Field (..., description = "Channel or uploader name" )
44+ published_at : str = Field (..., description = "ISO8601 publication timestamp" )
45+ description : str = Field (..., description = "Snippet description" )
46+ view_count : str = Field (..., description = "Total view count as string" )
47+ like_count : str = Field (..., description = "Total like count as string" )
48+ comment_count : str = Field (..., description = "Total comment count as string" )
49+ length : str = Field (..., description = "ISO8601 duration of the video" )
50+
51+
52+ class SearchResult (BaseModel ):
53+ results : List [SearchItem ] = Field (..., description = "List of search results" )
2354
24- for lang in languages_raw :
25- languages .append (lang .language_code )
26- #print(f"Available languages: {languages}")
2755
28- # Get transcript (get english by default, if not available get first language)
29- try :
30- transcript = YouTubeTranscriptApi .get_transcript (video_id , languages = ['en' ])
31- except :
32- transcript = YouTubeTranscriptApi .get_transcript (video_id , languages = [languages [0 ]])
33-
34- text = []
35- for part in transcript :
36- text .append (part ['text' ])
37-
38- print (text )
39- except :
40- text = "Could not get video transcript :("
41-
42- try :
43- i : int = yt .watch_html .find ('"shortDescription":"' )
44- desc : str = '"'
45- i += 20 # excluding the `"shortDescription":"`
46- while True :
47- letter = yt .watch_html [i ]
48- desc += letter # letter can be added in any case
49- i += 1
50- if letter == '\\ ' :
51- desc += yt .watch_html [i ]
52- i += 1
53- elif letter == '"' :
54- break
55-
56- return {
57- 'title' : yt .title ,
58- 'channel' : yt .author ,
59- 'description' : desc ,
60- 'length' : str (yt .length ) + " s" ,
61- 'views' : yt .views ,
62- 'transcription' : text
63- }
64- except Exception as e :
65- print (str (e ))
66- return {"Something went wrong :(" }
67-
6856class Tools :
6957 class Valves (BaseModel ):
70- WOLFRAMALPHA_APP_ID : str = Field (
71- default = "" ,
72- description = "The App ID (api key) to authorize WolframAlpha" ,
58+ YOUTUBE_API_KEY : str = Field (
59+ "" , description = "YouTube Data API v3 key for all API calls"
7360 )
7461
7562 def __init__ (self ):
63+ """
64+ Initialize the Tools container.
65+
66+ - Set `valves.YOUTUBE_API_KEY` before calling any methods.
67+ - `self.citation = True` indicates support for citing results.
68+ """
7669 self .valves = self .Valves ()
7770 self .citation = True
7871
79- def youtube (
80- self , video_id : str , __event_emitter__ : Callable [[dict ], Awaitable [None ]]
81- ) -> str :
72+ def transcript_download (
73+ self ,
74+ video_id : str ,
75+ __event_emitter__ : Callable [[dict ], Awaitable [None ]] = None ,
76+ ) -> Dict :
8277 """
83- This function lets you get information about YouTube videos. Including Metadata and Transcription .
84- :param video_id: Video ID of the YouTube video
85- :return: A short answer or explanation of the result of the query_string
78+ Download metadata and full transcript for a given YouTube video .
79+
80+ Returns a dict matching TranscriptDownloadResult schema.
8681 """
82+ # 1) Fetch transcript
83+ try :
84+ transcripts = YouTubeTranscriptApi .get_transcript (video_id )
85+ transcription = [seg .get ("text" , "" ) for seg in transcripts ]
86+ except Exception :
87+ transcription = []
88+
89+ # 2) Fetch metadata
90+ youtube = build ("youtube" , "v3" , developerKey = self .valves .YOUTUBE_API_KEY )
91+ try :
92+ resp = (
93+ youtube .videos ()
94+ .list (part = "snippet,contentDetails,statistics" , id = video_id )
95+ .execute ()
96+ )
97+ items = resp .get ("items" , [])
98+ if items :
99+ item = items [0 ]
100+ sn = item .get ("snippet" , {})
101+ cd = item .get ("contentDetails" , {})
102+ st = item .get ("statistics" , {})
103+ data = {
104+ "video_id" : video_id ,
105+ "title" : sn .get ("title" , "" ),
106+ "channel" : sn .get ("channelTitle" , "" ),
107+ "description" : sn .get ("description" , "" ),
108+ "duration" : cd .get ("duration" , "" ),
109+ "view_count" : st .get ("viewCount" , "0" ),
110+ "transcription" : transcription ,
111+ }
112+ else :
113+ data = {
114+ "video_id" : video_id ,
115+ "title" : "" ,
116+ "channel" : "" ,
117+ "description" : "" ,
118+ "duration" : "" ,
119+ "view_count" : "0" ,
120+ "transcription" : transcription ,
121+ }
122+ except Exception :
123+ data = {
124+ "video_id" : video_id ,
125+ "title" : "" ,
126+ "channel" : "" ,
127+ "description" : "" ,
128+ "duration" : "" ,
129+ "view_count" : "0" ,
130+ "transcription" : transcription ,
131+ }
87132
88- data = youtube_func (video_id )
133+ # Validate & serialize via Pydantic
134+ result = TranscriptDownloadResult (** data ).dict ()
135+ if __event_emitter__ :
136+ __event_emitter__ (
137+ {"type" : "message" , "data" : {"content" : json .dumps (result )}}
138+ )
139+ return result
89140
90- return json .dumps (data )
141+ @lru_cache (maxsize = 128 )
142+ def _search_logic (self , query : str , max_results : int ) -> List [Dict ]:
143+ # Validate bounds
144+ if not 1 <= max_results <= 50 :
145+ raise ValueError ("max_results must be between 1 and 50" )
146+ youtube = build ("youtube" , "v3" , developerKey = self .valves .YOUTUBE_API_KEY )
147+ search_resp = (
148+ youtube .search ()
149+ .list (part = "snippet" , q = query , type = "video" , maxResults = max_results )
150+ .execute ()
151+ )
152+ results : List [Dict ] = []
153+ video_ids : List [str ] = []
154+ for item in search_resp .get ("items" , []):
155+ vid = item ["id" ]["videoId" ]
156+ snip = item ["snippet" ]
157+ video_ids .append (vid )
158+ entry = {
159+ "video_id" : vid ,
160+ "title" : snip .get ("title" , "" ),
161+ "channel" : snip .get ("channelTitle" , "" ),
162+ "published_at" : snip .get ("publishedAt" , "" ),
163+ "description" : snip .get ("description" , "" ),
164+ }
165+ results .append (entry )
166+ if video_ids :
167+ detail_resp = (
168+ youtube .videos ()
169+ .list (part = "statistics,contentDetails" , id = "," .join (video_ids ))
170+ .execute ()
171+ )
172+ detail_map = {item ["id" ]: item for item in detail_resp .get ("items" , [])}
173+ for entry in results :
174+ det = detail_map .get (entry ["video_id" ], {})
175+ stats = det .get ("statistics" , {})
176+ cd = det .get ("contentDetails" , {})
177+ entry .update (
178+ {
179+ "view_count" : stats .get ("viewCount" , "0" ),
180+ "like_count" : stats .get ("likeCount" , "0" ),
181+ "comment_count" : stats .get ("commentCount" , "0" ),
182+ "length" : cd .get ("duration" , "" ),
183+ }
184+ )
185+ return results
186+
187+ def search (
188+ self ,
189+ query : str ,
190+ max_results : int = 10 ,
191+ __event_emitter__ : Callable [[dict ], Awaitable [None ]] = None ,
192+ ) -> Dict :
193+ """
194+ Search YouTube for videos matching a keyword, with LRU caching
195+ and max_results validation.
196+
197+ Returns a dict matching SearchResult schema.
198+ """
199+ entries = self ._search_logic (query , max_results )
200+ # Validate & serialize via Pydantic
201+ items = [SearchItem (** e ) for e in entries ]
202+ result = SearchResult (results = items ).dict ()
203+ if __event_emitter__ :
204+ for entry in items :
205+ __event_emitter__ (
206+ {"type" : "message" , "data" : {"content" : json .dumps (entry .dict ())}}
207+ )
208+ return result
0 commit comments