منتدى فيجوال بيسك لكل العرب | منتدى المبرمجين العرب

نسخة كاملة : مشكلة في مؤشر بايثون
أنت حالياً تتصفح نسخة خفيفة من المنتدى . مشاهدة نسخة كاملة مع جميع الأشكال الجمالية .
مشكلتي مع مؤشر التداول هذا قمت بتصميمه باستخدام لغة بايثون والهدف منه التنبؤ بسعر الاغلاق القادم اذا كان رح يكون اكبر او اصغر من سعر الاغلاق السابق. والمؤشر يحتوي على نموذج ذكاء صناعي اقوم بتدريبه على بيانات سوقية ويعطي نتائج مثالية اثناء التدريب والاختبار والاختبار الحي على ملف ثاني. لكن عند التنفيذ في الواقع يطعي نتائج ضعيفة. علما ان عندما اقوم باضافة البيانات بشكل يدوي بيانات سوقية لم يتدرب عليها ابدا يعطيني نتائج مثالية. عليما لا يوجد مشكلة في تسرب البيانات ابدا. وهذا الكود import os, time, random
import pandas as pd
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import accuracy_score
from torch.utils.data import DataLoader, TensorDataset

# تثبيت العشوائية
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)
torch.cuda.manual_seed_all(42)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# تحميل المسارات
sd = os.path.dirname(os.path.abspath(__file__))
train_path = os.path.join(sd, '1.csv')
test_path = next((os.path.join(sd, f) for f in os.listdir(sd) if f.endswith('.csv') and f != '1.csv'), None)
model_path = os.path.join(sd, 'best_model.pth')
if not os.path.isfile(train_path): raise FileNotFoundError("ملف 1.csv غير موجود")
if not test_path: raise FileNotFoundError("لا يوجد ملف اختبار")

# التحميل والمعالجة
df_train = pd.read_csv(train_path)
def preprocess(df):
    num_cols = ['open','high','low','close','PTI']
    # الاحتفاظ فقط بالأعمدة الوهمية المفتوحة والمرتفعة والمنخفضة
    dummy_cols = [
        'open_dummy1', 'high_dummy1', 'low_dummy1'
    ]
    for col in dummy_cols:
        if col not in df.columns:
            df[col] = 0.0

    all_cols = num_cols + dummy_cols
    df[all_cols] = df[all_cols].apply(pd.to_numeric, errors='coerce')
    df.dropna(subset=all_cols, inplace=True)
    df['time'] = pd.to_datetime(df['time'])
    df.sort_values('time', inplace=True, ignore_index=True)
    df['volatility'] = df['high'] - df['low']
    df['range_move'] = df['close'] - df['open']
    df['breakout_signal'] = [0 if i<3 else 1 if df['close'][i] > df['high'][i-3:i].max()
                             else -1 if df['close'][i] < df['low'][i-3:i].min() else 0 for i in range(len(df))]
    df['next_open'], df['next_close'] = df['open'].shift(-1), df['close'].shift(-1)
    df['label'] = df.apply(lambda r: 1 if pd.notna(r['next_open']) and r['next_close'] > r['next_open'] else 0, axis=1)
    df.dropna(subset=['label'], inplace=True)
    return df

df_train = preprocess(df_train)

# تحضير البيانات
features = ['open','high','low','close','PTI',
            'open_dummy1','high_dummy1','low_dummy1',
            'volatility','range_move','breakout_signal']

scaler = StandardScaler()
X_train = scaler.fit_transform(df_train[features])
y_train = df_train['label'].values
X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32).unsqueeze(1)
train_loader = DataLoader(TensorDataset(X_train_t, y_train_t), batch_size=32, shuffle=True)

# النموذج
class TransformerModel(nn.Module):
    def __init__(self, input_dim, heads=1, layers=4, hidden=128):
        super().__init__()
        enc = nn.TransformerEncoderLayer(d_model=input_dim, nhead=heads, dim_feedforward=hidden, batch_first=True)
        self.encoder = nn.TransformerEncoder(enc, num_layers=layers)
        self.fc_out = nn.Linear(input_dim, 1)
        self.drop = nn.Dropout(0.5)
    def forward(self, x):
        x = self.encoder(x.unsqueeze(1)).mean(dim=1)
        return self.fc_out(self.drop(x))

model = TransformerModel(input_dim=X_train.shape[1])
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# تدريب أو تحميل
if not os.path.isfile(model_path):
    best_acc = 0
    for epoch in range(1, 101):
        model.train()
        for xb, yb in train_loader:
            optimizer.zero_grad()
            loss = criterion(model(xb), yb)
            loss.backward(); optimizer.step()
        model.eval()
        test_df_temp = preprocess(pd.read_csv(test_path))
        X_test_temp = scaler.transform(test_df_temp[features])
        y_test_temp = test_df_temp['label'].values
        X_test_t_temp = torch.tensor(X_test_temp, dtype=torch.float32)
        y_test_t_temp = torch.tensor(y_test_temp, dtype=torch.float32).unsqueeze(1)
        with torch.no_grad():
            preds = (torch.sigmoid(model(X_test_t_temp)) >= 0.5).int().numpy().flatten()
            acc = accuracy_score(y_test_temp, preds)
            print(f"Epoch {epoch}: Accuracy = {acc:.4f}")
        if acc > best_acc:
            best_acc = acc
            torch.save(model.state_dict(), model_path)
        if best_acc >= 0.90: break
else:
    model.load_state_dict(torch.load(model_path))

# حلقة مراقبة الملف واختبار مستمر
last_mtime = 0
print("بدأ المراقبة، عدّل الملف لمشاهدة التحديثات...\n")
while True:
    try:
        new_mtime = os.path.getmtime(test_path)
        if new_mtime != last_mtime:
            last_mtime = new_mtime
            df_test = preprocess(pd.read_csv(test_path))
            if df_test.empty: 
                print("لا توجد بيانات صالحة بعد في ملف الاختبار...\n")
                time.sleep(2)
                continue
            X_test = scaler.transform(df_test[features])
            y_test = df_test['label'].values
            X_test_t = torch.tensor(X_test, dtype=torch.float32)
            with torch.no_grad():
                preds = (torch.sigmoid(model(X_test_t)) >= 0.5).int().numpy().flatten()

            pnl = [(r['next_close'] - r['next_open']) if p == 1 else (r['next_open'] - r['next_close'])
                   for (_, r), p in zip(df_test.iterrows(), preds) if pd.notna(r['next_open']) and pd.notna(r['next_close'])]
            pnl = np.array(pnl)
            wins, losses = np.sum(pnl > 0), np.sum(pnl < 0)

            prev_row, prev_pred = None, None
            for (_, row), pred in zip(df_test.iterrows(), preds):
                if prev_row is not None and pd.notna(prev_row['next_open']) and pd.notna(prev_row['next_close']):
                    expected = 1 if prev_row['next_close'] > prev_row['next_open'] else 0
                    res = "Win" if expected == prev_pred else "Loss"
                    print(f"Verification Open: {prev_row['next_open']}, Close: {prev_row['next_close']}\nResult: {res}\nخيط\n")
                print(f"Last Open: {row['next_open']}, Close: {row['next_close']}\nPrediction: {'Up' if pred == 1 else 'Down'}\nخيط\n")
                prev_row, prev_pred = row, pred

            if pd.isna(prev_row['next_open']) or pd.isna(prev_row['next_close']):
                print("في انتظار بيانات التحقق اليدوية لإظهار النتيجة...\n")
            print(f"Wins: {wins}, Losses: {losses}\n")
        time.sleep(2)
    except KeyboardInterrupt:
        print("\nتم إيقاف المراقبة يدويًا.")
        break