|
|
@@ -0,0 +1,97 @@
|
|
|
+from collections import defaultdict
|
|
|
+from wtforms import Form, FieldList, FormField
|
|
|
+from pydantic import ValidationError, BaseModel
|
|
|
+
|
|
|
+
|
|
|
+class PydanticFieldList(FieldList):
|
|
|
+ _baked_instance = None
|
|
|
+
|
|
|
+ def populate_obj(self, obj, name):
|
|
|
+ attr = getattr(obj, name)
|
|
|
+ for n, v in enumerate(self._baked_instance):
|
|
|
+ self.entries[n].form._baked_instance = v
|
|
|
+ model = self.entries[n].form.get_model()
|
|
|
+ new = model(**v.dict())
|
|
|
+ attr.append(new)
|
|
|
+
|
|
|
+
|
|
|
+class PydanticForm(Form):
|
|
|
+ _errors = {}
|
|
|
+ _schema = None
|
|
|
+ _baked_instance = None
|
|
|
+
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ super().__init__(*args, **kwargs)
|
|
|
+ self._errors = {}
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _schema_to_form_key(schema_key):
|
|
|
+ return schema_key
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def _add_field_error(form, field, value):
|
|
|
+ form.errors.update({field: value})
|
|
|
+ field = getattr(form, field)
|
|
|
+ if isinstance(field, FormField):
|
|
|
+ field.form._errors = value
|
|
|
+ elif isinstance(field, FieldList):
|
|
|
+ raise NotImplementedError()
|
|
|
+ else:
|
|
|
+ field.errors = value
|
|
|
+
|
|
|
+ def _update_field_errors(self, field_key, value):
|
|
|
+ if len(field_key) > 1:
|
|
|
+ path, latest = field_key[:-1], field_key[-1]
|
|
|
+ if len(path) == 1:
|
|
|
+ self._add_field_error(self, path[0], {latest: value})
|
|
|
+ sub_form = getattr(self, path[0]).form
|
|
|
+ self._add_field_error(sub_form, latest, value)
|
|
|
+ else:
|
|
|
+ self._add_field_error(self, field_key[0], value)
|
|
|
+
|
|
|
+ def _process_data_for_schema(self, data):
|
|
|
+ """Process data form for pydantic schema"""
|
|
|
+ return data
|
|
|
+
|
|
|
+ def _form_field_from_baked_field(self, baked_field_key):
|
|
|
+ """Get the form field to fill and the value from a field in the baked
|
|
|
+ instance"""
|
|
|
+ value = getattr(self._baked_instance, baked_field_key)
|
|
|
+ return getattr(self, baked_field_key), value
|
|
|
+
|
|
|
+ def validate(self):
|
|
|
+ """Validate the form, relying on Pydantic"""
|
|
|
+ try:
|
|
|
+ data = self._process_data_for_schema(self.data)
|
|
|
+ self._baked_instance = self._schema(**data)
|
|
|
+
|
|
|
+ # Fill the values of every sub-forms (if any)
|
|
|
+ for k, v in self._baked_instance.fields.items():
|
|
|
+ if issubclass(v.type_, BaseModel):
|
|
|
+ field_attr, value = self._form_field_from_baked_field(k)
|
|
|
+ if isinstance(field_attr, FieldList):
|
|
|
+ field_attr._baked_instance = value
|
|
|
+ else:
|
|
|
+ field_attr.form._baked_instance = value
|
|
|
+
|
|
|
+ return True
|
|
|
+ except ValidationError as e:
|
|
|
+ errors_dict = defaultdict(list)
|
|
|
+ for e in e.errors():
|
|
|
+ errors_dict[self._schema_to_form_key(e['loc'])].append(e)
|
|
|
+
|
|
|
+ for k, v in errors_dict.items():
|
|
|
+ self._update_field_errors(k, v)
|
|
|
+ return False
|
|
|
+
|
|
|
+ def populate_obj(self, obj):
|
|
|
+ """Populate `obj` with data from the form
|
|
|
+
|
|
|
+ :param obj: the object to populate """
|
|
|
+ for k, v in self._baked_instance.dict().items():
|
|
|
+ setattr(obj, k, v)
|
|
|
+
|
|
|
+ @property
|
|
|
+ def errors(self):
|
|
|
+ """Errors in the form, in the format of a dict"""
|
|
|
+ return self._errors
|