2

I pieced this together from a number of different examples I found online.

The goal is to:

  1. search in the youtube api
  2. turn search results from multiple pages into a csv file

edit: heres a working example of the search loop thanks to one of the answers provided. This now loops the maximum amount of times (10) as intended however when executed the problem now is the CSV file

It seems that after response is called, the program finishes even though there is a call to results and writeCSV after.

Any further help would be greatly appreciated!

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
import argparse

DEVELOPER_KEY = "dev-key"
YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION, developerKey=DEVELOPER_KEY)


# -------------Build YouTube Search------------#
def youtubeSearch(query, order="relevance"):
    # search 50 results per page
    request = youtube.search().list(
        q=query,
        type="video",
        order=order,
        part="id,snippet",
        maxResults="50",
        relevanceLanguage='en',
        videoDuration='long',
        fields='nextPageToken, items(id,snippet)'
    )

    title = []
    channelId = []
    channelTitle = []
    categoryId = []
    videoId = []
    viewCount = []
    likeCount = []
    dislikeCount = []
    commentCount = []
    favoriteCount = []
    tags = []
    category = []
    videos = []

    while request:
        response = request.execute()
        for search_result in response.get("items", []):
            if search_result["id"]["kind"] == "youtube#video":

                # append title and video for each item
                title.append(search_result['snippet']['title'])
                videoId.append(search_result['id']['videoId'])

                # then collect stats on each video using videoId
                stats = youtube.videos().list(
                    part='statistics, snippet',
                    id=search_result['id']['videoId']).execute()

                channelId.append(stats['items'][0]['snippet']['channelId'])
                channelTitle.append(stats['items'][0]['snippet']['channelTitle'])
                categoryId.append(stats['items'][0]['snippet']['categoryId'])
                favoriteCount.append(stats['items'][0]['statistics']['favoriteCount'])
                viewCount.append(stats['items'][0]['statistics']['viewCount'])

                # Not every video has likes/dislikes enabled so they won't appear in JSON response
                try:
                    likeCount.append(stats['items'][0]['statistics']['likeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Likes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    # Appends "Not Available" to keep dictionary values aligned
                    likeCount.append("Not available")

                try:
                    dislikeCount.append(stats['items'][0]['statistics']['dislikeCount'])
                except:
                    # Good to be aware of Channels that turn off their Likes
                    print("Video titled {0}, on Channel {1} Dislikes Count is not available".format(
                        stats['items'][0]['snippet']['title'],
                        stats['items'][0]['snippet']['channelTitle']))
                    print(stats['items'][0]['statistics'].keys())
                    dislikeCount.append("Not available")

                # Sometimes comments are disabled so if they exist append, if not append nothing...
                # It's not uncommon to disable comments, so no need to wrap in try and except
                if 'commentCount' in stats['items'][0]['statistics'].keys():
                    commentCount.append(stats['items'][0]['statistics']['commentCount'])
                else:
                    commentCount.append(0)

                if 'tags' in stats['items'][0]['snippet'].keys():
                    tags.append(stats['items'][0]['snippet']['tags'])
                else:
                    # I'm not a fan of empty fields
                    tags.append("No Tags")
        request = youtube.search().list_next(
            request, response)
    # Break out of for-loop and if statement and store lists of values in dictionary
    youtube_dict = {'tags': tags, 'channelId': channelId, 'channelTitle': channelTitle,
                    'categoryId': categoryId, 'title': title, 'videoId': videoId,
                    'viewCount': viewCount, 'likeCount': likeCount, 'dislikeCount': dislikeCount,
                    'commentCount': commentCount, 'favoriteCount': favoriteCount}


    print("Search Completed...")
    print("Total results: {0} \nResults per page: {1}".format(request['pageInfo']['totalResults'],
                                                              request['pageInfo']['resultsPerPage']))
    print("Example output per item, snippet")
    print(request['items'][0]['snippet'].keys())
    # Assign first page of results (items) to item variable
    items = request['items']  # 50 "items"

    # Assign 1st results to title, channelId, datePublished then print
    title = items[0]['snippet']['title']
    channelId = items[0]['snippet']['channelId']
    datePublished = items[0]['snippet']['publishedAt']
    print("First result is: \n Title: {0} \n Channel ID: {1} \n Published on: {2}".format(title, channelId,
                                                                                          datePublished))
    return youtube_dict


# Input query
print("Please input your search query")
q = input()
# Run YouTube Search
results = youtubeSearch(q)
# Display result titles
print("Top 3 results are: \n {0}, ({1}), \n {2}, ({3}),\n {4}, ({5})".format(results['title'][0],
                                                                             results['channelTitle'][0],
                                                                             results['title'][1],
                                                                             results['channelTitle'][1],
                                                                             results['title'][2],
                                                                             results['channelTitle'][2]))

# -------------------------Save results------------------------------#
print("Input filename to store csv file")
file = "\\YouTube\\" + input() + ".csv"


def writeCSV(results, filename):
    import csv
    keys = sorted(results.keys())
    with open(filename, "w", newline="", encoding="utf-8") as output:
        writer = csv.writer(output, delimiter=",")
        writer.writerow(keys)
        writer.writerows(zip(*[results[key] for key in keys]))


writeCSV(results, file)
print("CSV file has been uploaded at: " + str(file))

2 Answers2

2

Since you're using the Google's APIs Client Library for Python, the pythonic way of implementing result set pagination on the Search.list API endpoint looks like the one below:

request = youtube.search().list(
    q = 'A query',
    part = 'id,snippet',
    type = 'video',
    maxResults = 50,
    relevanceLanguage = 'en',
    videoDuration = 'long'
)

while request:
    response = request.execute()

    for item in response['items']:
        ...

    request = youtube.search().list_next(
        request, response)

It is this simple due to the way the Python client library is implemented: there's no need to handle explicitly the API response object's property nextPageToken and the API request parameter pageToken at all.

stvar
  • 5,871
  • 2
  • 10
  • 23
  • Thanks for your answer, I attempted to implement (as seen in the editted code above) but am now getting problems with `get` attribute. `for search_result in search_response.get('items', []): AttributeError: 'HttpRequest' object has no attribute 'get'` Any additional help would be greatly appreciated. I've tried moving `videof` outside of the while loop but I think the problem is calling `execute` in the while loop – Anton Kozlov Apr 27 '21 at 10:59
  • That's because you misnamed your variables; please refactor your code such that the result of `youtube.search().list(...)` be assigned to a variable of which name includes `request`; then the result of `... .execute()` be assigned to a variable of which name includes `response`. In my code above, I did named my variables `request` and `response` as such on purpose: the former one is only an *API request object* and the latter is the *API response object*. – stvar Apr 27 '21 at 11:11
  • Thank you again for your answer, I managed to get the loop to work but then the code finishes and doesn't call `storeResults` or `writeCSV` I replaced the code in the original question with your suggestions and it does work but have exceeded my quota and now must wait to fix next steps. – Anton Kozlov Apr 27 '21 at 12:59
  • Why is that your function `youtube_search` returns `search_response`? This variable stores an API **request** object; thus `search_response` is not an API **response** object -- i.e. is not an object that contains metadata associated to videos returned by the `Search.list` endpoint. Sorry, but you have to review your code in its entirety since things are really messed up. Obviously your code is based on [this one from Google](https://github.com/youtube/api-samples/blob/master/python/search.py), but, unfortunately, you're headed the wrong direction. – stvar Apr 27 '21 at 13:23
  • This is what I've based most of it on https://github.com/SeyiAgboola/YouTube-Mining/blob/master/youtube_search.py – Anton Kozlov Apr 27 '21 at 13:25
  • Yes, I see. But my accounts above remain valid as stated. That script has its function `youtubeSearch` returning an actual (correct) API *response* object. Yours `youtube_search` does not. – stvar Apr 27 '21 at 13:29
  • I've tried to implement what you've suggested, refactoring etc. I've got rid of a lot of stuff and kept it as much as the example as I linked previously. If you have any other suggestions please feel free. – Anton Kozlov Apr 27 '21 at 14:32
  • Unfortunately, @Anton, you seem to have taken quite lightly my recommendations: (1) the function `youtubeSearch` returns `None`; (2) `videos = []` defines `videos` as a Python list, making the subsequent statement `videos[video_id] = video_item` invalid (throwing a `TypeError` exception) since `video_id` is a string; – stvar Apr 27 '21 at 15:34
  • (3) the `for` loop below `def storeResults(response):` falls outside that function due to improper code indentation, making `storeResults` useless (returning `None`); (4) your code is invalid due to `return youtube_dict` being outside of any function body, meaning that your script above does not compile; (5) ... and so on – stvar Apr 27 '21 at 15:34
  • apologies @stvar points (3) and (4) were formatting errors when I copied my code in. – Anton Kozlov Apr 27 '21 at 15:52
  • (2) my code here is based off one of your answers https://stackoverflow.com/questions/65443193/youtube-data-api-page-token-question-python (1) so in my function `youtubeSearch` I would replace `return request` with `return None`? i can't thank you enough for the help so far – Anton Kozlov Apr 27 '21 at 15:54
  • W.r.t. your answer to (2), my code has `video_data` initialized as `video_data = {}`, thus as a dictionary not as a list; then `video_data[video_id] = video_item` is valid. – stvar Apr 27 '21 at 16:43
  • You should refactor `youtubeSearch` by moving the entire body of `for search_result in response.get("items", [])` to this function (renaming variables properly). Then have `youtubeSearch` return `youtube_dict`; also get rid of `storeResults` altogether. – stvar Apr 27 '21 at 16:49
  • (2) my mistake again, apologies. I think I've fixed it in my last edit but I'm waiting for my quota to reset before I can test. I removed `storeResults` and refactored the code associated. – Anton Kozlov Apr 27 '21 at 18:38
  • One more thing: I recommend to call `Videos.list` for groups of IDs: that is that per each iteration of your loop `while request:` to have **only one call** to `Videos.list`; that's possible because this endpoint accepts its request parameter [`id`](https://developers.google.com/youtube/v3/docs/videos/list#id) as a comma-separated list of at most 50 video IDs. See [this answer of mine](https://stackoverflow.com/a/66070785/8327971) or [this one](https://stackoverflow.com/a/65743649/8327971) for such a kind of `Videos.list` call. – stvar Apr 27 '21 at 19:02
  • Thank you! I think I have only one call per iteration to `Videos.list` in `youtubeSearch` at the moment unless the `stats` variable from `stats = youtube.videos().list(` counts in `channelId.append(stats['items'][0]['snippet']['channelId'])` – Anton Kozlov Apr 27 '21 at 19:14
  • Per each iteration of `while request:`, your code above has `N` calls to `Videos.list`, where `N` is the number of items returned by `request.execute()` of that respective iteration. Counting globally, that means that you'll have a total of `T` calls to that endpoint, where `T` is the total number of items returned by all your paginated calls to `Search.list`. What I proposed was to reduce the total number of calls to `Videos.list` from `T` to `(T // 50) + (1 if T % 50 else 0)`. – stvar Apr 27 '21 at 19:37
  • For example, if `T` is `2001`, then instead of paying 2001 units of [quota cost](https://developers.google.com/youtube/v3/determine_quota_cost) for all calls of `Videos.list`, you'll pay only 41 units. – stvar Apr 27 '21 at 19:44
0

I worked out a solution after a few different tests. I wasn't able to implement the pythonic solution suggested but this worked for me.

import pandas as pd
import os
import webvtt
import csv

import google.oauth2.credentials
import google_auth_oauthlib.flow
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from google_auth_oauthlib.flow import InstalledAppFlow

CLIENT_SECRETS_FILE = "client_secrets.json"
SCOPES = ['https://www.googleapis.com/auth/youtube.force-ssl']
API_SERVICE_NAME = 'youtube'
API_VERSION = 'v3'

def get_authenticated_service():
    flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRETS_FILE, SCOPES)
    credentials = flow.run_console()
    return build(API_SERVICE_NAME, API_VERSION, credentials= credentials)

# Remove keyword arguments that are not set
def remove_empty_kwargs(**kwargs):
    good_kwargs = {}
    if kwargs is not None:
        for key, value in kwargs.items():
            if value:
                good_kwargs[key] = value
    return good_kwargs

client = get_authenticated_service()

def youtube_keyword(client, **kwargs):
    kwargs = remove_empty_kwargs(**kwargs)
    response = client.search().list(
        **kwargs
        ).execute()
    return response

def youtube_search(criteria, max_res):
    # create lists and empty dataframe
    titles = []
    videoIds = []
    channelIds = []
    resp_df = pd.DataFrame()

    while len(titles) < max_res:
        token = None
        response = youtube_keyword(client,
                                   part='id,snippet',
                                   maxResults=50,
                                   q=criteria,
                                   videoCaption='closedCaption',
                                   type='video',
                                   videoDuration='long',
                                   pageToken=token)

        for item in response['items']:
            titles.append(item['snippet']['title'])
            channelIds.append(item['snippet']['channelTitle'])
            videoIds.append(item['id']['videoId'])

        token = response["nextPageToken"]

    resp_df['title'] = titles
    resp_df['channelId'] = channelIds
    resp_df['videoId'] = videoIds
    resp_df['subject'] = criteria

    return resp_df

Found_Videos = youtube_search('[search criteria]',1000)
Found_Videos.shape

Found_Videos.head()
Found_Videos.to_csv('Found_Videos.csv')