summaryrefslogtreecommitdiffstats
path: root/gui/streamlit_chat_app.py
blob: 68011229f7651239491292dbea578c62e304f12a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
import atexit
import os
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), os.path.pardir))

import streamlit as st
from streamlit_chat import message
from query_methods import query, avail_query_methods
import pickle

conversations_file = "conversations.pkl"


def load_conversations():
    try:
        with open(conversations_file, "rb") as f:
            return pickle.load(f)
    except FileNotFoundError:
        return []
    except EOFError:
        return []


def save_conversations(conversations, current_conversation):
    updated = False
    for i, conversation in enumerate(conversations):
        if conversation == current_conversation:
            conversations[i] = current_conversation
            updated = True
            break
    if not updated:
        conversations.append(current_conversation)

    temp_conversations_file = "temp_" + conversations_file
    with open(temp_conversations_file, "wb") as f:
        pickle.dump(conversations, f)

    os.replace(temp_conversations_file, conversations_file)


def exit_handler():
    print("Exiting, saving data...")
    # Perform cleanup operations here, like saving data or closing open files.
    save_conversations(st.session_state.conversations, st.session_state.current_conversation)


# Register the exit_handler function to be called when the program is closing.
atexit.register(exit_handler)

st.header("Chat Placeholder")

if 'conversations' not in st.session_state:
    st.session_state['conversations'] = load_conversations()

if 'input_text' not in st.session_state:
    st.session_state['input_text'] = ''

if 'selected_conversation' not in st.session_state:
    st.session_state['selected_conversation'] = None

if 'input_field_key' not in st.session_state:
    st.session_state['input_field_key'] = 0

if 'query_method' not in st.session_state:
    st.session_state['query_method'] = query

# Initialize new conversation
if 'current_conversation' not in st.session_state or st.session_state['current_conversation'] is None:
    st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []}

input_placeholder = st.empty()
user_input = input_placeholder.text_input(
    'You:', key=f'input_text_{len(st.session_state["current_conversation"]["user_inputs"])}'
)
submit_button = st.button("Submit")

if user_input or submit_button:
    output = query(user_input, st.session_state['query_method'])
    escaped_output = output.encode('utf-8').decode('unicode-escape')

    st.session_state.current_conversation['user_inputs'].append(user_input)
    st.session_state.current_conversation['generated_responses'].append(escaped_output)
    save_conversations(st.session_state.conversations, st.session_state.current_conversation)
    user_input = input_placeholder.text_input(
        'You:', value='', key=f'input_text_{len(st.session_state["current_conversation"]["user_inputs"])}'
    )  # Clear the input field

# Add a button to create a new conversation
if st.sidebar.button("New Conversation"):
    st.session_state['selected_conversation'] = None
    st.session_state['current_conversation'] = {'user_inputs': [], 'generated_responses': []}
    st.session_state['input_field_key'] += 1

st.session_state['query_method'] = st.sidebar.selectbox("Select API:", options=avail_query_methods, index=0)

# Sidebar
st.sidebar.header("Conversation History")

for i, conversation in enumerate(st.session_state.conversations):
    if st.sidebar.button(f"Conversation {i + 1}: {conversation['user_inputs'][0]}", key=f"sidebar_btn_{i}"):
        st.session_state['selected_conversation'] = i
        st.session_state['current_conversation'] = st.session_state.conversations[i]

if st.session_state['selected_conversation'] is not None:
    conversation_to_display = st.session_state.conversations[st.session_state['selected_conversation']]
else:
    conversation_to_display = st.session_state.current_conversation

if conversation_to_display['generated_responses']:
    for i in range(len(conversation_to_display['generated_responses']) - 1, -1, -1):
        message(conversation_to_display["generated_responses"][i], key=f"display_generated_{i}")
        message(conversation_to_display['user_inputs'][i], is_user=True, key=f"display_user_{i}")