ОШИБКА:SparkContext может использоваться только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063

В настоящее время я работаю с ASN 1 Decoder. Я получу шестнадцатеричный десятичный код от производителя, и я буду собирать его у потребителя. Затем, после того как я буду преобразовывать шестнадцатеричный код в RDD, а затем передать шестнадцатеричное значение RDD другой функции с тем же классом Decode_Module, я буду использовать декодер python asn1 для декодирования шестнадцатеричных данных и их возврата и печати. Я не понимаю, что не так с моим кодом. Я уже установил зависимости парсера asn1 и на рабочих узлах. Любая ошибка в том, как я называю лямбда-выражение или что-то еще.

Моя ОШИБКА: Исключение: похоже, вы пытаетесь сослаться на SparkContext из широковещательной переменной, действия или преобразования. SparkContext может использоваться только в драйвере, а не в коде, который он запускает на рабочих. Для получения дополнительной информации см. SPARK-5063

ПОЖАЛУЙСТА, ПОМОГИТЕ МНЕ, СПАСИБО

Мой код:

class telco_cn:

 def __init__(self,sc):
    self.sc = sc
    print ('in init function')
    logging.info('eneterd into init function')

 def decode_module(self,msg):
        try:
            logging.info('Entered into generate module')
            ### Providing input for module we need to load
            load_module(config_values['load_module'])
            ### Providing Value for Type of Decoding
            ASN1.ASN1Obj.CODEC = config_values['PER_DECODER']
            ### Providing Input for Align/UnAlign
            PER.VARIANT = config_values['PER_ALIGNED']
            ### Providing Input for pdu load
            pdu = GLOBAL.TYPE[config_values['pdu_load']]
            ### Providing Hex value to buf
            buf = '{}'.format(msg).decode('hex')
            return val
        except Exception as e:
            logging.debug('error in decode_module function %s' %str(e))


 def consumer_input(self,sc,k_topic):
            logging.info('entered into consumer input');print(k_topic)
            consumer = KafkaConsumer(ip and other values given)
            consumer.subscribe(k_topic)
            for msg in consumer:
                print(msg.value);
                a = sc.parallelize([msg.value])
                d = a.map(lambda x: self.decode_module(x)).collect()
                print d

if __name__ == "__main__":
    logging.info('Entered into main')
    conf = SparkConf()
    conf.setAppName('telco_consumer')
    conf.setMaster('yarn-client')
    sc = SparkContext(conf=conf)
    sqlContext = HiveContext(sc)
    cn = telco_cn(sc)
    cn.consumer_input(sc,config_values['kafka_topic'])

3 ответа

Это потому что self.decode_module содержать экземпляр SparkContext.

Чтобы исправить свой код вы можете использовать @staticmethod:

class telco_cn:
    def __init__(self, sc):
        self.sc = sc

    @staticmethod
    def decode_module(msg):
        return msg

    def consumer_input(self, sc, k_topic):
        a = sc.parallelize(list('abcd'))
        d = a.map(lambda x: telco_cn.decode_module(x)).collect()
        print d


if __name__ == "__main__":
    conf = SparkConf()
    sc = SparkContext(conf=conf)
    cn = telco_cn(sc)
    cn.consumer_input(sc, '')

Для получения дополнительной информации:

http://spark.apache.org/docs/latest/programming-guide.html

Вы не можете ссылаться на метод экземпляра ( self.decode_module) внутри лямбда-выражения, поскольку объект экземпляра содержит ссылку на SparkContext.

Это происходит из-за того, что PySpark внутренне пытается обработать все, что он получает, чтобы отправить своим рабочим. Итак, когда вы говорите, что он должен выполняться self.decode_module()внутри узлов PySpark пытается обработать весь (собственный) объект (который содержит ссылку на контекст искры).

Чтобы исправить это, вам просто нужно удалить ссылку SparkContext из telco_cn class и используйте другой подход, например использование SparkContext перед вызовом экземпляра класса (как предлагает ответ Чжанга).

Со мной проблема была:

      text_df = "some text"
convertUDF = udf(lambda z: my_fynction(z), StringType())
cleaned_fun = text_df.withColumn('cleaned', udf(convertUDF, StringType())('text'))

Я давал udf() дважды. Только что сделал это:

      convertUDF = lambda z: my_fynction(z)
cleaned_fun = text_df.withColumn('cleaned', udf(convertUDF, StringType())('text'))

и решил ошибку

Другие вопросы по тегам