Files
check-one-bullish-not-break/one_bullish_not_break.py

452 lines
19 KiB
Python
Raw Permalink Normal View History

2025-05-03 23:09:55 +08:00
import baostock as bs
import pandas as pd
import os
import mplfinance as mpf
import sys
import time
import json
from scipy.stats import pearsonr
import multiprocessing
BULLISH_K_INCREASE_LEN = 5.0
NOT_BREAK_K_COUNT = 6
BUY_WINDOW_LEN = 5
LEAST_PROFIT_RATE = 0.1
MAX_KEEP_DAYS = 10
DRAW_BUY_POINT_K_LINE = True
DRAW_TO_FILE = True
LOCAL_CACHE_PATH = "stocks-2025-04-14"
RESULT_OUTPUT_PATH = "results_tmp"
INDECATOR_CODE_LIST = ["sh.{:06}".format(i) for i in range(0, 999)] + ["sz.{:06}".format(i) for i in range(399001, 399999)]
if os.name == 'nt': # Windows 系统
import msvcrt
def getch():
return msvcrt.getch()
else: # Linux/macOS 系统
import tty
import termios
def getch():
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def all_stock_count(date, outfile="stock_list.csv"):
bs.login()
stock_rs = bs.query_all_stock(date)
df = stock_rs.get_data()
bs.logout()
print(f"股票总数:{len(df)}")
df.to_csv(outfile, encoding="utf8", index=False)
def download_data(date, outfile="test_result.csv", codes=None, freq="d"):
bs.login()
data_df = pd.DataFrame()
# 获取指定日期的指数、股票数据
if codes is None:
stock_rs = bs.query_all_stock(date)
stock_df = stock_rs.get_data()
codes = stock_df["code"]
for code in codes:
print("Downloading :" + code)
k_rs = bs.query_history_k_data_plus(code, "date,code,open,high,low,close", date, date, frequency=freq)
k_df = pd.DataFrame(k_rs.data, columns=k_rs.fields)
data_df = pd.concat([data_df, k_df], ignore_index=True)
bs.logout()
data_df.to_csv(outfile, encoding="gbk", index=False)
print(data_df)
def get_trade_day():
lg = bs.login()
print('login respond error_code:'+lg.error_code)
print('login respond error_msg:'+lg.error_msg)
#### 获取交易日信息 ####
rs = bs.query_trade_dates(start_date="2017-01-01", end_date="2017-06-30")
print('query_trade_dates respond error_code:'+rs.error_code)
print('query_trade_dates respond error_msg:'+rs.error_msg)
#### 打印结果集 ####
data_list = []
while (rs.error_code == '0') & rs.next():
# 获取一条记录,将记录合并在一起
data_list.append(rs.get_row_data())
result = pd.DataFrame(data_list, columns=rs.fields)
#### 结果集输出到csv文件 ####
result.to_csv("D:\\trade_datas.csv", encoding="gbk", index=False)
print(result)
#### 登出系统 ####
bs.logout()
def select_bullish_not_break(k_data, bullish_increase=6.0, period_lens=6):
ret = []
r, c = k_data.shape
for idx, stock in k_data.iterrows():
if idx + period_lens >= r:
continue
# 1. 找到启动大阳线
is_trade = stock["tradestatus"]
if is_trade == 0:
continue
bullish_open = float(stock["open"])
bullish_close = float(stock["close"])
increase = float(stock["pctChg"])
if increase < bullish_increase:
continue
# 2. 其后n天k线存在回踩大阳线顶且不跌破大阳线
fall_down = False
fall_break = False
for i in range(idx+1, idx+period_lens+1):
low = float(k_data.at[i, "low"])
open = float(k_data.at[i, "open"])
close = float(k_data.at[i, "close"])
if low < bullish_close or open < bullish_close or close < bullish_close:
fall_down = True
if low < bullish_open:
fall_break = True
if not fall_down or fall_break:
continue
# 3. 其后n天振幅均小于大阳线
increase_too_large = False
for i in range(idx+1, idx+period_lens+1):
rate = k_data.at[i, "pctChg"]
if rate > increase:
increase_too_large = True
break
if increase_too_large:
continue
ret.append((idx, stock["date"]))
return ret
def check_profit(k_data:pd.DataFrame, buy_k_start:int, buy_k_end:int, least_profit_rate=0.2, max_keep_days=20, bullish_k:int=-1):
ret = {}
bullish_open = 0
k_end = k_data.shape[0]
if bullish_k > 0:
bullish_open = float(k_data.at[bullish_k, "open"])
if buy_k_start >= k_end:
return {}
if buy_k_end > k_end:
buy_k_end = k_end
for i in range(buy_k_start, buy_k_end):
buy_open = float(k_data.at[i, "open"])
buy_close = float(k_data.at[i, "close"])
if buy_close < bullish_open:
return {}
sell_points = []
range_end = i+max_keep_days
if range_end > k_end:
range_end = k_end
for j in range(i+1, range_end):
current_open = float(k_data.at[j, "open"])
current_close = float(k_data.at[j, "close"])
profit = float(current_close - buy_close) / buy_close
if profit < least_profit_rate:
continue
sell_points.append((j, profit, j - i))
if len(sell_points) > 0:
ret[i] = sell_points
return ret
def pull_stock_data(start_day="2022-03-31", end_day=None):
if end_day is None:
end_day = time.strftime("%Y-%m-%d", time.localtime())
bs.login()
start_stocks = bs.query_all_stock(start_day)
end_stocks = bs.query_all_stock(end_day)
start_data = start_stocks.get_data()
end_data = end_stocks.get_data()
# start_data.to_csv("1.csv", encoding="utf8", index=False)
# end_data.to_csv("2.csv", encoding="utf8", index=False)
exist_data = pd.merge(start_data, end_data, how="inner", on=["code"])
exist_data.to_csv("exist_stock.csv", encoding="utf8", index=False)
if not os.path.exists(LOCAL_CACHE_PATH):
os.mkdir(LOCAL_CACHE_PATH)
for _, stock in exist_data.iterrows():
code = stock["code"]
name = stock["code_name_y"]
name = name.replace("*", "x")
file_name = str(code) + "_" + str(name) + ".csv"
print(file_name)
file_path = os.path.join(LOCAL_CACHE_PATH, file_name)
k_rs = bs.query_history_k_data_plus(code, "date,open,high,low,close,volume,turn,tradestatus,peTTM,isST,preclose,pctChg", start_day, end_day, frequency="d")
k_df = pd.DataFrame(k_rs.data, columns=k_rs.fields)
k_df.to_csv(file_path, encoding="utf8", index=False)
bs.logout()
def list_cached_stocks(base_path=LOCAL_CACHE_PATH):
ret = {}
names = os.listdir(base_path)
for name in names:
path = os.path.join(base_path, name)
if os.path.isfile(path):
name = name[:len(name)-4]
stock = name.split("_")
if stock[0] in INDECATOR_CODE_LIST:
continue
ret[stock[0]] = stock[1]
return ret
def get_cached_stock(stock, base_path=LOCAL_CACHE_PATH):
names = os.listdir(base_path)
file_name = None
for name in names:
if stock in name:
file_name = name
break
if file_name is None:
print("can't find cached stock:" + stock)
return pd.DataFrame()
return pd.read_csv(os.path.join(base_path, file_name))
def draw_k_lines(k_data:pd.DataFrame, indecator_k:pd.Series, specified={}, save_to=None):
k_data.loc[:,'date'] = pd.to_datetime(k_data['date'])
k_data.set_index('date', inplace=True)
mc = mpf.make_marketcolors(
up='r', # 上涨蜡烛颜色(绿色)
down='g', # 下跌蜡烛颜色(红色)
edge='inherit', # 蜡烛边框颜色inherit 表示继承上涨或下跌颜色
wick='inherit', # 影线颜色inherit 表示继承上涨或下跌颜色
volume='inherit' # 成交量颜色inherit 表示继承上涨或下跌颜色
)
if save_to is not None:
out_dir, _ = os.path.split(save_to)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
# 指数曲线
curv_ap = mpf.make_addplot(indecator_k, color='b', linestyle='-', secondary_y=True)
# 创建自定义样式
s = mpf.make_mpf_style(marketcolors=mc)
markers = ['' if i not in specified else '^' for i in range(len(k_data))]
marker_colors = ['black' if i not in specified else specified[i] for i in range(len(k_data))]
ap = mpf.make_addplot(k_data['close'], type='scatter', marker=markers, color=marker_colors)
if save_to is None:
mpf.plot(k_data, type='candle', volume=True, style=s, figsize=(16, 8), xrotation=45, addplot=[ap, curv_ap], tight_layout=True)
else:
mpf.plot(k_data, type='candle', volume=True, style=s, figsize=(16, 8), xrotation=45, addplot=[ap, curv_ap], tight_layout=True, savefig=save_to)
def select_opportunity(code, save_to_dir=None, draw_opportunity=False, draw_to_file=False, indicator_k=None, draw_no_buy_point=False):
ok_cnt = 0
if save_to_dir is not None:
serial_path = os.path.join(save_to_dir, code+".json")
if os.path.exists(serial_path):
exist_data = []
with open(serial_path, "r") as file:
print("already processed stock:{0}".format(code))
exist_data = json.load(file)
ok_cnt = 0
for d in exist_data:
if len(d["trade"]) > 0:
ok_cnt = ok_cnt + 1
return len(exist_data), ok_cnt
if save_to_dir is not None:
if not os.path.exists(save_to_dir):
os.makedirs(save_to_dir)
day_k_data = get_cached_stock(code)
# print(day_k_data.loc[690:710])
out_json = []
candidates = select_bullish_not_break(day_k_data, BULLISH_K_INCREASE_LEN, NOT_BREAK_K_COUNT)
print("stock:{0} total found {1} results".format(code, len(candidates)))
for idx, date in candidates:
serial_data = {}
profits = check_profit(day_k_data, idx+NOT_BREAK_K_COUNT, idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN, LEAST_PROFIT_RATE, MAX_KEEP_DAYS, idx)
correlation = 0
p_value = 0
if indicator_k is not None:
start_k_idx = idx
end_k_idx = idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN + MAX_KEEP_DAYS
if end_k_idx >= day_k_data.shape[0]:
end_k_idx = day_k_data.shape[0]
correlation, p_value = pearsonr(day_k_data["close"][start_k_idx:end_k_idx], indicator_k["close"][start_k_idx:end_k_idx])
print("皮尔逊相关系数: {0}, p 值: {1}".format(correlation, p_value))
print(" bullish {0} has {1} buy points:".format(date, len(profits)))
serial_data["bullish"] = date
serial_data["pearson_correlation"] = correlation
serial_data["pearson_p"] = p_value
serial_data["trade"] = []
if len(profits) > 0:
ok_cnt = ok_cnt + 1
for k, v in profits.items():
buyday = str(day_k_data.at[k, "date"])
buy_data = {"buyday":buyday, "sells":[]}
print(" buy date:{0} has {1} sell points:".format(day_k_data.at[k, "date"], len(v)))
for sell in v:
day = sell[0]
profit_rate = sell[1]
sell_data = {"sellday":str(day_k_data.at[day, "date"]), "keep":sell[2], "profit":profit_rate}
buy_data["sells"].append(sell_data)
print(" sell point:{0} get profit:{1}".format(day_k_data.at[day, "date"], profit_rate))
serial_data["trade"].append(buy_data)
print("-------------------------------------------------------------------------------")
if len(serial_data) > 0:
out_json.append(serial_data)
if draw_opportunity:
if len(profits) > 0 or draw_no_buy_point:
kdata = day_k_data.loc[idx:idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN+MAX_KEEP_DAYS]
indicator = indicator_k.loc[idx:idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN+MAX_KEEP_DAYS]
colors = ["purple", "yellow", "pink", "black", "white", "green", "orange", "blue", "gray"]
group = 0
color_group = {}
for b in profits.keys():
color_group[b-idx] = colors[group]
for v in profits[b]:
color_group[v[0]-idx] = colors[group]
group = group + 1
out_pic = None
if draw_to_file:
out_pic = os.path.join(save_to_dir, code)
if len(profits) == 0:
out_pic = os.path.join(out_pic, "no_buy_point")
else:
out_pic = os.path.join(out_pic, "buy_point")
out_pic = os.path.join(out_pic, str(date)+".jpg")
draw_k_lines(kdata, indicator["close"], color_group, out_pic)
if save_to_dir and len(out_json)>0:
with open(serial_path, "w") as file:
json.dump(out_json, file, indent=2)
return len(candidates), ok_cnt
def worker(stock_code, indecator):
try:
return select_opportunity(stock_code, None, False, DRAW_TO_FILE, indecator, True)
except Exception as e:
print(f"Error processing {stock_code}: {e}")
return (0, 0)
# if __name__ == '__main__':
# if not os.path.exists(LOCAL_CACHE_PATH):
# pull_stock_data("2022-03-31", "2025-04-11")
# all_stocks = list_cached_stocks()
# sh_indecator = get_cached_stock("sh.000001")
# sz_indecator = get_cached_stock("sz.399001")
# no_buy_point_stock_count = 0
# has_buy_point_stock_count = 0
# no_bullish_stock_count = 0
# for code, name in all_stocks.items():
# ok = 0
# bullish = 0
# if str(code).startswith("sh"):
# bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, sh_indecator, True)
# elif str(code).startswith("sz"):
# bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, sz_indecator, True)
# else:
# bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, None, True)
# if bullish > 0:
# if ok > 0:
# has_buy_point_stock_count = has_buy_point_stock_count + 1
# else:
# no_buy_point_stock_count = no_buy_point_stock_count + 1
# else:
# no_bullish_stock_count = no_bullish_stock_count + 1
# print("total {0} stocks, {1} without bullish, {3} has buy point, {4} no buy point".format(
# len(all_stocks), no_bullish_stock_count, has_buy_point_stock_count, no_buy_point_stock_count))
def check_result(code, date):
stock = get_cached_stock(code)
idx = stock[stock["date"] == date].index.tolist()[0]
result = check_profit(stock, idx+NOT_BREAK_K_COUNT, idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN, LEAST_PROFIT_RATE, MAX_KEEP_DAYS, idx)
print("result:{0}".format(result))
def cal_profite(all_stocks, sh_indecator, sz_indecator, pool=None):
no_buy_point_stock_count = 0
has_buy_point_stock_count = 0
no_bullish_stock_count = 0
work_param = []
for stock in all_stocks:
work_param.append((stock, sh_indecator if str(stock).startswith("sh") else sz_indecator))
if pool is None:
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
results = pool.starmap(worker, work_param)
pool.close()
pool.join()
total_bullish = 0
total_buy_point = 0
for bullish, ok in results:
total_bullish += bullish
total_buy_point += ok
if bullish > 0:
if ok > 0:
has_buy_point_stock_count = has_buy_point_stock_count + 1
else:
no_buy_point_stock_count = no_buy_point_stock_count + 1
else:
no_bullish_stock_count = no_bullish_stock_count + 1
print("total {0} stocks, {1} without bullish, {2} has buy point, {3} no buy point".format(
len(all_stocks), no_bullish_stock_count, has_buy_point_stock_count, no_buy_point_stock_count))
print(" total bullish:{0}, total buy point:{1}, rate:{2}".format(
total_bullish, total_buy_point, float(total_buy_point)/float(total_bullish)))
summary = {
"bullish_len": BULLISH_K_INCREASE_LEN,
"not_break_len": NOT_BREAK_K_COUNT,
"max_keep_days": MAX_KEEP_DAYS,
"buy_window_len": BUY_WINDOW_LEN,
"least_profit_rate": LEAST_PROFIT_RATE,
"total_bullish": total_bullish,
"total_buy_point": total_buy_point,
"win_rate": float(total_buy_point)/float(total_bullish)
}
with open("summary.json", "r+") as file:
try:
old = json.load(file)
if len(old) == 0:
old = []
except:
old = []
old.append(summary)
file.seek(0)
json.dump(old, file, indent=2)
if __name__ == '__main__':
# list_cached_stocks()
# check_result("sz.000002", "2022-11-11")
# exit()
if not os.path.exists(LOCAL_CACHE_PATH):
pull_stock_data("2022-03-31", "2025-04-14")
all_stocks = list_cached_stocks()
sh_indecator = get_cached_stock("sh.000001")
sz_indecator = get_cached_stock("sz.399001")
for bullish_k_len in range(1, 10):
for not_break_k_count in range(2, 6):
for max_keep_days in range(1, 20):
for buy_window_len in range(2, 10):
for least_profit_rate in [0.01, 0.05, 0.1, 0.15, 0.2]:
old = []
with open("summary.json", "r+") as file:
try:
old = json.load(file)
except:
pass
inold = False
for exist in old:
if bullish_k_len == exist["bullish_len"] and not_break_k_count == exist["not_break_len"] and max_keep_days == exist["max_keep_days"] and buy_window_len == exist["buy_window_len"] and least_profit_rate == exist["least_profit_rate"]:
print("already processed this param")
inold = True
break
if inold:
continue
BULLISH_K_INCREASE_LEN = bullish_k_len
NOT_BREAK_K_COUNT = not_break_k_count
MAX_KEEP_DAYS = max_keep_days
BUY_WINDOW_LEN = buy_window_len
LEAST_PROFIT_RATE = least_profit_rate
print("BULLISH_K_INCREASE_LEN:{0}, NOT_BREAK_K_COUNT:{1}, MAX_KEEP_DAYS:{2}, BUY_WINDOW_LEN:{3}, LEAST_PROFIT_RATE:{4}".format(
BULLISH_K_INCREASE_LEN, NOT_BREAK_K_COUNT, MAX_KEEP_DAYS, BUY_WINDOW_LEN, LEAST_PROFIT_RATE))
cal_profite(all_stocks, None, None)