| | import os |
| | import numpy as np |
| | import pandas as pd |
| | import torch |
| | import torch.nn as nn |
| | from torch.utils.data import Dataset, DataLoader |
| | from sklearn.preprocessing import MinMaxScaler |
| | from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score |
| | import matplotlib.pyplot as plt |
| | from itertools import product |
| | import json |
| |
|
| | |
| | |
| | |
| | class StockDataset(Dataset): |
| | """Custom Dataset for stock price time-series forecasting.""" |
| | def __init__(self, series, seq_length): |
| | self.series = series |
| | self.seq_length = seq_length |
| |
|
| | def __len__(self): |
| | return len(self.series) - self.seq_length |
| |
|
| | def __getitem__(self, idx): |
| | x = self.series[idx:idx + self.seq_length] |
| | y = self.series[idx + self.seq_length] |
| | x = np.expand_dims(x, axis=0) |
| | return torch.tensor(x, dtype=torch.float32), torch.tensor(y, dtype=torch.float32) |
| |
|
| | |
| | |
| | |
| | class TemporalBlock(nn.Module): |
| | """Temporal Convolutional Network block with causal dilated convolutions.""" |
| | def __init__(self, in_channels, out_channels, kernel_size, stride, dilation, dropout=0.2): |
| | super().__init__() |
| | padding = (kernel_size - 1) * dilation |
| | self.conv1 = nn.Conv1d(in_channels, out_channels, kernel_size, |
| | stride=stride, padding=padding, dilation=dilation) |
| | self.relu1 = nn.ReLU() |
| | self.dropout1 = nn.Dropout(dropout) |
| | self.conv2 = nn.Conv1d(out_channels, out_channels, kernel_size, |
| | stride=stride, padding=padding, dilation=dilation) |
| | self.relu2 = nn.ReLU() |
| | self.dropout2 = nn.Dropout(dropout) |
| | self.downsample = nn.Conv1d(in_channels, out_channels, 1) if in_channels != out_channels else None |
| | self.relu = nn.ReLU() |
| |
|
| | def forward(self, x): |
| | out = self.conv1(x) |
| | out = out[:, :, :x.size(2)] |
| | out = self.relu1(out) |
| | out = self.dropout1(out) |
| | out = self.conv2(out) |
| | out = out[:, :, :x.size(2)] |
| | out = self.relu2(out) |
| | out = self.dropout2(out) |
| | res = x if self.downsample is None else self.downsample(x) |
| | return self.relu(out + res) |
| |
|
| | class TCN(nn.Module): |
| | """Temporal Convolutional Network for time-series forecasting.""" |
| | def __init__(self, input_size, output_size, num_channels, kernel_size=3, dropout=0.2): |
| | super().__init__() |
| | layers = [] |
| | num_levels = len(num_channels) |
| | for i in range(num_levels): |
| | dilation_size = 2 ** i |
| | in_channels = input_size if i == 0 else num_channels[i - 1] |
| | out_channels = num_channels[i] |
| | layers.append( |
| | TemporalBlock(in_channels, out_channels, kernel_size, |
| | stride=1, dilation=dilation_size, dropout=dropout) |
| | ) |
| | self.network = nn.Sequential(*layers) |
| | self.linear = nn.Linear(num_channels[-1], output_size) |
| |
|
| | def forward(self, x): |
| | out = self.network(x) |
| | out = out[:, :, -1] |
| | return self.linear(out) |
| |
|
| | |
| | |
| | |
| | class StockPriceForecaster: |
| | """Stock price forecasting with TCN model.""" |
| | def __init__(self, dataset_path, seq_length=30, batch_size=32, lr=0.001, epochs=20, |
| | kernel_size=3, num_channels=[32, 64, 64], dropout=0.2, test_split=0.2): |
| | self.dataset_path = dataset_path |
| | self.seq_length = seq_length |
| | self.batch_size = batch_size |
| | self.lr = lr |
| | self.epochs = epochs |
| | self.kernel_size = kernel_size |
| | self.num_channels = num_channels |
| | self.dropout = dropout |
| | self.test_split = test_split |
| | self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | self.scaler = MinMaxScaler() |
| |
|
| | def load_data(self): |
| | """Load and preprocess stock price data.""" |
| | if not os.path.exists(self.dataset_path): |
| | raise FileNotFoundError(f"Dataset file not found at: {self.dataset_path}") |
| | df = pd.read_csv(self.dataset_path) |
| | if "Close" not in df.columns: |
| | raise ValueError("CSV file must contain a 'Close' column") |
| | prices = df["Close"].values.reshape(-1, 1) |
| | prices_scaled = self.scaler.fit_transform(prices).flatten() |
| | dataset = StockDataset(prices_scaled, self.seq_length) |
| | train_size = int(len(dataset) * (1 - self.test_split)) |
| | test_size = len(dataset) - train_size |
| | train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size]) |
| | train_loader = DataLoader(train_dataset, batch_size=self.batch_size, shuffle=True) |
| | test_loader = DataLoader(test_dataset, batch_size=self.batch_size, shuffle=False) |
| | return train_loader, test_loader |
| |
|
| | def train(self, model, train_loader): |
| | """Train the TCN model.""" |
| | criterion = nn.MSELoss() |
| | optimizer = torch.optim.Adam(model.parameters(), lr=self.lr) |
| | model.train() |
| | for epoch in range(self.epochs): |
| | epoch_loss = 0 |
| | for x, y in train_loader: |
| | x, y = x.to(self.device), y.to(self.device) |
| | optimizer.zero_grad() |
| | output = model(x) |
| | loss = criterion(output.squeeze(), y) |
| | loss.backward() |
| | optimizer.step() |
| | epoch_loss += loss.item() |
| | print(f"Epoch [{epoch+1}/{self.epochs}], Loss: {epoch_loss/len(train_loader):.6f}") |
| | return model |
| |
|
| | def evaluate(self, model, test_loader): |
| | """Evaluate the model on the test set.""" |
| | model.eval() |
| | predictions, actuals = [], [] |
| | with torch.no_grad(): |
| | for x, y in test_loader: |
| | x, y = x.to(self.device), y.to(self.device) |
| | output = model(x) |
| | predictions.extend(output.squeeze().cpu().numpy()) |
| | actuals.extend(y.cpu().numpy()) |
| | predictions = self.scaler.inverse_transform(np.array(predictions).reshape(-1, 1)).flatten() |
| | actuals = self.scaler.inverse_transform(np.array(actuals).reshape(-1, 1)).flatten() |
| | mae = mean_absolute_error(actuals, predictions) |
| | rmse = mean_squared_error(actuals, predictions, squared=False) |
| | mape = np.mean(np.abs((actuals - predictions) / (actuals + 1e-10))) * 100 |
| | r2 = r2_score(actuals, predictions) |
| | return mae, rmse, mape, r2, actuals, predictions |
| |
|
| | def run(self): |
| | """Run training and evaluation.""" |
| | train_loader, test_loader = self.load_data() |
| | model = TCN(input_size=1, output_size=1, |
| | num_channels=self.num_channels, |
| | kernel_size=self.kernel_size, |
| | dropout=self.dropout).to(self.device) |
| | trained_model = self.train(model, train_loader) |
| | return trained_model, self.evaluate(model, test_loader) |
| |
|
| | |
| | |
| | |
| | def save_model_for_huggingface(model, scaler, config, save_dir="tcn_stock_model"): |
| | """Save the model and necessary components for Hugging Face deployment.""" |
| | os.makedirs(save_dir, exist_ok=True) |
| | |
| | |
| | torch.save(model.state_dict(), os.path.join(save_dir, "pytorch_model.bin")) |
| | |
| | |
| | with open(os.path.join(save_dir, "config.json"), "w") as f: |
| | json.dump({ |
| | "input_size": 1, |
| | "output_size": 1, |
| | "num_channels": config["num_channels"], |
| | "kernel_size": config["kernel_size"], |
| | "dropout": config["dropout"], |
| | "seq_length": config["seq_length"] |
| | }, f, indent=4) |
| | |
| | |
| | import pickle |
| | with open(os.path.join(save_dir, "scaler.pkl"), "wb") as f: |
| | pickle.dump(scaler, f) |
| | |
| | print(f"Model saved to {save_dir}") |
| |
|
| | |
| | |
| | |
| | if __name__ == "__main__": |
| | dataset_path = "/work/GOOGL.csv" |
| |
|
| | |
| | seq_lengths = [20, 50] |
| | batch_sizes = [16, 32] |
| | learning_rates = [0.001, 0.0005] |
| | kernel_sizes = [3, 5] |
| | num_channels_list = [[32, 64, 128], [64, 128, 256]] |
| | dropouts = [0.1, 0.2] |
| |
|
| | results = [] |
| | best_result = None |
| | best_metrics = float('inf') |
| | best_model = None |
| | best_config = None |
| |
|
| | |
| | for seq, batch, lr, kernel, channels, dropout in product( |
| | seq_lengths, batch_sizes, learning_rates, kernel_sizes, num_channels_list, dropouts |
| | ): |
| | print(f"\nRunning: seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}") |
| | try: |
| | forecaster = StockPriceForecaster( |
| | dataset_path=dataset_path, |
| | seq_length=seq, |
| | batch_size=batch, |
| | lr=lr, |
| | epochs=20, |
| | kernel_size=kernel, |
| | num_channels=channels, |
| | dropout=dropout, |
| | test_split=0.2 |
| | ) |
| | model, (mae, rmse, mape, r2, actuals, predictions) = forecaster.run() |
| | results.append({ |
| | "seq_length": seq, |
| | "batch_size": batch, |
| | "lr": lr, |
| | "kernel_size": kernel, |
| | "num_channels": str(channels), |
| | "dropout": dropout, |
| | "MAE": mae, |
| | "RMSE": rmse, |
| | "MAPE": mape, |
| | "R2": r2 |
| | }) |
| | if rmse < best_metrics: |
| | best_metrics = rmse |
| | best_result = (actuals, predictions, seq, batch, lr, kernel, channels, dropout) |
| | best_model = model |
| | best_config = { |
| | "seq_length": seq, |
| | "batch_size": batch, |
| | "lr": lr, |
| | "kernel_size": kernel, |
| | "num_channels": channels, |
| | "dropout": dropout |
| | } |
| | except Exception as e: |
| | print(f"Error with config seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}: {e}") |
| | continue |
| |
|
| | |
| | df_results = pd.DataFrame(results) |
| | df_results.to_csv("tcn_experiments_results.csv", index=False) |
| | print("\nAll experiments done! Results saved to 'tcn_experiments_results.csv'") |
| |
|
| | |
| | print("\nMetrics Table:") |
| | pd.set_option('display.max_columns', None) |
| | pd.set_option('display.width', 1000) |
| | pd.set_option('display.float_format', '{:.6f}'.format) |
| | print(df_results) |
| |
|
| | |
| | if best_model is not None: |
| | save_model_for_huggingface(best_model, forecaster.scaler, best_config) |
| | print(f"\nBest model saved with RMSE: {best_metrics:.6f}") |
| | print("\nBest configuration:") |
| | print(pd.Series(best_config)) |
| |
|
| | |
| | if best_result is not None: |
| | actuals, predictions, seq, batch, lr, kernel, channels, dropout = best_result |
| | plt.figure(figsize=(12, 6)) |
| | plt.plot(actuals, label="Actual Prices") |
| | plt.plot(predictions, label="Predicted Prices") |
| | plt.title(f"Best Model: seq={seq}, batch={batch}, lr={lr}, kernel={kernel}, channels={channels}, dropout={dropout}") |
| | plt.xlabel("Time Step") |
| | plt.ylabel("Price") |
| | plt.legend() |
| | plt.grid(True) |
| | plt.show() |
| | else: |
| | print("No successful experiments to plot.") |