Создание постоянного подключения к API потокового твиттера с помощью R

В настоящее время я использую пакет streamR в R для потоковой передачи твитов из потока фильтров в Twitter. У меня есть рукопожатие объекта ROAuth, который я использую для этого. Мой кусок кода выглядит так:

# load the Twitter auth object
load("twitter_oAuth3.RData")
load("keywords3.RData")

streamTweet = function(){
  require(streamR)
  require(ROAuth)

  stack = filterStream(file.name="",track=keywords,timeout=500,oauth=twitter_oAuth)
  return(stack)
}

Я хотел создать приложение в реальном времени, которое включает в себя сброс этих твитов в тему activeMQ. Мой код для этого:

require(Rjms)

# Set logger properties
url = "tcp://localhost:61616"
type = "T"
name = "TwitterStream"

# initialize logger
topicWriter = initialize.logger(url,type,name)

topicWrite = function(input){
#   print("writing to topic")
  to.logger(topicWriter,input,asString=TRUE,propertyName='StreamerID',propertyValue='1')
  return()
}

logToTopic = function(streamedStack){ 
#   print("inside stack-writer")
  stacklength = length(streamedStack)
  print(c("Length: ",stacklength))
  for(i in 1:stacklength){
    print(c("calling for: ",i))
    topicWrite(streamedStack[i])
    }
  return()
}

Теперь моя проблема заключается в том, что тайм-аут filterStream() необходимо. Я заглянул под капот и нашел этот вызов, который делает функция:

url <- "https://stream.twitter.com/1.1/statuses/filter.json"
    output <- tryCatch(oauth$OAuthRequest(URL = url, params = params, 
                                          method = "POST", customHeader = NULL, 
                                          writefunction = topicWrite, cainfo = system.file("CurlSSL", 
                                                                                             "cacert.pem", package = "RCurl")), error = function(e) e)

Я попытался удалить компонент тайм-аута, но он не работает. Есть ли способ, которым я могу поддерживать поток навсегда (до тех пор, пока я его не убью), который выводит каждый твит, когда он входит в тему?

PS Я знаю о реализации Java, которая делает вызов API twitter4j. Я, однако, понятия не имею, как это сделать в R.

1 ответ

Документация для streamR Пакет упоминает, что опция по умолчанию для опции тайм-аута в filterStream() равен 0, что будет держать соединение открытым постоянно.

Я цитирую:

"числовой, максимальный промежуток времени (в секундах) соединения с потоком. Соединение будет автоматически закрыто по истечении этого периода. Например, установка тайм-аута на 10800 будет держать соединение открытым в течение 3 часов. Значение по умолчанию - 0, которое сохранит соединение открыто постоянно."

Надеюсь это поможет.

Другие вопросы по тегам