Files
check-one-bullish-not-break/one_bullish_not_break.py
2025-05-03 23:09:55 +08:00

452 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import baostock as bs
import pandas as pd
import os
import mplfinance as mpf
import sys
import time
import json
from scipy.stats import pearsonr
import multiprocessing
BULLISH_K_INCREASE_LEN = 5.0
NOT_BREAK_K_COUNT = 6
BUY_WINDOW_LEN = 5
LEAST_PROFIT_RATE = 0.1
MAX_KEEP_DAYS = 10
DRAW_BUY_POINT_K_LINE = True
DRAW_TO_FILE = True
LOCAL_CACHE_PATH = "stocks-2025-04-14"
RESULT_OUTPUT_PATH = "results_tmp"
INDECATOR_CODE_LIST = ["sh.{:06}".format(i) for i in range(0, 999)] + ["sz.{:06}".format(i) for i in range(399001, 399999)]
if os.name == 'nt': # Windows 系统
import msvcrt
def getch():
return msvcrt.getch()
else: # Linux/macOS 系统
import tty
import termios
def getch():
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def all_stock_count(date, outfile="stock_list.csv"):
bs.login()
stock_rs = bs.query_all_stock(date)
df = stock_rs.get_data()
bs.logout()
print(f"股票总数:{len(df)}")
df.to_csv(outfile, encoding="utf8", index=False)
def download_data(date, outfile="test_result.csv", codes=None, freq="d"):
bs.login()
data_df = pd.DataFrame()
# 获取指定日期的指数、股票数据
if codes is None:
stock_rs = bs.query_all_stock(date)
stock_df = stock_rs.get_data()
codes = stock_df["code"]
for code in codes:
print("Downloading :" + code)
k_rs = bs.query_history_k_data_plus(code, "date,code,open,high,low,close", date, date, frequency=freq)
k_df = pd.DataFrame(k_rs.data, columns=k_rs.fields)
data_df = pd.concat([data_df, k_df], ignore_index=True)
bs.logout()
data_df.to_csv(outfile, encoding="gbk", index=False)
print(data_df)
def get_trade_day():
lg = bs.login()
print('login respond error_code:'+lg.error_code)
print('login respond error_msg:'+lg.error_msg)
#### 获取交易日信息 ####
rs = bs.query_trade_dates(start_date="2017-01-01", end_date="2017-06-30")
print('query_trade_dates respond error_code:'+rs.error_code)
print('query_trade_dates respond error_msg:'+rs.error_msg)
#### 打印结果集 ####
data_list = []
while (rs.error_code == '0') & rs.next():
# 获取一条记录,将记录合并在一起
data_list.append(rs.get_row_data())
result = pd.DataFrame(data_list, columns=rs.fields)
#### 结果集输出到csv文件 ####
result.to_csv("D:\\trade_datas.csv", encoding="gbk", index=False)
print(result)
#### 登出系统 ####
bs.logout()
def select_bullish_not_break(k_data, bullish_increase=6.0, period_lens=6):
ret = []
r, c = k_data.shape
for idx, stock in k_data.iterrows():
if idx + period_lens >= r:
continue
# 1. 找到启动大阳线
is_trade = stock["tradestatus"]
if is_trade == 0:
continue
bullish_open = float(stock["open"])
bullish_close = float(stock["close"])
increase = float(stock["pctChg"])
if increase < bullish_increase:
continue
# 2. 其后n天k线存在回踩大阳线顶且不跌破大阳线
fall_down = False
fall_break = False
for i in range(idx+1, idx+period_lens+1):
low = float(k_data.at[i, "low"])
open = float(k_data.at[i, "open"])
close = float(k_data.at[i, "close"])
if low < bullish_close or open < bullish_close or close < bullish_close:
fall_down = True
if low < bullish_open:
fall_break = True
if not fall_down or fall_break:
continue
# 3. 其后n天振幅均小于大阳线
increase_too_large = False
for i in range(idx+1, idx+period_lens+1):
rate = k_data.at[i, "pctChg"]
if rate > increase:
increase_too_large = True
break
if increase_too_large:
continue
ret.append((idx, stock["date"]))
return ret
def check_profit(k_data:pd.DataFrame, buy_k_start:int, buy_k_end:int, least_profit_rate=0.2, max_keep_days=20, bullish_k:int=-1):
ret = {}
bullish_open = 0
k_end = k_data.shape[0]
if bullish_k > 0:
bullish_open = float(k_data.at[bullish_k, "open"])
if buy_k_start >= k_end:
return {}
if buy_k_end > k_end:
buy_k_end = k_end
for i in range(buy_k_start, buy_k_end):
buy_open = float(k_data.at[i, "open"])
buy_close = float(k_data.at[i, "close"])
if buy_close < bullish_open:
return {}
sell_points = []
range_end = i+max_keep_days
if range_end > k_end:
range_end = k_end
for j in range(i+1, range_end):
current_open = float(k_data.at[j, "open"])
current_close = float(k_data.at[j, "close"])
profit = float(current_close - buy_close) / buy_close
if profit < least_profit_rate:
continue
sell_points.append((j, profit, j - i))
if len(sell_points) > 0:
ret[i] = sell_points
return ret
def pull_stock_data(start_day="2022-03-31", end_day=None):
if end_day is None:
end_day = time.strftime("%Y-%m-%d", time.localtime())
bs.login()
start_stocks = bs.query_all_stock(start_day)
end_stocks = bs.query_all_stock(end_day)
start_data = start_stocks.get_data()
end_data = end_stocks.get_data()
# start_data.to_csv("1.csv", encoding="utf8", index=False)
# end_data.to_csv("2.csv", encoding="utf8", index=False)
exist_data = pd.merge(start_data, end_data, how="inner", on=["code"])
exist_data.to_csv("exist_stock.csv", encoding="utf8", index=False)
if not os.path.exists(LOCAL_CACHE_PATH):
os.mkdir(LOCAL_CACHE_PATH)
for _, stock in exist_data.iterrows():
code = stock["code"]
name = stock["code_name_y"]
name = name.replace("*", "x")
file_name = str(code) + "_" + str(name) + ".csv"
print(file_name)
file_path = os.path.join(LOCAL_CACHE_PATH, file_name)
k_rs = bs.query_history_k_data_plus(code, "date,open,high,low,close,volume,turn,tradestatus,peTTM,isST,preclose,pctChg", start_day, end_day, frequency="d")
k_df = pd.DataFrame(k_rs.data, columns=k_rs.fields)
k_df.to_csv(file_path, encoding="utf8", index=False)
bs.logout()
def list_cached_stocks(base_path=LOCAL_CACHE_PATH):
ret = {}
names = os.listdir(base_path)
for name in names:
path = os.path.join(base_path, name)
if os.path.isfile(path):
name = name[:len(name)-4]
stock = name.split("_")
if stock[0] in INDECATOR_CODE_LIST:
continue
ret[stock[0]] = stock[1]
return ret
def get_cached_stock(stock, base_path=LOCAL_CACHE_PATH):
names = os.listdir(base_path)
file_name = None
for name in names:
if stock in name:
file_name = name
break
if file_name is None:
print("can't find cached stock:" + stock)
return pd.DataFrame()
return pd.read_csv(os.path.join(base_path, file_name))
def draw_k_lines(k_data:pd.DataFrame, indecator_k:pd.Series, specified={}, save_to=None):
k_data.loc[:,'date'] = pd.to_datetime(k_data['date'])
k_data.set_index('date', inplace=True)
mc = mpf.make_marketcolors(
up='r', # 上涨蜡烛颜色(绿色)
down='g', # 下跌蜡烛颜色(红色)
edge='inherit', # 蜡烛边框颜色inherit 表示继承上涨或下跌颜色
wick='inherit', # 影线颜色inherit 表示继承上涨或下跌颜色
volume='inherit' # 成交量颜色inherit 表示继承上涨或下跌颜色
)
if save_to is not None:
out_dir, _ = os.path.split(save_to)
if not os.path.exists(out_dir):
os.makedirs(out_dir)
# 指数曲线
curv_ap = mpf.make_addplot(indecator_k, color='b', linestyle='-', secondary_y=True)
# 创建自定义样式
s = mpf.make_mpf_style(marketcolors=mc)
markers = ['' if i not in specified else '^' for i in range(len(k_data))]
marker_colors = ['black' if i not in specified else specified[i] for i in range(len(k_data))]
ap = mpf.make_addplot(k_data['close'], type='scatter', marker=markers, color=marker_colors)
if save_to is None:
mpf.plot(k_data, type='candle', volume=True, style=s, figsize=(16, 8), xrotation=45, addplot=[ap, curv_ap], tight_layout=True)
else:
mpf.plot(k_data, type='candle', volume=True, style=s, figsize=(16, 8), xrotation=45, addplot=[ap, curv_ap], tight_layout=True, savefig=save_to)
def select_opportunity(code, save_to_dir=None, draw_opportunity=False, draw_to_file=False, indicator_k=None, draw_no_buy_point=False):
ok_cnt = 0
if save_to_dir is not None:
serial_path = os.path.join(save_to_dir, code+".json")
if os.path.exists(serial_path):
exist_data = []
with open(serial_path, "r") as file:
print("already processed stock:{0}".format(code))
exist_data = json.load(file)
ok_cnt = 0
for d in exist_data:
if len(d["trade"]) > 0:
ok_cnt = ok_cnt + 1
return len(exist_data), ok_cnt
if save_to_dir is not None:
if not os.path.exists(save_to_dir):
os.makedirs(save_to_dir)
day_k_data = get_cached_stock(code)
# print(day_k_data.loc[690:710])
out_json = []
candidates = select_bullish_not_break(day_k_data, BULLISH_K_INCREASE_LEN, NOT_BREAK_K_COUNT)
print("stock:{0} total found {1} results".format(code, len(candidates)))
for idx, date in candidates:
serial_data = {}
profits = check_profit(day_k_data, idx+NOT_BREAK_K_COUNT, idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN, LEAST_PROFIT_RATE, MAX_KEEP_DAYS, idx)
correlation = 0
p_value = 0
if indicator_k is not None:
start_k_idx = idx
end_k_idx = idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN + MAX_KEEP_DAYS
if end_k_idx >= day_k_data.shape[0]:
end_k_idx = day_k_data.shape[0]
correlation, p_value = pearsonr(day_k_data["close"][start_k_idx:end_k_idx], indicator_k["close"][start_k_idx:end_k_idx])
print("皮尔逊相关系数: {0}, p 值: {1}".format(correlation, p_value))
print(" bullish {0} has {1} buy points:".format(date, len(profits)))
serial_data["bullish"] = date
serial_data["pearson_correlation"] = correlation
serial_data["pearson_p"] = p_value
serial_data["trade"] = []
if len(profits) > 0:
ok_cnt = ok_cnt + 1
for k, v in profits.items():
buyday = str(day_k_data.at[k, "date"])
buy_data = {"buyday":buyday, "sells":[]}
print(" buy date:{0} has {1} sell points:".format(day_k_data.at[k, "date"], len(v)))
for sell in v:
day = sell[0]
profit_rate = sell[1]
sell_data = {"sellday":str(day_k_data.at[day, "date"]), "keep":sell[2], "profit":profit_rate}
buy_data["sells"].append(sell_data)
print(" sell point:{0} get profit:{1}".format(day_k_data.at[day, "date"], profit_rate))
serial_data["trade"].append(buy_data)
print("-------------------------------------------------------------------------------")
if len(serial_data) > 0:
out_json.append(serial_data)
if draw_opportunity:
if len(profits) > 0 or draw_no_buy_point:
kdata = day_k_data.loc[idx:idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN+MAX_KEEP_DAYS]
indicator = indicator_k.loc[idx:idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN+MAX_KEEP_DAYS]
colors = ["purple", "yellow", "pink", "black", "white", "green", "orange", "blue", "gray"]
group = 0
color_group = {}
for b in profits.keys():
color_group[b-idx] = colors[group]
for v in profits[b]:
color_group[v[0]-idx] = colors[group]
group = group + 1
out_pic = None
if draw_to_file:
out_pic = os.path.join(save_to_dir, code)
if len(profits) == 0:
out_pic = os.path.join(out_pic, "no_buy_point")
else:
out_pic = os.path.join(out_pic, "buy_point")
out_pic = os.path.join(out_pic, str(date)+".jpg")
draw_k_lines(kdata, indicator["close"], color_group, out_pic)
if save_to_dir and len(out_json)>0:
with open(serial_path, "w") as file:
json.dump(out_json, file, indent=2)
return len(candidates), ok_cnt
def worker(stock_code, indecator):
try:
return select_opportunity(stock_code, None, False, DRAW_TO_FILE, indecator, True)
except Exception as e:
print(f"Error processing {stock_code}: {e}")
return (0, 0)
# if __name__ == '__main__':
# if not os.path.exists(LOCAL_CACHE_PATH):
# pull_stock_data("2022-03-31", "2025-04-11")
# all_stocks = list_cached_stocks()
# sh_indecator = get_cached_stock("sh.000001")
# sz_indecator = get_cached_stock("sz.399001")
# no_buy_point_stock_count = 0
# has_buy_point_stock_count = 0
# no_bullish_stock_count = 0
# for code, name in all_stocks.items():
# ok = 0
# bullish = 0
# if str(code).startswith("sh"):
# bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, sh_indecator, True)
# elif str(code).startswith("sz"):
# bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, sz_indecator, True)
# else:
# bullish, ok = select_opportunity(code, RESULT_OUTPUT_PATH, DRAW_BUY_POINT_K_LINE, DRAW_TO_FILE, None, True)
# if bullish > 0:
# if ok > 0:
# has_buy_point_stock_count = has_buy_point_stock_count + 1
# else:
# no_buy_point_stock_count = no_buy_point_stock_count + 1
# else:
# no_bullish_stock_count = no_bullish_stock_count + 1
# print("total {0} stocks, {1} without bullish, {3} has buy point, {4} no buy point".format(
# len(all_stocks), no_bullish_stock_count, has_buy_point_stock_count, no_buy_point_stock_count))
def check_result(code, date):
stock = get_cached_stock(code)
idx = stock[stock["date"] == date].index.tolist()[0]
result = check_profit(stock, idx+NOT_BREAK_K_COUNT, idx+NOT_BREAK_K_COUNT+BUY_WINDOW_LEN, LEAST_PROFIT_RATE, MAX_KEEP_DAYS, idx)
print("result:{0}".format(result))
def cal_profite(all_stocks, sh_indecator, sz_indecator, pool=None):
no_buy_point_stock_count = 0
has_buy_point_stock_count = 0
no_bullish_stock_count = 0
work_param = []
for stock in all_stocks:
work_param.append((stock, sh_indecator if str(stock).startswith("sh") else sz_indecator))
if pool is None:
pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
results = pool.starmap(worker, work_param)
pool.close()
pool.join()
total_bullish = 0
total_buy_point = 0
for bullish, ok in results:
total_bullish += bullish
total_buy_point += ok
if bullish > 0:
if ok > 0:
has_buy_point_stock_count = has_buy_point_stock_count + 1
else:
no_buy_point_stock_count = no_buy_point_stock_count + 1
else:
no_bullish_stock_count = no_bullish_stock_count + 1
print("total {0} stocks, {1} without bullish, {2} has buy point, {3} no buy point".format(
len(all_stocks), no_bullish_stock_count, has_buy_point_stock_count, no_buy_point_stock_count))
print(" total bullish:{0}, total buy point:{1}, rate:{2}".format(
total_bullish, total_buy_point, float(total_buy_point)/float(total_bullish)))
summary = {
"bullish_len": BULLISH_K_INCREASE_LEN,
"not_break_len": NOT_BREAK_K_COUNT,
"max_keep_days": MAX_KEEP_DAYS,
"buy_window_len": BUY_WINDOW_LEN,
"least_profit_rate": LEAST_PROFIT_RATE,
"total_bullish": total_bullish,
"total_buy_point": total_buy_point,
"win_rate": float(total_buy_point)/float(total_bullish)
}
with open("summary.json", "r+") as file:
try:
old = json.load(file)
if len(old) == 0:
old = []
except:
old = []
old.append(summary)
file.seek(0)
json.dump(old, file, indent=2)
if __name__ == '__main__':
# list_cached_stocks()
# check_result("sz.000002", "2022-11-11")
# exit()
if not os.path.exists(LOCAL_CACHE_PATH):
pull_stock_data("2022-03-31", "2025-04-14")
all_stocks = list_cached_stocks()
sh_indecator = get_cached_stock("sh.000001")
sz_indecator = get_cached_stock("sz.399001")
for bullish_k_len in range(1, 10):
for not_break_k_count in range(2, 6):
for max_keep_days in range(1, 20):
for buy_window_len in range(2, 10):
for least_profit_rate in [0.01, 0.05, 0.1, 0.15, 0.2]:
old = []
with open("summary.json", "r+") as file:
try:
old = json.load(file)
except:
pass
inold = False
for exist in old:
if bullish_k_len == exist["bullish_len"] and not_break_k_count == exist["not_break_len"] and max_keep_days == exist["max_keep_days"] and buy_window_len == exist["buy_window_len"] and least_profit_rate == exist["least_profit_rate"]:
print("already processed this param")
inold = True
break
if inold:
continue
BULLISH_K_INCREASE_LEN = bullish_k_len
NOT_BREAK_K_COUNT = not_break_k_count
MAX_KEEP_DAYS = max_keep_days
BUY_WINDOW_LEN = buy_window_len
LEAST_PROFIT_RATE = least_profit_rate
print("BULLISH_K_INCREASE_LEN:{0}, NOT_BREAK_K_COUNT:{1}, MAX_KEEP_DAYS:{2}, BUY_WINDOW_LEN:{3}, LEAST_PROFIT_RATE:{4}".format(
BULLISH_K_INCREASE_LEN, NOT_BREAK_K_COUNT, MAX_KEEP_DAYS, BUY_WINDOW_LEN, LEAST_PROFIT_RATE))
cal_profite(all_stocks, None, None)