Django bulk_create с игнорированием строк, которые вызывают IntegrityError?

Я использую bulk_create для загрузки тысяч или строк в базу данных postgresql. К сожалению, некоторые строки вызывают IntegrityError и останавливают процесс bulk_create. Мне было интересно, есть ли способ сказать Django игнорировать такие строки и сохранить как можно большую часть пакета?

7 ответов

Решение

(Примечание: я не использую Django, поэтому могут быть более подходящие ответы для конкретной структуры)

Django не может сделать это, просто игнорируя INSERT сбои, потому что PostgreSQL прерывает всю транзакцию при первой ошибке.

Джанго понадобится один из этих подходов:

  1. INSERT каждая строка в отдельной транзакции и игнорирование ошибок (очень медленно);
  2. Создать SAVEPOINT перед каждой вставкой (могут возникнуть проблемы с масштабированием);
  3. Используйте процедуру или запрос для вставки, только если строка еще не существует (сложная и медленная); или же
  4. Массовая вставка или (лучше) COPY данные в TEMPORARY таблицу, затем объедините ее с основной таблицей на стороне сервера.

Подобный upsert подход (3) кажется хорошей идеей, но upsert и insert-if-not-Существует на удивление сложно.

Лично я бы взял (4): я бы массово вставил в новую отдельную таблицу, вероятно UNLOGGED или же TEMPORARY тогда я бы запустил несколько SQL вручную:

LOCK TABLE realtable IN EXCLUSIVE MODE;

INSERT INTO realtable 
SELECT * FROM temptable WHERE NOT EXISTS (
    SELECT 1 FROM realtable WHERE temptable.id = realtable.id
);

LOCK TABLE ... IN EXCLUSIVE MODE предотвращает конфликт при одновременной вставке, которая создает строку, с конфликтом со вставкой, выполненной вышеприведенным оператором, и с ошибкой Это не мешает одновременному SELECT только SELECT ... FOR UPDATE, INSERT, UPDATE а также DELETE Таким образом, чтение из таблицы продолжается как обычно.

Если вы не можете позволить себе слишком долго блокировать одновременную запись, вы можете вместо этого использовать записываемый CTE для копирования диапазонов строк из temptable в realtable, повторяя каждый блок, если это не удалось.

Теперь это возможно на Django 2.2

Django 2.2 добавляет новый ignore_conflicts вариант к bulk_create Метод, из документации:

В базах данных, которые его поддерживают (все, кроме PostgreSQL < 9.5 и Oracle), установка для параметра ignore_conflicts значения True указывает базе данных игнорировать сбой при вставке любых строк, которые не соответствуют ограничениям, таким как повторяющиеся уникальные значения. Включение этого параметра отключает установку первичного ключа для каждого экземпляра модели (если база данных обычно его поддерживает).

Пример:

Entry.objects.bulk_create([
    Entry(headline='This is a test'),
    Entry(headline='This is only a test'),
], ignore_conflicts=True)

Один быстрый и грязный обходной путь для этого, который не включает ручной SQL и временные таблицы, - это просто попытаться выполнить массовую вставку данных. Если это не удается, вернитесь к последовательной вставке.

objs = [(Event), (Event), (Event)...]

try:
    Event.objects.bulk_create(objs)

except IntegrityError:
    for obj in objs:
        try:
            obj.save()
        except IntegrityError:
            continue

Если у вас много и много ошибок, это может быть не так эффективно (вы будете тратить больше времени на последовательную вставку, чем на массовую), но я работаю с набором данных с большим количеством элементов с небольшим количеством дубликатов, поэтому это решает большинство моих проблем. проблемы.

Или 5. Разделяй и властвуй

Я не проверял и не тестировал это полностью, но он работает довольно хорошо для меня. YMMV, в частности, в зависимости от того, сколько ошибок вы ожидаете получить в массовой операции.

def psql_copy(records):
    count = len(records)
    if count < 1:
        return True
    try:
        pg.copy_bin_values(records)
        return True
    except IntegrityError:
        if count == 1:
            # found culprit!
            msg = "Integrity error copying record:\n%r"
            logger.error(msg % records[0], exc_info=True)
            return False
    finally:
        connection.commit()

    # There was an integrity error but we had more than one record.
    # Divide and conquer.
    mid = count / 2
    return psql_copy(records[:mid]) and psql_copy(records[mid:])
    # or just return False

Поздний ответ для проектов до Django 2.2:

Недавно я столкнулся с этой ситуацией, и я нашел выход с массивом списка второстепенных для проверки уникальности.

В моем случае модель имеет эту уникальную совместную проверку, а массовое создание выдает исключение Integrity Error, потому что в массиве массового создания есть повторяющиеся данные.

Поэтому я решил создать контрольный список помимо списка массового создания объектов. Вот пример кода; Уникальные ключи - это владелец и бренд, и в этом примере владелец - это экземпляр объекта пользователя, а бренд - экземпляр строки:

create_list = []
create_list_check = []
for brand in brands:
    if (owner.id, brand) not in create_list_check:
        create_list_check.append((owner.id, brand))
        create_list.append(ProductBrand(owner=owner, name=brand))

if create_list:
    ProductBrand.objects.bulk_create(create_list)

Даже в Django 1.11 нет способа сделать это. Я нашел лучший вариант, чем использование Raw SQL. Это используя djnago-query-builder. У него есть метод upsert

from querybuilder.query import Query
q = Query().from_table(YourModel)
# replace with your real objects
rows = [YourModel() for i in range(10)] 
q.upsert(rows, ['unique_fld1', 'unique_fld2'], ['fld1_to_update', 'fld2_to_update'])

Примечание: библиотека поддерживает только postgreSQL

Вот суть, которую я использую для массовой вставки, которая поддерживает игнорирование IntegrityErrors и возвращает вставленные записи.

это работа для меня,
я использую эту функцию в потоке.
мой файл csv содержит 120907 строк.

      def products_create():
    full_path = os.path.join(settings.MEDIA_ROOT,'productcsv')
    filename = os.listdir(full_path)[0]
    logger.debug(filename)
    logger.debug(len(Product.objects.all()))
    if len(Product.objects.all()) > 0:
        logger.debug("Products Data Erasing")
        Product.objects.all().delete()
        logger.debug("Products Erasing Done")

    csvfile = os.path.join(full_path,filename)
    csv_df = pd.read_csv(csvfile,sep=',')
    csv_df['HSN Code'] = csv_df['HSN Code'].fillna(0)
    row_iter = csv_df.iterrows()
    logger.debug(row_iter)

    logger.debug("New Products Creating")

    for index, row in row_iter:
        Product.objects.create(part_number = row[0],
                               part_description = row[1],
                               mrp = row[2],
                               hsn_code = row[3],
                               gst = row[4],
                               )

    # products_list = [
    #           Product(
    #               part_number = row[0] ,
    #               part_description = row[1],
    #               mrp = row[2],
    #               hsn_code = row[3],
    #               gst = row[4],
    #           )
    #           for index, row in row_iter

    #       ]
    # logger.debug(products_list)

    # Product.objects.bulk_create(products_list)
    logger.debug("Products uploading done")```

Другие вопросы по тегам