cvat/tests/python/rest_api/_test_base.py

568 lines
22 KiB
Python
Raw Permalink Normal View History

2025-09-16 01:19:40 +00:00
import io
import math
from contextlib import closing
from functools import partial
from typing import Generator, Optional, Sequence
import numpy as np
import pytest
from cvat_sdk.api_client import models
from PIL import Image
from pytest_cases import fixture, fixture_ref, parametrize
import shared.utils.s3 as s3
from rest_api.utils import calc_end_frame, create_task, unique
from shared.tasks.enums import SourceDataType
from shared.tasks.interface import ITaskSpec
from shared.tasks.types import ImagesTaskSpec, VideoTaskSpec
from shared.tasks.utils import parse_frame_step
from shared.utils.config import make_api_client
from shared.utils.helpers import generate_image_files, generate_video_file
class TestTasksBase:
_USERNAME = "admin1"
def _image_task_fxt_base(
self,
request: pytest.FixtureRequest,
*,
frame_count: Optional[int] = 10,
image_files: Optional[Sequence[io.BytesIO]] = None,
start_frame: Optional[int] = None,
stop_frame: Optional[int] = None,
step: Optional[int] = None,
segment_size: Optional[int] = None,
server_files: Optional[Sequence[str]] = None,
cloud_storage_id: Optional[int] = None,
job_replication: Optional[int] = None,
**data_kwargs,
) -> Generator[tuple[ImagesTaskSpec, int], None, None]:
task_params = {
"name": f"{request.node.name}[{request.fixturename}]",
"labels": [{"name": "a"}],
**({"segment_size": segment_size} if segment_size else {}),
**({"consensus_replicas": job_replication} if job_replication else {}),
}
if server_files is not None:
assert (
image_files is not None
), "'server_files' must be used together with 'image_files'"
else:
assert bool(image_files) ^ bool(
frame_count
), "Expected only one of 'image_files' and 'frame_count'"
if not image_files:
image_files = generate_image_files(frame_count)
images_data = [f.getvalue() for f in image_files]
resulting_task_size = len(
range(start_frame or 0, (stop_frame or len(images_data) - 1) + 1, step or 1)
)
data_params = {
"image_quality": 70,
"sorting_method": "natural",
"chunk_size": max(1, (segment_size or resulting_task_size) // 2),
**(
{
"server_files": server_files,
"cloud_storage_id": cloud_storage_id,
}
if server_files
else {"client_files": image_files}
),
}
data_params.update(data_kwargs)
if start_frame is not None:
data_params["start_frame"] = start_frame
if stop_frame is not None:
data_params["stop_frame"] = stop_frame
if step is not None:
data_params["frame_filter"] = f"step={step}"
def get_frame(i: int) -> bytes:
return images_data[i]
task_id, _ = create_task(self._USERNAME, spec=task_params, data=data_params)
yield ImagesTaskSpec(
models.TaskWriteRequest._from_openapi_data(**task_params),
models.DataRequest._from_openapi_data(**data_params),
get_frame=get_frame,
size=resulting_task_size,
), task_id
@pytest.fixture(scope="class")
def fxt_uploaded_images_task(
self, request: pytest.FixtureRequest
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._image_task_fxt_base(request=request)
@pytest.fixture(scope="class")
def fxt_uploaded_images_task_with_segments(
self, request: pytest.FixtureRequest
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._image_task_fxt_base(request=request, segment_size=4)
@fixture(scope="class")
@parametrize("step", [2, 5])
@parametrize("stop_frame", [15, 26])
@parametrize("start_frame", [3, 7])
def fxt_uploaded_images_task_with_segments_start_stop_step(
self, request: pytest.FixtureRequest, start_frame: int, stop_frame: Optional[int], step: int
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._image_task_fxt_base(
request=request,
frame_count=30,
segment_size=4,
start_frame=start_frame,
stop_frame=stop_frame,
step=step,
)
@pytest.fixture(scope="class")
def fxt_uploaded_images_task_with_segments_and_consensus(
self, request: pytest.FixtureRequest
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._image_task_fxt_base(request=request, segment_size=4, job_replication=2)
def _image_task_with_honeypots_and_segments_base(
self,
request: pytest.FixtureRequest,
*,
start_frame: Optional[int] = None,
step: Optional[int] = None,
random_seed: int = 42,
image_files: Optional[Sequence[io.BytesIO]] = None,
server_files: Optional[Sequence[str]] = None,
cloud_storage_id: Optional[int] = None,
**kwargs,
) -> Generator[tuple[ITaskSpec, int], None, None]:
validation_params = models.DataRequestValidationParams._from_openapi_data(
mode="gt_pool",
frame_selection_method="random_uniform",
random_seed=random_seed,
frame_count=5,
frames_per_job_count=2,
)
used_frames_count = 15
total_frame_count = (start_frame or 0) + used_frames_count * (step or 1)
base_segment_size = 4
regular_frame_count = used_frames_count - validation_params.frame_count
final_segment_size = base_segment_size + validation_params.frames_per_job_count
final_task_size = (
regular_frame_count
+ validation_params.frames_per_job_count
* math.ceil(regular_frame_count / base_segment_size)
+ validation_params.frame_count
)
if image_files:
if len(image_files) != total_frame_count:
raise ValueError(
f"If provided, image_files must contain {total_frame_count} images"
)
else:
image_files = generate_image_files(total_frame_count)
with closing(
self._image_task_fxt_base(
request=request,
frame_count=None,
image_files=image_files,
segment_size=base_segment_size,
sorting_method="random",
start_frame=start_frame,
step=step,
validation_params=validation_params,
server_files=server_files,
cloud_storage_id=cloud_storage_id,
**kwargs,
)
) as task_gen:
for task_spec, task_id in task_gen:
# Get the actual frame order after the task is created
with make_api_client(self._USERNAME) as api_client:
(task_meta, _) = api_client.tasks_api.retrieve_data_meta(task_id)
frame_map = [
next(i for i, f in enumerate(image_files) if f.name == frame_info.name)
for frame_info in task_meta.frames
]
_get_frame = task_spec._get_frame
task_spec._get_frame = lambda i: _get_frame(frame_map[i])
task_spec.size = final_task_size
task_spec._params.segment_size = final_segment_size
# These parameters are not applicable to the resulting task,
# they are only effective during task creation
if start_frame or step:
task_spec._data_params.start_frame = 0
task_spec._data_params.stop_frame = task_spec.size
task_spec._data_params.frame_filter = ""
yield task_spec, task_id
@fixture(scope="class")
def fxt_uploaded_images_task_with_honeypots_and_segments(
self, request: pytest.FixtureRequest
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._image_task_with_honeypots_and_segments_base(request)
@fixture(scope="class")
@parametrize("start_frame, step", [(2, 3)])
def fxt_uploaded_images_task_with_honeypots_and_segments_start_step(
self, request: pytest.FixtureRequest, start_frame: Optional[int], step: Optional[int]
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._image_task_with_honeypots_and_segments_base(
request, start_frame=start_frame, step=step
)
def _images_task_with_honeypots_and_changed_real_frames_base(
self, request: pytest.FixtureRequest, **kwargs
):
with closing(
self._image_task_with_honeypots_and_segments_base(
request, start_frame=2, step=3, **kwargs
)
) as gen_iter:
task_spec, task_id = next(gen_iter)
with make_api_client(self._USERNAME) as api_client:
validation_layout, _ = api_client.tasks_api.retrieve_validation_layout(task_id)
validation_frames = validation_layout.validation_frames
new_honeypot_real_frames = [
validation_frames[(validation_frames.index(f) + 1) % len(validation_frames)]
for f in validation_layout.honeypot_real_frames
]
api_client.tasks_api.partial_update_validation_layout(
task_id,
patched_task_validation_layout_write_request=(
models.PatchedTaskValidationLayoutWriteRequest(
frame_selection_method="manual",
honeypot_real_frames=new_honeypot_real_frames,
)
),
)
# Get the new frame order
frame_map = dict(zip(validation_layout.honeypot_frames, new_honeypot_real_frames))
_get_frame = task_spec._get_frame
task_spec._get_frame = lambda i: _get_frame(frame_map.get(i, i))
yield task_spec, task_id
@fixture(scope="class")
@parametrize("random_seed", [1, 2, 5])
def fxt_uploaded_images_task_with_honeypots_and_changed_real_frames(
self, request: pytest.FixtureRequest, random_seed: int
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._images_task_with_honeypots_and_changed_real_frames_base(
request, random_seed=random_seed
)
@fixture(scope="class")
@parametrize(
"cloud_storage_id",
[pytest.param(2, marks=[pytest.mark.with_external_services, pytest.mark.timeout(60)])],
)
def fxt_cloud_images_task_with_honeypots_and_changed_real_frames(
self, request: pytest.FixtureRequest, cloud_storages, cloud_storage_id: int
) -> Generator[tuple[ITaskSpec, int], None, None]:
cloud_storage = cloud_storages[cloud_storage_id]
s3_client = s3.make_client(bucket=cloud_storage["resource"])
image_files = generate_image_files(47)
for image in image_files:
image.name = f"test/{image.name}"
image.seek(0)
s3_client.create_file(data=image, filename=image.name)
request.addfinalizer(partial(s3_client.remove_file, filename=image.name))
server_files = [f.name for f in image_files]
for image in image_files:
image.seek(0)
yield from self._images_task_with_honeypots_and_changed_real_frames_base(
request,
image_files=image_files,
server_files=server_files,
cloud_storage_id=cloud_storage_id,
# FIXME: random sorting with frame filter and cloud images (and, optionally, honeypots)
# doesn't work with static cache
# https://github.com/cvat-ai/cvat/issues/9021
use_cache=True,
)
def _uploaded_images_task_with_gt_and_segments_base(
self,
request: pytest.FixtureRequest,
*,
start_frame: Optional[int] = None,
step: Optional[int] = None,
frame_selection_method: str = "random_uniform",
job_replication: Optional[int] = None,
) -> Generator[tuple[ITaskSpec, int], None, None]:
used_frames_count = 16
total_frame_count = (start_frame or 0) + used_frames_count * (step or 1)
segment_size = 5
image_files = generate_image_files(total_frame_count)
validation_params_kwargs = {"frame_selection_method": frame_selection_method}
if "random" in frame_selection_method:
validation_params_kwargs["random_seed"] = 42
if frame_selection_method == "random_uniform":
validation_frames_count = 10
validation_params_kwargs["frame_count"] = validation_frames_count
elif frame_selection_method == "random_per_job":
frames_per_job_count = 3
validation_params_kwargs["frames_per_job_count"] = frames_per_job_count
validation_frames_count = used_frames_count // segment_size + min(
used_frames_count % segment_size, frames_per_job_count
)
elif frame_selection_method == "manual":
validation_frames_count = 10
valid_frame_ids = range(
(start_frame or 0), (start_frame or 0) + used_frames_count * (step or 1), step or 1
)
rng = np.random.Generator(np.random.MT19937(seed=42))
validation_params_kwargs["frames"] = rng.choice(
[f.name for i, f in enumerate(image_files) if i in valid_frame_ids],
validation_frames_count,
replace=False,
).tolist()
else:
raise NotImplementedError
validation_params = models.DataRequestValidationParams._from_openapi_data(
mode="gt",
**validation_params_kwargs,
)
yield from self._image_task_fxt_base(
request=request,
frame_count=None,
image_files=image_files,
segment_size=segment_size,
sorting_method="natural",
start_frame=start_frame,
step=step,
validation_params=validation_params,
job_replication=job_replication,
)
@pytest.fixture(scope="class")
def fxt_uploaded_images_task_with_gt_and_segments_and_consensus(
self, request: pytest.FixtureRequest
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._uploaded_images_task_with_gt_and_segments_base(
request=request, job_replication=2
)
@fixture(scope="class")
@parametrize("start_frame, step", [(2, 3)])
@parametrize("frame_selection_method", ["random_uniform", "random_per_job", "manual"])
def fxt_uploaded_images_task_with_gt_and_segments_start_step(
self,
request: pytest.FixtureRequest,
start_frame: Optional[int],
step: Optional[int],
frame_selection_method: str,
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._uploaded_images_task_with_gt_and_segments_base(
request,
start_frame=start_frame,
step=step,
frame_selection_method=frame_selection_method,
)
def _uploaded_video_task_fxt_base(
self,
request: pytest.FixtureRequest,
*,
frame_count: int = 10,
segment_size: Optional[int] = None,
start_frame: Optional[int] = None,
stop_frame: Optional[int] = None,
step: Optional[int] = None,
) -> Generator[tuple[VideoTaskSpec, int], None, None]:
task_params = {
"name": f"{request.node.name}[{request.fixturename}]",
"labels": [{"name": "a"}],
}
if segment_size:
task_params["segment_size"] = segment_size
resulting_task_size = len(
range(start_frame or 0, (stop_frame or frame_count - 1) + 1, step or 1)
)
video_file = generate_video_file(frame_count)
video_data = video_file.getvalue()
data_params = {
"image_quality": 70,
"client_files": [video_file],
"chunk_size": max(1, (segment_size or resulting_task_size) // 2),
}
if start_frame is not None:
data_params["start_frame"] = start_frame
if stop_frame is not None:
data_params["stop_frame"] = stop_frame
if step is not None:
data_params["frame_filter"] = f"step={step}"
def get_video_file() -> io.BytesIO:
return io.BytesIO(video_data)
task_id, _ = create_task(self._USERNAME, spec=task_params, data=data_params)
yield VideoTaskSpec(
models.TaskWriteRequest._from_openapi_data(**task_params),
models.DataRequest._from_openapi_data(**data_params),
get_video_file=get_video_file,
size=resulting_task_size,
), task_id
@pytest.fixture(scope="class")
def fxt_uploaded_video_task(
self,
request: pytest.FixtureRequest,
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._uploaded_video_task_fxt_base(request=request)
@pytest.fixture(scope="class")
def fxt_uploaded_video_task_with_segments(
self, request: pytest.FixtureRequest
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._uploaded_video_task_fxt_base(request=request, segment_size=4)
@fixture(scope="class")
@parametrize("step", [2, 5])
@parametrize("stop_frame", [15, 26])
@parametrize("start_frame", [3, 7])
def fxt_uploaded_video_task_with_segments_start_stop_step(
self, request: pytest.FixtureRequest, start_frame: int, stop_frame: Optional[int], step: int
) -> Generator[tuple[ITaskSpec, int], None, None]:
yield from self._uploaded_video_task_fxt_base(
request=request,
frame_count=30,
segment_size=4,
start_frame=start_frame,
stop_frame=stop_frame,
step=step,
)
def _compute_annotation_segment_params(self, task_spec: ITaskSpec) -> list[tuple[int, int]]:
segment_params = []
frame_step = task_spec.frame_step
segment_size = getattr(task_spec, "segment_size", 0) or task_spec.size * frame_step
start_frame = getattr(task_spec, "start_frame", 0)
stop_frame = getattr(task_spec, "stop_frame", None) or (
start_frame + (task_spec.size - 1) * frame_step
)
end_frame = calc_end_frame(start_frame, stop_frame, frame_step)
validation_params = getattr(task_spec, "validation_params", None)
if validation_params and validation_params.mode.value == "gt_pool":
end_frame = min(
end_frame, (task_spec.size - validation_params.frame_count) * frame_step
)
segment_size = min(segment_size, end_frame - 1)
overlap = min(
(
getattr(task_spec, "overlap", None) or 0
if task_spec.source_data_type == SourceDataType.images
else 5
),
segment_size // 2,
)
segment_start = start_frame
while segment_start < end_frame:
if start_frame < segment_start:
segment_start -= overlap * frame_step
segment_end = segment_start + frame_step * segment_size
segment_params.append((segment_start, min(segment_end, end_frame) - frame_step))
segment_start = segment_end
return segment_params
@staticmethod
def _compare_images(
expected: Image.Image, actual: Image.Image, *, must_be_identical: bool = True
):
expected_pixels = np.array(expected)
chunk_frame_pixels = np.array(actual)
assert expected_pixels.shape == chunk_frame_pixels.shape
if not must_be_identical:
# video chunks can have slightly changed colors, due to codec specifics
# compressed images can also be distorted
assert np.allclose(chunk_frame_pixels, expected_pixels, atol=2)
else:
assert np.array_equal(chunk_frame_pixels, expected_pixels)
def _get_job_abs_frame_set(self, job_meta: models.DataMetaRead) -> Sequence[int]:
if job_meta.included_frames:
return job_meta.included_frames
else:
return range(
job_meta.start_frame,
job_meta.stop_frame + 1,
parse_frame_step(job_meta.frame_filter),
)
_tasks_with_honeypots_cases = [
fixture_ref("fxt_uploaded_images_task_with_honeypots_and_segments"),
fixture_ref("fxt_uploaded_images_task_with_honeypots_and_segments_start_step"),
fixture_ref("fxt_uploaded_images_task_with_honeypots_and_changed_real_frames"),
fixture_ref("fxt_cloud_images_task_with_honeypots_and_changed_real_frames"),
]
_tasks_with_simple_gt_job_cases = [
fixture_ref("fxt_uploaded_images_task_with_gt_and_segments_start_step"),
fixture_ref("fxt_uploaded_images_task_with_gt_and_segments_and_consensus"),
]
_tasks_with_consensus_cases = [
fixture_ref("fxt_uploaded_images_task_with_segments_and_consensus"),
fixture_ref("fxt_uploaded_images_task_with_gt_and_segments_and_consensus"),
]
# Keep in mind that these fixtures are generated eagerly
# (before each depending test or group of tests),
# e.g. a failing task creation in one the fixtures will fail all the depending tests cases.
_all_task_cases = unique(
[
fixture_ref("fxt_uploaded_images_task"),
fixture_ref("fxt_uploaded_images_task_with_segments"),
fixture_ref("fxt_uploaded_images_task_with_segments_start_stop_step"),
fixture_ref("fxt_uploaded_video_task"),
fixture_ref("fxt_uploaded_video_task_with_segments"),
fixture_ref("fxt_uploaded_video_task_with_segments_start_stop_step"),
]
+ _tasks_with_honeypots_cases
+ _tasks_with_simple_gt_job_cases
+ _tasks_with_consensus_cases,
key=lambda fxt_ref: fxt_ref.fixture,
)