Потоковый процессор Wso2: Произошла ошибка при обработке eventByteBufferQueue

У меня есть два узла аналитического сервера wso2-am (2.6.0), который является процессорами Wso2 Stream. Я вижу следующую ошибку на пассивном узле кластера. Активный узел в порядке, и я не вижу ошибок. Результат аналитики не влияет на пользователей, просматривающих данные в API Publisher или Store. однако есть ошибка в пассивном узле.

Посоветуйте пожалуйста, что вызывает следующую проблему..

2019-02-26 17: 06: 09,513] ОШИБКА {org.wso2.carbon.stream.processor.core.ha.tcp.EventSyncServer} - Произошла ошибка при обработке события eventByteBufferQueue null java.nio.BufferUnderflowException

0 ответов

Just meet the same issue, here is my problem and solution. 1) Using the WSO2 SP HA deployment. 2) When Event come in active node and according the source mapping of the streaming, some fields are NULL 3) Active Node would like sync this event to passive node 4) passive node pick up the event data from the 'eventByteBufferQueue' to meet the standby-take over mechanism 5) passive node cannot parse the data from active node and reports error exception.

the root cause is SP only support NULL String by default, when NULL with LONG, INTEGER.. the error occurred. but for me, Long fields have NULL is the normal case, you can change data type to string.

here is my solution: org.wso2.carbon.stream.processor.core_2.0.478.jar Add logic to support NULL BinaryMessageConverterUtil.java for sending event data from active node

public final class BinaryMessageConverterUtil {

public static int getSize(Object data) {
    if (data instanceof String) {
        return 4 + ((String) data).length();
    } else if (data instanceof Integer) {
        return 4;
    } else if (data instanceof Long) {
        return 8;
    } else if (data instanceof Float) {
        return 4;
    } else if (data instanceof Double) {
        return 8;
    } else if (data instanceof Boolean) {
        return 1;
    } else if (data == null) {
        return 0;
    }else {
        //TODO
        return 4;
    }
}

public static EventDataMetaInfo getEventMetaInfo(Object data) {
    int eventSize;
    Attribute.Type attributeType;
    if (data instanceof String) {
        attributeType = Attribute.Type.STRING;
        eventSize = 4 + ((String) data).length();
    } else if (data instanceof Integer) {
        attributeType = Attribute.Type.INT;
        eventSize = 4;
    } else if (data instanceof Long) {
        attributeType = Attribute.Type.LONG;
        eventSize = 8;
    } else if (data instanceof Float) {
        attributeType = Attribute.Type.FLOAT;
        eventSize = 4;
    } else if (data instanceof Double) {
        attributeType = Attribute.Type.DOUBLE;
        eventSize = 8;
    } else if (data instanceof Boolean) {
        attributeType = Attribute.Type.BOOL;
        eventSize = 1;
    } else if (data == null){
        attributeType = Attribute.Type.OBJECT;
        eventSize = 0; //'no content between the HA nodes for NULL fields'
    } else {
        //TODO
        attributeType = Attribute.Type.OBJECT;
        eventSize = 1;
    }
    return new EventDataMetaInfo(eventSize, attributeType);
}

public static void assignData(Object data, ByteBuffer eventDataBuffer) throws IOException {
    if (data instanceof String) {
        eventDataBuffer.putInt(((String) data).length());
        eventDataBuffer.put((((String) data).getBytes(Charset.defaultCharset())));
    } else if (data instanceof Integer) {
        eventDataBuffer.putInt((Integer) data);
    } else if (data instanceof Long) {
        eventDataBuffer.putLong((Long) data);
    } else if (data instanceof Float) {
        eventDataBuffer.putFloat((Float) data);
    } else if (data instanceof Double) {
        eventDataBuffer.putDouble((Double) data);
    } else if (data instanceof Boolean) {
        eventDataBuffer.put((byte) (((Boolean) data) ? 1 : 0));
    } else if (data == null){
        //put nothing into he Buffer
    } else {
        eventDataBuffer.putInt(0);
    }
}

public static String getString(ByteBuf byteBuf, int size) throws UnsupportedEncodingException {
    byte[] bytes = new byte[size];
    byteBuf.readBytes(bytes);
    return new String(bytes, Charset.defaultCharset());
}

public static String getString(ByteBuffer byteBuf, int size) throws UnsupportedEncodingException {
    byte[] bytes = new byte[size];
    byteBuf.get(bytes);
    return new String(bytes, Charset.defaultCharset());
}

}

SiddhiEventConverter.java for processing event data at passive node

static Object[] toObjectArray(ByteBuffer byteBuffer,
                              String[] attributeTypeOrder) throws UnsupportedEncodingException {
    if (attributeTypeOrder != null) {
        Object[] objects = new Object[attributeTypeOrder.length];
        for (int i = 0; i < attributeTypeOrder.length; i++) {
            switch (attributeTypeOrder[i]) {
                case "INT":
                    objects[i] = byteBuffer.getInt();
                    break;
                case "LONG":
                    objects[i] = byteBuffer.getLong();
                    break;
                case "STRING":
                    int stringSize = byteBuffer.getInt();
                    if (stringSize == 0) {
                        objects[i] = null;
                    } else {
                        objects[i] = BinaryMessageConverterUtil.getString(byteBuffer, stringSize);
                    }
                    break;
                case "DOUBLE":
                    objects[i] = byteBuffer.getDouble();
                    break;
                case "FLOAT":
                    objects[i] = byteBuffer.getFloat();
                    break;
                case "BOOL":
                    objects[i] = byteBuffer.get() == 1;
                    break;
                case "OBJECT":
                    //for NULL fields
                    objects[i] = null;
                    break;
                default:
                    // will not occur
            }
        }
        return objects;
    } else {
        return null;
    }
}
Другие вопросы по тегам