Files
Time-Series-Analysis/app.py
2025-07-30 16:12:03 +03:00

285 lines
11 KiB
Python

from flask import Flask, request, render_template, send_file, session
import pandas as pd
import io
import os
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.graphics.tsaplots import plot_acf, plot_pacf
import pmdarima as pm
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
from werkzeug.utils import secure_filename
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend
import matplotlib.pyplot as plt
import io
import base64
import numpy as np
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = 'Uploads'
app.config['ALLOWED_EXTENSIONS'] = {'csv', 'xls', 'xlsx'}
app.secret_key = 'your-secret-key' # Required for session management
# Ensure upload folder exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in app.config['ALLOWED_EXTENSIONS']
def create_acf_pacf_plots(data):
# Create ACF and PACF plots using matplotlib
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
plot_acf(data, ax=ax1, lags=40)
ax1.set_title('Autocorrelation Function')
plot_pacf(data, ax=ax2, lags=40)
ax2.set_title('Partial Autocorrelation Function')
# Convert matplotlib plot to Plotly
buf = io.BytesIO()
plt.savefig(buf, format='png')
plt.close(fig)
buf.seek(0)
img_str = base64.b64encode(buf.getvalue()).decode('utf-8')
# Create Plotly figure with image
fig_plotly = go.Figure()
fig_plotly.add_layout_image(
dict(
source=f'data:image/png;base64,{img_str}',
x=0,
y=1,
xref="paper",
yref="paper",
sizex=1,
sizey=1,
sizing="stretch",
opacity=1,
layer="below"
)
)
fig_plotly.update_layout(
height=600,
showlegend=False,
xaxis=dict(visible=False),
yaxis=dict(visible=False)
)
return pio.to_html(fig_plotly, full_html=False)
def process_time_series(filepath, do_decomposition, do_forecasting, do_acf_pacf, train_percent, forecast_periods):
try:
# Read file
if filepath.endswith('.csv'):
df = pd.read_csv(filepath)
else:
df = pd.read_excel(filepath)
# Ensure datetime column exists
date_col = df.columns[0] # Assume first column is date
value_col = df.columns[1] # Assume second column is value
df[date_col] = pd.to_datetime(df[date_col])
df.set_index(date_col, inplace=True)
# Initialize variables
plot_html = None
forecast_html = None
acf_pacf_html = None
summary = df[value_col].describe().to_dict()
arima_params = None
seasonal_params = None
train_size = None
test_size = None
# Save processed data
processed_df = df.copy()
# Time series decomposition
if do_decomposition:
decomposition = seasonal_decompose(df[value_col], model='additive', period=12)
fig = make_subplots(rows=4, cols=1,
subplot_titles=('Original Series', 'Trend', 'Seasonality', 'Residuals'))
fig.add_trace(go.Scatter(x=df.index, y=df[value_col], name='Original'), row=1, col=1)
fig.add_trace(go.Scatter(x=df.index, y=decomposition.trend, name='Trend'), row=2, col=1)
fig.add_trace(go.Scatter(x=df.index, y=decomposition.seasonal, name='Seasonality'), row=3, col=1)
fig.add_trace(go.Scatter(x=df.index, y=decomposition.resid, name='Residuals'), row=4, col=1)
fig.update_layout(height=800, showlegend=True)
plot_html = pio.to_html(fig, full_html=False)
processed_df['Trend'] = decomposition.trend
processed_df['Seasonality'] = decomposition.seasonal
processed_df['Residuals'] = decomposition.resid
# Forecasting
if do_forecasting:
# Split data into train and test
train_size = int(len(df) * train_percent)
test_size = len(df) - train_size
train_data = df[value_col].iloc[:train_size]
test_data = df[value_col].iloc[train_size:] if test_size > 0 else pd.Series()
# Auto ARIMA for best parameters
model = pm.auto_arima(train_data,
seasonal=True,
m=12,
start_p=0, start_q=0,
max_p=3, max_q=3,
start_P=0, start_Q=0,
max_P=2, max_Q=2,
d=1, D=1,
trace=False,
error_action='ignore',
suppress_warnings=True,
stepwise=True)
# Fit ARIMA with best parameters
model_fit = model.fit(train_data)
forecast = model_fit.predict(n_periods=forecast_periods)
# Get ARIMA parameters
arima_params = model.order
seasonal_params = model.seasonal_order
# Forecast plot
forecast_dates = pd.date_range(start=df.index[-1], periods=forecast_periods + 1,
freq=df.index.inferred_freq)[1:]
forecast_fig = go.Figure()
forecast_fig.add_trace(go.Scatter(x=df.index, y=df[value_col], name='Historical'))
if test_size > 0:
forecast_fig.add_trace(
go.Scatter(x=df.index[train_size:], y=test_data, name='Test Data', line=dict(color='green')))
forecast_fig.add_trace(go.Scatter(x=forecast_dates, y=forecast, name='Forecast', line=dict(dash='dash')))
forecast_fig.update_layout(title=f'Forecast (ARIMA{arima_params}, Seasonal{seasonal_params})', height=400)
forecast_html = pio.to_html(forecast_fig, full_html=False)
# ACF/PACF plots
if do_acf_pacf:
acf_pacf_html = create_acf_pacf_plots(df[value_col])
# Save processed data
processed_df.to_csv(os.path.join(app.config['UPLOAD_FOLDER'], 'processed_' + os.path.basename(filepath)))
return {
'plot_html': plot_html,
'forecast_html': forecast_html,
'acf_pacf_html': acf_pacf_html,
'summary': summary,
'filename': 'processed_' + os.path.basename(filepath),
'arima_params': arima_params,
'seasonal_params': seasonal_params,
'train_size': train_size,
'test_size': test_size
}
except Exception as e:
return {'error': str(e)}
@app.route('/')
def index():
return render_template('index.html')
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return render_template('index.html', error='No file part')
file = request.files['file']
if file.filename == '':
return render_template('index.html', error='No selected file')
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
file.save(filepath)
session['filepath'] = filepath # Store filepath in session
# Get user selections
do_decomposition = 'decomposition' in request.form
do_forecasting = 'forecasting' in request.form
do_acf_pacf = 'acf_pacf' in request.form
train_percent = float(request.form.get('train_percent', 80)) / 100
test_percent = float(request.form.get('test_percent', 20)) / 100
# Validate train/test percentages
if abs(train_percent + test_percent - 1.0) > 0.01: # Allow small float precision errors
return render_template('index.html', error='Train and test percentages must sum to 100%')
session['do_decomposition'] = do_decomposition
session['do_forecasting'] = do_forecasting
session['do_acf_pacf'] = do_acf_pacf
result = process_time_series(filepath, do_decomposition, do_forecasting, do_acf_pacf, train_percent,
forecast_periods=int(request.form.get('forecast_periods', 12)))
if 'error' in result:
return render_template('index.html', error=result['error'])
return render_template('results.html',
do_decomposition=do_decomposition,
do_forecasting=do_forecasting,
do_acf_pacf=do_acf_pacf,
train_percent=train_percent * 100,
test_percent=test_percent * 100,
forecast_periods=int(request.form.get('forecast_periods', 12)),
**result)
@app.route('/reforecast', methods=['POST'])
def reforecast():
filepath = session.get('filepath')
if not filepath or not os.path.exists(filepath):
return render_template('index.html', error='Session expired or file not found. Please upload the file again.')
# Get user selections from reforecast form
train_percent = float(request.form.get('train_percent', 80)) / 100
test_percent = float(request.form.get('test_percent', 20)) / 100
forecast_periods = int(request.form.get('forecast_periods', 12))
# Validate train/test percentages
if abs(train_percent + test_percent - 1.0) > 0.01: # Allow small float precision errors
return render_template('index.html', error='Train and test percentages must sum to 100%')
# Get original selections from session or defaults
do_decomposition = session.get('do_decomposition', False)
do_forecasting = True # Since this is a reforecast
do_acf_pacf = session.get('do_acf_pacf', False)
result = process_time_series(filepath, do_decomposition, do_forecasting, do_acf_pacf, train_percent,
forecast_periods)
if 'error' in result:
return render_template('index.html', error=result['error'])
# Update session with new parameters
session['train_percent'] = train_percent
session['test_percent'] = test_percent
session['forecast_periods'] = forecast_periods
return render_template('results.html',
do_decomposition=do_decomposition,
do_forecasting=do_forecasting,
do_acf_pacf=do_acf_pacf,
train_percent=train_percent * 100,
test_percent=test_percent * 100,
forecast_periods=forecast_periods,
**result)
@app.route('/download/<filename>')
def download_file(filename):
filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename)
return send_file(filepath, as_attachment=True)
if __name__ == '__main__':
app.run(debug=True)