グラム
import boto3
import cv2
import numpy as np
import matplotlib.pyplot as plt
from io import BytesIO
from botocore.exceptions import NoCredentialsError
import tempfile
import os
# AWSの認証情報を設定
aws_access_key_id = 'YOUR_AWS_ACCESS_KEY_ID'
aws_secret_access_key = 'YOUR_AWS_SECRET_ACCESS_KEY'
region_name = 'YOUR_AWS_REGION'
# S3クライアントを作成
s3 = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
region_name=region_name
)
# S3バケット名と教師データフォルダを指定
teacher_bucket_name = 'YOUR_TEACHER_BUCKET_NAME'
teacher_folder = 'path/to/teacher_folder/' # 教師データのフォルダパス
video_bucket_name = 'YOUR_VIDEO_BUCKET_NAME'
video_key = 'path/to/your/video/file.mp4'
# アップロード先のS3バケットとフォルダを指定
output_bucket_name = 'YOUR_OUTPUT_BUCKET_NAME'
output_folder = 'path/to/output/folder/'
# 動画をS3からダウンロードして一時ファイルに保存
try:
s3_object = s3.get_object(Bucket=video_bucket_name, Key=video_key)
print(f"Retrieved {video_key} from S3")
except NoCredentialsError:
print("Credentials not available")
except Exception as e:
print(f"Error occurred: {e}")
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(s3_object['Body'].read())
temp_video_path = temp_file.name
# 特徴量の抽出とマッチング関数
def detect_and_compute_features(image):
orb = cv2.ORB_create()
keypoints, descriptors = orb.detectAndCompute(image, None)
return keypoints, descriptors
# 教師データフォルダ内の全ファイルを取得
teacher_keys = []
try:
response = s3.list_objects_v2(Bucket=teacher_bucket_name, Prefix=teacher_folder)
if 'Contents' in response:
for obj in response['Contents']:
if obj['Key'].lower().endswith(('.jpg', '.jpeg', '.png')):
teacher_keys.append(obj['Key'])
print(f"Found {len(teacher_keys)} teacher images in {teacher_folder}")
except NoCredentialsError:
print("Credentials not available")
except Exception as e:
print(f"Error occurred: {e}")
# 各教師データごとに処理
for teacher_index, teacher_key in enumerate(teacher_keys):
# 教師データをS3からダウンロードして読み込む
try:
s3_object = s3.get_object(Bucket=teacher_bucket_name, Key=teacher_key)
print(f"Retrieved {teacher_key} from S3")
image_stream = BytesIO(s3_object['Body'].read())
teacher_image = cv2.imdecode(np.frombuffer(image_stream.read(), np.uint8), cv2.IMREAD_COLOR)
except NoCredentialsError:
print("Credentials not available")
continue
except Exception as e:
print(f"Error occurred while retrieving {teacher_key}: {e}")
continue
# 教師データの特徴量を抽出
_, teacher_descriptors = detect_and_compute_features(teacher_image)
# 動画をビデオキャプチャとして読み込む
video_stream = cv2.VideoCapture(temp_video_path)
if not video_stream.isOpened():
raise RuntimeError("Error: Could not open video stream")
# フレームごとの特徴量を抽出して一致度を計算
frame_count = 0
similarity_scores = []
best_match = -1
best_frame = None
best_frame_number = -1
while video_stream.isOpened():
ret, frame = video_stream.read()
if ret:
_, frame_descriptors = detect_and_compute_features(frame)
if frame_descriptors is not None:
bf = cv2.BFMatcher(cv2.NORM_HAMMING, crossCheck=True)
matches = bf.match(frame_descriptors, teacher_descriptors)
total_matches = len(matches)
similarity_scores.append(total_matches)
if total_matches > best_match:
best_match = total_matches
best_frame = frame.copy()
best_frame_number = frame_count
else:
similarity_scores.append(0)
frame_count += 1
else:
break
# 動画ストリームをリリース
video_stream.release()
# グラフ化
plt.figure()
plt.plot(similarity_scores)
plt.title(f'Similarity Scores for Teacher Image {teacher_index + 1}')
plt.xlabel('Frame Number')
plt.ylabel('Number of Matches')
plt.show()
if best_frame is not None:
# 一時ファイルに保存
with tempfile.NamedTemporaryFile(suffix='.jpg', delete=False) as temp_frame_file:
cv2.imwrite(temp_frame_file.name, best_frame)
temp_frame_path = temp_frame_file.name
# S3にアップロード
output_key = f"{output_folder}best_match_teacher_image_{teacher_index + 1}_frame_{best_frame_number}.jpg"
s3.upload_file(
Filename=temp_frame_path,
Bucket=output_bucket_name,
Key=output_key
)
os.remove(temp_frame_path)
print(f"Best match for Teacher Image {teacher_index + 1} is at frame {best_frame_number} and saved to {output_key}")
# 一時ファイルを削除
os.remove(temp_video_path)
print("Processing complete.")