Как данные разделяются на файлы деталей в sqoop
Я сомневаюсь, как данные разбиваются на файлы деталей, если данные искажены. Если возможно, пожалуйста, помогите мне прояснить это.
Допустим это мое department
стол с department_id
в качестве первичного ключа.
mysql> select * from departments;
2 Fitness
3 Footwear
4 Apparel
5 Golf
6 Outdoors
7 Fan Shop
Если я использую sqoop import
упоминая -m 1
в команде import я знаю, что будет создан только один файл детали со всеми записями в нем.
Теперь я запустил команду без указания каких-либо картографов. Таким образом, по умолчанию требуется 4 картографа, и он создал 4 файла частей в HDFS. Ниже показано, как записи распределяются по файлам деталей.
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00000
2,Fitness
3,Footwear
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00001
4,Apparel
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00002
5,Golf
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00003
6,Outdoors
7,Fan Shop
В соответствии с BoundingValsQuery, по умолчанию должны использоваться Min(департамент_ид)=2, Макс (отдел_ид)=8 и 4 сопоставления.
После расчета каждый картограф должен получить (8-2)/4=1.5 записей.
Здесь я не понимаю, как распространять данные. Я не мог понять, как две записи попали в часть-m-00000 и только одна в часть-m-00001, часть-m-00002 и снова две в часть-m-00003.
2 ответа
Sqoop находит минимальное и максимальное значение в столбце первичного ключа или в столбце разбиения по столбцам, а затем пытается разделить диапазон для заданного числа картографов.
Например, если у вас есть таблица с идентификатором столбца первичного ключа, минимальное значение которого равно 0, а максимальное значение равно 1000, и Sqoop был направлен на использование 4 задач, Sqoop будет запускать четыре процесса, каждый из которых выполняет операторы SQL в форме SELECT * FROM. sometable WHERE id >= lo AND id Здесь min val =2 max=7, следовательно, sqoop будет запускать четыре процесса со следующими диапазонами ( 2-4), (4-5), (5-6),(6-7), что означает
Если у вас есть возможность заглянуть в библиотеку. В этом есть последовательность шагов.
Работа Sqoop Чтение записей. через DBRecordReader
org.apache.sqoop.mapreduce.db.DBRecordReader
Два метода будут работать здесь.
способ 1.
protected ResultSet executeQuery(String query) throws SQLException {
Integer fetchSize = dbConf.getFetchSize();
/*get fetchSize according to split which is calculated via getSplits() method of
org.apache.sqoop.mapreduce.db.DBInputFormat.And no. of splits are calculated
via no. of (count from table/no. of mappers). */
}
Сплит Расчет:-
org.apache.sqoop.mapreduce.db.DBInputFormat
public List<InputSplit> getSplits(JobContext job) throws IOException {
.......//here splits are calculated accroding to count of source table
.......query.append("SELECT COUNT(*) FROM " + tableName);
}
способ 2.
protected String getSelectQuery() {
if (dbConf.getInputQuery() == null) {
query.append("SELECT ");
for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length -1) {
query.append(", ");
}
}
query.append(" FROM ").append(tableName);
query.append(" AS ").append(tableName);
if (conditions != null && conditions.length() > 0) {
query.append(" WHERE (").append(conditions).append(")");
}
String orderBy = dbConf.getInputOrderBy();
if (orderBy != null && orderBy.length() > 0) {
query.append(" ORDER BY ").append(orderBy);
}
} else {
//PREBUILT QUERY
query.append(dbConf.getInputQuery());
}
try {// main logic to decide division of records between mappers.
query.append(" LIMIT ").append(split.getLength());
query.append(" OFFSET ").append(split.getStart());
} catch (IOException ex) {
// Ignore, will not throw.
}
return query.toString();
}
проверьте секцию кода в комментариях к основной логике....... Здесь записи делятся в соответствии с LIMIT и OFFSET. И эта логика реализована по-разному для каждой СУБД. просто искать org.apache.sqoop.mapreduce.db.OracleDBRecordReader
у него немного другая реализация метода getSelectQuery().
Надеюсь, что это дает быстрое представление о том, как записи делятся на разных картографов.