Files
Time-Series-Analysis/models/time_series.py
2025-08-01 04:33:03 -04:00

176 lines
8.0 KiB
Python
Executable File

import pandas as pd
from statsmodels.tsa.seasonal import seasonal_decompose
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import pmdarima as pm
from prophet import Prophet
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.io as pio
import numpy as np
from sklearn.metrics import mean_absolute_error, mean_squared_error
from utils.file_handling import save_processed_file
from .plotting import create_acf_pacf_plots
def process_time_series(filepath, do_decomposition, do_forecasting, do_acf_pacf, train_percent, forecast_periods,
model_type):
try:
# Read file
if filepath.endswith('.csv'):
df = pd.read_csv(filepath)
else:
df = pd.read_excel(filepath)
# Ensure datetime column exists
date_col = df.columns[0] # Assume first column is date
value_col = df.columns[1] # Assume second column is value
df[date_col] = pd.to_datetime(df[date_col])
df.set_index(date_col, inplace=True)
# Initialize variables
plot_html = None
forecast_html = None
acf_pacf_html = None
summary = df[value_col].describe().to_dict()
model_params = None
train_size = None
test_size = None
metrics = None
# Save processed data
processed_df = df.copy()
# Time series decomposition
if do_decomposition:
decomposition = seasonal_decompose(df[value_col], model='additive', period=12)
fig = make_subplots(rows=4, cols=1,
subplot_titles=('Original Series', 'Trend', 'Seasonality', 'Residuals'))
fig.add_trace(go.Scatter(x=df.index, y=df[value_col], name='Original'), row=1, col=1)
fig.add_trace(go.Scatter(x=df.index, y=decomposition.trend, name='Trend'), row=2, col=1)
fig.add_trace(go.Scatter(x=df.index, y=decomposition.seasonal, name='Seasonality'), row=3, col=1)
fig.add_trace(go.Scatter(x=df.index, y=decomposition.resid, name='Residuals'), row=4, col=1)
fig.update_layout(height=800, showlegend=True)
plot_html = pio.to_html(fig, full_html=False)
processed_df['Trend'] = decomposition.trend
processed_df['Seasonality'] = decomposition.seasonal
processed_df['Residuals'] = decomposition.resid
# Forecasting
if do_forecasting:
# Split data into train and test
train_size = int(len(df) * train_percent)
test_size = len(df) - train_size
train_data = df[value_col].iloc[:train_size]
test_data = df[value_col].iloc[train_size:] if test_size > 0 else pd.Series()
forecast_dates = pd.date_range(start=df.index[-1], periods=forecast_periods + 1,
freq=df.index.inferred_freq)[1:]
# Initialize forecast and model parameters
forecast = None
if model_type == 'ARIMA':
# Auto ARIMA for best parameters
model = pm.auto_arima(train_data,
seasonal=True,
m=12,
start_p=0, start_q=0,
max_p=3, max_q=3,
start_P=0, start_Q=0,
max_P=2, max_Q=2,
d=1, D=1,
trace=False,
error_action='ignore',
suppress_warnings=True,
stepwise=True)
# Fit ARIMA with best parameters
model_fit = model.fit(train_data)
forecast = model_fit.predict(n_periods=forecast_periods)
model_params = f"{model.order}, Seasonal{model.seasonal_order}"
# Calculate metrics on test data if available
if test_size > 0:
test_predictions = model_fit.predict(n_periods=test_size)
mae = mean_absolute_error(test_data, test_predictions)
mse = mean_squared_error(test_data, test_predictions)
rmse = np.sqrt(mse)
metrics = {'MAE': mae, 'MSE': mse, 'RMSE': rmse}
elif model_type == 'Exponential Smoothing':
# Fit Exponential Smoothing model
model = ExponentialSmoothing(train_data,
trend='add',
seasonal='add',
seasonal_periods=12)
model_fit = model.fit()
forecast = model_fit.forecast(forecast_periods)
model_params = "Additive Trend, Additive Seasonal"
# Calculate metrics on test data if available
if test_size > 0:
test_predictions = model_fit.forecast(test_size)
mae = mean_absolute_error(test_data, test_predictions)
mse = mean_squared_error(test_data, test_predictions)
rmse = np.sqrt(mse)
metrics = {'MAE': mae, 'MSE': mse, 'RMSE': rmse}
elif model_type == 'Prophet':
# Prepare data for Prophet
prophet_df = train_data.reset_index().rename(columns={date_col: 'ds', value_col: 'y'})
model = Prophet(yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False)
model.add_seasonality(name='monthly', period=30.5, fourier_order=5)
model_fit = model.fit(prophet_df)
# Create future dataframe
future = model.make_future_dataframe(periods=forecast_periods, freq=df.index.inferred_freq)
forecast_full = model_fit.predict(future)
forecast = forecast_full['yhat'].iloc[-forecast_periods:].values
model_params = "Prophet"
# Calculate metrics on test data if available
if test_size > 0:
test_future = model.make_future_dataframe(periods=test_size, freq=df.index.inferred_freq)
test_predictions = model.predict(test_future)['yhat'].iloc[-test_size:].values
mae = mean_absolute_error(test_data, test_predictions)
mse = mean_squared_error(test_data, test_predictions)
rmse = np.sqrt(mse)
metrics = {'MAE': mae, 'MSE': mse, 'RMSE': rmse}
# Forecast plot
forecast_fig = go.Figure()
forecast_fig.add_trace(go.Scatter(x=df.index, y=df[value_col], name='Historical'))
if test_size > 0:
forecast_fig.add_trace(
go.Scatter(x=df.index[train_size:], y=test_data, name='Test Data', line=dict(color='green')))
forecast_fig.add_trace(
go.Scatter(x=forecast_dates, y=forecast, name=f'Forecast ({model_type})', line=dict(dash='dash')))
forecast_fig.update_layout(title=f'Forecast ({model_type})', height=400)
forecast_html = pio.to_html(forecast_fig, full_html=False)
# ACF/PACF plots
if do_acf_pacf:
acf_pacf_html = create_acf_pacf_plots(df[value_col])
# Save processed data
filename = save_processed_file(processed_df, filepath)
return {
'plot_html': plot_html,
'forecast_html': forecast_html,
'acf_pacf_html': acf_pacf_html,
'summary': summary,
'filename': filename,
'model_params': model_params,
'train_size': train_size,
'test_size': test_size,
'metrics': metrics,
'forecast_dates': forecast_dates.tolist() if do_forecasting else [],
'forecast_values': forecast.tolist() if do_forecasting else [],
'model_type': model_type
}
except Exception as e:
return {'error': str(e)}