import json
import logging
from dataclasses import dataclass
from typing import Any, cast
from pydicom import config, Sequence, Dataset, DataElement
from pydicom.multival import MultiValue
from pydicom.tag import BaseTag, Tag
from pydicom.valuerep import validate_value, VR
from dicom_validator.spec_reader.condition import (
ConditionType,
ConditionOperator,
)
from dicom_validator.validator.dicom_info import DicomInfo
from dicom_validator.validator.error_handler import (
ValidationResultHandler,
default_error_handler,
)
from dicom_validator.validator.validation_result import (
ValidationResult,
Status,
TagErrors,
ErrorCode,
TagError,
TagType,
ErrorScope,
DicomTag,
)
class DatasetStackItem:
"""Holds either the root dataset, or a dataset in a sequence item together
with the related sequence tag ID and the parent sequence tag IDs."""
def __init__(
self,
dataset: Dataset,
tag: BaseTag | None = None,
stack: list[BaseTag] | None = None,
in_func_group: bool = False,
) -> None:
self.dataset = dataset
self.tag = tag
self.stack = stack
self.in_func_group = in_func_group
if not in_func_group:
self.in_func_group = tag in (0x5200_9229, 0x5200_9230)
if tag is not None:
if stack is None:
self.stack = [tag]
else:
self.stack = stack[:] + [tag]
self.unexpected_tags = {
DicomTag(Tag(d.tag), self.stack)
for d in dataset
if not Tag(d.tag).is_private
}
@dataclass
class FunctionalGroupInfo:
"""Contains information about the currently validated functional groups.
Contrary to other checks, we have to check both Shared and PerFrame functional
groups before being able to do the validation.
"""
shared_results: dict # the result of the shared group validation
checked_modules: set # the names of already validated macro modules
def clear(self) -> None:
self.shared_results.clear()
self.checked_modules.clear()
def combined(
self, module_name: str, seq_tag: DicomTag, per_frame: TagErrors
) -> TagErrors:
"""Return the combined error for errors from shared and per-frame groups
for the given module.
Parameters
----------
module_name : str
The name of the validated macro module.
seq_tag : DicomTag
The tag ID of the top-level-sequence tag in the macro
per_frame : TagErrors
The errors from validation of the module in the per-frame group.
"""
result = {}
shared = self.shared_results.get(module_name, {})
if not shared and not per_frame:
# the module is present in both shared and per-frame groups
# this is an error
return {
seq_tag: TagError(
code=ErrorCode.TagNotAllowed, scope=ErrorScope.BothFuncGroups
)
}
for tag, error in shared.items():
# similar tags differ by the functional group parent tag
per_frame_tag = DicomTag(tag.tag, [0x5200_9230] + tag.parents[1:])
if per_frame.get(per_frame_tag) == error:
# if the error appears in both sequences, it is real
result[tag] = shared[tag]
del per_frame[per_frame_tag]
elif error.code == ErrorCode.TagMissing:
# for missing tags, we also have to check if the error does not appear
# in the per-frame group because it is part of a missing sequence
handled_tags = []
for per_frame_tag in per_frame:
if per_frame_tag.parents and tag.tag in per_frame_tag.parents[1:]:
result[per_frame_tag] = per_frame[per_frame_tag]
handled_tags.append(per_frame_tag)
for handled_tag in handled_tags:
del per_frame[handled_tag]
else:
# other errors (unexpected tag, missing value) shall always remain
result[tag] = shared[tag]
for tag, error in per_frame.items():
if error.code == ErrorCode.TagMissing:
# same check as above
handled_tags = []
for shared_tag in shared:
if shared_tag.parents and tag.tag in shared_tag.parents[1:]:
result[shared_tag] = shared[shared_tag]
handled_tags.append(shared_tag)
for handled_tag in handled_tags:
del shared[handled_tag]
else:
result[tag] = per_frame[tag]
return result
class InvalidParameterError(Exception):
pass
[docs]
class IODValidator:
"""Performs the actual IOD validation of a single DICOM dataset."""
[docs]
def __init__(
self,
dataset: Dataset,
dicom_info: DicomInfo,
*,
log_level: int = logging.INFO,
suppress_vr_warnings: bool = False,
error_handler: ValidationResultHandler | None = None,
file_path: str = "",
) -> None:
"""Create an IODValidator instance.
Parameters
----------
dataset : Dataset
The dataset to be validated.
dicom_info : dict
The DICOM information as extracted from the standard.
log_level : int
The log level of the logger, if using the default error handler.
suppress_vr_warnings : bool
If True, skip the VR validation of DICOM tags.
error_handler : ValidationResultHandler
Handles errors found during validation.
Defaults to a handler that logs all errors to the console.
"""
self._dataset = dataset
self._dataset_stack = [DatasetStackItem(self._dataset)]
self._dicom_info = dicom_info
self._func_group_info = FunctionalGroupInfo({}, set())
self._suppress_vr_warnings = suppress_vr_warnings
self.result = ValidationResult(file_path=file_path)
if error_handler is not None:
self.handler = error_handler
else:
self.handler = default_error_handler(dicom_info, log_level)
[docs]
def validate(self) -> ValidationResult:
"""Validates current dataset.
All errors are contained in the `ValidationResult` object after execution.
By default, e.g. if no other handler has been set, all errors are
logged to the console.
"""
self.result.reset()
self.result.sop_class_uid = self._dataset.get("SOPClassUID")
if not self.result.sop_class_uid:
self.result.status = Status.MissingSOPClassUID
self.result.errors = 1
else:
if self.result.sop_class_uid not in self._dicom_info.iods:
self.result.status = Status.UnknownSOPClassUID
self.result.errors = 1
else:
self._validate_sop_class()
self.handler.handle_validation_result(self.result)
return self.result
def _validate_sop_class(self) -> None:
"""Validate the dataset against the current SOP class.
Record all errors in the `errors` attribute.
"""
self.handler.handle_validation_start(self.result)
iod_info = self._dicom_info.iods[self.result.sop_class_uid]
maybe_existing_modules = self._get_maybe_existing_modules(iod_info["modules"])
for module_name, module in iod_info["modules"].items():
self._dataset_stack[-1].tag = module_name
errors = self._validate_module(
module, module_name, maybe_existing_modules, iod_info["group_macros"]
)
if errors:
self.result.add_tag_errors(module_name, errors)
self.result.status = Status.Failed
if len(self._dataset_stack[-1].unexpected_tags) != 0:
self.result.add_tag_errors("General", self._unexpected_tag_errors())
def _validate_module(
self,
module: dict[str, dict | str],
module_name: str,
maybe_existing_modules: dict[str, set[DicomTag]],
group_macros: dict[str, dict[str, dict]] | None = None,
) -> TagErrors:
"""Validate the given module.
Parameters
----------
module : dict[str, dict]
Contains the module reference chapter ("ref"), the usage ("use"),
and optionally the usage condition as a dictionary (see `Condition`).
module_name : str
The module name as listed in the standard.
maybe_existing_modules : dict[str, set[DicomTag]]
List of module references with contained tags that may be present
in the dataset. Due to the fact that the same tag may belong to
different modules, the presence of the module is only guessed at this point,
and some of them may not actually be present.
group_macros : dict[str, dict], optional
The modules allowed in functional group sequences, if the given module
contains them, otherwise an empty dictionary.
The keys are the module names, the values the module dicts as described
for `module`.
None if `module` itself is a module in a functional group.
Returns
-------
The dictionary of found errors.
"""
usage = module["use"]
module_info = self._get_module_info(cast(str, module["ref"]), group_macros)
condition: dict[str, dict] | None = None
if "cond" in module:
condition = cast(dict[str, dict], module["cond"])
is_shared = False
is_per_frame = False
if group_macros is None:
if module_name in self._func_group_info.checked_modules:
# check only one per-frame item
return {}
is_shared = self._in_shared_group
if not is_shared:
is_per_frame = self._in_per_frame_group
allowed = True
scope = ErrorScope.General
if condition and "F" in condition["type"] and is_shared:
required, allowed = False, False
scope = ErrorScope.SharedFuncGroup
elif condition and "S" in condition["type"] and is_per_frame:
required, allowed = False, False
scope = ErrorScope.PerFrameFuncGroup
elif usage[0] == "M":
required = True
elif usage[0] == ConditionType.UserDefined:
required = False
elif condition:
required, allowed = self._object_is_required_or_allowed(condition)
else:
required = False
if required:
# Always validate required modules.
# If the module is missing from the dataset the validation
# should report it as an error.
result = self._validate_attributes(module_info, False)
if group_macros is not None:
return result
# for functional groups, we need to check both shared and per-frame groups
# to get a result; a required module should be in only one of these
if is_shared:
# just save the result to check together with per-frame groups
self._func_group_info.shared_results[module_name] = result
return {}
if is_per_frame:
shared_result = self._func_group_info.shared_results.get(module_name)
if shared_result is not None:
seq_tag = self._tag_id(list(module_info.keys())[0])
return self._func_group_info.combined(module_name, seq_tag, result)
return result
if module["ref"] not in maybe_existing_modules:
# The module is not present at all in the dataset.
# No validation is needed.
return {}
# At this point the module is __not required__ but it __may be existing__
# in the dataset.
# Just "maybe" because multiple modules may have overlapping attributes.
# So, let's see if it exists "strongly" enough to be considered
# for further checks.
if maybe_existing_modules and not self._does_module_strongly_exist(
cast(str, module["ref"]), maybe_existing_modules
):
return {}
if not allowed:
# no special case for functional groups here
errors = {}
for tag_id_string in module_info:
tag_id = self._tag_id(tag_id_string)
if tag_id.tag in self._dataset_stack[-1].dataset:
errors[tag_id] = TagError(code=ErrorCode.TagNotAllowed, scope=scope)
self._dataset_stack[-1].unexpected_tags.discard(tag_id)
return errors
return self._validate_attributes(module_info, False)
@property
def _in_per_frame_group(self) -> bool:
return self._dataset_stack[-1].tag == 0x5200_9230
@property
def _in_shared_group(self) -> bool:
return self._dataset_stack[-1].tag == 0x5200_9229
def _validate_attributes(
self, attributes: dict, report_unexpected_tags: bool
) -> TagErrors:
"""Validate the given attributes according to their type.
Parameters
----------
attributes : dict
The attributes of a single module to be validated.
report_unexpected_tags : bool
If True, tags that are not expected are reported and placed into
the `errors` dictionary.
Returns
-------
The dictionary of found errors.
"""
errors = TagErrors()
for tag_id_string, attribute in attributes.items():
if tag_id_string == "modules":
self._validate_func_group_modules(attribute)
else:
tag_id = self._tag_id(tag_id_string)
if (
tag_error := self._validate_attribute(tag_id, attribute)
) is not None:
errors[tag_id] = tag_error
self._dataset_stack[-1].unexpected_tags.discard(tag_id)
if "items" in attribute:
data_elem = self._dataset_stack[-1].dataset.get_item(tag_id.tag)
if data_elem is None:
continue
if data_elem.VR != "SQ":
raise RuntimeError(f"Not a sequence: {data_elem}")
for sq_item_dataset in data_elem.value:
self._dataset_stack.append(
DatasetStackItem(
sq_item_dataset,
tag_id.tag,
self._dataset_stack[-1].stack,
self._dataset_stack[-1].in_func_group,
)
)
# the item attributes are only created at this point,
# where we have descended into the related sequence item level
item_attribute = attribute["items"]
if "group_macros" in item_attribute:
group_macros = item_attribute["group_macros"]
items = item_attribute["items"]
else:
group_macros = None
items = item_attribute
item_attributes = self._expanded_module_info(
items,
group_macros,
expand_items=True,
)
errors.update(self._validate_attributes(item_attributes, True))
self._dataset_stack.pop()
if report_unexpected_tags:
errors.update(self._unexpected_tag_errors())
return errors
def _validate_func_group_modules(
self, modules: dict[str, dict[str, dict | str]]
) -> None:
if self._in_shared_group:
self._func_group_info.clear()
maybe_existing_modules = self._get_maybe_existing_modules(modules)
for module_name, module in modules.items():
errors = self._validate_module(module, module_name, maybe_existing_modules)
if errors:
self.result.add_tag_errors(module_name, errors)
def _validate_attribute(self, tag_id: DicomTag, attribute: dict) -> TagError | None:
"""Validate a single DICOM attribute according to its type.
Parameters
----------
tag_id : DicomTag
The tag ID of the attribute.
attribute : dict
Contains the attribute type ("type"), and the optional condition ("cond")
for the presence of the attribute (see `Condition`).
Returns
-------
The dictionary of found errors.
"""
attribute_type = attribute["type"]
# ignore image data and larger tags for now - we don't read them
if tag_id.tag >= 0x7FE00010:
return None
has_tag = tag_id.tag in self._dataset_stack[-1].dataset
error = TagError(attribute_type, context={})
value_required = attribute_type in ("1", "1C")
if attribute_type in ("1", "2"):
tag_required, tag_allowed = True, True
elif "cond" in attribute:
error.context = error.context or {}
error.context["cond"] = attribute["cond"]
tag_required, tag_allowed = self._object_is_required_or_allowed(
error.context["cond"]
)
else:
tag_required, tag_allowed = False, True
if not has_tag and tag_required:
error.code = ErrorCode.TagMissing
elif has_tag and not tag_allowed:
error.code = ErrorCode.TagNotAllowed
elif has_tag:
value = self._dataset_stack[-1].dataset[tag_id.tag].value
vr = self._dataset_stack[-1].dataset[tag_id.tag].VR
if value_required:
if value is None or isinstance(value, (Sequence, str)) and not value:
error.code = ErrorCode.TagEmpty
if value is not None and (not isinstance(value, str) or value):
if not isinstance(value, (MultiValue, list)):
value = [value]
for i, v in enumerate(value):
if "enums" in attribute:
for enums in attribute["enums"]:
# if an index is there, we only check the value for the
# correct index; otherwise there will only be one entry
if "index" in enums and int(enums["index"]) != i + 1:
continue
# check an existing condition
if cond := enums.get("cond"):
if not self._object_is_required_or_allowed(cond)[0]:
continue
if v not in enums["val"]:
error.code = ErrorCode.EnumValueNotAllowed
error.context = error.context or {}
error.context.update(
{"value": v, "allowed": enums["val"]}
)
if not self._suppress_vr_warnings and not error.is_error():
vv = str(v) if vr in ("DS", "IS") else v
try:
validate_value(vr, vv, config.RAISE)
except ValueError:
error.code = ErrorCode.InvalidValue
error.context = error.context or {}
error.context.update({"value": vv, "VR": vr})
if error.is_error():
return error
return None
def _object_is_required_or_allowed(self, condition: dict[str, Any]):
"""Checks if an attribute is required or allowed in the current dataset,
depending on the given condition.
Parameters
----------
condition : dict
The condition or serialized condition defining if the object shall or
may be present.
Returns
-------
tuple(bool, bool)
The first attribute is `True` if the attribute is required,
the second if it is allowed. Valid combinations are:
True, True: the attribute is required
False, True: the attribute is allowed but not required
False, False: the attribute is not allowed.
"""
condition_type = condition["type"]
if ConditionType(condition_type).user_defined:
return False, True
matches = self._composite_object_matches_condition(condition)
if matches:
if condition_type == ConditionType.NotAllowedOrUserDefined:
return False, False
return True, True
allowed = (
condition_type == ConditionType.MandatoryOrUserDefined
or condition_type == ConditionType.MandatoryOrConditional
and self._composite_object_matches_condition(condition["other_cond"])
)
return False, allowed
def _composite_object_matches_condition(self, condition: dict[str, Any]):
"""Checks if an attribute matches the given composite condition.
Parameters
----------
condition : dict
The condition dictionary.
Returns
-------
bool
`True` if the attribute matches the condition.
"""
if "and" in condition:
matches = all(
self._composite_object_matches_condition(cond)
for cond in condition["and"]
)
elif "or" in condition:
matches = any(
self._composite_object_matches_condition(cond)
for cond in condition["or"]
)
else:
matches = self._matches_condition(condition)
return matches
def _matches_condition(self, condition: dict[str, Any]) -> bool:
"""Checks if an attribute matches the given condition.
Parameters
----------
condition : dict
The condition dict.
Returns
-------
bool
`True` if the attribute matches the condition in the dataset.
"""
tag_id = self._tag_id(condition["tag"])
tag_value = None
operator = ConditionOperator(condition["op"])
if operator == ConditionOperator.Present:
return self._tag_exists(tag_id)
elif operator == ConditionOperator.Absent:
return not self._tag_exists(tag_id)
elif data_element := self._lookup_tag(tag_id):
assert data_element is not None
index = condition["index"]
if index > 0:
if index <= data_element.VM:
tag_value = data_element.value[index - 1]
elif data_element.VM > 1:
tag_value = data_element.value[0]
else:
tag_value = data_element.value
if tag_value is None:
return False
if operator == ConditionOperator.NotEmpty:
return True
return self._tag_matches(tag_value, operator, condition["values"])
return False
#
# Get all the modules that have at least one tag/attribute present
# in the dataset.
#
# We consider these as maybe-existing (or maybe-present) in the dataset.
# Only maybe, because a tag/attribute may belong to two different modules,
# and we cannot be sure which of those two modules should be considered
# as "existing/present" in the dataset.
#
# We return a dictionary, where the key is the module ref
# and the value is the list of tags present in the dataset.
#
def _get_maybe_existing_modules(
self, modules: dict[str, dict]
) -> dict[str, set[DicomTag]]:
maybe_existing_modules = {}
for module in modules.values():
module_info = self._get_module_info(module["ref"])
existing_tags = self._get_existing_tags_of_module(module_info)
if existing_tags:
maybe_existing_modules[module["ref"]] = existing_tags
return maybe_existing_modules
#
# Check if a maybe-existing module is strongly-existing.
# A module is strongly-existing if it has existing tags/attributes
# that are not present in any of the other maybe-existing modules.
#
@staticmethod
def _does_module_strongly_exist(
a_module_ref: str, maybe_existing_modules: dict[str, set[DicomTag]]
) -> bool:
a_tags = maybe_existing_modules[a_module_ref]
for b_ref, b_tags in maybe_existing_modules.items():
if b_ref == a_module_ref:
continue
tags_only_in_a = a_tags - (a_tags & b_tags)
if len(tags_only_in_a) == 0:
return False
return True
def _get_existing_tags_of_module(
self, module_info: dict[str, dict | str]
) -> set[DicomTag]:
existing_tag_ids = set()
for tag_id_string in module_info:
tag_id = self._tag_id(tag_id_string)
if tag_id.tag in self._dataset_stack[-1].dataset:
existing_tag_ids.add(tag_id)
return existing_tag_ids
def _lookup_tag_in_func_group(
self, tag: DicomTag, seq_tag: int
) -> DataElement | None:
"""Lookup a tag in the functional group if it wasn't directly found
at the current level (e.g. the current sequence).
"""
dataset = self._dataset_stack[0].dataset
if seq_tag not in dataset:
return None
if not len(dataset[seq_tag].value):
return None
seq_item: Dataset = dataset[seq_tag].value[0]
# the tag may be a top-level sequence
if tag.tag in seq_item:
return seq_item[tag.tag]
# otherwise, only check top-level tags in all sequences
# as only these are referenced in conditions
for elem_tag in seq_item.keys():
# access via indexing to ensure we get a fully decoded DataElement
# (pydicom decodes deferred RawDataElements inside Dataset.__getitem__)
seq = seq_item[elem_tag]
if seq.VR != VR.SQ or not len(seq.value):
continue
item: Dataset = seq.value[0]
if tag.tag in item:
return item[tag.tag]
return None
def _lookup_tag_in_func_groups(self, tag: DicomTag) -> DataElement | None:
"""Lookup a tag in the functional groups if it wasn't directly found
at the current level (e.g. thr current sequence).
"""
if tag.parents is None:
return None
if element := self._lookup_tag_in_func_group(tag, tag.parents[0]):
return element
other_group_tag = 0x5200_9230 if tag.parents[0] == 0x5200_9229 else 0x5200_9229
return self._lookup_tag_in_func_group(tag, other_group_tag)
def _lookup_tag(self, tag: DicomTag) -> DataElement | None:
for stack_item in reversed(self._dataset_stack):
if tag.tag in stack_item.dataset:
return stack_item.dataset[tag.tag]
if self._dataset_stack[-1].in_func_group:
return self._lookup_tag_in_func_groups(tag)
return None
def _tag_exists(self, tag_id: DicomTag) -> bool:
return self._lookup_tag(tag_id) is not None
def _tag_id(self, tag_id_string: str) -> DicomTag:
group, element = tag_id_string[1:-1].split(",")
# workaround for repeating tags -> special handling needed
if group.endswith("xx"):
group = group[:2] + "00"
parents = [(d.tag or 0) for d in self._dataset_stack[1:]] or None
return DicomTag((int(group, 16) << 16) + int(element, 16), parents)
def _tag_matches(
self, tag_value: Any, operator: ConditionOperator, values: list
) -> bool:
try:
values = [type(tag_value)(value) for value in values]
except ValueError:
# the values are of the wrong type - ignore them
return False
if operator == ConditionOperator.EqualsValue:
return tag_value in values
if operator == ConditionOperator.NotEqualsValue:
return tag_value not in values
if operator == ConditionOperator.GreaterValue:
return tag_value > values[0]
if operator == ConditionOperator.LessValue:
return tag_value < values[0]
if operator == ConditionOperator.EqualsTag:
return tag_value in values
return False
def _get_module_info(
self,
module_ref: str,
group_macros: dict[str, dict[str, dict]] | None = None,
) -> dict[str, dict | str]:
return self._expanded_module_info(
self._dicom_info.modules[module_ref], group_macros
)
def _expanded_module_info(
self,
module_info: dict[str, dict],
group_macros: dict[str, dict[str, dict]] | None,
expand_items: bool = False,
) -> dict[str, dict | str]:
expanded_mod_info: dict[str, dict | str] = {}
for k, v in module_info.items():
if k == "include":
for info in module_info["include"]:
ref = info["ref"]
if ref == "FuncGroup":
if group_macros is None:
continue
expanded_mod_info["modules"] = group_macros
else:
if "cond" in info:
if not self._object_is_required_or_allowed(info["cond"])[0]:
continue
expanded_mod_info.update(
self._get_module_info(ref, group_macros)
)
elif not expand_items and k == "items":
# we shall not create the item attributes at this point, because
# they may have conditions that refer to item inside the sequence,
# and we are currently at a higher dataset level
# instead, we save the information needed to create them lazily
expanded_mod_info[k] = {"items": v, "group_macros": group_macros}
elif isinstance(v, dict):
expanded_mod_info[k] = self._expanded_module_info(v, group_macros)
else:
expanded_mod_info[k] = v
return expanded_mod_info
def _unexpected_tag_errors(self) -> dict[DicomTag, TagError]:
errors = {}
for tag_id in self._dataset_stack[-1].unexpected_tags:
errors[tag_id] = TagError(TagType.Undefined, ErrorCode.TagUnexpected)
return errors
# For debugging
@staticmethod
def _dump_dict_as_json(name, d):
print("{")
print(f'"{name}": ')
print(json.dumps(d, indent=2))
print("}")