numpy 确定重要枢轴点后,在特定条件下绘制趋势线

qzwqbdag  于 2023-10-19  发布在  其他
关注(0)|答案(1)|浏览(119)

我试图得到一个点,它在一系列点中更高,也就是说,枢轴高点,然后在一系列枢轴高点中,我想找到一个显著的枢轴高点。为此,我试图创建一个范围,这不是预定义的,但计算的每一个去。通过膝点图计算,以确定最佳参数,该参数给出范围以上的点和范围以下的点。
这对于很多数据来说都很好。如果循环无法找到最佳参数,我将手动分配最佳高和最佳低数据。还有一个范围,我们可以检查参数值,并且较低的参数有一个条件,即它不能超过某个值。
这是足够的背景,并确保代码被理解得很好。
现在我想包括一个功能,绘制趋势线的情节包含重要的枢轴高,重要的枢轴低和收盘价。趋势线的特征应该是这样的,我能够连接价格图表上的向上趋势线与重要的枢轴低点。该线触及的枢轴低点越显著,趋势线就越强。类似的情况将是向下的趋势线和重要的枢轴低点。
我的代码当前绘制的内容类似于:

红色虚线和绿色虚线分别表示正在绘制的当前线。黑色和蓝色的连接线是我希望从我的代码。
我想,我不能正确地思考逻辑,一旦,我可以清楚地写出算法。
代码:

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema

def calculate_pivot_points(data):
    pivot_points = []
    resistance_levels = []
    support_levels = []
    pivot_high_points = []
    pivot_low_points = []

    for i in range(len(data)):
        high = data.loc[i, 'high']
        low = data.loc[i, 'low']
        close = data.loc[i, 'close']

        # Calculate Pivot Point
        pivot_point = (high + low + close) / 3
        pivot_points.append(pivot_point)

        # Calculate Resistance Levels
        resistance1 = (2 * pivot_point) - low
        resistance2 = pivot_point + (high - low)
        resistance3 = high + 2 * (pivot_point - low)
        resistance_levels.append({'R1': resistance1, 'R2': resistance2, 'R3': resistance3})

        # Calculate Support Levels
        support1 = (2 * pivot_point) - high
        support2 = pivot_point - (high - low)
        support3 = low - 2 * (high - pivot_point)
        support_levels.append({'S1': support1, 'S2': support2, 'S3': support3})

        # Identify Pivot High Points using swing points
        if i > 0 and i < len(data) - 1:
            if high > data.loc[i-1, 'high'] and high > data.loc[i+1, 'high']:
                pivot_high_points.append({'index': i, 'value': high})

        # Identify Pivot Low Points using swing points
        if i > 0 and i < len(data) - 1:
            if low < data.loc[i-1, 'low'] and low < data.loc[i+1, 'low']:
                pivot_low_points.append({'index': i, 'value': low})

    return pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points

# Create a list to store all the data frames
data_frames = []

# Specify the folder path containing the CSV files
folder_path = "./data_frames"

# Iterate over each file in the folder
for filename in os.listdir(folder_path):
    if filename.endswith(".csv"):
        file_path = os.path.join(folder_path, filename)

        # Read the data from the CSV file
        data = pd.read_csv(file_path)

        # Add the data frame to the list
        data_frames.append(data)

        # Extract the file name without the extension
        file_name = os.path.splitext(filename)[0]

        # Calculate pivot points and other parameters
        pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points = calculate_pivot_points(data)

        # Extract closing prices
        closing_prices = data['close']

        # Define the range of parameter values to test
        parameter_range = range(1, 40)

        # Calculate scores for different parameter combinations
        parameter_scores = []
        for high_parameter in parameter_range:
            for low_parameter in parameter_range:
                if low_parameter <= 8:  # Add the condition here
                    # Determine significant pivot high points using swing points
                    significant_high_points = []
                    for point in pivot_high_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            high_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'high']
                            if point['value'] == high_range.max():
                                significant_high_points.append(point)

                    # Determine significant pivot low points using swing points
                    significant_low_points = []
                    for point in pivot_low_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            low_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'low']
                            if point['value'] == low_range.min():
                                significant_low_points.append(point)

                    # Calculate the score as the difference between high and low point counts
                    score = len(significant_high_points) - len(significant_low_points)
                    parameter_scores.append((high_parameter, low_parameter, score))

        # Convert the scores to a NumPy array for easier manipulation
        scores = np.array(parameter_scores)

        # Find the optimal parameter values using the knee point
        if len(scores) > 0:
            knee_index = argrelextrema(scores[:, 2], np.less)[0][-1]
            optimal_high_parameter, optimal_low_parameter, optimal_score = scores[knee_index]
        else:
            optimal_high_parameter = 16  # Manually assign the value
            optimal_low_parameter = 2  # Manually assign the value

        print("Optimal high parameter value:", optimal_high_parameter)
        print("Optimal low parameter value:", optimal_low_parameter)

        # Plot line chart for closing prices
        plt.plot(closing_prices, label='Closing Prices')

        # Calculate the trendlines for connecting the pivot high points
        trendlines_high = []
        trendline_points_high = []
        for i in range(0, len(significant_high_points) - 1):
            point1 = significant_high_points[i]
            point2 = significant_high_points[i+1]
            slope = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
            if slope > 0:
                if not trendline_points_high:
                    trendline_points_high.append(point1)
                trendline_points_high.append(point2)
            else:
                if len(trendline_points_high) > 1:
                    trendlines_high.append(trendline_points_high)
                trendline_points_high = []
        if len(trendline_points_high) > 1:
            trendlines_high.append(trendline_points_high)

        # Calculate the trendlines for connecting the pivot low points
        trendlines_low = []
        trendline_points_low = []
        for i in range(0, len(significant_low_points) - 1):
            point1 = significant_low_points[i]
            point2 = significant_low_points[i+1]
            slope = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])
            if slope < 0:
                if not trendline_points_low:
                    trendline_points_low.append(point1)
                trendline_points_low.append(point2)
            else:
                if len(trendline_points_low) > 1:
                    trendlines_low.append(trendline_points_low)
                trendline_points_low = []
        if len(trendline_points_low) > 1:
            trendlines_low.append(trendline_points_low)

        # Plot the trendlines for positive slope
        for trendline_points_high in trendlines_high:
            x_values = [point['index'] for point in trendline_points_high]
            y_values = [point['value'] for point in trendline_points_high]
            plt.plot(x_values, y_values, color='red', linestyle='dashed')

        # Plot the significant pivot high points
        x_values = [point['index'] for point in significant_high_points]
        y_values = [point['value'] for point in significant_high_points]
        plt.scatter(x_values, y_values, color='red', label='Significant Pivot High Points')

        # Plot the trendlines for positive slope
        for trendline_points_low in trendlines_low:
            x_values = [point['index'] for point in trendline_points_low]
            y_values = [point['value'] for point in trendline_points_low]
            plt.plot(x_values, y_values, color='green', linestyle='dashed')

        # Plot the significant pivot low points
        x_values = [point['index'] for point in significant_low_points]
        y_values = [point['value'] for point in significant_low_points]
        plt.scatter(x_values, y_values, color='green', label='Significant Pivot Low Points')

        # Set chart title and labels
        plt.title(f'Closing Prices with Trendlines and Significant Pivot Points ({file_name})')
        plt.xlabel('Index')
        plt.ylabel('Closing Price')

        # Show the chart for the current data frame
        plt.legend()
        plt.show()

如果您想自己尝试代码,可以在此驱动器链接中找到数据:Link
PS:在目前的代码中,我只是检查这两个点是否在同一直线趋势线上。这在很长一段时间内都不会发生。所以我想的是,我们定义一个范围,如果第一个点和第n+1个点之间的斜率>或< 0,那么我们继续下两个点,即第n+1和第n+2个点。这里,如果两个斜率之间的差异,即n和n +1之间的斜率和n +1和n+2之间的斜率在一定范围内,那么我们可以将主要斜率变量移动到n和n+2之间的斜率,并类似地运行循环。这将是一个很好的开始,但现在我被编码部分卡住了。如果有人能帮我把它写出来,那将是非常有帮助的。

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from scipy.signal import argrelextrema

def calculate_pivot_points(data):
    pivot_points = []
    resistance_levels = []
    support_levels = []
    pivot_high_points = []
    pivot_low_points = []

    for i in range(len(data)):
        high = data.loc[i, 'high']
        low = data.loc[i, 'low']
        close = data.loc[i, 'close']

        # Calculate Pivot Point
        pivot_point = (high + low + close) / 3
        pivot_points.append(pivot_point)

        # Calculate Resistance Levels
        resistance1 = (2 * pivot_point) - low
        resistance2 = pivot_point + (high - low)
        resistance3 = high + 2 * (pivot_point - low)
        resistance_levels.append({'R1': resistance1, 'R2': resistance2, 'R3': resistance3})

        # Calculate Support Levels
        support1 = (2 * pivot_point) - high
        support2 = pivot_point - (high - low)
        support3 = low - 2 * (high - pivot_point)
        support_levels.append({'S1': support1, 'S2': support2, 'S3': support3})

        # Identify Pivot High Points using swing points
        if i > 0 and i < len(data) - 1:
            if high > data.loc[i-1, 'high'] and high > data.loc[i+1, 'high']:
                pivot_high_points.append({'index': i, 'value': high})

        # Identify Pivot Low Points using swing points
        if i > 0 and i < len(data) - 1:
            if low < data.loc[i-1, 'low'] and low < data.loc[i+1, 'low']:
                pivot_low_points.append({'index': i, 'value': low})

    return pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points

# Create a list to store all the data frames
data_frames = []

# Specify the folder path containing the CSV files
folder_path = "./data_frames"

# Iterate over each file in the folder
for filename in os.listdir(folder_path):
    if filename.endswith(".csv"):
        file_path = os.path.join(folder_path, filename)

        # Read the data from the CSV file
        data = pd.read_csv(file_path)

        # Add the data frame to the list
        data_frames.append(data)

        # Extract the file name without the extension
        file_name = os.path.splitext(filename)[0]

        # Calculate pivot points and other parameters
        pivot_points, resistance_levels, support_levels, pivot_high_points, pivot_low_points = calculate_pivot_points(data)

        # Extract closing prices
        closing_prices = data['close']

        # Define the range of parameter values to test
        parameter_range = range(1, 40)

        # Calculate scores for different parameter combinations
        parameter_scores = []
        for high_parameter in parameter_range:
            for low_parameter in parameter_range:
                if low_parameter <= 8:  # Add the condition here
                    # Determine significant pivot high points using swing points
                    significant_high_points = []
                    for point in pivot_high_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            high_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'high']
                            if point['value'] == high_range.max():
                                significant_high_points.append(point)

                    # Determine significant pivot low points using swing points
                    significant_low_points = []
                    for point in pivot_low_points:
                        if point['index'] > 0 and point['index'] < len(data) - 1:
                            low_range = data.loc[point['index'] - high_parameter: point['index'] + low_parameter, 'low']
                            if point['value'] == low_range.min():
                                significant_low_points.append(point)

                    # Calculate the score as the difference between high and low point counts
                    score = len(significant_high_points) - len(significant_low_points)
                    parameter_scores.append((high_parameter, low_parameter, score))

        # Convert the scores to a NumPy array for easier manipulation
        scores = np.array(parameter_scores)

        # Find the optimal parameter values using the knee point
        if len(scores) > 0:
            knee_index = argrelextrema(scores[:, 2], np.less)[0][-1]
            optimal_high_parameter, optimal_low_parameter, optimal_score = scores[knee_index]
        else:
            optimal_high_parameter = 16  # Manually assign the value
            optimal_low_parameter = 2  # Manually assign the value

        print("Optimal high parameter value:", optimal_high_parameter)
        print("Optimal low parameter value:", optimal_low_parameter)

        # Plot line chart for closing prices
        plt.plot(closing_prices, label='Closing Prices')

        slope_range = 1  # Adjust this range as per your requirement

        # Calculate the trendlines for connecting the pivot high points
        trendlines_high = []
        trendline_points_high = []
        for i in range(0, len(significant_high_points) - 2):
            point1 = significant_high_points[i]
            point2 = significant_high_points[i+1]
            slope1 = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])

            point3 = significant_high_points[i+1]
            point4 = significant_high_points[i+2]
            slope2 = (point4['value'] - point3['value']) / (point4['index'] - point3['index'])

            slope_difference = abs(slope2 - slope1)

            if slope1 < 0:
                if not trendline_points_high:
                    trendline_points_high.append(point1)
                if slope_difference <= slope_range:
                    trendline_points_high.append(point2)
                else:
                    if len(trendline_points_high) > 1:
                        trendlines_high.append(trendline_points_high)
                    trendline_points_high = [point2]  # Start a new trendline with point2

        if len(trendline_points_high) > 1:
            trendlines_high.append(trendline_points_high)

        # Calculate the trendlines for connecting the pivot low points
        trendlines_low = []
        trendline_points_low = []
        for i in range(0, len(significant_low_points) - 2):
            point1 = significant_low_points[i]
            point2 = significant_low_points[i+1]
            slope1 = (point2['value'] - point1['value']) / (point2['index'] - point1['index'])

            point3 = significant_low_points[i+1]
            point4 = significant_low_points[i+2]
            slope2 = (point4['value'] - point3['value']) / (point4['index'] - point3['index'])

            slope_difference = abs(slope2 - slope1)

            if slope1 > 0:
                if not trendline_points_low:
                    trendline_points_low.append(point1)
                if slope_difference <= slope_range:
                    trendline_points_low.append(point2)
                else:
                    if len(trendline_points_low) > 1:
                        trendlines_low.append(trendline_points_low)
                    trendline_points_low = [point2]  # Start a new trendline with point2

        if len(trendline_points_low) > 1:
            trendlines_low.append(trendline_points_low)

        # Plot the trendlines for positive slope
        for trendline_points_high in trendlines_high:
            x_values = [point['index'] for point in trendline_points_high]
            y_values = [point['value'] for point in trendline_points_high]
            plt.plot(x_values, y_values, color='red', linestyle='dashed')

        # Plot the significant pivot high points
        x_values = [point['index'] for point in significant_high_points]
        y_values = [point['value'] for point in significant_high_points]
        plt.scatter(x_values, y_values, color='red', label='Significant Pivot High Points')

        # Plot the trendlines for positive slope
        for trendline_points_low in trendlines_low:
            x_values = [point['index'] for point in trendline_points_low]
            y_values = [point['value'] for point in trendline_points_low]
            plt.plot(x_values, y_values, color='green', linestyle='dashed')

        # Plot the significant pivot low points
        x_values = [point['index'] for point in significant_low_points]
        y_values = [point['value'] for point in significant_low_points]
        plt.scatter(x_values, y_values, color='green', label='Significant Pivot Low Points')

        # Set chart title and labels
        plt.title(f'Closing Prices with Trendlines and Significant Pivot Points ({file_name})')
        plt.xlabel('Index')
        plt.ylabel('Closing Price')

        # Show the chart for the current data frame
        plt.legend()
        plt.show()

这是我的新方法,按照我刚才所说的逻辑,但仍然没有接近我们的愿望。

8tntrjer

8tntrjer1#

下面是一个使用KMeans聚类和线性回归技术绘制优化趋势线的示例

簇的数量是硬编码的(并且可以通过变量n_clusters轻松更改);在一个更复杂的版本中,基于数据本身的最佳集群数量将被达到(例如,这就是为什么mean_squared_error被包括在内,但我最终没有在这个演示中使用它们;使用该度量总是简单地选择可能的最大数目的聚类作为最佳-存在用于找到理想数目的聚类的更好方法,但是,通过小轮试验和错误运行手动将其参数化并不太困难或耗时)。

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.graph_objects as go

from sklearn.cluster import KMeans
from sklearn.linear_model import LinearRegression
from sklearn.metrics import mean_squared_error

def fit_line_to_cluster(df, cluster_label):
    reg = LinearRegression().fit(df[["index"]], df["value"])
    pred = reg.predict(df[["index"]])
    mse = mean_squared_error(df["value"], pred)
    return reg, mse

def calculate_pivot_points(data):
    pivot_points = []
    pivot_high_points = []
    pivot_low_points = []

    for i in range(len(data)):
        high = data.loc[i, "high"]
        low = data.loc[i, "low"]
        close = data.loc[i, "close"]

        # Calculate Pivot Point
        pivot_point = (high + low + close) / 3
        pivot_points.append(pivot_point)

        # Identify Pivot High Points using swing points
        if i > 0 and i < len(data) - 1:
            if (
                high > data.loc[i - 1, "high"]
                and high > data.loc[i + 1, "high"]
            ):
                pivot_high_points.append({"index": i, "value": high})

        # Identify Pivot Low Points using swing points
        if i > 0 and i < len(data) - 1:
            if low < data.loc[i - 1, "low"] and low < data.loc[i + 1, "low"]:
                pivot_low_points.append({"index": i, "value": low})

    return (
        pivot_points,
        pivot_high_points,
        pivot_low_points,
    )

def add_fitted_lines_to_plotly(fig, df, models, colors):
    for i, model in enumerate(models):
        x_vals = df[df["cluster"] == i]["index"].values
        y_vals = model.predict(x_vals.reshape(-1, 1))
        fig.add_trace(
            go.Scatter(
                x=x_vals, y=y_vals, mode="lines", line=dict(color=colors[i]),
            )
        )

### Data Analysis
df = pd.read_excel("~/Downloads/data3.xlsx")
df["time"] = pd.to_datetime(df["timestamp"])

pivot_points, pivot_high_points, pivot_low_points = calculate_pivot_points(df)

high_df = pd.DataFrame(pivot_high_points)
low_df = pd.DataFrame(pivot_low_points)

## Clustering
n_clusters = 20

optimal_high_models = []
optimal_low_models = []

# For high points
kmeans_high = KMeans(n_clusters=n_clusters, random_state=0).fit(
    high_df[["index", "value"]]
)
high_df["cluster"] = kmeans_high.labels_
for i in range(n_clusters):
    cluster_data = high_df[high_df["cluster"] == i]
    model, mse = fit_line_to_cluster(cluster_data, i)
    optimal_high_models.append(model)

# For low points
kmeans_low = KMeans(n_clusters=n_clusters, random_state=0).fit(
    low_df[["index", "value"]]
)
low_df["cluster"] = kmeans_low.labels_
for i in range(n_clusters):
    cluster_data = low_df[low_df["cluster"] == i]
    model, mse = fit_line_to_cluster(cluster_data, i)
    optimal_low_models.append(model)

closing_prices = df["close"].values

### Plotting
fig = go.Figure()

# Plot closing, high, and low points
fig.add_trace(
    go.Scatter(
        x=list(range(len(closing_prices))),
        y=closing_prices,
        mode="lines",
        name="Closing Prices",
        line=dict(color="blue"),
        opacity=0.75,
    )
)

fig.add_trace(
    go.Scatter(
        x=high_df["index"],
        y=high_df["value"],
        mode="markers",
        name="High Points",
        marker=dict(color="red"),
        opacity=0.75,
        marker_size=5,
    )
)
fig.add_trace(
    go.Scatter(
        x=low_df["index"],
        y=low_df["value"],
        mode="markers",
        name="Low Points",
        marker=dict(color="green"),
        opacity=0.75,
        marker_size=5,
    )
)

# Add optimal trendlines for high and low clusters
add_fitted_lines_to_plotly(
    fig, high_df, optimal_high_models, ["magenta"] * n_clusters
)
add_fitted_lines_to_plotly(
    fig, low_df, optimal_low_models, ["cyan"] * n_clusters
)

# Set chart title and labels
fig.update_layout(
    title="High and Low Pivot Points with Clustered Trendlines",
    xaxis_title="Time",
    yaxis_title="Price",
)

fig.show()

结果是:x1c 0d1x使用Plotly,这允许放大数据,例如:

相关问题