Amazon Kinesis Video Streams から動画を受信して画像処理をおこなう (Python)

GetMediaForFragmentList, GetClip または GetImages API を使い、動画や画像を取得して処理する方法です。

概要

関連

HLS から受信して処理 (OpenCV+ffmpeg)

Amazon Kinesis Video Streams から HLS URL を取得し、VideoCapture に指定することで、HLS から受信することができます。

  • LIVE(最新映像)/LIVE_REPLAY(指定日時以降の過去映像)/ON_DEMAND(指定区間の過去映像)のいずれかを指定して再生可能。
  • HLS の仕組み上、セグメント長(1s~10s)ごとに映像が更新されるため、元映像のセグメント長に応じて遅延が発生する。
  • フレームごとの日時などのメタデータが取得できない。(と思われる)
import os

import boto3
import cv2

OUTPUT_DIR = "output"


def get_data_endpoint(stream_name: str, api_name: str) -> str:
    """
    call GetDataEndpoint API
    https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_GetDataEndpoint.html
    """
    kinesis_video = boto3.client("kinesisvideo")
    res = kinesis_video.get_data_endpoint(StreamName=stream_name, APIName=api_name)
    return res["DataEndpoint"]


def get_hls_streaming_session_url(stream_name: str) -> str:
    """
    call GetHLSStreamingSessionURL API
    https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_reader_GetHLSStreamingSessionURL.html
    """
    res = boto3.client(
        "kinesis-video-archived-media", endpoint_url=get_data_endpoint(stream_name, "GET_HLS_STREAMING_SESSION_URL")
    ).get_hls_streaming_session_url(
        StreamName=stream_name,
        PlaybackMode="LIVE"
    )
    return res["HLSStreamingSessionURL"]


def process(stream_name: str):
    hls_url = get_hls_streaming_session_url(stream_name)
    cap = cv2.VideoCapture(hls_url)
    for i in range(300):
        ret, frame = cap.read()
        if frame is None:
            break
        file_path = os.path.join(OUTPUT_DIR, f"{i:04d}.png")
        cv2.imwrite(file_path, frame)
        print(f"Saved: {file_path}")
    cap.release()

指定期間のセグメントデータを取得して処理

ListFragments + GetMediaFromFragmentList API でセグメント単位で受信し、一旦ファイルに保存してから各フレームを取得して処理します。

  • 指定区間の過去映像のみ取得可能

OpenCV+ffmpeg の場合

import operator
import os
from datetime import datetime
from tempfile import NamedTemporaryFile

import boto3
import cv2
from botocore.response import StreamingBody

OUTPUT_DIR = "output"
READ_FRAGMENTS = 1  # いちどに読み込むフラグメントの数


def get_data_endpoint(stream_name: str, api_name: str) -> str:
    """
    call GetDataEndpoint API
    https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_GetDataEndpoint.html
    """
    kinesis_video = boto3.client("kinesisvideo")
    res = kinesis_video.get_data_endpoint(StreamName=stream_name, APIName=api_name)
    return res["DataEndpoint"]


def list_fragments(stream_name: str, start_time: datetime, end_time: datetime) -> list[dict]:
    """
    call ListFragments API
    https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_reader_ListFragments.html
    """
    res = boto3.client(
        "kinesis-video-archived-media", endpoint_url=get_data_endpoint(stream_name, "LIST_FRAGMENTS")
    ).list_fragments(
        StreamName=stream_name,
        FragmentSelector={
            "FragmentSelectorType": "PRODUCER_TIMESTAMP",
            "TimestampRange": {
                "StartTimestamp": start_time,
                "EndTimestamp": end_time
            }
        }
    )
    return res["Fragments"]


def get_media_for_fragment_list(stream_name, fragment_numbers: list[str]) -> StreamingBody:
    """
    call GetMediaForFragmentList API
    https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_reader_GetMediaForFragmentList.html
    """
    res = boto3.client(
        "kinesis-video-archived-media", endpoint_url=get_data_endpoint(stream_name, "GET_MEDIA_FOR_FRAGMENT_LIST")
    ).get_media_for_fragment_list(
        StreamName=stream_name,
        Fragments=fragment_numbers
    )
    return res["Payload"]


def process_media(start_dt: datetime, media: StreamingBody):
    print(f"--> {start_dt}")

    with NamedTemporaryFile(suffix=".mkv") as f:
        f.write(media.read())
        cap = cv2.VideoCapture(f.name)
        i = 0
        while True:
            ret, frame = cap.read()
            if frame is None:
                break
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            file_path = os.path.join(OUTPUT_DIR, f"{start_dt}.{i:02d}.png")
            cv2.imwrite(file_path, frame)
            print(f"Saved: {file_path}")
            i += 1
        cap.release()


def chunks(lst: list, n: int) -> list:
    for i in range(0, len(lst), n):
        yield lst[i:i + n]


def process(stream_name: str, start_dt: datetime, end_dt: datetime):
    all_fragments = list_fragments(stream_name, start_dt, end_dt)
    sorted_fragments = sorted(all_fragments, key=operator.itemgetter("ProducerTimestamp"))
    for fragments in chunks(sorted_fragments, READ_FRAGMENTS):
        fragment_numbers = list(map(lambda x: x["FragmentNumber"], fragments))
        media = get_media_for_fragment_list(stream_name, fragment_numbers)
        process_media(fragments[0]["ProducerTimestamp"], media)

imageio+ffmpeg の場合

def process_media(start_dt: datetime, media: StreamingBody):
    print(f"--> {start_dt}")

    with NamedTemporaryFile(suffix=".mkv") as f:
        f.write(media.read())
        reader = imageio.get_reader(f.name, "ffmpeg")  # class imageio.plugins.ffmpeg.FfmpegFormat.Reader
        metadata = reader.get_meta_data()
        print(json.dumps(metadata))
        for i, frame in enumerate(reader):
            frame = cv2.cvtColor(frame, cv2.COLOR_RGB2BGR)
            file_path = os.path.join(OUTPUT_DIR, f"{start_dt}.{i:04d}.png")
            cv2.imwrite(file_path, frame)
            print(f"Saved: {file_path}")
        reader.close()

画像を取得

GetImages API を使用して、指定期間の画像を JPEG/PNG 形式で取得できます。

import base64
from datetime import datetime

import boto3


def get_data_endpoint(stream_name: str, api_name: str) -> str:
    """
    call GetDataEndpoint API
    https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_GetDataEndpoint.html
    """
    kinesis_video = boto3.client("kinesisvideo")
    res = kinesis_video.get_data_endpoint(StreamName=stream_name, APIName=api_name)
    return res["DataEndpoint"]


def get_images(stream_name: str, start_datetime: datetime, end_datetime: datetime, interval: int):
    """
    call GetImages API
    https://docs.aws.amazon.com/kinesisvideostreams/latest/dg/API_reader_GetImages.html
    """
    res = boto3.client(
        "kinesis-video-archived-media", endpoint_url=get_data_endpoint(stream_name, "GET_IMAGES")
    ).get_images(
        StreamName=stream_name,
        StartTimestamp=start_datetime,
        EndTimestamp=end_datetime,
        SamplingInterval=interval,
        ImageSelectorType="PRODUCER_TIMESTAMP",
        Format="PNG",
    )
    return res


def process(stream_name: str, start_datetime: datetime, end_datetime: datetime, interval: int):
    res = get_images(stream_name, start_datetime, end_datetime, interval)
    for image in res["Images"]:
        dt: datetime = image["TimeStamp"]
        if "ImageContent" in image:
            with open("{}.png".format(dt.strftime("%Y%m%d%H%M%S")), "wb") as f:
                f.write(base64.b64decode(image["ImageContent"]))