Таблица слияния RhinoETL с результатами веб-сервиса

У меня есть таблица в SQL Server, которая заполняется из веб-службы. Я хочу, чтобы он обновлялся по расписанию. Я хотел бы иметь что-то похожее на операцию SQL слияния.

То есть я определяю мой источник (веб-сервис) и цель (таблица SQL), и я определяю, как обрабатывать пропущенные данные из пропавших без вести целей и совпадений.

Давайте рассмотрим сценарий, в котором у меня есть только два поля в таблице Description и Deleted, а веб-служба предоставляет только Description.

  • Если описание присутствует как в таблице, так и в веб-сервисе, то я только что обновил (или нет).

  • Если описание присутствует в веб-сервисе, но отсутствует в таблице, я хочу вставить его

  • Если описание больше не присутствует на веб-сервере, я хочу, чтобы оно было помечено как Удалено = true

Что у меня сейчас есть:

public class WebServiceResults: AbstractOperation
{
    public WebServiceResults()
    {
        var WebService = new WebService();
        WSResults = WebService.GetResults();
    }
    IEnumerable<WSResult> WSResults  { get; set; }
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        foreach(var obj in WSResults)
            yield return Row.FromObject(obj);
    }

}

class SQLTableResults : AbstractOperation
{
    public SQLTableResults()
    {
        SQLResults = data.MyTable.Select(x=>new {x.Description,x.Deletet});
    }
    Data data = new Data();
    IEnumerable<SQLResult> SQLResults  { get; set; }
    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        foreach (var obj in SQLResults)
            yield return Row.FromObject(obj);
        }
    }
    public override void Dispose()
    {
        data.Dispose();
        base.Dispose();
    }
}

class JoinTables : JoinOperation
{

    protected override Row MergeRows(Row leftRow, Row rightRow)
    {
        Row row = leftRow.Clone();
        row["Description2"] = rightRow["Description"];
        return row;
    }

    protected override void SetupJoinConditions()
    {
        FullOuterJoin
            .Left("Description")
            .Right("Description");
    }
}

class MergeTables : AbstractOperation
{
    Data data = new Data();
    public MergeTables()
    { }

    public override IEnumerable<Row> Execute(IEnumerable<Row> rows)
    {
        foreach (var obj in rows)
        {
            if (String.IsNullOrEmpty((string)obj["Description2"]))
            {
                //Code for not matched at target
                yield return Row.FromObject(obj);
            }
            if (String.IsNullOrEmpty((string)obj["Description"]))
            {
                //Code for not matched at source
                yield return Row.FromObject(obj);
            }

            {
                //Code for matched
                yield return Row.FromObject(obj);
            }
        }
    }
    public override void Dispose()
    {
        data.Dispose();
        base.Dispose();
    }
}

protected override void Initialize()
{
    Register(
           new JoinTables()
           .Left(new SQLTableResults())
           .Right(new WebServiceResults())
       );
    Register(new MergeTables());
    foreach (var error in GetAllErrors())
        Console.Write(error.Message);
}

Это путь? Я хотел бы представить что-то более ступенчатый процесс, как

Register(new NotMatchedAtSourceOperation());
Register(new NotMatchedAtTargetOperation());
Register(new MatchedOperation());

но, насколько я понимаю, каждый регистр возвращает свои строки следующему, поэтому, если я отфильтрую несоответствующие, то два других ничего не сделают.

Должен ли я создать новый процесс для каждого случая?

Кстати, я ищу документацию по RhinoEtl. Знаете ли вы какие-либо ссылки? Любые учебники?

1 ответ

Решение

Определите действие слияния в одной операции, используя полное внешнее соединение. Вы можете увидеть пример здесь. Это не совсем то, что вам нужно, поэтому я постараюсь адаптировать его к вашей ситуации ниже:

protected override Row MergeRows(Row wsRow, Row dbRow) {

    Row row;

    // if the db row doesn't exist, then the ws row is new, and it should be inserted
    if (dbRow["id"] == null) {
        row = wsRow.Clone();
        row["action"] = "Insert";
        row["deleted"] = false;
        return row;
    }

    // if the ws row doesn't exist, it should be marked as deleted in the database (if not already)
    if (wsRow["id"] == null) {
        row = dbRow.Clone();
        row["deleted"] = true;
        row["action"] = dbRow["deleted"].Equals(true) ? "None" : "Update";
        return row;
    }

    // ws and db descriptions match, but check and make sure it's not marked as deleted in database
    row = wsRow.Clone();
    row["deleted"] = false;
    row["action"] = dbRow["deleted"].Equals(true) ? "Update" : "None";
    return row;

}

protected override void SetupJoinConditions() {
    FullOuterJoin.Left("description").Right("description");
}

После выполнения этой операции каждая строка будет иметь действие "Вставить", "Обновить" или "Нет". На основе этого действия вы можете составить операторы вставки и обновления для выполнения SqlBatchOperation.

Другие вопросы по тегам