java.io.NotSerializableException: com.impetus.kundera.persistence.EntityManagerImpl

Я хочу использовать Кундера в качестве дао-слоя для Apache Storm, чтобы хранить данные в базе данных Кассандры. Моя топология создается без проблем, и я могу получать сообщения, но когда я пытался сохранить данные в базе данных, я получаю сообщение:

26240 [main-EventThread] INFO  o.a.s.s.o.a.c.f.s.ConnectionStateManager - State change: CONNECTED
26275 [main] INFO  o.a.s.d.supervisor - Starting supervisor with id 1db18608-a2df-47e0-8ae6-cc634c90f81d at host 10.0.0.4
26308 [main] ERROR o.a.s.s.o.a.z.s.NIOServerCnxnFactory - Thread Thread[main,5,main] died
java.lang.IllegalStateException: Bolt 'bolt1' contains a non-serializable field of type com.impetus.kundera.persistence.EntityManagerImpl, which was instantiated prior to topology creation. com.impetus.kundera.persistence.EntityManagerImpl should be instantiated within the prepare method of 'bolt1 at the earliest.
        at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:127) ~[storm-core-1.0.0.jar:1.0.0]
        at topology.ConnectorTopology.main(ConnectorTopology.java:52) ~[Zorro-0.0.1-SNAPSHOT-jar-with-dependencies.jar:?]
Caused by: java.lang.RuntimeException: java.io.NotSerializableException: com.impetus.kundera.persistence.EntityManagerImpl
        at org.apache.storm.utils.Utils.javaSerialize(Utils.java:167) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:122) ~[storm-core-1.0.0.jar:1.0.0]
        ... 1 more
Caused by: java.io.NotSerializableException: com.impetus.kundera.persistence.EntityManagerImpl
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) ~[?:1.7.0_99]
        at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) ~[?:1.7.0_99]
        at org.apache.storm.utils.Utils.javaSerialize(Utils.java:163) ~[storm-core-1.0.0.jar:1.0.0]
        at org.apache.storm.topology.TopologyBuilder.createTopology(TopologyBuilder.java:122) ~[storm-core-1.0.0.jar:1.0.0]
        ... 1 more

в топологическом классе у меня есть:

brokerSpout = new BrokerSpout(rabbitMQAMQP);
            builder.setSpout("spout1", brokerSpout);
printerbolt = new PrinterBolt();
            builder.setBolt("bolt1", printerbolt).shuffleGrouping("spout1");

в моем классе дао (который реализует сериализуемый интерфейс)

private EntityManager em;
    @Transient
    private EntityManagerFactory emf;
    public SensorDAOImpl() {
        // TODO Auto-generated constructor stub

         emf = Persistence.createEntityManagerFactory("cassandra_pu");
         em = emf.createEntityManager();
    }




    @Override
    public void insert(Object entity)
    {

        em.persist(entity);

    }

мой код болта

private OutputCollector collector;
    public PrinterBolt() {
        this.index=0;
    EntityManagerFactory emf = Persistence.createEntityManagerFactory("cassandra_pu");
    EntityManager em= emf.createEntityManager();
        database= new DatabaseController(em);
        // TODO Auto-generated constructor stub
    }

    /* (non-Javadoc)
     * @see org.apache.storm.task.IBolt#execute(org.apache.storm.tuple.Tuple)
     */
    public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context,
            OutputCollector collector) {
        this.collector = collector;
    }
    public void execute(Tuple tuple) {
        //tuple.getBinaryByField(arg0)
        String message = tuple.getStringByField("message");
        System.out.println("Receive ["+index+"]"+message);
        database.saveEntitie(tuple.getBinaryByField("message"));
        index++;

    }

Итак, как я могу решить эту проблему, (я не могу коснуться класса com.impetus.kundera.persistence.EntityManagerImpl)

1 ответ

Решение

Создайте поле типа com.impetus.kundera.persistence.EntityManagerImpl в методе подготовки вашего болта.

public void prepare(@SuppressWarnings("rawtypes") Map config, TopologyContext context,
            OutputCollector collector) {
     emf = Persistence.createEntityManagerFactory("cassandra_pu");
     em = emf.createEntityManager();
     database = new DatabaseController(em);
     this.collector = collector;
    }
Другие вопросы по тегам