Как данные разделяются на файлы деталей в sqoop

Я сомневаюсь, как данные разбиваются на файлы деталей, если данные искажены. Если возможно, пожалуйста, помогите мне прояснить это.

Допустим это мое department стол с department_id в качестве первичного ключа.

mysql> select * from departments;
2 Fitness
3 Footwear
4 Apparel
5 Golf
6 Outdoors
7 Fan Shop

Если я использую sqoop import упоминая -m 1 в команде import я знаю, что будет создан только один файл детали со всеми записями в нем.

Теперь я запустил команду без указания каких-либо картографов. Таким образом, по умолчанию требуется 4 картографа, и он создал 4 файла частей в HDFS. Ниже показано, как записи распределяются по файлам деталей.

[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00000
2,Fitness
3,Footwear
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00001
4,Apparel
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00002
5,Golf
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00003
6,Outdoors
7,Fan Shop

В соответствии с BoundingValsQuery, по умолчанию должны использоваться Min(департамент_ид)=2, Макс (отдел_ид)=8 и 4 сопоставления.

После расчета каждый картограф должен получить (8-2)/4=1.5 записей.

Здесь я не понимаю, как распространять данные. Я не мог понять, как две записи попали в часть-m-00000 и только одна в часть-m-00001, часть-m-00002 и снова две в часть-m-00003.

2 ответа

Sqoop находит минимальное и максимальное значение в столбце первичного ключа или в столбце разбиения по столбцам, а затем пытается разделить диапазон для заданного числа картографов.

Например, если у вас есть таблица с идентификатором столбца первичного ключа, минимальное значение которого равно 0, а максимальное значение равно 1000, и Sqoop был направлен на использование 4 задач, Sqoop будет запускать четыре процесса, каждый из которых выполняет операторы SQL в форме SELECT * FROM. sometable WHERE id >= lo AND id

Здесь min val =2 max=7, следовательно, sqoop будет запускать четыре процесса со следующими диапазонами ( 2-4), (4-5), (5-6),(6-7), что означает

  1. 2 и 3 вместе
  2. 4-я запись
  3. 5-я запись
  4. 6 и 7 в этом диапазоне

Если у вас есть возможность заглянуть в библиотеку. В этом есть последовательность шагов.

Работа Sqoop Чтение записей. через DBRecordReader

 org.apache.sqoop.mapreduce.db.DBRecordReader

Два метода будут работать здесь.

способ 1.

protected ResultSet executeQuery(String query) throws SQLException {
Integer fetchSize = dbConf.getFetchSize();
/*get fetchSize according to split which is calculated via getSplits() method of 
org.apache.sqoop.mapreduce.db.DBInputFormat.And no. of splits are calculated
via no. of (count from table/no. of mappers). */
 }

Сплит Расчет:-

org.apache.sqoop.mapreduce.db.DBInputFormat
 public List<InputSplit> getSplits(JobContext job) throws IOException {
 .......//here splits are calculated accroding to count of source table
 .......query.append("SELECT COUNT(*) FROM " + tableName);
}   

способ 2.

 protected String getSelectQuery() {
    if (dbConf.getInputQuery() == null) {
      query.append("SELECT ");

      for (int i = 0; i < fieldNames.length; i++) {
        query.append(fieldNames[i]);
        if (i != fieldNames.length -1) {
          query.append(", ");
        }
      }

      query.append(" FROM ").append(tableName);
      query.append(" AS ").append(tableName); 
      if (conditions != null && conditions.length() > 0) {
        query.append(" WHERE (").append(conditions).append(")");
      }

      String orderBy = dbConf.getInputOrderBy();
      if (orderBy != null && orderBy.length() > 0) {
        query.append(" ORDER BY ").append(orderBy);
      }
    } else {
      //PREBUILT QUERY
      query.append(dbConf.getInputQuery());
    }

    try {// main logic to decide division of records between mappers.
      query.append(" LIMIT ").append(split.getLength());
      query.append(" OFFSET ").append(split.getStart());
    } catch (IOException ex) {
      // Ignore, will not throw.
    }

    return query.toString();
  }

проверьте секцию кода в комментариях к основной логике....... Здесь записи делятся в соответствии с LIMIT и OFFSET. И эта логика реализована по-разному для каждой СУБД. просто искать org.apache.sqoop.mapreduce.db.OracleDBRecordReader у него немного другая реализация метода getSelectQuery().

Надеюсь, что это дает быстрое представление о том, как записи делятся на разных картографов.

Другие вопросы по тегам