0

this is my first ever "problem solving" in python and the last time i used python was 2.7 with the old asyncio loop behaviour. Now python latest wont work because this feature no lonnger exists. This project is an Programm to win Kahoots and this code below ist the library for it. wont work.

import base64
import json
import os
import re
import time

try:
    import aiocometd
    from py_mini_racer import py_mini_racer
    import requests
except ModuleNotFoundError:
    if "y" in input("Install dependencies? [y/N] > ").lower():
        os.system('python3 -m pip install -r requirements.txt')

import asyncio
from difflib import SequenceMatcher

allowedTypes = ['quiz', 'multiple_select_quiz']
DEFAULT_ANSWER = 1


class Kahoot:
    def __init__(self, pin=None, nickname=None, quizName=None, quizID=None, maxCount=None, DEBUG=None):
        self.pin = pin
        self.nickname = nickname
        self.quizName = quizName
        self.quizID = quizID
        self.client = requests.session()
        self.captchaToken = "KAHOOT_TOKEN_eyJ2ZXJzaW9uIjoiIn0="
        # This will work until Kahoot updates their code version (https://repl.it/repls/WholeCrimsonFlashdrives)
        self.authToken = None
        self.answers = None
        self.colors = {0: "RED", 1: "BLUE", 2: "YELLOW", 3: "GREEN"}
        self.maxCount = maxCount if maxCount else 50
        self.lookup = None
        self.loadCodes()
        self.sessionID = None
        self.sessionToken = None
        self.DEBUG = DEBUG
        self.loop = asyncio.get_event_loop()

    def error(self, err):
        raise KahootError(err)

    def gracefulExit(self):
        exit()

    def authenticate(self, email, password):
        url = 'https://create.kahoot.it/rest/authenticate'
        data = {'username': email, 'password': password, 'grant_type': 'password'}
        response = self.client.post(url, json=data,
                                    headers={'Content-Type': 'application/json', "x-kahoot-login-gate": "enabled"})
        if response.status_code == 401:
            self.error("Invalid Email or Password.")
        elif response.status_code == 200:
            print('AUTHENTICATED')
            self.authToken = response.json()["access_token"]
        else:
            self.error(f"Login error {response.status_code}")

    def startGame(self):
        self.loop.run_until_complete(self._play())

    def search(self):
        self.loop.run_until_complete(self._search())

    async def _search(self):
        self.answers = await self.findAnswers(searchOnly=1)

    async def _play(self):
        url = f'wss://play.kahoot.it/cometd/{self.pin}/{self.sessionID}'
        async with aiocometd.Client(url, ssl=True) as client:
            self.socket = client
            await client.subscribe("/service/controller")
            await client.subscribe("/service/player")
            await client.subscribe("/service/status")
            await client.publish('/service/controller',
                                 {"host": "kahoot.it", "gameid": self.pin, "captchaToken": self.captchaToken,
                                  "name": self.nickname, "type": "login"})
            offset = 0
            tFADone = 0
            if self.quizID:
                self.answers = await self.findAnswers()
                if self.answers:
                    print(f'ANSWERS RECEIVED')
            async for rawMessage in client:
                message = rawMessage['data']
                if 'error' in message:
                    self.error(message['description'])
                if 'id' in message:
                    data = json.loads(message['content'])
                    kind = ''
                    if message['id'] in self.lookup:
                        kind = self.lookup[message['id']]
                    if kind == 'TWO_FACTOR_AUTH_CORRECT':
                        tFADone = True
                    if kind == 'RESET_TWO_FACTOR_AUTH' and not tFADone:
                        await self.submit2FA()
                    elif kind != 'RESET_TWO_FACTOR_AUTH':
                        print(kind.replace('_', ' '))
                    if kind == 'START_QUIZ':
                        if self.DEBUG:
                            print(data)
                        quizAnswers = data['quizQuestionAnswers']
                        if not self.answers:
                            self.answers = await self.findAnswers(accepted_answers=quizAnswers)
                            if self.answers:
                                print(f'ANSWERS RECEIVED')
                    elif kind == 'START_QUESTION':
                        print('------', data['questionIndex'] + 1, '------')
                        if data['gameBlockType'] not in allowedTypes:
                            pass
                        if self.answers:
                            correct = self.answers[data['questionIndex'] + offset]['index']
                            print(f'SELECTED {self.colors[int(correct)]}')
                        else:
                            correct = DEFAULT_ANSWER
                            print('SELECTED FALLBACK')
                        await self.sendAnswer(int(correct))
                    elif kind == 'TIME_UP':
                        # print('DID NOT ANSWER IN TIME, SKIPPING TO NEXT ANSWER')
                        # offset += 1
                        pass
                    elif kind == 'RESET_CONTROLLER':
                        print("RESET_CONTROLLER")
                        self.gracefulExit()
                    elif kind == 'GAME_OVER':
                        print("Game over, if you didn't win the winner has a better bot!")
                        self.gracefulExit()

    def convert(self, cols):
        for num, color in self.colors.items():
            cols = cols.replace(color[0].lower(), str(num))
        return cols

    async def submit2FA(self):
        seq = self.convert(input("2fa (e.g. rbyg, yrgb) > ").lower())
        tfa = json.dumps({"sequence": seq})
        await self.socket.publish("/service/controller",
                                  {"content": tfa, "gameid": self.pin, "host": "kahoot.it", "type": "message",
                                   "id": 50})

    async def sendAnswer(self, choice):
        choiceInfo = json.dumps(
            {"choice": choice,
             "meta": {"lag": 0, "device": {"userAgent": "kbot", "screen": {"width": 1920, "height": 1080}}}})
        await self.socket.publish("/service/controller",
                                  {"content": choiceInfo, "gameid": self.pin, "host": "kahoot.it", "type": "message",
                                   "id": 45})

    async def getQuiz(self, url, accepted_answers=None, actualAnswers=None, searchOnly=None):
        if self.DEBUG:
            print(url)
        if self.authToken:
            resp = self.client.get(url, headers={'Authorization': f'Bearer {self.authToken}'})
        else:
            resp = self.client.get(url)
        if resp.status_code == 400:
            self.error("Invalid UUID.")
        if resp.status_code != 200:
            self.error("Something went wrong finding answers.")
        if accepted_answers and actualAnswers:
            if actualAnswers == len(accepted_answers):
                isCorrectQuiz = True
                for q_index, question in enumerate(resp.json()['questions']):
                    if len(question['choices']) != accepted_answers[q_index]:
                        isCorrectQuiz = False
                        break
                if isCorrectQuiz:
                    print("QUIZ FOUND")
                    return resp.json()
                else:
                    print("Wrong question types")
            else:
                print("Wrong num of accepted answers")
        else:
            print("Here you go:" if searchOnly else "No accepted answers")
            return resp.json()

    async def findAnswers(self, accepted_answers=None, searchOnly=None):
        if self.quizID:
            url = f'https://create.kahoot.it/rest/kahoots/{self.quizID}'
            return self.parseAnswers(await self.getQuiz(url=url, accepted_answers=accepted_answers), self.DEBUG)
        elif self.quizName:
            url = 'https://create.kahoot.it/rest/kahoots/'
            params = {'query': self.quizName, 'cursor': 0, 'limit': self.maxCount, 'topics': '', 'grades': '',
                      'orderBy': 'relevance', 'searchCluster': 1, 'includeExtendedCounters': False}
            if self.DEBUG:
                print(self.authToken)
            if self.authToken:
                resp = self.client.get(url, params=params, headers={'Authorization': f'Bearer {self.authToken}'})
            else:
                resp = self.client.get(url, params=params)
            if resp.status_code != 200:
                self.error("Something went wrong searching quizzes.")
            quizzes = resp.json()['entities']
            print(f'{len(quizzes)} matching quizzes found')
            quiz = None
            for q in quizzes:
                if searchOnly:
                    if not re.match(r'y(es)?', input(f"Check '{q['card']['title']}'? [y/N] ").lower()):
                        continue
                else:
                    print(f"Checking {q['card']['title']}...", end=" ")
                url = f'https://create.kahoot.it/rest/kahoots/{q["card"]["uuid"]}'
                quiz = await self.getQuiz(url=url, accepted_answers=accepted_answers,
                                          actualAnswers=q['card']['number_of_questions'], searchOnly=searchOnly)
                if searchOnly:
                    self.parseAnswers(quiz, self.DEBUG)
                elif quiz:
                    return self.parseAnswers(quiz, self.DEBUG)
            if not quiz:
                self.error("No quiz found. (private?)")

    @staticmethod
    def parseAnswers(quiz, debug=None):
        answers = []
        if debug:
            print(quiz)
        for question in quiz['questions']:
            foundAnswer = False
            if question['type'] not in allowedTypes:
                answers.append({'question': 'NOT A QUESTION'})
                continue
            for i, choice in enumerate(question['choices']):
                if choice['correct'] and not foundAnswer:
                    foundAnswer = True
                    answers.append({'question': question['question'], 'index': i, 'answer': choice['answer']})
        Kahoot.printAnswers(quiz, answers)
        return answers

    @staticmethod
    def printAnswers(quiz, answers):
        # print("If the questions are randomized, go to " + url + "to get the answers yourself.")
        print(f"Title: {quiz['title']}")
        print(f"Creator: {quiz['creator_username']}")
        print(f"Desc: {quiz['description']}")
        for q in answers:
            print(f"{q['question']}\n\t{q['answer']}")

    @staticmethod
    def _remove_emojis(text):
        # https://stackoverflow.com/questions/33404752/removing-emojis-from-a-string-in-python/33417311
        emoji_pattern = re.compile("["
                                   u"\U0001F600-\U0001F64F"  # emoticons
                                   u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                                   u"\U0001F680-\U0001F6FF"  # transport & map symbols
                                   u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
                                   "]+", flags=re.UNICODE)
        return emoji_pattern.sub(r'', text)

    @staticmethod
    def _similar(a, b):
        return SequenceMatcher(None, a, b).ratio()

    def checkPin(self):
        assert type(self.pin) == str
        currentTime = int(time.time())
        url = f"https://play.kahoot.it/reserve/session/{self.pin}/?{currentTime}"
        resp = self.client.get(url)
        if resp.status_code != 200:
            self.error(f"Pin {self.pin} does not exist.")
        self.sessionToken = resp.headers['x-kahoot-session-token']
        self.sessionID = self.solveChallenge(resp.json()["challenge"])

    def solveChallenge(self, text):
        # Rebuilt Javascript so engine can solve it
        text = text.replace('\t', '', -1).encode('ascii', 'ignore').decode('utf-8')
        text = re.split("[{};]", text)
        replaceFunction = "return message.replace(/./g, function(char, position) {"
        rebuilt = [text[1] + "{", text[2] + ";", replaceFunction, text[7] + ";})};", text[0]]

        jsEngine = py_mini_racer.MiniRacer()
        solution = jsEngine.eval("".join(rebuilt))
        return self._shiftBits(solution)

    def _shiftBits(self, solution):
        decodedToken = base64.b64decode(self.sessionToken).decode('utf-8', 'strict')
        solChars = [ord(s) for s in solution]
        sessChars = [ord(s) for s in decodedToken]
        return "".join([chr(sessChars[i] ^ solChars[i % len(solChars)]) for i in range(len(sessChars))])

    def loadCodes(self):
        self.lookup = {
            1: "GET_READY",
            2: "START_QUESTION",
            3: "GAME_OVER",
            4: "TIME_UP",
            5: "PLAY_AGAIN",
            6: "ANSWER_SELECTED",
            7: "ANSWER_RESPONSE",
            8: "REVEAL_ANSWER",
            9: "START_QUIZ",
            10: "RESET_CONTROLLER",
            11: "SUBMIT_FEEDBACK",
            12: "FEEDBACK",
            13: "REVEAL_RANKING",
            14: "USERNAME_ACCEPTED",
            15: "USERNAME_REJECTED",
            16: "REQUEST_RECOVERY_DATA_FROM_PLAYER",
            17: "SEND_RECOVERY_DATA_TO_CONTROLLER",
            18: "JOIN_TEAM_MEMBERS",
            19: "JOIN_TEAM_MEMBERS_RESPONSE",
            20: "START_TEAM_TALK",
            21: "SKIP_TEAM_TALK",
            31: "IFRAME_CONTROLLER_EVENT",
            32: "SERVER_IFRAME_EVENT",
            40: "STORY_BLOCK_GET_READY",
            41: "REACTION_SELECTED",
            42: "REACTION_RESPONSE",
            43: "GAME_BLOCK_START",
            44: "GAME_BLOCK_END",
            45: "GAME_BLOCK_ANSWER",
            50: "SUBMIT_TWO_FACTOR",
            51: "TWO_FACTOR_AUTH_INCORRECT",
            52: "TWO_FACTOR_AUTH_CORRECT",
            53: "RESET_TWO_FACTOR_AUTH"
        }


class KahootError(Exception):
    pass

Daniel_J
  • 1
  • 3

0 Answers0