Обратные вызовы данных CFSocket

В приложении для iPhone я создаю объект CFSocket из существующего собственного сокета UDP и настраиваю обратный вызов данных всякий раз, когда сокет получает некоторые данные. Затем я добавляю это в мой основной цикл программы:

    //Set socket descriptor field
    cbData.s = udpSocket.getSocketDescriptor();

    CFSocketContext udpSocketContext;
    memset(&udpSocketContext, 0, sizeof(udpSocketContext));
    udpSocketContext.info = &cbData;

    cbData.socketRef = CFSocketCreateWithNative(NULL, cbData.s, kCFSocketDataCallBack, &getSocketDataCallBack, &udpSocketContext);
    cbData.runLoopSourceRef = CFSocketCreateRunLoopSource( NULL, cbData.socketRef, 0);
    CFRunLoopAddSource(CFRunLoopGetMain(), cbData.runLoopSourceRef, kCFRunLoopCommonModes);

Я отправляю 1024-байтовые дейтаграммы через WiFi из отдельного приложения Mac-сервера каждые 5 мс и получаю их на свой iPhone в своей процедуре getSocketDataCallBack.

Я ожидаю, что getSocketDataCallBack будет вызываться каждые 5 мс (чтобы соответствовать периоду дейтаграмм, отправляемых с Mac), что происходит в большинстве случаев. НО, звонки часто задерживаются на 10 или 100 мс. После этого я получаю быструю последовательность обратных вызовов (доли мс), чтобы извлечь несколько дейтаграмм, которые накопились за эту задержку.

Поскольку iOS, очевидно, хранит отложенные датаграммы,

  • есть ли способ получить все задержанные дейтаграммы из системы сразу, вместо того, чтобы getSocketDataCallBack вызывался снова и снова в быстрой последовательности?

    [Я делаю запрос, сколько байтов доступно в ala обратного вызова:

           CFDataRef dataRef = (CFDataRef)data;
           numBytesReceived = CFDataGetLength(dataRef);
    

    но numBytesReceived всегда указывается как 1024.]

  • Альтернативно, есть ли способ улучшить / уменьшить изменчивость синхронизации обратного вызова сокета другими способами?

1 ответ

Решение

Я использую обратный вызов сокета для межпроцессного взаимодействия (на самом деле, межпотоковое взаимодействие) с сокетом UNIX. То, как мы используем сокет, идентично TCP/UDP.

Код ниже написан на c/obj-c и использует posix-поток. Перевести его на Swift/NSThread не должно быть сложно.

Обратите внимание, что приведенная ниже программа работает как сторона сервера, что означает, что программа создает сокет, к которому подключаются клиенты. Как только клиент подключится к сокету, система автоматически примет соединение и выделит другой файловый дескриптор для чтения / записи. Обратный вызов сокета отражает эту двухэтапную операцию. Сначала мы создаем сокет, затем добавляем его в качестве источника цикла выполнения, чтобы система могла перезвонить, когда клиент попытался подключиться. Система принимает, затем выделяет и сообщает обратному вызову файловый дескриптор для чтения / записи с клиентом. Затем мы создаем другой источник цикла выполнения из fd для чтения / записи и добавляем в цикл выполнения. Этот второй обратный вызов вызывается, когда данные rx/tx готовы.

ОСНОВНАЯ РЕЗЬБА:

Основной поток создает сокет UNIX и рабочий поток. Сокет fd передается как аргумент рабочего потока.

#import <stdio.h>
#import <string.h>
#import <stdlib.h>
#import <unistd.h>
#import <pthread.h>
#import <sys/socket.h>
#import <sys/un.h>
#import <sys/stat.h>
#import <sys/types.h>
#import <UIKit/UIKit.h>
#import <Foundation/Foundation.h>
int setup(const char *ipcNode) {
    int sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    if (sockfd == -1) {
        return -1;
    }
    struct sockaddr_un sa = {0};
    sa.sun_len = sizeof(sa);
    sa.sun_family = AF_UNIX;
    strcpy(sa.sun_path, ipcNode);
    remove(sa.sun_path);
    if (bind(sockfd, (struct sockaddr*)&sa, sizeof(struct sockaddr_un)) == -1) {
        close(sockfd);
        return -1;
    }
    // start up worker thread
    pthread_attr_t at;
    pthread_attr_init(&at);
    pthread_attr_setdetachstate(&at, PTHREAD_CREATE_DETACHED);
    pthread_t th;
    pthread_create(&th, &at, workerThread, (void *)(long)(sockfd));
    return 1;
}

РАБОЧАЯ РЕЗЬБА:

Программа работает как сервер. Таким образом, он ожидает подключения клиента (через connect()). После подключения система автоматически вызывает accept() и выделяет чтение / запись для обмена данными с клиентом. Этот fd передается подпрограмме accept-call back socketDataCallback(). Затем мы создаем еще один обратный вызов clientDataCallback() с fd для чтения / записи.

// worker thread
//
void *workerThread(void *tprm) {
    int sockfd = (int)tprm;
    int retval = listen(sockfd, 1);  // mark as "server" side. here, accept only 1 connection request at a time
    if (retval != 0) {
        return NULL;
    }
    // create CFSocket and register it as data source.
    CFSocketRef socket = CFSocketCreateWithNative(kCFAllocatorDefault, sockfd, kCFSocketAcceptCallBack, socketDataCallback, nil);
    // don't close native fd on CFSocketInvalidate
    CFSocketSetSocketFlags(socket, CFSocketGetSocketFlags(socket) & ~kCFSocketCloseOnInvalidate);
    // create run loop source
    CFRunLoopSourceRef socketRunLoop = CFSocketCreateRunLoopSource(kCFAllocatorDefault, socket, 0);
    // add to run loop
    CFRunLoopAddSource(CFRunLoopGetCurrent(), socketRunLoop, kCFRunLoopCommonModes);
    CFRelease(socketRunLoop);
    CFRelease(socket);
    CFRunLoopRun();
    // not return here untill run loop stops
    close(sockfd);
    return NULL;
}

// socket connection w/ client side. create another data source and add to run-loop
//
void socketDataCallback(CFSocketRef s, CFSocketCallBackType callbackType, CFDataRef address, const void *data, void *info) {
    CFSocketContext socketContext;
    memset(&socketContext, 0, sizeof(CFSocketContext));
    int clientfd = *((int *)data);   // get file descriptor (fd)
    socketContext.info = (void *)((long)clientfd);  // set fd at info of socketContext
    // create CFSocket for tx/rx w/ connected client
    CFSocketRef socket = CFSocketCreateWithNative(kCFAllocatorDefault, clientfd, kCFSocketReadCallBack | kCFSocketWriteCallBack, clientDataCallback, &socketContext);
    CFSocketDisableCallBacks(socket, kCFSocketWriteCallBack);
    CFRunLoopSourceRef socketRunLoop = CFSocketCreateRunLoopSource(kCFAllocatorDefault, socket, 0);
    CFRunLoopAddSource(CFRunLoopGetCurrent(), socketRunLoop, kCFRunLoopCommonModes);
    CFRelease(socket);
    CFRelease(socketRunLoop);
}

// data to/from client
//
void clientDataCallback(CFSocketRef s, CFSocketCallBackType callbackType, CFDataRef address, const void *data, void *info) {
    if (callbackType & kCFSocketWriteCallBack) {
        // your own tx data prcess here
        // txDataCallback(s, callbackType, address, data, info);
    }
    if (!(callbackType & kCFSocketReadCallBack))  return;
    // extract fd
    int fd = (int)((long)info);
    // read data, and do some work
    uint8_t rxdata[1024];
    size_t nr = read(fd, rxdata, 1024);
    if (!nr) {
        // socket closed
        handleSocketClosed(s);
        return;
    }
    // your own rx process here
}

// socket closed
//
void handleSocketClosed(CFSocketRef s) {
    // any clean up process here, then
    CFSocketInvalidate(s);
    // stop run loop if necessary
    // CFRunLoopStop(CFRunLoopGetCurrent());
}

Если вы работаете на стороне клиента, все становится немного проще. Вы получаете чтение / запись с помощью вызова connect (). Затем вы создаете CFSockeRef и добавляете в цикл выполнения с помощью fd.

Надеюсь это поможет.

РЕДАКТИРОВАТЬ: Как ждать с POSIX выберите (). Ожидать с POSIX select() в рабочем потоке проще, чем обратный вызов сокета. Если вы на стороне клиента, то:

int sockfd = socket(...);
bind(sockfd, ...)
connect(sockfd, ...);
while (1) {
    int nfds = sockfd+1;
    fd_set rfds;
    FD_ZERO(&rfds);
    FD_SET(sockfd, &rfds);
    int retval = select(nfds, &rfds, NULL, NULL, NULL);
    if (retval == -1)  break;
    if (retval > 0) {
        uint8_t rxdata[1024];
        size_t nr = read(sockfd, rxdata, 1024);
        if (!nr) {
            // socket closed.
            break;
        }
        // do your rx process here
    }
}

Запустите приведенный выше код в вашем рабочем потоке.

Другие вопросы по тегам