Twitter, несколько процессов и база данных

Я новичок в написании небольшого твиттерного инструмента для запланированных твитов и автоматических ретвитов в python/flask.

Я застрял с проблемами процессов, работающих в фоновом режиме.

Я хочу, чтобы запланированные твиты и ретвиты работали одновременно в фоновом режиме для данного пользователя.

Я хочу иметь возможность завершать эти фоновые процессы, запускающие ретвиты / запланированные твиты отдельно друг от друга.

Как бы вы изменили код ниже, чтобы достичь этого?

Если вы посмотрите на приведенный ниже код, он работает, но пользователь не может запускать запланированные твиты и ретвиты одновременно. Кроме того, если пользователь решает прекратить один из процессов, скажем, ретвиты другого процесса также завершаются (запланированные твиты) и наоборот.

Я думал о том, чтобы поместить идентификационные данные для данного процесса в базу данных и вызвать эти идентификационные данные из базы данных, когда есть необходимость прекратить их, вместо того, чтобы использовать сеанс cookie, но я не знаю, как реализовать эту идею в коде.

import ........

mysql = MySQL()
app = Flask(__name__)
app.secret_key = 'xxx'

app.config['MYSQL_DATABASE_USER'] = 'xxx'
app.config['MYSQL_DATABASE_PASSWORD'] = 'xxx'
app.config['MYSQL_DATABASE_DB'] = 'xxx'
app.config['MYSQL_DATABASE_HOST'] = '0.0.0.0'
mysql.init_app(app)

@app.route('/showSignin')
def showSignin():
   if session.get('user'):
       return redirect('/userHome')
   else:
       return render_template('signin.html')

@app.route('/showscheduletweets')

def showscheduletweets():

     if session.get('user'):
      return render_template('scheduletweets.html')
    else:
       return render_template('signin.html')

     @app.route('/validateLogin',methods=['POST'])
def validateLogin():
   try:
    _username = request.form['inputEmail']
    _password = request.form['inputPassword']

    # connect to mysql

    con = mysql.connect()
    cursor = con.cursor()
    cursor.callproc('sp_validateLogin',(_username,))
    data = cursor.fetchall()

    if len(data) > 0:
        if check_password_hash(str(data[0][3]),_password):
            session['user'] = data[0][0]
            consumerkey = data [0][4]
            consumersecret = data [0][5]
            accesstoken = data [0][6]
            tokensecret = data [0][7]
            twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)
            twitter.update_status(status="xxx says hello.")
            return render_template('userHome.html')
        else:
            return render_template('error.html',error = 'Wrong Email address or Password.')
    else:
        return render_template('error.html',error = 'Wrong Email address or Password.')

except Exception as e:
    return render_template('error.html',error = str(e))
finally:
    cursor.close()
    con.close()

#schedule tweets

@app.route('/scheduletweets',methods=['POST'])

def scheduletweets():
    if session.get('user'):
    _username = request.form['inputEmail']
    con = mysql.connect()
    cursor = con.cursor()
    cursor.callproc('sp_GetTwitter', (_username,))
    data = cursor.fetchall()

    session['user'] = data[0][0]
    consumerkey = data [0][4]
    consumersecret = data [0][5]
    accesstoken = data [0][6]
    tokensecret = data [0][7]
    twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)

    tweet1 = request.form['inputTweet1']
    tweet2 = request.form['inputTweet2']
    tweet3 = request.form['inputTweet3']
    tweet4 = request.form['inputTweet4']
    tweet5 = request.form['inputTweet5']
    tweet6 = request.form['inputTweet6']

    Hash1 = request.form['inputHash1']
    Hash2 = request.form['inputHash2']
    Hash3 = request.form['inputHash3']
    Hash4 = request.form['inputHash4']

    fruits = [Hash1, Hash2, Hash3, Hash4]



    list = [tweet1, tweet2, tweet3, tweet4, tweet5, tweet6]
    def workit():

     while True:
        try:
            if len(list) > 0:
                z = random.randint(1, len(fruits))
                a = random.sample(fruits, z)


                b=" ".join(str(x) for x in a)
                toTweet = list[random.randint(0,len(list))-1] + " " + b

                twitter.update_status(status=toTweet)
                time.sleep(10)


            else:
                twitter.update_status(status="Oh dear... I'm afraid I'm rather empty =(")
                break
        except TwythonError as e:
            print (e)


    if 'work_process' not in session:
     process = Process(target=workit)
     process.start()
     pid = process.pid
     parent_pid = psutil.Process(process.pid).parent().pid
     session['work_process'] = (parent_pid, pid)
    return redirect('/showscheduletweets')
     #retweets
     @app.route('/retweet',methods=['POST'])
def retweet():
   if session.get('user'):

    _username = request.form['inputEmail']
    con = mysql.connect()
    cursor = con.cursor()
    cursor.callproc('sp_GetTwitter', (_username,))
    data = cursor.fetchall()

    session['user'] = data[0][0]
    consumerkey = data [0][4]
    consumersecret = data [0][5]
    accesstoken = data [0][6]
    tokensecret = data [0][7]


    Retweet1 = request.form['inputRetweet1']
    Retweet2 = request.form['inputRetweet2']
    Retweet3 = request.form['inputRetweet3']
    Retweet4 = request.form['inputRetweet4']
    Exclude1 = request.form['inputExclude1']
    Exclude2 = request.form['inputExclude2']




    def work():
     twitter = Twython(consumerkey, consumersecret, accesstoken, tokensecret)
     naughty_words = [Exclude1, Exclude2]
     good_words = [Retweet1, Retweet2, Retweet3, Retweet4]
     filter = " OR ".join(good_words)
     blacklist = " -".join(naughty_words)
     keywords = filter +" -"+ blacklist
     print(keywords)
     while True:
        search_results = twitter.search(q=keywords, count=10)
        try:
            for tweet in search_results["statuses"]:
                try:
                    twitter.retweet(id = tweet["id_str"])
                    time.sleep(60)
                except TwythonError as e:
                                            print (e)
        except TwythonError as e:
                                    print (e)

    if 'work_process' not in session:
     process = Process(target=work)
     process.start()
     pid = process.pid
     parent_pid = psutil.Process(process.pid).parent().pid
     session['work_process'] = (parent_pid, pid)
    return redirect('/showretweet')

       #terminating scheduled tweets and retweets
      @app.route('/stoptweet', methods=['POST'])
  def stoptweet():
    if 'work_process' in session:
    parent_pid, pid = session['work_process']
    try:
        process = psutil.Process(pid)
        if process.parent().pid == parent_pid:
            process.terminate()
    except psutil.NoSuchProcess:
        pass
    session.pop('work_process')
    return render_template('index.html')
else:
    return render_template('index.html')

 if __name__ == '__main__':
  app.run(host=os.getenv('IP', '0.0.0.0'),port=int(os.getenv('PORT', xxx)))

1 ответ

Возможно, вы захотите использовать модуль сельдерея Python и переместить твит расписания и ретвит в качестве фоновой работы.

Для получения дополнительной информации см. Документ: http://flask.pocoo.org/docs/0.11/patterns/celery/

Вы будете украшать те функции, связанные с сельдереем, а не колбу.

Как пример:

В вашем скрипте:

import my_schedule_module

а затем в my_schedule_module.py:

from celery import Celery, Task
from celery.result import AsyncResult

from celery.task.base import periodic_task

import sqlite3 # Here I use sqlite, can be sql
import redis # Here I am using redis, you can use another db as well > check documentation

from datetime import timedelta # used to schedule your background jobs, see in configuration below


app_schedule = Celery('my_schedule_module')


'''
Celery Configuration
'''

# a mockup configuration of your background jobs, as example use retweet each 60s
app_schedule.conf.update(
    CELERY_ACCEPT_CONTENT = ['application/json'],
    CELERY_TASK_SERIALIZER='json',
    # CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    # CELERY_TIMEZONE='Europe/Oslo',
    # CELERY_ENABLE_UTC=True,
    CELERYD_TASK_TIME_LIMIT = 600,
    CELERYD_TASK_SOFT_TIME_LIMIT = 600,
    CELERYD_MAX_TASKS_PER_CHILD = 1000,
    CELERYD_OPTS="--time-limit=600 --concurrency=4",
    BROKER_URL = 'redis://localhost:6379/0',
    CELERY_RESULT_BACKEND = 'redis://localhost',
    CELERYBEAT_SCHEDULE = {
        'add-every-60-seconds': {
        'task': 'my_schedule_module.retweet',
        'schedule': timedelta(seconds=60)
        },
    }
)

@app_schedule.task()
def retweet(tweet):
     # your tweet function

@app_schedule.task()
def scheduletweets():
     # your background job
     # pseudo code
     tweets = get_tweets()
     process_tweet_list = []
     for tweet in tweets:
          process_tweet_list.append( retweet.s(tweet) ) 
     job = group(process_tweet_list)  #group is celery.group, see documentation
     result = job.apply_async() # process job list async
     print 'result', result.ready(), result.successful()

Вы также можете использовать функции обратного вызова - например, вы можете обновить дату и время в вашей базе данных, когда ваш твит был ретвитнут.

В этом случае у вас будет такой синтаксис:

    result = my_schedule_module.retweet.apply_async( (tweet,) , link=my_schedule_module.callback_to_store_results_of_retweet.s())
Другие вопросы по тегам