![]() |
|
[سؤال] مشكلة في مؤشر بايثون - نسخة قابلة للطباعة +- منتدى فيجوال بيسك لكل العرب | منتدى المبرمجين العرب (http://vb4arb.com/vb) +-- قسم : قسم لغة السي شارب C#.NET (http://vb4arb.com/vb/forumdisplay.php?fid=175) +--- قسم : قسم اسئلة C#.NET (http://vb4arb.com/vb/forumdisplay.php?fid=176) +--- الموضوع : [سؤال] مشكلة في مؤشر بايثون (/showthread.php?tid=53022) |
مشكلة في مؤشر بايثون - reem.564 - 08-05-25 مشكلتي مع مؤشر التداول هذا قمت بتصميمه باستخدام لغة بايثون والهدف منه التنبؤ بسعر الاغلاق القادم اذا كان رح يكون اكبر او اصغر من سعر الاغلاق السابق. والمؤشر يحتوي على نموذج ذكاء صناعي اقوم بتدريبه على بيانات سوقية ويعطي نتائج مثالية اثناء التدريب والاختبار والاختبار الحي على ملف ثاني. لكن عند التنفيذ في الواقع يطعي نتائج ضعيفة. علما ان عندما اقوم باضافة البيانات بشكل يدوي بيانات سوقية لم يتدرب عليها ابدا يعطيني نتائج مثالية. عليما لا يوجد مشكلة في تسرب البيانات ابدا. وهذا الكود import os, time, random import pandas as pd import numpy as np import torch import torch.nn as nn import torch.optim as optim from sklearn.preprocessing import StandardScaler from sklearn.metrics import accuracy_score from torch.utils.data import DataLoader, TensorDataset # تثبيت العشوائية random.seed(42) np.random.seed(42) torch.manual_seed(42) torch.cuda.manual_seed_all(42) torch.backends.cudnn.deterministic = True torch.backends.cudnn.benchmark = False # تحميل المسارات sd = os.path.dirname(os.path.abspath(__file__)) train_path = os.path.join(sd, '1.csv') test_path = next((os.path.join(sd, f) for f in os.listdir(sd) if f.endswith('.csv') and f != '1.csv'), None) model_path = os.path.join(sd, 'best_model.pth') if not os.path.isfile(train_path): raise FileNotFoundError("ملف 1.csv غير موجود") if not test_path: raise FileNotFoundError("لا يوجد ملف اختبار") # التحميل والمعالجة df_train = pd.read_csv(train_path) def preprocess(df): num_cols = ['open','high','low','close','PTI'] # الاحتفاظ فقط بالأعمدة الوهمية المفتوحة والمرتفعة والمنخفضة dummy_cols = [ 'open_dummy1', 'high_dummy1', 'low_dummy1' ] for col in dummy_cols: if col not in df.columns: df[col] = 0.0 all_cols = num_cols + dummy_cols df[all_cols] = df[all_cols].apply(pd.to_numeric, errors='coerce') df.dropna(subset=all_cols, inplace=True) df['time'] = pd.to_datetime(df['time']) df.sort_values('time', inplace=True, ignore_index=True) df['volatility'] = df['high'] - df['low'] df['range_move'] = df['close'] - df['open'] df['breakout_signal'] = [0 if i<3 else 1 if df['close'][i] > df['high'][i-3:i].max() else -1 if df['close'][i] < df['low'][i-3:i].min() else 0 for i in range(len(df))] df['next_open'], df['next_close'] = df['open'].shift(-1), df['close'].shift(-1) df['label'] = df.apply(lambda r: 1 if pd.notna(r['next_open']) and r['next_close'] > r['next_open'] else 0, axis=1) df.dropna(subset=['label'], inplace=True) return df df_train = preprocess(df_train) # تحضير البيانات features = ['open','high','low','close','PTI', 'open_dummy1','high_dummy1','low_dummy1', 'volatility','range_move','breakout_signal'] scaler = StandardScaler() X_train = scaler.fit_transform(df_train[features]) y_train = df_train['label'].values X_train_t = torch.tensor(X_train, dtype=torch.float32) y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1) train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=32, shuffle=True) # النموذج class TransformerModel(nn.Module): def __init__(self, input_dim, heads=1, layers=4, hidden=128): super().__init__() enc = nn.TransformerEncoderLayer(d_model=input_dim, nhead=heads, dim_feedforward=hidden, batch_first=True) self.encoder = nn.TransformerEncoder(enc, num_layers=layers) self.fc_out = nn.Linear(input_dim, 1) self.drop = nn.Dropout(0.5) def forward(self, x): x = self.encoder(x.unsqueeze(1)).mean(dim=1) return self.fc_out(self.drop(x)) model = TransformerModel(input_dim=X_train.shape[1]) criterion = nn.BCEWithLogitsLoss() optimizer = optim.Adam(model.parameters(), lr=0.001) # تدريب أو تحميل if not os.path.isfile(model_path): best_acc = 0 for epoch in range(1, 101): model.train() for xb, yb in train_loader: optimizer.zero_grad() loss = criterion(model(xb), yb) loss.backward(); optimizer.step() model.eval() test_df_temp = preprocess(pd.read_csv(test_path)) X_test_temp = scaler.transform(test_df_temp[features]) y_test_temp = test_df_temp['label'].values X_test_t_temp = torch.tensor(X_test_temp, dtype=torch.float32) y_test_t_temp = torch.tensor(y_test_temp, dtype=torch.float32).unsqueeze(1) with torch.no_grad(): preds = (torch.sigmoid(model(X_test_t_temp)) >= 0.5).int().numpy().flatten() acc = accuracy_score(y_test_temp, preds) print(f"Epoch {epoch}: Accuracy = {acc:.4f}") if acc > best_acc: best_acc = acc torch.save(model.state_dict(), model_path) if best_acc >= 0.90: break else: model.load_state_dict(torch.load(model_path)) # حلقة مراقبة الملف واختبار مستمر last_mtime = 0 print("بدأ المراقبة، عدّل الملف لمشاهدة التحديثات...\n") while True: try: new_mtime = os.path.getmtime(test_path) if new_mtime != last_mtime: last_mtime = new_mtime df_test = preprocess(pd.read_csv(test_path)) if df_test.empty: print("لا توجد بيانات صالحة بعد في ملف الاختبار...\n") time.sleep(2) continue X_test = scaler.transform(df_test[features]) y_test = df_test['label'].values X_test_t = torch.tensor(X_test, dtype=torch.float32) with torch.no_grad(): preds = (torch.sigmoid(model(X_test_t)) >= 0.5).int().numpy().flatten() pnl = [(r['next_close'] - r['next_open']) if p == 1 else (r['next_open'] - r['next_close']) for (_, r), p in zip(df_test.iterrows(), preds) if pd.notna(r['next_open']) and pd.notna(r['next_close'])] pnl = np.array(pnl) wins, losses = np.sum(pnl > 0), np.sum(pnl < 0) prev_row, prev_pred = None, None for (_, row), pred in zip(df_test.iterrows(), preds): if prev_row is not None and pd.notna(prev_row['next_open']) and pd.notna(prev_row['next_close']): expected = 1 if prev_row['next_close'] > prev_row['next_open'] else 0 res = "Win" if expected == prev_pred else "Loss" print(f"Verification Open: {prev_row['next_open']}, Close: {prev_row['next_close']}\nResult: {res}\nخيط\n") print(f"Last Open: {row['next_open']}, Close: {row['next_close']}\nPrediction: {'Up' if pred == 1 else 'Down'}\nخيط\n") prev_row, prev_pred = row, pred if pd.isna(prev_row['next_open']) or pd.isna(prev_row['next_close']): print("في انتظار بيانات التحقق اليدوية لإظهار النتيجة...\n") print(f"Wins: {wins}, Losses: {losses}\n") time.sleep(2) except KeyboardInterrupt: print("\nتم إيقاف المراقبة يدويًا.") break |