Мне нужна настройка в Label Studio, чтобы администратор мог назначать проект пользователям. Как мне это реализовать?
Назначьте функцию, которая будет отображаться таким образом в студии меток. Администратор может назначить проект пользователям. Используя флажок, администратор может выбрать пользователей для конкретного проекта, чтобы назначить его.
Это модель проекта
class ProjectOnboardingSteps(models.Model):
""" """
DATA_UPLOAD = "DU"
CONF_SETTINGS = "CF"
PUBLISH = "PB"
INVITE_EXPERTS = "IE"
STEPS_CHOICES = (
(DATA_UPLOAD, "Import your data"),
(CONF_SETTINGS, "Configure settings"),
(PUBLISH, "Publish project"),
(INVITE_EXPERTS, "Invite collaborators"),
)
code = models.CharField(max_length=2, choices=STEPS_CHOICES, null=True)
title = models.CharField(_('title'), max_length=1000, null=False)
description = models.TextField(_('description'), null=False)
order = models.IntegerField(default=0)
created_at = models.DateTimeField(_('created at'), auto_now_add=True)
updated_at = models.DateTimeField(_('updated at'), auto_now=True)
class Meta:
ordering = ['order']
class ProjectOnboarding(models.Model):
""" """
step = models.ForeignKey(ProjectOnboardingSteps, on_delete=models.CASCADE, related_name="po_through")
project = models.ForeignKey(Project, on_delete=models.CASCADE)
finished = models.BooleanField(default=False)
created_at = models.DateTimeField(_('created at'), auto_now_add=True)
updated_at = models.DateTimeField(_('updated at'), auto_now=True)
def save(self, *args, **kwargs):
super(ProjectOnboarding, self).save(*args, **kwargs)
if ProjectOnboarding.objects.filter(project=self.project, finished=True).count() == 4:
self.project.skip_onboarding = True
self.project.save(recalc=False)
class LabelStreamHistory(models.Model):
user = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='histories', help_text='User ID'
) # noqa
project = models.ForeignKey(Project, on_delete=models.CASCADE, related_name='histories', help_text='Project ID')
data = models.JSONField(default=list)
class Meta:
constraints = [
models.UniqueConstraint(fields=['user', 'project'], name='unique_history')
]
class ProjectMember(models.Model):
user = models.ForeignKey(
settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='project_memberships', help_text='User ID'
) # noqa
project = models.ForeignKey(Project, on_delete=models.CASCADE, related_name='members', help_text='Project ID')
enabled = models.BooleanField(default=True, help_text='Project member is enabled')
created_at = models.DateTimeField(_('created at'), auto_now_add=True)
updated_at = models.DateTimeField(_('updated at'), auto_now=True)
class ProjectSummary(models.Model):
project = AutoOneToOneField(Project, primary_key=True, on_delete=models.CASCADE, related_name='summary')
created_at = models.DateTimeField(_('created at'), auto_now_add=True, help_text='Creation time')
# { col1: task_count_with_col1, col2: task_count_with_col2 }
all_data_columns = JSONField(
_('all data columns'), null=True, default=dict, help_text='All data columns found in imported tasks'
)
# [col1, col2]
common_data_columns = JSONField(
_('common data columns'), null=True, default=list, help_text='Common data columns found across imported tasks'
)
# { (from_name, to_name, type): annotation_count }
created_annotations = JSONField(
_('created annotations'),
null=True,
default=dict,
help_text='Unique annotation types identified by tuple (from_name, to_name, type)',
) # noqa
# { from_name: {label1: task_count_with_label1, label2: task_count_with_label2} }
created_labels = JSONField(_('created labels'), null=True, default=dict, help_text='Unique labels')
created_labels_drafts = JSONField(_('created labels in drafts'), null=True, default=dict, help_text='Unique drafts labels')
def has_permission(self, user):
user.project = self.project # link for activity log
return self.project.has_permission(user)
def reset(self, tasks_data_based=True):
if tasks_data_based:
self.all_data_columns = {}
self.common_data_columns = []
self.created_annotations = {}
self.created_labels = {}
self.created_labels_drafts = {}
self.save()
def update_data_columns(self, tasks):
common_data_columns = set()
all_data_columns = dict(self.all_data_columns)
for task in tasks:
try:
task_data = get_attr_or_item(task, 'data')
except KeyError:
task_data = task
task_data_keys = task_data.keys()
for column in task_data_keys:
all_data_columns[column] = all_data_columns.get(column, 0) + 1
if not common_data_columns:
common_data_columns = set(task_data_keys)
else:
common_data_columns &= set(task_data_keys)
self.all_data_columns = all_data_columns
if not self.common_data_columns:
self.common_data_columns = list(sorted(common_data_columns))
else:
self.common_data_columns = list(sorted(set(self.common_data_columns) & common_data_columns))
logger.debug(f'summary.all_data_columns = {self.all_data_columns}')
logger.debug(f'summary.common_data_columns = {self.common_data_columns}')
self.save(update_fields=['all_data_columns', 'common_data_columns'])
def remove_data_columns(self, tasks):
all_data_columns = dict(self.all_data_columns)
keys_to_remove = []
for task in tasks:
task_data = get_attr_or_item(task, 'data')
for key in task_data.keys():
if key in all_data_columns:
all_data_columns[key] -= 1
if all_data_columns[key] == 0:
keys_to_remove.append(key)
all_data_columns.pop(key)
self.all_data_columns = all_data_columns
if keys_to_remove:
common_data_columns = list(self.common_data_columns)
for key in keys_to_remove:
if key in common_data_columns:
common_data_columns.remove(key)
self.common_data_columns = common_data_columns
logger.debug(f'summary.all_data_columns = {self.all_data_columns}')
logger.debug(f'summary.common_data_columns = {self.common_data_columns}')
self.save(update_fields=['all_data_columns', 'common_data_columns', ])
def _get_annotation_key(self, result):
result_type = result.get('type', None)
if result_type in ('relation', 'pairwise', None):
return None
if 'from_name' not in result or 'to_name' not in result:
logger.error(
'Unexpected annotation.result format: "from_name" or "to_name" not found',
extra={'sentry_skip': True},
)
return None
result_from_name = result['from_name']
key = get_annotation_tuple(result_from_name, result['to_name'], result_type or '')
return key
def _get_labels(self, result):
result_type = result.get('type')
# DEV-1990 Workaround for Video labels as there are no labels in VideoRectangle tag
if result_type in ['videorectangle']:
result_type = 'labels'
result_value = result['value'].get(result_type)
if not result_value or not isinstance(result_value, list) or result_type == 'text':
# Non-list values are not labels. TextArea list values (texts) are not labels too.
return []
# Labels are stored in list
labels = []
for label in result_value:
if result_type == 'taxonomy' and isinstance(label, list):
for label_ in label:
labels.append(str(label_))
else:
labels.append(str(label))
return labels
def update_created_annotations_and_labels(self, annotations):
created_annotations = dict(self.created_annotations)
labels = dict(self.created_labels)
for annotation in annotations:
results = get_attr_or_item(annotation, 'result') or []
if not isinstance(results, list):
continue
for result in results:
# aggregate annotation types
key = self._get_annotation_key(result)
if not key:
continue
created_annotations[key] = created_annotations.get(key, 0) + 1
from_name = result['from_name']
# aggregate labels
if from_name not in self.created_labels:
labels[from_name] = dict()
for label in self._get_labels(result):
labels[from_name][label] = labels[from_name].get(label, 0) + 1
logger.debug(f'summary.created_annotations = {created_annotations}')
logger.debug(f'summary.created_labels = {labels}')
self.created_annotations = created_annotations
self.created_labels = labels
self.save(update_fields=['created_annotations', 'created_labels'])
def remove_created_annotations_and_labels(self, annotations):
created_annotations = dict(self.created_annotations)
labels = dict(self.created_labels)
for annotation in annotations:
results = get_attr_or_item(annotation, 'result') or []
if not isinstance(results, list):
continue
for result in results:
# reduce annotation counters
key = self._get_annotation_key(result)
if key in created_annotations:
created_annotations[key] -= 1
if created_annotations[key] == 0:
created_annotations.pop(key)
# reduce labels counters
from_name = result.get('from_name', None)
if from_name not in labels:
continue
for label in self._get_labels(result):
label = str(label)
if label in labels[from_name]:
labels[from_name][label] -= 1
if labels[from_name][label] == 0:
labels[from_name].pop(label)
if not labels[from_name]:
labels.pop(from_name)
logger.debug(f'summary.created_annotations = {created_annotations}')
logger.debug(f'summary.created_labels = {labels}')
self.created_annotations = created_annotations
self.created_labels = labels
self.save(update_fields=['created_annotations', 'created_labels'])
def update_created_labels_drafts(self, drafts):
labels = dict(self.created_labels_drafts)
for draft in drafts:
results = get_attr_or_item(draft, 'result') or []
if not isinstance(results, list):
continue
for result in results:
if 'from_name' not in result:
continue
from_name = result['from_name']
# aggregate labels
if from_name not in self.created_labels_drafts:
labels[from_name] = dict()
for label in self._get_labels(result):
labels[from_name][label] = labels[from_name].get(label, 0) + 1
logger.debug(f'summary.created_labels = {labels}')
self.created_labels_drafts = labels
self.save(update_fields=['created_labels_drafts'])
def remove_created_drafts_and_labels(self, drafts):
labels = dict(self.created_labels_drafts)
for draft in drafts:
results = get_attr_or_item(draft, 'result') or []
if not isinstance(results, list):
continue
for result in results:
# reduce labels counters
from_name = result.get('from_name', None)
if from_name not in labels:
continue
for label in self._get_labels(result):
label = str(label)
if label in labels[from_name]:
labels[from_name][label] -= 1
if labels[from_name][label] == 0:
labels[from_name].pop(label)
if not labels[from_name]:
labels.pop(from_name)
logger.debug(f'summary.created_labels = {labels}')
self.created_labels_drafts = labels
self.save(update_fields=['created_labels_drafts'])
class ProjectImport(models.Model):
class Status(models.TextChoices):
CREATED = 'created', _('Created')
IN_PROGRESS = 'in_progress', _('In progress')
FAILED = 'failed', _('Failed')
COMPLETED = 'completed', _('Completed')
project = models.ForeignKey(
'projects.Project', null=True, related_name='imports', on_delete=models.CASCADE
)
preannotated_from_fields = models.JSONField(null=True, blank=True)
commit_to_project = models.BooleanField(default=False)
return_task_ids = models.BooleanField(default=False)
status = models.CharField(max_length=64, choices=Status.choices, default=Status.CREATED)
url = models.CharField(max_length=2048, null=True, blank=True)
traceback = models.TextField(null=True, blank=True)
error = models.TextField(null=True, blank=True)
created_at = models.DateTimeField(_('created at'), null=True, auto_now_add=True, help_text='Creation time')
updated_at = models.DateTimeField(_('updated at'), null=True, auto_now_add=True, help_text='Updated time')
finished_at = models.DateTimeField(_('finished at'), help_text='Complete or fail time', null=True, default=None)
task_count = models.IntegerField(default=0)
annotation_count = models.IntegerField(default=0)
prediction_count = models.IntegerField(default=0)
duration = models.IntegerField(default=0)
file_upload_ids = models.JSONField(default=list)
could_be_tasks_list = models.BooleanField(default=False)
found_formats = models.JSONField(default=list)
data_columns = models.JSONField(default=list)
tasks = models.JSONField(blank=True, null=True)
task_ids = models.JSONField(default=list)
def has_permission(self, user):
return self.project.has_permission(user)
class ProjectReimport(models.Model):
class Status(models.TextChoices):
CREATED = 'created', _('Created')
IN_PROGRESS = 'in_progress', _('In progress')
FAILED = 'failed', _('Failed')
COMPLETED = 'completed', _('Completed')
project = models.ForeignKey(
'projects.Project', null=True, related_name='reimports', on_delete=models.CASCADE
)
status = models.CharField(max_length=64, choices=Status.choices, default=Status.CREATED)
error = models.TextField(null=True, blank=True)
task_count = models.IntegerField(default=0)
annotation_count = models.IntegerField(default=0)
prediction_count = models.IntegerField(default=0)
duration = models.IntegerField(default=0)
file_upload_ids = models.JSONField(default=list)
files_as_tasks_list = models.BooleanField(default=False)
found_formats = models.JSONField(default=list)
data_columns = models.JSONField(default=list)
traceback = models.TextField(null=True, blank=True)
def has_permission(self, user):
return self.project.has_permission(user)
ЭТО ПОЛЬЗОВАТЕЛЬСКАЯ МОДЕЛЬ
class UserManager(BaseUserManager):
use_in_migrations = True
def _create_user(self, email, password, **extra_fields):
"""
Create and save a user with the given email and password.
"""
if not email:
raise ValueError('Must specify an email address')
email = self.normalize_email(email)
user = self.model(email=email, **extra_fields)
user.set_password(password)
user.save(using=self._db)
return user
def create_user(self, email, password=None, **extra_fields):
extra_fields.setdefault('is_staff', False)
extra_fields.setdefault('is_superuser', False)
return self._create_user(email, password, **extra_fields)
def create_superuser(self, email, password, **extra_fields):
extra_fields.setdefault('is_staff', True)
extra_fields.setdefault('is_superuser', True)
if extra_fields.get('is_staff') is not True:
raise ValueError('Superuser must have is_staff=True.')
if extra_fields.get('is_superuser') is not True:
raise ValueError('Superuser must have is_superuser=True.')
return self._create_user(email, password, **extra_fields)
class UserLastActivityMixin(models.Model):
last_activity = models.DateTimeField(
_('last activity'), default=timezone.now, editable=False)
def update_last_activity(self):
self.last_activity = timezone.now()
self.save(update_fields=["last_activity"])
class Meta:
abstract = True
UserMixin = load_func(settings.USER_MIXIN)
class User(UserMixin, AbstractBaseUser, PermissionsMixin, UserLastActivityMixin):
"""
An abstract base class implementing a fully featured User model with
admin-compliant permissions.
Username and password are required. Other fields are optional.
"""
username = models.CharField(_('username'), max_length=256)
email = models.EmailField(_('email address'), unique=True, blank=True)
first_name = models.CharField(_('first name'), max_length=256, blank=True)
last_name = models.CharField(_('last name'), max_length=256, blank=True)
phone = models.CharField(_('phone'), max_length=256, blank=True)
avatar = models.ImageField(upload_to=hash_upload, blank=True)
is_staff = models.BooleanField(_('staff status'), default=False,
help_text=_('Designates whether the user can log into this admin site.'))
is_active = models.BooleanField(_('active'), default=True,
help_text=_('Designates whether to treat this user as active. '
'Unselect this instead of deleting accounts.'))
date_joined = models.DateTimeField(_('date joined'), default=timezone.now)
activity_at = models.DateTimeField(_('last annotation activity'), auto_now=True)
active_organization = models.ForeignKey(
'organizations.Organization',
null=True,
on_delete=models.SET_NULL,
related_name='active_users'
)
allow_newsletters = models.BooleanField(
_('allow newsletters'),
null=True,
default=None,
help_text=_('Allow sending newsletters to user')
)
objects = UserManager()
EMAIL_FIELD = 'email'
USERNAME_FIELD = 'email'
REQUIRED_FIELDS = ()
class Meta:
db_table = 'htx_user'
verbose_name = _('user')
verbose_name_plural = _('users')
indexes = [
models.Index(fields=['username']),
models.Index(fields=['email']),
models.Index(fields=['first_name']),
models.Index(fields=['last_name']),
models.Index(fields=['date_joined']),
]
@property
def avatar_url(self):
if self.avatar:
if settings.CLOUD_FILE_STORAGE_ENABLED:
return self.avatar.url
else:
return settings.HOSTNAME + self.avatar.url
def is_organization_admin(self, org_pk):
return True
def active_organization_annotations(self):
return self.annotations.filter(project__organization=self.active_organization)
def active_organization_contributed_project_number(self):
annotations = self.active_organization_annotations()
return annotations.values_list('project').distinct().count()
@property
def own_organization(self):
return Organization.objects.get(created_by=self)
@property
def has_organization(self):
return Organization.objects.filter(created_by=self).exists()
def clean(self):
super().clean()
self.email = self.__class__.objects.normalize_email(self.email)
def name_or_email(self):
name = self.get_full_name()
if len(name) == 0:
name = self.email
return name
def get_full_name(self):
"""
Return the first_name and the last_name for a given user with a space in between.
"""
full_name = '%s %s' % (self.first_name, self.last_name)
return full_name.strip()
def get_short_name(self):
"""Return the short name for the user."""
return self.first_name
def reset_token(self):
token = Token.objects.filter(user=self)
if token.exists():
token.delete()
return Token.objects.create(user=self)
def get_initials(self):
initials = '?'
if not self.first_name and not self.last_name:
initials = self.email[0:2]
elif self.first_name and not self.last_name:
initials = self.first_name[0:1]
elif self.last_name and not self.first_name:
initials = self.last_name[0:1]
elif self.first_name and self.last_name:
initials = self.first_name[0:1] + self.last_name[0:1]
return initials
@receiver(post_save, sender=User)
def init_user(sender, instance=None, created=False, **kwargs):
if created:
# create token for user
Token.objects.create(user=instance)
Я попытался изменить поле create_by проекта с admin на user, я думал, что это даст мне возможность показать проект пользователю, но я не знаю, как реализовать эту идею. Также как администратор я хочу назначить проект нескольким пользователям.