Распределить часть кода по нескольким узлам в spark/scala

У меня размер файла около 10 ГБ. Мне нужно извлечь данные и вставить их в несколько таблиц улья.

Я могу выполнить некоторые функции подготовки / отображения в одном узле.

Пример данных:

Dept : HR
Emp name is Andrew lives in Colorodo
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Alex lives in Texas
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Mathew lives in California
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016

Dept : QC
Emp name is Nguyen lives in Nevada
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : Retail
DOJ : 11/04/2011
DOL : 08/21/2013
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Cassey lives in Newyork
DOB : 03/09/1958
Project name : Healthcare
DOJ : 06/04/2011
DOL : 09/21/2011
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016
Emp name is Ronney lives in Alasca
DOB : 03/09/1958
Project name : Audit
DOJ : 09/11/2013
DOL : 09/01/2014
Project name : ContorlManagement
DOJ : 01/08/2015
DOL : 02/14/2016

Вот мой код:

val inp =  sc.textFile("F:\\Softwares\\Spark\\Programs for td\\input\\sample.txt").collect().mkString(" ").replaceAll("""[\r\n]+""", " ")
     val pt1 = """(?s)Dept : .*?(?=\bDept : |$)""".r
     val pt2 = """(?s)Emp name : .*?(?=\bEmp name : |$)""".r
     val dname = """Dept : (\S+).*?""".r
     val emprec = """Emp name : (\S+) lives in (\S+) DOB : (\S+).*?""".r
     val prrec = """Project name : (\S+) DOJ : (\S+) DOL : (\S+) """.r
     var recs = pt1.findAllIn(inp)
     var rec1 = recs

После этого rec1( Regex.MatchIterator) данные становятся такими, как показано ниже:

Dept : HR Emp name : Andrew lives in Colorodo DOB : 03/09/1958 Project name :Healthcare DOJ : 06/04/2011 DOL : 09/21/2011 Project name : Retail DOJ : 11/04/2011 DOL : 08/21/2013 Project name : Audit DOJ : 09/11/2013 DOL : 09/01/2014 Project name : ContorlManagement DOJ : 01/08/2015 DOL : 02/14/2016 Emp name : Alex lives in Texas DOB : 03/09/1958 Project name : Healthcare DOJ : 06/04/2011 DOL : 09/21/2011 Project name : ContorlManagement DOJ : 01/08/2015 DOL : 02/14/2016 Emp name : Mathew lives in California DOB : 03/09/1958 Project name : Healthcare DOJ : 06/04/2011 DOL : 09/21/2011 Project name : Retail DOJ : 11/04/2011 DOL : 08/21/2013 Project name : Audit DOJ : 09/11/2013 DOL : 09/01/2014 Project name : ContorlManagement DOJ : 01/08/2015 DOL : 02/14/2016  



Dept : QC Emp name : Nguyen lives in Nevada DOB : 03/09/1958 Project name : Healthcare DOJ : 06/04/2011 DOL : 09/21/2011 Project name : Retail DOJ : 11/04/2011 DOL : 08/21/2013 Project name : Audit DOJ : 09/11/2013 DOL : 09/01/2014 Project name : ContorlManagement DOJ : 01/08/2015 DOL : 02/14/2016 Emp name : Cassey lives in Newyork DOB : 03/09/1958 Project name : Healthcare DOJ : 06/04/2011 DOL : 09/21/2011 Project name : ContorlManagement DOJ : 01/08/2015 DOL : 02/14/2016 Emp name : Ronney lives in Alasca DOB : 03/09/1958 Project name : Audit DOJ : 09/11/2013 DOL : 09/01/2014 Project name : ContorlManagement DOJ : 01/08/2015 DOL : 02/14/2016

То есть, каждый отдел данных становится отдельной строкой. Иногда это может варьироваться от 300 до 400 тысяч строк. Вот следующая часть кода, которая форматирует его и преобразует их в фрейм данных. (Мне еще предстоит написать код для вставки этого Df в улей. Но я думаю, что могу написать это внизу этой страницы).

   var recfinal =  recs.map
      {
        e=>
 //  println(e)
      var dname(d) = e
   //    println(d)
      var rec2= pt2.findAllIn(e)

        var rec5 =rec2.map { k =>
    //   println(k)
                var emprec(ename,est,edob) = k
     //     println(ename,est,edob)
                var rec3 = prrec.findAllIn(k) 

               var rec4 =  rec3.map { j =>
                            var prrec(prjname,doj,dol) = j
                            var reclist = (d,ename,est,edob,prjname,doj,dol)
                             reclist
                           }
                .toList
                rec4
                  }
       .toList
       rec5

      }
    .flatMap(identity)
   .flatMap(identity)
  val rec9= recfinal.toSeq.toDF("dname","en","st","db","prj","dj","dl")
  rec9.show

1) Как я могу распространить вторую часть кода на несколько узлов исполнителей и в то же время Данные также должны быть разделены эквивалентно для выполнения. Как у меня есть кластер из 4 узлов, и каждый узел должен получить 100 тыс. Строк и выполнить только 2-ю часть кода. Но в конце все это должно быть в одном столе улья.

0 ответов

Другие вопросы по тегам