V-REPのLine TracerをPythonで制御する
急に3Dシミュレーションやりたい欲が出てきたのでV-REPというのを使ってみた。 V-REPを選択した理由は次の通り。
- Pythonで制御できる
- いろんなリアルロボットをシミュレートできる
- 以下の記事があったのでハードルが低かった
この記事に沿って進めていくだけで、以下の動画のようにPythonでLine Tracerを制御できるようになる。 しかし、途中からLine Trace機能が効かなくなって、Line Tracerが落下するという結末が待っていた。
そこで、その原因を究明するために、ソースコードを少しだけちゃんと読んでみた。
ソースコード解析
前半はV-REPをPythonから制御するための前処理的なことなので飛ばして、まずは扱うUIとObjectの取得から。 次の処理では、メソッドの第2引数から分かるように、順にセンサーディスプレイ、左・中央・右センサー、左・右車輪と、1つのUIと5つのObjectを取得している。
# Get obejects res, display = vrep.simxGetUIHandle(clientID, "sensorDisplay", vrep.simx_opmode_blocking) res, leftSensor = vrep.simxGetObjectHandle(clientID, "LeftSensor", vrep.simx_opmode_blocking) res, middleSensor = vrep.simxGetObjectHandle(clientID, "MiddleSensor", vrep.simx_opmode_blocking) res, rightSensor = vrep.simxGetObjectHandle(clientID, "RightSensor", vrep.simx_opmode_blocking) res, leftJointDynamic = vrep.simxGetObjectHandle(clientID, "DynamicLeftJoint" , vrep.simx_opmode_blocking) # Left wheel res, rightJointDynamic = vrep.simxGetObjectHandle(clientID, "DynamicRightJoint", vrep.simx_opmode_blocking) # Right wheel
最初の"sensorDisplay"
とは、下図の左上にあるUI。
Left sensorが黒く、それ以外が黄色になっている。
これはLine Tracerの前方に付いている3つのセンサーを見ると分かる。
左センサーが黒いラインを捉えているため、UIにそれが反映されている。
以下がその設定メソッドsetLeds()
。
elHandle
に上記で取得した"sensorDisplay"
のObject display
を第1引数として渡す。
第2~4引数のleft
, middle
, right
は、TrueかFalseで、Falseがラインを捉えていることを示す。
最初にFalseで初期化して、Trueなら設定し直すという感じかな、たぶん。
setLeds()という名前にしているのは、sensorをLEDとしてUIで表現しているからだと思う。
# Update the sensor display. def setLeds(elHandle, left, middle, right): # Initialize display at first. vrep.simxSetUIButtonProperty(clientID, elHandle, 8, vrep.sim_buttonproperty_staydown, vrep.simx_opmode_oneshot) vrep.simxSetUIButtonProperty(clientID, elHandle, 16, vrep.sim_buttonproperty_staydown, vrep.simx_opmode_oneshot) vrep.simxSetUIButtonProperty(clientID, elHandle, 24, vrep.sim_buttonproperty_staydown, vrep.simx_opmode_oneshot) # Set LEDs of the display if sensors don't catch the line. if left: vrep.simxSetUIButtonProperty(clientID, elHandle, 8, vrep.sim_buttonproperty_staydown+vrep.sim_buttonproperty_isdown, vrep.simx_opmode_oneshot) if middle: vrep.simxSetUIButtonProperty(clientID, elHandle, 16, vrep.sim_buttonproperty_staydown+vrep.sim_buttonproperty_isdown, vrep.simx_opmode_oneshot) if right: vrep.simxSetUIButtonProperty(clientID, elHandle, 24, vrep.sim_buttonproperty_staydown+vrep.sim_buttonproperty_isdown, vrep.simx_opmode_oneshot)
続いて、本処理に入る前にセンサー情報を格納するlistを初期化。
# Initialize sensors sensorReading = [False, False, False] # Left, Middle, Right sensorReading[0] = (vrep.simxReadVisionSensor(clientID, leftSensor, vrep.simx_opmode_streaming) == 1) sensorReading[1] = (vrep.simxReadVisionSensor(clientID, middleSensor, vrep.simx_opmode_streaming) == 1) sensorReading[2] = (vrep.simxReadVisionSensor(clientID, rightSensor, vrep.simx_opmode_streaming) == 1)
そして本処理だが、ここがLine Tracerが落下した原因の一つ。
while
の行に50秒以内であれば制御する書いてある。
確かに観測してるとゲートを通る直前ぐらいで50秒が過ぎている。
このときにPythonの処理が終わって、その時の左右の車輪の速度が同じでかつ前進しているから、そのままLine Tracerが落下すると思われる。
なので、とりあえずは50を∞に近い数字すれば半永久的にLine Traceしてくれるはず。
なお後に続くのは、センサー情報を取得して上記のsetLeds()でディスプレイにしている処理。
while time.time() - startTime < 50: # Set near ∞ if you want!!!!!!!!!! # Try to retrieve the streamed data. returnCode, data = vrep.simxGetIntegerParameter(clientID,vrep.sim_intparam_mouse_x,vrep.simx_opmode_buffer) # Read the sensors sensorReading[0] = (vrep.simxReadVisionSensor(clientID, leftSensor, vrep.simx_opmode_buffer)[1]) sensorReading[1] = (vrep.simxReadVisionSensor(clientID, middleSensor, vrep.simx_opmode_buffer)[1]) sensorReading[2] = (vrep.simxReadVisionSensor(clientID, rightSensor, vrep.simx_opmode_buffer)[1]) # Update the sensor display setLeds(display, sensorReading[0], sensorReading[1], sensorReading[2])
最後はLine Tracerの制御。 まずはまっすぐ進むように左右の車輪を同じ速度に設定。 その後、左センサーがラインを捉えていたら左に少し曲がるように設定。そのまた逆も然り。 そして、その設定コマンドをLine Tracerに送る。
# Decide about both left and right velocities. s = 1.0 linearVelocityLeft = nominalLinearVelocity*s linearVelocityRight = nominalLinearVelocity*s # Turn left a little if the left sensor catches line(False), and vice versa. if not sensorReading[0]: linearVelocityLeft = linearVelocityLeft*0.3 if not sensorReading[2]: linearVelocityRight = linearVelocityRight*0.3 # Update both left and right velocities. vrep.simxSetJointTargetVelocity(clientID, leftJointDynamic, linearVelocityLeft/(s*wheelRadius), vrep.simx_opmode_oneshot) vrep.simxSetJointTargetVelocity(clientID, rightJointDynamic, linearVelocityRight/(s*wheelRadius), vrep.simx_opmode_oneshot) time.sleep(0.005)
上でwhileの処理時間を∞に近い数字すればいいと言ったが、そんなことしなくてもシンプルに処理が終わったらLine Tracerを止めればいい、という考えもある。そのためには、与える速度を左右の車輪共に0.0
にして、かつ第4引数をvrep.simx_opmode_blocking
にすればOK。ちなみに、vrep.simx_opmode_oneshot
は前のコマンドと同じものを返すみたい?なので、速度を0.0
にしても、そのまま落下する結末を免れることができなかった。詳しくは参考文献を見て下さい。
# Stop the line tracer. vrep.simxSetJointTargetVelocity(clientID, leftJointDynamic, 0.0, vrep.simx_opmode_blocking) vrep.simxSetJointTargetVelocity(clientID, rightJointDynamic, 0.0, vrep.simx_opmode_blocking)
以上。
全ソースコード
# -*- coding: utf-8 -*- try: import vrep except: print ('--------------------------------------------------------------') print ('"vrep.py" could not be imported. This means very probably that') print ('either "vrep.py" or the remoteApi library could not be found.') print ('Make sure both are in the same folder as this file,') print ('or appropriately adjust the file "vrep.py"') print ('--------------------------------------------------------------') print ('') import time import sys import ctypes print ('Program started') vrep.simxFinish(-1) clientID = vrep.simxStart('127.0.0.1', 19999, True, True, 5000, 5) if clientID != -1: print ('Connected to remote API server') else: print ('Failed connecting to remote API server') sys.exit('Program Ended') nominalLinearVelocity = 0.3 wheelRadius = 0.027 interWheelDistance = 0.119 res, objs = vrep.simxGetObjects(clientID, vrep.sim_handle_all, vrep.simx_opmode_blocking) if res == vrep.simx_return_ok: print('Number of objects in the scene: ', len(objs)) else: print('Remote API function call returned with error code: ',res) time.sleep(2) startTime = time.time() vrep.simxGetIntegerParameter(clientID, vrep.sim_intparam_mouse_x, vrep.simx_opmode_streaming) # Get obejects. res, display = vrep.simxGetUIHandle(clientID, "sensorDisplay", vrep.simx_opmode_blocking) res, leftSensor = vrep.simxGetObjectHandle(clientID, "LeftSensor", vrep.simx_opmode_blocking) res, middleSensor = vrep.simxGetObjectHandle(clientID, "MiddleSensor", vrep.simx_opmode_blocking) res, rightSensor = vrep.simxGetObjectHandle(clientID, "RightSensor", vrep.simx_opmode_blocking) res, leftJointDynamic = vrep.simxGetObjectHandle(clientID, "DynamicLeftJoint" , vrep.simx_opmode_blocking) # Left wheel res, rightJointDynamic = vrep.simxGetObjectHandle(clientID, "DynamicRightJoint", vrep.simx_opmode_blocking) # Right wheel if res != vrep.simx_return_ok: print('Failed to get sensor Handler') vrep.simxFinish(clientID) sys.exit('Program ended') # Update the sensor display. def setLeds(elHandle, left, middle, right): # Initialize display at first. vrep.simxSetUIButtonProperty(clientID, elHandle, 8, vrep.sim_buttonproperty_staydown, vrep.simx_opmode_oneshot) vrep.simxSetUIButtonProperty(clientID, elHandle, 16, vrep.sim_buttonproperty_staydown, vrep.simx_opmode_oneshot) vrep.simxSetUIButtonProperty(clientID, elHandle, 24, vrep.sim_buttonproperty_staydown, vrep.simx_opmode_oneshot) # Set LEDs of the display if sensors don't catch the line. if left: vrep.simxSetUIButtonProperty(clientID, elHandle, 8, vrep.sim_buttonproperty_staydown+vrep.sim_buttonproperty_isdown, vrep.simx_opmode_oneshot) if middle: vrep.simxSetUIButtonProperty(clientID, elHandle, 16, vrep.sim_buttonproperty_staydown+vrep.sim_buttonproperty_isdown, vrep.simx_opmode_oneshot) if right: vrep.simxSetUIButtonProperty(clientID, elHandle, 24, vrep.sim_buttonproperty_staydown+vrep.sim_buttonproperty_isdown, vrep.simx_opmode_oneshot) # Initialize sensors sensorReading = [False, False, False] # Left, Middle, Right sensorReading[0] = (vrep.simxReadVisionSensor(clientID, leftSensor, vrep.simx_opmode_streaming) == 1) sensorReading[1] = (vrep.simxReadVisionSensor(clientID, middleSensor, vrep.simx_opmode_streaming) == 1) sensorReading[2] = (vrep.simxReadVisionSensor(clientID, rightSensor, vrep.simx_opmode_streaming) == 1) while time.time() - startTime < 50: # Try to retrieve the streamed data. returnCode, data = vrep.simxGetIntegerParameter(clientID,vrep.sim_intparam_mouse_x,vrep.simx_opmode_buffer) # Read the sensors sensorReading[0] = (vrep.simxReadVisionSensor(clientID, leftSensor, vrep.simx_opmode_buffer)[1]) sensorReading[1] = (vrep.simxReadVisionSensor(clientID, middleSensor, vrep.simx_opmode_buffer)[1]) sensorReading[2] = (vrep.simxReadVisionSensor(clientID, rightSensor, vrep.simx_opmode_buffer)[1]) # Update the sensor display setLeds(display, sensorReading[0], sensorReading[1], sensorReading[2]) # Decide about both left and right velocities. s = 1.0 linearVelocityLeft = nominalLinearVelocity*s linearVelocityRight = nominalLinearVelocity*s # Trun left a little if the left sensor catches line(False), and vice versa. if not sensorReading[0]: linearVelocityLeft = linearVelocityLeft*0.3 if not sensorReading[2]: linearVelocityRight = linearVelocityRight*0.3 # Update both left and right velocities. vrep.simxSetJointTargetVelocity(clientID, leftJointDynamic, linearVelocityLeft/(s*wheelRadius), vrep.simx_opmode_oneshot) vrep.simxSetJointTargetVelocity(clientID, rightJointDynamic, linearVelocityRight/(s*wheelRadius), vrep.simx_opmode_oneshot) time.sleep(0.005) # Stop the line tracer. vrep.simxSetJointTargetVelocity(clientID, leftJointDynamic, 0.0, vrep.simx_opmode_blocking) vrep.simxSetJointTargetVelocity(clientID, rightJointDynamic, 0.0, vrep.simx_opmode_blocking)
参考文献
複数画像をKerasのVGG16で特徴抽出してk-means++でクラスタリング
VGG16, VGG19, ResNet50, InceptionV3など、 ImageNetで学習済みのモデルがKerasで使える。 物体認識だけでなく特徴抽出にも使えるので、 複数画像をVGG16で特徴抽出して、これをk-means++でクラスタリングしてみた。 なお複数画像は、ハワイで撮影したフラダンスの動画をフレーム分割して用意した。 以下に説明するコードは、ここ に置いておく。
ディレクトリー構成
ディレクトリー構成は以下の通り。
src/image_clustering.py
が実行ファイルとなる。
data/video/
にクラスタリングしたい動画を置く。
data/images/targrt/
に動画をフレーム分割した画像が保存される。
data/images/clustered/
に分類済みの画像が保存される。
なお、動画でなくても、クラスタリングしたい画像を data/images/targrt/
に置くこともできる。
なお、ここ
には src/image_clustering.py
しかないので、
data/video/
と data/images/
フォルダーを追加して使ってください。
image_clustering ├─data │ ├─images │ │ ├─clustered │ │ └─target │ └─video └─src └─image_clustering.py
初期設定
グローバル変数は固定。
__init__()
の引数に、動画の場合は video_file
にファイル名を指定する。
画像の場合は、input_video
をFalseに変更する。
DATA_DIR = '../data/' VIDEOS_DIR = '../data/video/' # The place to put the video TARGET_IMAGES_DIR = '../data/images/target/' # The place to put the images which you want to execute clustering CLUSTERED_IMAGES_DIR = '../data/images/clustered/' # The place to put the images which are clustered IMAGE_LABEL_FILE ='image_label.csv' # Image name and its label class Image_Clustering: def __init__(self, n_clusters=50, video_file='IMG_2140.MOV', image_file_temp='img_%s.png', input_video=True): self.n_clusters = n_clusters # The number of cluster self.video_file = video_file # Input video file name self.image_file_temp = image_file_temp # Image file name template self.input_video = input_video # If input data is a video
メイン関数
以下の順で処理を行う。1.は動画の場合のみ。
def main(self): if self.input_video == True: self.video_2_frames() self.label_images() self.classify_images()
動画のフレーム分割
OpenCVでフレーム分割して、data/images/targrt/
に保存する。
def video_2_frames(self): print('Video to frames...') cap = cv2.VideoCapture(VIDEOS_DIR+self.video_file) # Remove and make a directory. if os.path.exists(TARGET_IMAGES_DIR): shutil.rmtree(TARGET_IMAGES_DIR) # Delete an entire directory tree if not os.path.exists(TARGET_IMAGES_DIR): os.makedirs(TARGET_IMAGES_DIR) # Make a directory i = 0 while(cap.isOpened()): flag, frame = cap.read() # Capture frame-by-frame if flag == False: break # A frame is not left cv2.imwrite(TARGET_IMAGES_DIR+self.image_file_temp % str(i).zfill(6), frame) # Save a frame i += 1 print('Save', TARGET_IMAGES_DIR+self.image_file_temp % str(i).zfill(6)) cap.release() # When everything done, release the capture print('')
実行すると、次々と画像が保存されていく。 フラダンスの動画は17秒だが、フレーム数は530枚となっている。
> python .\image_clustering.py
Video to frames...
Save ../data/images/target/img_000001.png
Save ../data/images/target/img_000002.png
Save ../data/images/target/img_000003.png
...
Save ../data/images/target/img_000530.png
画像のラベリング(特徴抽出とクラスタリング)
モデルにはVGG16を使用。 include_top=False
とすることで、特徴抽出が可能になる。
動画を分割したフレームはpngで保存しているが、画像はjpgも対象。
jpegとかも対象にしたい場合は、この辺を書き換えてください。
後述する__feature_extraction()
にVGG16のモデルと画像を渡して特徴を取得する。
全画像の特徴をk-means++でクラスタリングを行い、ラベルを取得して画像とセットにして、pandasでCSVに保存する。
def label_images(self): print('Label images...') # Load a model model = VGG16(weights='imagenet', include_top=False) # Get images images = [f for f in os.listdir(TARGET_IMAGES_DIR) if f[-4:] in ['.png', '.jpg']] assert(len(images)>0) X = [] pb = ProgressBar(max_value=len(images)) for i in range(len(images)): # Extract image features feat = self.__feature_extraction(model, TARGET_IMAGES_DIR+images[i]) X.append(feat) pb.update(i) # Update progressbar # Clutering images by k-means++ X = np.array(X) kmeans = KMeans(n_clusters=self.n_clusters, random_state=0).fit(X) print('') print('labels:') print(kmeans.labels_) print('') # Merge images and labels df = pd.DataFrame({'image': images, 'label': kmeans.labels_}) df.to_csv(DATA_DIR+IMAGE_LABEL_FILE, index=False)
__feature_extraction()
の中身は、
Applications - Keras Documentation
に書かれてある通り。
リサイズして、4次元データにして、zero-centeringしてから特徴を取得。
ただし、特徴は (1, 7, 7, 512)
の4次元で返ってくるので、
flatten()
で (25088,)
の1次元に変換する。
def __feature_extraction(self, model, img_path): img = image.load_img(img_path, target_size=(224, 224)) # resize x = image.img_to_array(img) x = np.expand_dims(x, axis=0) # add a dimention of samples x = preprocess_input(x) # RGB 2 BGR and zero-centering by mean pixel based on the position of channels feat = model.predict(x) # Get image features feat = feat.flatten() # Convert 3-dimentional matrix to (1, n) array return feat
実行した結果、画像530枚でも、VGG16による特徴抽出は24秒で終わった。なお、GPUはGeForce 1080 x1。 k-means++で50クラスタに分類しているが、だいたい連続するフレームが同じラベルになっていることが分かる。 しかし、ラベル7などフレームが飛んでいても同じラベルになっているものもある。
Label images... 99% (527 of 530) |################################## | Elapsed Time: 0:00:24 ETA: 0:00:00 labels: [28 28 28 28 28 28 28 28 28 3 3 3 3 3 3 3 3 3 3 3 3 3 3 3 16 16 16 16 16 16 16 16 16 16 16 16 16 16 16 16 16 26 26 26 26 26 26 26 26 26 26 26 26 26 26 26 26 26 26 11 11 11 11 11 11 11 11 11 32 32 32 32 32 32 32 32 32 32 32 27 27 27 27 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 9 37 37 37 37 37 37 37 37 37 37 37 37 37 37 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 21 29 29 29 29 29 29 29 29 29 29 29 29 29 1 1 1 1 1 42 42 42 42 42 42 45 45 45 45 45 45 45 45 45 45 45 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 4 33 33 33 33 33 33 33 33 33 33 33 33 33 33 33 35 35 35 35 35 35 35 35 35 35 35 35 19 19 19 19 19 19 41 41 41 41 41 41 41 49 49 49 49 49 49 49 13 13 13 13 13 13 44 44 44 44 44 44 44 44 44 44 44 5 5 17 17 17 17 17 17 17 17 17 17 13 13 5 5 34 34 34 34 34 34 34 34 34 34 5 5 5 5 5 5 25 25 25 25 25 25 25 25 25 15 15 15 15 15 15 31 31 31 31 31 48 48 48 48 48 48 40 40 40 40 40 40 40 20 20 20 20 20 20 20 39 39 39 39 39 39 39 39 2 2 2 2 2 2 2 2 2 2 2 2 2 2 18 18 18 18 18 18 18 18 18 18 18 18 18 46 46 46 46 46 46 46 46 46 47 47 47 47 47 47 47 47 47 22 22 22 22 22 22 6 6 6 6 6 6 6 6 6 6 43 43 43 43 43 43 43 43 43 43 30 30 30 30 30 30 30 30 30 30 7 7 7 7 7 7 7 7 7 7 7 7 7 7 7 38 38 38 38 38 38 38 38 38 38 38 38 38 38 38 7 7 7 10 10 10 10 10 10 10 10 10 24 24 24 24 24 24 24 24 24 14 14 0 0 0 0 0 0 0 0 14 14 14 14 14 14 14 14 14 14 14 14 14 14 14 14 14 14 14 36 36 36 36 36 36 36 36 36 36 36 36 12 12 12 12 12 12 12 12 8 8 8 8 8 8 8 8 23 23 23 23 23 23 23 23 23]
CSVに保存したデータは以下の通り。
image,label img_000000.png,28 img_000001.png,28 img_000002.png,28 ... img_000009.png,3 img_000010.png,3 img_000011.png,3 ... img_000024.png,16 img_000025.png,16 img_000026.png,16 ...
画像の分類(ラベルごとにディレクトリーにまとめる)
CSVファイルを読み込んで、ラベルの数だけ data/images/clustered/
にラベル名のディレクトリーを作成し、
data/images/target/
から画像をラベルごとにコピペする。
def classify_images(self): print('Classify images...') # Get labels and images df = pd.read_csv(DATA_DIR+IMAGE_LABEL_FILE) labels = list(set(df['label'].values)) # Delete images which were clustered before if os.path.exists(CLUSTERED_IMAGES_DIR): shutil.rmtree(CLUSTERED_IMAGES_DIR) for label in labels: print('Copy and paste label %s images.' % label) # Make directories named each label new_dir = CLUSTERED_IMAGES_DIR + str(label) + '/' if not os.path.exists(new_dir): os.makedirs(new_dir) # Copy images to the directories clustered_images = df[df['label']==label]['image'].values for ci in clustered_images: src = TARGET_IMAGES_DIR + ci dst = CLUSTERED_IMAGES_DIR + str(label) + '/' + ci shutil.copyfile(src, dst)
以下が実行ログ。
Classify images... Copy and paste label 0 images. Copy and paste label 1 images. Copy and paste label 2 images. ... Copy and paste label 49 images.
分類結果
先ほどのラベル7の画像を見てみると、左手を頭より上に、右手を胸の高さに上げている画像がまとめられている。 画像のindexは427から443に富んでいるが、どちらも同じポーズをしている。
またラベル45の画像を見てみると、両手を胸の高さに上げて、海の方を向いている画像がまとめられている。
感想
今回、動画からフレーム分割した画像ということもあって、特徴がほとんど変わらない画像が対象となってはいるが、 上手くクラスタリングできている。今後はなるべく似ていない画像で試してみようと思う。 あとできれば、k-means++を使わずにCNNに閉じた方法があれば試してみたい。
参考文献
listやarrayからPandas DataFrameを作成
カラム名をkey、listやarrayをvalueにして辞書を作成して、DataFrameに突っ込むとできる。
import numpy as np import pandas as pd def make_df_from_list_and_array(): col1 = [0,1,2,3,4] col2 = np.array([5,6,7,8,9]) df = pd.DataFrame({'col_list':col1, 'col_array':col2}) print(df)
コードはここ。
>>> import test >>> test.make_df_from_list_and_array() col_array col_list 0 5 0 1 6 1 2 7 2 3 8 3 4 9 4
Pythonでディレクトリー内から特定の拡張子のファイルを取得
listdirを使ってディレクトリー内のファイルを取得し、リスト内包表記を使って条件付きでファイルを読み込む。 if文を変えれば拡張子に限定されない。 例えば、接頭語を指定するとか。
import os def get_target_files(dir_path='./image_dir/'): files = [f for f in os.listdir(dir_path) if f[-4:]=='.png'] return files
コードは一応、ここに置いておきます。
>>> import test >>> test.get_target_files() ['img_000000.png', 'img_000001.png', 'img_000002.png', ...]
Python-OpenCVで動画をフレーム分割して画像として保存
動画をフレームに分割して画像として指定したディレクトリーに保存する操作をまとめた。 動画を読み込んで、フレームごとに存在を確認して、確認できれば画像として保存する、という手順。 引数は適宜変えてください。
import os import shutil import cv2 def video_2_frames(video_file='./IMG_2140.MOV', image_dir='./image_dir/', image_file='img_%s.png'): # Delete the entire directory tree if it exists. if os.path.exists(image_dir): shutil.rmtree(image_dir) # Make the directory if it doesn't exist. if not os.path.exists(image_dir): os.makedirs(image_dir) # Video to frames i = 0 cap = cv2.VideoCapture(video_file) while(cap.isOpened()): flag, frame = cap.read() # Capture frame-by-frame if flag == False: # Is a frame left? break cv2.imwrite(image_dir+image_file % str(i).zfill(6), frame) # Save a frame print('Save', image_dir+image_file % str(i).zfill(6)) i += 1 cap.release() # When everything done, release the capture
githubに置いたコードを 以下のように実行すると、指定したディレクトリーに動画をフレーム分割した画像が保存される。
>>> import test >>> test.video_2_frames() Save ./image_dir/img_000000.png Save ./image_dir/img_000001.png Save ./image_dir/img_000002.png ...
参考文献
Pythonでディレクトリーの削除と作成
処理中、一時的にディレクトリーを作ることが多いので、以下に操作をまとめた。 引数にディレクトリーパスを渡して、ある時は削除して、ない時は作るという仕様。
# coding: utf-8 import os import shutil def delete_and_make_directory(dir_path='./image_dir/'): # Delete the entire directory tree if it exists. if os.path.exists(dir_path): shutil.rmtree(dir_path) # Make the directory if it doesn't exist. if not os.path.exists(dir_path): os.makedirs(dir_path)
githubに置いたコードを 以下のように実行すると、test.pyと同じ階層に新しいディレクトリ-が作成される。
>>> import test >>> test.delete_and_make_directory()
参考文献
TensorFlowからKerasに乗り換えてみた
モデル設計などの際に、TensorFlowのコードが長くなるので自分でラッパーを書いていたのだが、 ざっとKerasを調べてみたら、ラッパーが必要ないくらいシンプルに書けるし、 前処理などモデル設計以外のツールも充実しているようだったので、 KerasでCIFAR10のモデルを訓練するコードを書いてみた。 なおKerasについては、KerasでCIFAR-10の一般物体認識 - 人工知能に関する断創録 が非常に分かりやすかった。そのため以下に示すコードは、こちらの記事をベースに色々とカスタマイズしている。 なおKeras v1.2.2, tensorflow-gpu v1.0.0, opencv-python v3.2.0を使用しており、 全コードはここで公開している。
パラメーター
IDG_PARAMはData Augmentationのパラメーター。
今回は、ZCA Whiteningと、ランダムで上下左右の移動と左右反転を行う。
これだけのData Augmentationを、独自に実装することなく、パラメーター操作だけで色々と試せるのがKerasの良いところ。
Kerasによるデータ拡張 - 人工知能に関する断創録
に各パラメーターを可視化した様子が掲載されている。
モデルなどの訓練結果はDIRに保存する。最後にFILEが付いてるパラメーターがそれらのファイル。
N_CLASS = 10 N_EPOCH = 50 # 100 BATCH_SIZE = 128 INPUT_DIM = (32, 32, 3) DATA_AUGMENTATION = True IDG_PARAM = {'featurewise_center': False, 'samplewise_center': False, 'featurewise_std_normalization': False, 'samplewise_std_normalization': False, 'zca_whitening': True, # False 'rotation_range': 0., 'width_shift_range': 0.1, # 0., 'height_shift_range': 0.1, # 0., 'shear_range': 0., 'zoom_range': 0., 'channel_shift_range': 0., 'fill_mode': 'nearest', 'cval': 0., 'horizontal_flip': True, 'vertical_flip': False, 'rescale': None, 'preprocessing_function': None } DIR = '../result/' MODEL_FILE = 'model.json' WEIGHT_FILE = 'weights.h5' HISTORY_DATA_FILE = 'history.csv' HISTORY_IMAGE_FILE = 'history.jpg' PARAM_EVAL_FILE = 'param_eval.csv'
メイン
今回実装したのは、データ取得、モデル設計、モデル訓練、結果保存、モデル試験の5つ。 これらをメソッドに分けて、すべてTestクラスのmainメソッドから呼び出す。
class Test: def __init__(self): pass def main(self): # Training start = time.clock() data = self.get_data() # データ取得 model = self.design_model(data[0]) # モデル設計 result = self.train_model(data, model) # モデル訓練 self.save(result) # 結果保存 print('Training Time: %s min' % round((time.clock()-start)/60., 1)) print('') # Test self.test_model(data) # モデル試験
データ取得
cifar10.load_data()
を実行するだけで、CIFAR10を訓練/試験、データ/ラベルに分割して取得可能。
ただし、normalizeとonehot化は自分で行う必要があるが、後者もまたnp_utils.to_categorical()
を実行するだけでOK。
def get_data(self): # Load CIFAR-10 (X_train, y_train), (X_test, y_test) = cifar10.load_data() self.__draw_sample_images(X_train, y_train) # Normalize data X_train = X_train.astype('float32') X_test = X_test.astype('float32') X_train /= 255.0 X_test /= 255.0 # Onehot label Y_train = np_utils.to_categorical(y_train, N_CLASS) Y_test = np_utils.to_categorical(y_test, N_CLASS) print('X_train.shape:', X_train.shape, 'Y_train.shape:', Y_train.shape) print('X_test.shape:', X_test.shape, 'Y_test.shape:', Y_test.shape) return X_train, Y_train, X_test, Y_test
なお、self.__draw_sample_images(X_train, y_train)
でCIFAR10の一部を可視化している。
モデル設計
VGGっぽいモデルを設計。
Convolution2D
ではborder_mode='same'
にしてゼロパディング。
個人的にKerasの気に入っているところはActivation('relu')
の部分。
活性化関数を文字列で指定できるところがいい。
TensorFlowでは活性化関数別にメソッドが用意されてたので、独自にラッパーを書いてた。
このモデルは、model.summay()
により標準出力できる。
def design_model(self, X_train): # Initialize model = Sequential() # (Conv -> Relu) * 2 -> Pool -> Dropout model.add(Convolution2D(32, 3, 3, border_mode='same', input_shape=X_train.shape[1:])) model.add(Activation('relu')) model.add(Convolution2D(32, 3, 3, border_mode='same')) model.add(Activation('relu')) model.add(MaxPooling2D(pool_size=(2, 2))) model.add(Dropout(0.25)) # (Conv -> Relu) * 2 -> Pool -> Dropout model.add(Convolution2D(64, 3, 3, border_mode='same')) model.add(Activation('relu')) model.add(Convolution2D(64, 3, 3, border_mode='same')) model.add(Activation('relu')) model.add(MaxPooling2D(pool_size=(2, 2))) model.add(Dropout(0.25)) # Flatten model.add(Flatten()) # 6*6*64 # FC -> Relu -> Dropout model.add(Dense(512)) model.add(Activation('relu')) model.add(Dropout(0.5)) # FC -> Softmax model.add(Dense(N_CLASS)) model.add(Activation('softmax')) model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy']) # Output model summary model.summary() return model
以下がモデルの標準出力の様子。
____________________________________________________________________________________________________ Layer (type) Output Shape Param # Connected to ==================================================================================================== convolution2d_1 (Convolution2D) (None, 32, 32, 32) 896 convolution2d_input_1[0][0] ____________________________________________________________________________________________________ activation_1 (Activation) (None, 32, 32, 32) 0 convolution2d_1[0][0] ____________________________________________________________________________________________________ convolution2d_2 (Convolution2D) (None, 32, 32, 32) 9248 activation_1[0][0] ____________________________________________________________________________________________________ activation_2 (Activation) (None, 32, 32, 32) 0 convolution2d_2[0][0] ____________________________________________________________________________________________________ maxpooling2d_1 (MaxPooling2D) (None, 16, 16, 32) 0 activation_2[0][0] ____________________________________________________________________________________________________ dropout_1 (Dropout) (None, 16, 16, 32) 0 maxpooling2d_1[0][0] ____________________________________________________________________________________________________ convolution2d_3 (Convolution2D) (None, 16, 16, 64) 18496 dropout_1[0][0] ____________________________________________________________________________________________________ activation_3 (Activation) (None, 16, 16, 64) 0 convolution2d_3[0][0] ____________________________________________________________________________________________________ convolution2d_4 (Convolution2D) (None, 16, 16, 64) 36928 activation_3[0][0] ____________________________________________________________________________________________________ activation_4 (Activation) (None, 16, 16, 64) 0 convolution2d_4[0][0] ____________________________________________________________________________________________________ maxpooling2d_2 (MaxPooling2D) (None, 8, 8, 64) 0 activation_4[0][0] ____________________________________________________________________________________________________ dropout_2 (Dropout) (None, 8, 8, 64) 0 maxpooling2d_2[0][0] ____________________________________________________________________________________________________ flatten_1 (Flatten) (None, 4096) 0 dropout_2[0][0] ____________________________________________________________________________________________________ dense_1 (Dense) (None, 512) 2097664 flatten_1[0][0] ____________________________________________________________________________________________________ activation_5 (Activation) (None, 512) 0 dense_1[0][0] ____________________________________________________________________________________________________ dropout_3 (Dropout) (None, 512) 0 activation_5[0][0] ____________________________________________________________________________________________________ dense_2 (Dense) (None, 10) 5130 dropout_3[0][0] ____________________________________________________________________________________________________ activation_6 (Activation) (None, 10) 0 dense_2[0][0] ==================================================================================================== Total params: 2,168,362 Trainable params: 2,168,362 Non-trainable params: 0 ____________________________________________________________________________________________________
モデル訓練
Data augmentationを行うか否かで場合分け。 行う場合は、上記で設定したIDG_PARAMがセットされる。 ZCA Whiteningは訓練/試験共に実施される。
def train_model(self, data, model): X_train, Y_train, X_test, Y_test = data if not DATA_AUGMENTATION: print('Not using data augmentation') # Train the model history = model.fit(X_train, Y_train, batch_size=BATCH_SIZE, nb_epoch=N_EPOCH, verbose=1, validation_data=(X_test, Y_test), shuffle=True) else: print('Using real-time data augmentation') # Make a generator for training data train_datagen = ImageDataGenerator(featurewise_center=IDG_PARAM['featurewise_center'], samplewise_center=IDG_PARAM['samplewise_center'], featurewise_std_normalization=IDG_PARAM['featurewise_std_normalization'], samplewise_std_normalization=IDG_PARAM['samplewise_std_normalization'], zca_whitening=IDG_PARAM['zca_whitening'], rotation_range=IDG_PARAM['rotation_range'], width_shift_range=IDG_PARAM['width_shift_range'], height_shift_range=IDG_PARAM['height_shift_range'], shear_range=IDG_PARAM['shear_range'], zoom_range=IDG_PARAM['zoom_range'], channel_shift_range=IDG_PARAM['channel_shift_range'], fill_mode=IDG_PARAM['fill_mode'], cval=IDG_PARAM['cval'], horizontal_flip=IDG_PARAM['horizontal_flip'], vertical_flip=IDG_PARAM['vertical_flip'], rescale=IDG_PARAM['rescale'], preprocessing_function=IDG_PARAM['preprocessing_function']) train_datagen.fit(X_train) train_generator = train_datagen.flow(X_train, Y_train, batch_size=BATCH_SIZE) # Make a generator for test data test_datagen = ImageDataGenerator(zca_whitening=IDG_PARAM['zca_whitening']) test_datagen.fit(X_test) test_generator = test_datagen.flow(X_test, Y_test) # Train the model history = model.fit_generator(train_generator, samples_per_epoch=X_train.shape[0], nb_epoch=N_EPOCH, validation_data=test_generator, nb_val_samples=X_test.shape[0]) # Evaluate the model if not DATA_AUGMENTATION: loss, acc = model.evaluate(X_test, Y_test, verbose=0) else: loss, acc = model.evaluate_generator(test_generator, val_samples=X_test.shape[0]) print('Test loss: %s, Test acc: %s' % (loss, acc)) result = {'model': model, 'history': history, 'loss': loss, 'acc': acc} return result
なお訓練中は、以下のように学習状況を標準出力してくれる。 プログレスバーに加えて、訓練/検証別にエラー率と精度が出力されるので、 適切に学習できているかが分かる。
Using real-time data augmentation Epoch 1/50 50000/50000 [==============================] - 106s - loss: 1.6846 - acc: 0.3863 - val_loss: 1.1716 - val_acc: 0.5766 Epoch 2/50 50000/50000 [==============================] - 108s - loss: 1.1772 - acc: 0.5828 - val_loss: 0.9773 - val_acc: 0.6528 Epoch 3/50 50000/50000 [==============================] - 107s - loss: 1.0237 - acc: 0.6394 - val_loss: 0.8878 - val_acc: 0.6886 ... Epoch 48/50 50000/50000 [==============================] - 107s - loss: 0.4872 - acc: 0.8319 - val_loss: 0.4522 - val_acc: 0.8453 Epoch 49/50 50000/50000 [==============================] - 105s - loss: 0.4835 - acc: 0.8315 - val_loss: 0.5183 - val_acc: 0.8318 Epoch 50/50 50000/50000 [==============================] - 104s - loss: 0.4835 - acc: 0.8322 - val_loss: 0.4671 - val_acc: 0.8403 Test loss: 0.456553607655, Test acc: 0.8476
結果保存
モデル、重み、訓練/検証別のエラー率と精度、Data augmentationなどパラメーターと試験結果を保存する。 最後のはパラメーターによって試験結果がどのように変化するかを調査するためのログとなる。
def save(self, result): """ Save model, weight, history, parameter and evaluation """ model = result['model'] history = result['history'] loss = result['loss'] acc = result['acc'] # Model model_json = model.to_json() # Weight with open(os.path.join(DIR, MODEL_FILE), 'w') as json_file: json_file.write(model_json) model.save_weights(os.path.join(DIR, WEIGHT_FILE)) # History self.__save_history(history) self.__plot_history(history) # Param and evaluation dic = IDG_PARAM dic.update({'n_epoch': N_EPOCH, 'batch_size': BATCH_SIZE, 'loss': loss, 'acc': acc}) if os.path.exists(DIR+PARAM_EVAL_FILE): df = pd.read_csv(DIR+PARAM_EVAL_FILE) df = pd.concat([df, pd.DataFrame([dic])]) else: df = pd.DataFrame([dic]) df.to_csv(DIR+PARAM_EVAL_FILE, index=False)
以下は保存したhistory.jpg
。
モデル試験
上記で保存したモデルとその重みを読み込んで、テストデータにかける。
def test_model(self, data): X_train, Y_train, X_test, Y_test = data model_file = os.path.join(DIR, MODEL_FILE) weight_file = os.path.join(DIR, WEIGHT_FILE) with open(model_file, 'r') as fp: model = model_from_json(fp.read()) model.load_weights(weight_file) model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy']) if not DATA_AUGMENTATION: loss, acc = model.evaluate(X_test, Y_test, verbose=0) else: # Make a generator for test data test_datagen = ImageDataGenerator(zca_whitening=True) test_datagen.fit(X_test) test_generator = test_datagen.flow(X_test, Y_test) loss, acc = model.evaluate_generator(test_generator, val_samples=X_test.shape[0]) print('Test loss: %s, Test acc: %s' % (loss, acc)) print('')
結果はモデル訓練の時とほぼ一致している。
Test loss: 0.460079106236, Test acc: 0.8441
最後に
Kerasのツールの便利さが身に染みた。 他のKerasで実装されたコードを読んでると、 最も精度が高かった重みを上書きしない設定など、 他にも色々な便利ツールがあるっぽいので、 必要な処理を自前で書く前にKerasから探すのもいいと思う。