VideoStream

exception CameraFailedToStart(cmd, backend, append='')

Error when camera initialization fails.

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

exception CameraConnectionLost

Error for when the camera connection is not found after the connection has been established.

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class BaseVideoStream(cmd, backend)
start()

Start reading frames from the video stream.

Returns

self

Raises

CameraFailedToStart if video stream fails to open

property fps

The FPS of the video stream.

Raises

RuntimeError if FPS cannot be queried

Return type

float

read()

Read the most recent frame from the camera.

This function blocks on waiting for a new frame.

Return type

ndarray

Returns

numpy array – The frame that was read from the camera

read_non_blocking()

Read the most recent frame from the camera.

This function will return None if no new frames are available

Return type

Optional[ndarray]

Returns

The frame that was read from the camera, or None

stop()

Stop and clean up the camera connection.

class WebcamVideoStream(cam=0)

Capture video frames from a webcam or CSI camera attached to your device.

WebcamVideoStream can be instantiated as a context manager:

with edgeiq.WebcamVideoStream() as video_stream:
    ...

To use WebcamVideoStream directly, use the start() and stop() functions:

video_stream = edgeiq.WebcamVideoStream().start()
...
video_stream.stop()

Typical usage:

with edgeiq.WebcamVideoStream() as video_stream:
    while True:
        frame = video_stream.read()
Parameters

cam (int) – The integer identifier of the camera.

property fps

The FPS of the video stream.

Raises

RuntimeError if FPS cannot be queried

Return type

float

read()

Read the most recent frame from the camera.

This function blocks on waiting for a new frame.

Return type

ndarray

Returns

numpy array – The frame that was read from the camera

read_non_blocking()

Read the most recent frame from the camera.

This function will return None if no new frames are available

Return type

Optional[ndarray]

Returns

The frame that was read from the camera, or None

start()

Start reading frames from the video stream.

Returns

self

Raises

CameraFailedToStart if video stream fails to open

stop()

Stop and clean up the camera connection.

class GStreamerVideoStream(cam=0, display_width=640, display_height=480, framerate=30)

Capture video frames using the GStreamer plugin.

This can be useful for capturing videos from CSI cameras when V4L is not supported.

GStreamerVideoStream can be instantiated as a context manager:

with edgeiq.GStreamerVideoStream() as video_stream:
    ...

To use GStreamerVideoStream directly, use the start() and stop() functions:

video_stream = edgeiq.GStreamerVideoStream().start()
...
video_stream.stop()

Typical usage:

with edgeiq.GStreamerVideoStream() as video_stream:
    while True:
        frame = video_stream.read()
Parameters
  • cam (int) – The integer identifier of the camera.

  • display_width (int) – The output image width in pixels.

  • display_height (int) – The output image height in pixels.

  • framerate (int) – The recording frame rate.

property fps

The FPS of the video stream.

Raises

RuntimeError if FPS cannot be queried

Return type

float

read()

Read the most recent frame from the camera.

This function blocks on waiting for a new frame.

Return type

ndarray

Returns

numpy array – The frame that was read from the camera

read_non_blocking()

Read the most recent frame from the camera.

This function will return None if no new frames are available

Return type

Optional[ndarray]

Returns

The frame that was read from the camera, or None

start()

Start reading frames from the video stream.

Returns

self

Raises

CameraFailedToStart if video stream fails to open

stop()

Stop and clean up the camera connection.

class GStreamerCustomVideoStream(cmd)

Capture video frames using a custom GStreamer pipeline.

GStreamerCustomVideoStream can be instantiated as a context manager:

with edgeiq.GStreamerCustomVideoStream(cmd) as video_stream:
    ...

To use GStreamerCustomVideoStream directly, use the start() and stop() functions:

video_stream = edgeiq.GStreamerCustomVideoStream(cmd).start()
...
video_stream.stop()

Typical usage:

# Receive an H264-encoded RTP stream
cmd = ' ! '.join([
        'udpsrc port=5001',
        'application/x-rtp,encoding-name=H264,payload=96',
        'rtph264depay',
        'h264parse',
        'queue',
        'avdec_h264',
        'videoconvert',
        'appsink'
        ])
with edgeiq.GStreamerCustomVideoStream(cmd) as video_stream:
    while True:
        frame = video_stream.read()
Parameters

cmd (str) – The custom GStreamer pipeline

property fps

The FPS of the video stream.

Raises

RuntimeError if FPS cannot be queried

Return type

float

read()

Read the most recent frame from the camera.

This function blocks on waiting for a new frame.

Return type

ndarray

Returns

numpy array – The frame that was read from the camera

read_non_blocking()

Read the most recent frame from the camera.

This function will return None if no new frames are available

Return type

Optional[ndarray]

Returns

The frame that was read from the camera, or None

start()

Start reading frames from the video stream.

Returns

self

Raises

CameraFailedToStart if video stream fails to open

stop()

Stop and clean up the camera connection.

class TestVideoStream(fps=30, display_width=640, display_height=480)

Creates a test video source with a static colorblock feed. A use case for this class is to validate the operation of the end-to-end app when the live feed is not available. Uses a specific instance of GStreamerCustomVideoStream.

TestVideoStream can be instantiated as a context manager:

with edgeiq.TestVideoStream() as video_stream:
    ...

To use TestVideoStream directly, use the start() and stop() functions:

video_stream = edgeiq.TestVideoStream().start()
...
video_stream.stop()

Typical usage:

with edgeiq.TestVideoStream() as video_stream:
    while True:
        frame = video_stream.read()
Parameters
  • fps (int) – The video frames per second of the generated video stream.

  • display_width (int) – The width of the app’s video frame in pixels.

  • display_height (int) – The height of the app’s video frame in pixels.

property fps

The FPS of the video stream.

Raises

RuntimeError if FPS cannot be queried

Return type

float

read()

Read the most recent frame from the camera.

This function blocks on waiting for a new frame.

Return type

ndarray

Returns

numpy array – The frame that was read from the camera

read_non_blocking()

Read the most recent frame from the camera.

This function will return None if no new frames are available

Return type

Optional[ndarray]

Returns

The frame that was read from the camera, or None

start()

Start reading frames from the video stream.

Returns

self

Raises

CameraFailedToStart if video stream fails to open

stop()

Stop and clean up the camera connection.

class FrameRotation(value)

Amount of rotation applied to each frame in degrees

ROTATE_NONE = 0
ROTATE_90 = 90
ROTATE_180 = 180
class JetsonCameraMode(value)

Sensor Mode applied to CSI camera which determines input width and height and framerate. The first Number identifies the Sony Sensor Number (IMX219 or IMX477). The second numbers are the input width and height. The third number is the framerate and fourth number is the camera sensor mode.

IMX219_3264x2468_21_0 = 0
IMX219_3264x1848_28_1 = 1
IMX219_1920x1080_30_2 = 2
IMX219_1640x1232_30_3 = 3
IMX477_4032x3040_30_0 = 4
IMX477_1920x1080_60_1 = 5
IMX477_2560x1440_40_3 = 7
class JetsonVideoStream(cam=0, rotation=<FrameRotation.ROTATE_NONE: 0>, camera_mode=<JetsonCameraMode.IMX219_1920x1080_30_2: 2>, display_width=640, display_height=480)

Capture video frames from a CSI ribbon camera on NVIDIA Jetson.

JetsonVideoStream can be instantiated as a context manager:

with edgeiq.JetsonVideoStream() as video_stream:
    ...

To use JetsonVideoStream directly, use the start() and stop() functions:

video_stream = edgeiq.JetsonVideoStream().start()
...
video_stream.stop()

Typical usage:

with edgeiq.JetsonVideoStream() as video_stream:
    while True:
        frame = video_stream.read()
Parameters
  • cam (int) – The integer identifier of the camera.

  • rotation (FrameRotation) – The rotation applied to each frame

  • camera_mode (JetsonCameraMode) – The sensor mode for csi camera

  • display_width (int) – The output image width in pixels.

  • display_height (int) – The output image height in pixels.

property fps

The FPS of the video stream.

Raises

RuntimeError if FPS cannot be queried

Return type

float

read()

Read the most recent frame from the camera.

This function blocks on waiting for a new frame.

Return type

ndarray

Returns

numpy array – The frame that was read from the camera

read_non_blocking()

Read the most recent frame from the camera.

This function will return None if no new frames are available

Return type

Optional[ndarray]

Returns

The frame that was read from the camera, or None

start()

Start reading frames from the video stream.

Returns

self

Raises

CameraFailedToStart if video stream fails to open

stop()

Stop and clean up the camera connection.

class IPVideoStream(url)

Capture video frames from an IP stream.

IPVideoStream can be instantiated as a context manager:

with edgeiq.IPVideoStream('tcp://0.0.0.0:3333') as video_stream:
    ...

To use IPVideoStream directly, use the start() and stop() functions:

video_stream = edgeiq.IPVideoStream('tcp://0.0.0.0:3333').start()
...
video_stream.stop()

Typical usage:

with edgeiq.IPVideoStream('tcp://0.0.0.0:3333') as video_stream:
    while True:
        frame = video_stream.read()
Parameters

url (str) – The URL of the IP stream

property fps

The FPS of the video stream.

Raises

RuntimeError if FPS cannot be queried

Return type

float

read()

Read the most recent frame from the camera.

This function blocks on waiting for a new frame.

Return type

ndarray

Returns

numpy array – The frame that was read from the camera

read_non_blocking()

Read the most recent frame from the camera.

This function will return None if no new frames are available

Return type

Optional[ndarray]

Returns

The frame that was read from the camera, or None

start()

Start reading frames from the video stream.

Returns

self

Raises

CameraFailedToStart if video stream fails to open

stop()

Stop and clean up the camera connection.

exception NoMoreFrames

Error when trying to read more frames but no frames remaining to read.

args
with_traceback()

Exception.with_traceback(tb) – set self.__traceback__ to tb and return self.

class FileVideoStream(path, queue_size=128, play_realtime=False, fps=None)

Stream a video file for analysis.

FileVideoStream can be instantiated as a context manager:

with edgeiq.FileVideoStream('/path/to/video.mp4') as video_stream:
    ...

To use FileVideoStream directly, use the start() and stop() functions:

video_stream = edgeiq.FileVideoStream('/path/to/video.mp4').start()
...
video_stream.stop()

Typical usage:

with edgeiq.FileVideoStream('/path/to/video.mp4') as video_stream:
    while video_stream.more():
        frame = video_stream.read()
Parameters
  • path (str) – The path of the video file location.

  • queue_size (int) – The size of the buffer queue.

  • play_realtime (bool) – If True play video in simulated realtime, where frames are dropped as they become outdated.

  • fps (Optional[int]) – The framerate to override the video file’s frame rate

property fps

The FPS of the loaded video file or override FPS

Return type

Optional[int]

start()

Start reading frames from the video file.

When called a second time, will stop and restart.

Returns

self

Raises

RuntimeError when file doesn’t exist or video stream can’t be opened.

read()

Read the next frame from the file.

This function blocks on waiting for frames to become available.

Return type

ndarray

Returns

numpy array – The next frame

Raises

NoMoreFrames if no frames left to read. RuntimeError if called before start().

more()

Check if there are more frames in the video file.

When play_realtime is True, this value won’t be accurate. It is recommended to catch NoMoreFrames on read() instead.

Return type

bool

Returns

True if there are more frames to read in the video file.

Raises

RuntimeError if called before start().

stop()

Stop the FileVideoStream.