import atexit
import Levenshtein
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))
import streamlit as st
from streamlit_chat import message
from query_methods import query, avail_query_methods
import pickle
conversations_file = "conversations.pkl"
def load_conversations():
try:
with open(conversations_file, "rb") as f:
return pickle.load(f)
except FileNotFoundError:
return []
except EOFError:
return []
def save_conversations(conversations, current_conversation):
updated = False
for idx, conversation in enumerate(conversations):
if conversation == current_conversation:
conversations[idx] = current_conversation
updated = True
break
if not updated:
conversations.append(current_conversation)
temp_conversations_file = "temp_" + conversations_file
with open(temp_conversations_file, "wb") as f:
pickle.dump(conversations, f)
os.replace(temp_conversations_file, conversations_file)
def delete_conversation(conversations, current_conversation):
for idx, conversation in enumerate(conversations):
conversations[idx] = current_conversation
break
conversations.remove(current_conversation)
temp_conversations_file = "temp_" + conversations_file
with open(temp_conversations_file, "wb") as f:
pickle.dump(conversations, f)
os.replace(temp_conversations_file, conversations_file)
def exit_handler():
print("Exiting, saving data...")
# Perform cleanup operations here, like saving data or closing open files.
save_conversations(st.session_state.conversations, st.session_state.current_conversation)
# Register the exit_handler function to be called when the program is closing.
atexit.register(exit_handler)
st.header("Chat Placeholder")
if 'conversations' not in st.session_state:
st.session_state['conversations'] = load_conversations()
if 'input_text' not in st.session_state:
st.session_state['input_text'] = ''
if 'selected_conversation' not in st.session_state:
st.session_state['selected_conversation'] = None
if 'input_field_key' not in st.session_state:
st.session_state['input_field_key'] = 0
if 'query_method' not in st.session_state:
st.session_state['query_method'] = query
if 'search_query' not in st.session_state:
st.session_state['search_query'] = ''
# Initialize new conversation
if 'current_conversation' not in st.session_state or st.session_state['current_conversation'] is None:
st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []}
input_placeholder = st.empty()
user_input = input_placeholder.text_input(
'You:', value=st.session_state['input_text'], key=f'input_text_-1'#{st.session_state["input_field_key"]}
)
submit_button = st.button("Submit")
if (user_input and user_input != st.session_state['input_text']) or submit_button:
output = query(user_input, st.session_state['query_method'])
escaped_output = output.encode('utf-8').decode('unicode-escape')
st.session_state['current_conversation']['user_inputs'].append(user_input)
st.session_state.current_conversation['generated_responses'].append(escaped_output)
save_conversations(st.session_state.conversations, st.session_state.current_conversation)
st.session_state['input_text'] = ''
st.session_state['input_field_key'] += 1 # Increment key value for new widget
user_input = input_placeholder.text_input(
'You:', value=st.session_state['input_text'], key=f'input_text_{st.session_state["input_field_key"]}'
) # Clear the input field
# Add a button to create a new conversation
if st.sidebar.button("New Conversation"):
st.session_state['selected_conversation'] = None
st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []}
st.session_state['input_field_key'] += 1 # Increment key value for new widget
st.session_state['query_method'] = st.sidebar.selectbox("Select API:", options=avail_query_methods, index=0)
# Proxy
st.session_state['proxy'] = st.sidebar.text_input("Proxy: ")
# Searchbar
search_query = st.sidebar.text_input("Search Conversations:", value=st.session_state.get('search_query', ''), key='search')
if search_query:
filtered_conversations = []
indices = []
for idx, conversation in enumerate(st.session_state.conversations):
if search_query in conversation['user_inputs'][0]:
filtered_conversations.append(conversation)
indices.append(idx)
filtered_conversations = list(zip(indices, filtered_conversations))
conversations = sorted(filtered_conversations, key=lambda x: Levenshtein.distance(search_query, x[1]['user_inputs'][0]))
sidebar_header = f"Search Results ({len(conversations)})"
else:
conversations = st.session_state.conversations
sidebar_header = "Conversation History"
# Sidebar
st.sidebar.header(sidebar_header)
sidebar_col1, sidebar_col2 = st.sidebar.columns([5,1])
for idx, conversation in enumerate(conversations):
if sidebar_col1.button(f"Conversation {idx + 1}: {conversation['user_inputs'][0]}", key=f"sidebar_btn_{idx}"):
st.session_state['selected_conversation'] = idx
st.session_state['current_conversation'] = conversation
if sidebar_col2.button('🗑️', key=f"sidebar_btn_delete_{idx}"):
if st.session_state['selected_conversation'] == idx:
st.session_state['selected_conversation'] = None
st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []}
delete_conversation(conversations, conversation)
st.experimental_rerun()
if st.session_state['selected_conversation'] is not None:
conversation_to_display = conversations[st.session_state['selected_conversation']]
else:
conversation_to_display = st.session_state.current_conversation
if conversation_to_display['generated_responses']:
for i in range(len(conversation_to_display['generated_responses']) - 1, -1, -1):
message(conversation_to_display["generated_responses"][i], key=f"display_generated_{i}")
message(conversation_to_display['user_inputs'][i], is_user=True, key=f"display_user_{i}")