Spaces:
Runtime error
Runtime error
| %%writefile app.py | |
| from IPython.display import Javascript | |
| from IPython import display | |
| from google.colab import output | |
| from base64 import b64decode | |
| import datetime | |
| import whisper | |
| import openai | |
| import os | |
| import base64 | |
| from Crypto.Cipher import AES | |
| from streamlit_bokeh_events import streamlit_bokeh_events | |
| import streamlit as st | |
| from bokeh.models.widgets import Button | |
| from bokeh.models.widgets.buttons import Button | |
| from bokeh.models import CustomJS | |
| from streamlit_bokeh_events import streamlit_bokeh_events | |
| RECORD = """ | |
| const sleep = time => new Promise(resolve => setTimeout(resolve, time)) | |
| const b2text = blob => new Promise(resolve => { | |
| const reader = new FileReader() | |
| reader.onloadend = e => resolve(e.srcElement.result) | |
| reader.readAsDataURL(blob) | |
| }) | |
| var record = time => new Promise(async resolve => { | |
| stream = await navigator.mediaDevices.getUserMedia({ audio: true }) | |
| recorder = new MediaRecorder(stream) | |
| chunks = [] | |
| recorder.ondataavailable = e => chunks.push(e.data) | |
| recorder.start() | |
| await sleep(time) | |
| recorder.onstop = async ()=>{ | |
| blob = new Blob(chunks) | |
| text = await b2text(blob) | |
| resolve(text) | |
| } | |
| recorder.stop() | |
| }) | |
| """ | |
| openai.api_key = os.environ["API_KEY"] | |
| with open("encrypt.txt", "r") as encfile: | |
| encoder_txt = encfile.read() | |
| with open("decrypt.txt", "r") as decfile: | |
| decoder_txt = decfile.read() | |
| def openai_fun(myprompt): | |
| response_encoded = openai.Completion.create( | |
| engine="text-davinci-003", | |
| prompt = myprompt, | |
| max_tokens=1024, | |
| n=1, | |
| stop=None, | |
| temperature=0.5, | |
| ) | |
| return response_encoded | |
| def record(sec=5): | |
| display.display(Javascript(RECORD)) | |
| s = output.eval_js('record(%d)' % (sec*1000)) | |
| b = b64decode(s.split(',')[1]) | |
| ts = datetime.datetime.now() | |
| filename = ts.strftime("%Y_%m_%d_%H_%M_%S") | |
| with open(f'{filename}.wav','wb') as f: | |
| f.write(b) | |
| return f'{filename}.wav' # or webm ? | |
| model = whisper.load_model("base") | |
| transcribed = [] | |
| while True: | |
| user_choice = st.text_input("Do you want to record a new audio for transcription?[y/n]") | |
| if user_choice == 'y': | |
| st.write('Recording! (5 seconds)') | |
| record(5) | |
| folder_path = "/content" | |
| audio_files = [f for f in os.listdir(folder_path) if f.endswith(".wav")] | |
| audio_files.sort(key=lambda x: os.path.getmtime(os.path.join(folder_path, x)), reverse=True) | |
| last_audio_file_path = os.path.join(folder_path, audio_files[0]) | |
| st.write('Transcribing audio file: ',last_audio_file_path) | |
| # COMMENT IF NOT NEEDED: | |
| if os.path.exists(last_audio_file_path) and not last_audio_file_path in transcribed: | |
| audio = whisper.load_audio(last_audio_file_path) | |
| audio = whisper.pad_or_trim(audio) | |
| mel = whisper.log_mel_spectrogram(audio).to(model.device) | |
| options = whisper.DecodingOptions(language= 'en', fp16=False) | |
| result = whisper.decode(model, mel, options) | |
| if result.no_speech_prob < 0.5: | |
| mymsg = result.text | |
| st.write("Actual Message: ",mymsg) | |
| enc_prompt = encoder_txt + mymsg | |
| openai_fun(enc_prompt) | |
| if openai_fun(enc_prompt)['choices'][0]['text'] != "": | |
| # print(response_encoded['choices'][0]['text']) | |
| exec(openai_fun(enc_prompt)['choices'][0]['text']) | |
| encoded_msg = enc(mymsg) | |
| print("The encoded message: ", encoded_msg) | |
| decode_ = st.text_input("Do you wish to decode the message?[y/n]") | |
| if decode_ == "y": | |
| dec_prompt = decoder_txt + str(encoded_msg) | |
| response_decoded = openai.Completion.create( | |
| engine="text-davinci-003", | |
| prompt = dec_prompt, | |
| max_tokens=500, | |
| n=1, | |
| stop=None, | |
| temperature=0.5, | |
| ) | |
| if response_decoded['choices'][0]['text'] != "": | |
| print(response_decoded['choices'][0]['text']) | |
| exec(response_decoded['choices'][0]['text']) | |
| decoded_msg = dec(encoded_msg, key) | |
| print("The decoded message: ", decoded_msg) | |
| else: | |
| st.write('Retry! The message could') | |
| break # exit the loop | |
| elif user_choice == 'n': | |
| uc1 = input('Do you want to transcribe an existing audio?[y/n]') | |
| if uc1 == 'y': | |
| folder_path = "/content" | |
| audio_files = [f for f in os.listdir(folder_path) if f.endswith(".wav")] | |
| print('Audio files present: ',audio_files) | |
| audio_files.sort(key=lambda x: os.path.getmtime(os.path.join(folder_path, x)), reverse=True) | |
| last_audio_file_path = os.path.join(folder_path, audio_files[0]) | |
| print('Transcribing last audio file: ',last_audio_file_path) | |
| # COMMENT IF NOT NEEDED: | |
| if os.path.exists(last_audio_file_path) and not last_audio_file_path in transcribed: | |
| audio = whisper.load_audio(last_audio_file_path) | |
| audio = whisper.pad_or_trim(audio) | |
| mel = whisper.log_mel_spectrogram(audio).to(model.device) | |
| options = whisper.DecodingOptions(language= 'en', fp16=False) | |
| result = whisper.decode(model, mel, options) | |
| if result.no_speech_prob < 0.5: | |
| mymsg = result.text | |
| print("Actual Message: ",mymsg) | |
| enc_prompt = encoder_txt + result.text | |
| response_encoded = openai.Completion.create( | |
| engine="text-davinci-003", | |
| prompt = enc_prompt, | |
| max_tokens=1024, | |
| n=1, | |
| stop=None, | |
| temperature=0.5, | |
| ) | |
| if response_encoded['choices'][0]['text'] != "": | |
| # print(response_encoded['choices'][0]['text']) | |
| exec(response_encoded['choices'][0]['text']) | |
| encoded_msg = enc(mymsg) | |
| st.write("The encoded message: ", encoded_msg) | |
| else: | |
| st.write('Retry! The message could') | |
| # DELETE audio | |
| break # exit the loop | |
| elif uc1 == 'n': | |
| continue # continue the loop, prompting for input again | |
| else: | |
| st.write('Invalid input, please enter y or n') | |
| continue # continue the loop, prompting for input again | |
| else: | |
| st.write('Invalid input, please enter y or n') | |
| continue # continue the loop, prompting for input again | |