Python - 一連の値に基づいて複数のプロセスを使用する

okwaves2024-01-25  9

久しぶりに Python のマルチプロセッシングを使用しました。それによって恩恵を受ける可能性のあるタスクがあるか、その仮定が間違っています。自分のユースケースに対してのみプロキシを提供できます。

基本的に、リアルタイム入力シナリオをシミュレートしています。生の入力がストリートシーンのビデオである場合、入力はフレームごとに一意に識別され追跡された車両の状態になります。状態_0: v_id=0; x=0; y=1; vx=2.5; vy=-3.

これらの状態はフレームごとではなく、一度に 1 つずつ発生します。したがって、上記の例がフレーム 1 のものである場合、車両 0 がフレーム 2 で追跡され、新しい車両がピックアップされると、次の結果が得られます。

状態_1: v_id=0; x=2.5; y=-2; vx=3; vy=-2.5 状態_2: v_id=1; x=5.6; y=3; vx=-1; vy=0

フレーム情報はなく、状態が続くだけです。

Th状態は、v_id、[x、y、vx、vy] を返すカスタム反復子を持つ State オブジェクトによって表されます。

特定の車両に関する以前の情報を使用して、これらの状態について分析を実行したいと考えています。さらにプロキシ コードをいくつか示します。

self.analysis = {}

def estimate(self):

    for v_id, state in self.State:

        if v_id not in self.analysis.keys():
            self.analysis[v_id] = state

        state_estimate = self.do_something(self.analysis[v_id], state)
        self.analysis[vid].append(state_estimate)

私が持っているものは機能しますが、マルチプロセッシングを使用して改善できるかどうか知りたいです。私の考えは、すべての状態 v_id=0 をプロセス 1 に送信し、すべての状態 v_id=1 をプロセス 2 に送信できるかということでした。

do_something() 関数は非常に時間がかかるため、State イテレータが複数のプロセスに状態を渡した方が高速になるのではないかと考えました。

「先を読む」ことを自分に許可していないため、イテレータがボトルネックになると思います。あらゆる未来の状態へ。しかし、16 個のプロセスをセットアップし、現在の状態の v_id に基づいて sta を持つことはできますか?特定のプロセスによって実行される分析を推定しますか?ここではマルチプロセッシングが適切ですか?

これはほぼ完全に do_something が何であるかによって決まります。 CPU バウンドの場合は、マルチプロセッシングが役立つ可能性があります。主に新しい状態 (または通常は IO バウンド) を待機している場合、おそらくパフォーマンスはあまり改善されません。ただし、そのメソッドの詳細を含めるか、単にコードのプロファイリングやベンチマークを行う必要があります。

– ブネッカー

2020 年 9 月 3 日 20:09

簡単に説明すると、do_something は、過去の予測と現在の状態を使用して新しい予測を生成する機械学習予測子です。 CPU に依存していると思いますか?それは役に立ちますか?実際のコードを提供できないのが残念です。

– スパッド

2020 年 9 月 3 日 20:53

確実に知るためには試してみる必要があるようです。それは常にトレードオフです。新しいプロセスは無料ではないため、プロセスの作成とリソース使用量のバランスを取る必要があります。計算に有利になります。新しいデータが到着するまでにかかる時間よりも計算にかかる時間が大幅に長い場合は、並列化が役立つ可能性があります。ぜひ試してみることをお勧めします。コードをプロファイリングし、並列化して、再度プロファイリングします :)

– ブネッカー

2020 年 9 月 3 日 21:12

しかし、どうやってそれを行うのでしょうか?コードをプロファイリングしたので、do_something が時間の負担になることがわかりました。私はマルチプロセッシングについてあまり詳しくありません。このコードのどこにプロセスを作成すればよいでしょうか?適切なデータを提供するにはどうすればよいですか?

– Sプリン

2020 年 9 月 3 日 23:03

マルチプロセッシングのドキュメントを参照してください。 ID に基づいてワーカーにデータを送信するという基本的なアプローチは良さそうです (各 ID の可能性が等しい限り)。コードを書くことはできませんが、簡単に説明します。プロセスごとにキューを作成します。新しい状態をキューにプッシュします。ワーカーは、キュー上の新しいデータを待機する無限ループを実行し、処理を実行し、必要に応じて結果を送信または保存します。

– ブネッカー

2020 年 9 月 3 日 23:33

-------------------------------

これがうまくいかないことはわかっていますが、これには問題があります。

ワーカーがすべて同じ呼び出し可能オブジェクトを実行しているが、v_id と状態が異なる場合に、ワーカーにタスクをフィードする方法に苦労しています。

最初の編集: これはもう少し目標に近いと思います。プロセスごとにキューがあります。状態は、v_id に基づいてキューに追加されます。まだワーカーを特定中です。

2 番目の編集: 入力キューと出力キューの両方が必要でしょうか?プロセスを追跡する必要はまったくないのではないでしょうか?これはすべて例に基づいて説明しようとしています。

進行中のソリューション:

self.analysis = {}

self.queues = {}
self.n_workers = multiprocessing.cpu_count()
for worker in range(self.n_workers):
    q_i = multiprocessing.Queue()
    q_o = multiprocessing.Queue()
    multiprocessing.Process(target=self.worker, args=(q_i, q_o), name=str(worker)).start()
    self.queues[v_id] = {'input': q_i, 'output': q_o}

def worker(self, input_queue, output_queue):

    ??? Maybe something here takes the place of self.analysis ???

def loop_states(self):

    for v_id, state in self.State:

        if v_id not in self.analysis.keys():
            self.analysis[v_id] = state

        queue = self.queues[v_id % self.n_workers]['input']
        queue.put(self.estimate, (v_id, state))

def estimate(self, v_id, state):

    state_estimate = self.do_something(self.analysis[v_id], state)
    self.analysis[vid].append(state_estimate)

1

ここでは正しい道を進んでいます。ワーカー関数には、並列化する必要がある関数がすべて含まれている必要があります。これは事実上、入力キューから項目を受け取り、その項目に対して do_something を呼び出し、結果を出力キューに送り返す while ループである必要があります。

– ブネッカー

2020 年 9 月 4 日 16:28

総合生活情報サイト - OKWAVES
総合生活情報サイト - OKWAVES
生活総合情報サイトokwaves(オールアバウト)。その道のプロ(専門家)が、日常生活をより豊かに快適にするノウハウから業界の最新動向、読み物コラムまで、多彩なコンテンツを発信。