Импорт данных отчета Salesforce с использованием Python
Я новичок в SFDC. У меня есть отчет, уже созданный пользователем. Я хотел бы использовать Python для выгрузки данных отчета в файл CSV / Excel. Я вижу, есть пара пакетов Python для этого. Но мой код выдает ошибку
from simple_salesforce import Salesforce
sf = Salesforce(instance_url='https://cs1.salesforce.com', session_id='')
sf = Salesforce(password='xxxxxx', username='xxxxx', organizationId='xxxxx')
Могу ли я иметь основные шаги по настройке API и пример кода
4 ответа
Это сработало для меня:
import requests
import csv
from simple_salesforce import Salesforce
import pandas as pd
sf = Salesforce(username=your_username, password=your_password, security_token = your_token)
login_data = {'username': your_username, 'password': your_password_plus_your_token}
with requests.session() as s:
d = s.get("https://your_instance.salesforce.com/{}?export=1&enc=UTF-8&xf=csv".format(reportid), headers=sf.headers, cookies={'sid': sf.session_id})
d.content
будет содержать строку значений, разделенных запятыми, которые вы можете прочитать с помощью csv
модуль.
Я беру данные в pandas
отсюда название функции и import pandas
, Я удалил остальную часть функции, где он помещает данные в DataFrame
, но если вы заинтересованы в том, как это сделать, дайте мне знать.
В случае, если это полезно, я хотел бы написать шаги, которые я использовал, чтобы ответить на этот вопрос сейчас (август-2018), основываясь на комментарии Оболя. Для справки я следовал инструкциям README по адресу https://github.com/cghall/force-retrieve/blob/master/README.md для пакета salesforce_reporting.
Чтобы подключиться к Salesforce:
from salesforce_reporting import Connection, ReportParser
sf = Connection(username='your_username',password='your_password',security_token='your_token')
Затем, чтобы получить отчет, который я хотел, в DataFrame Pandas:
report = sf.get_report(your_reports_id)
parser = salesforce_reporting.ReportParser(report)
report = parser.records_dict()
report = pd.DataFrame(report)
Если бы вы были так склонны, вы могли бы также упростить четыре строки выше в одну, например так:
report = pd.DataFrame(salesforce_reporting.ReportParser(sf.get_report(your_reports_id)).records_dict())
Одно отличие, с которым я столкнулся с README, заключается в том, что sf.get_report('report_id', includeDetails=True)
сгенерировал ошибку с указанием get_report() got an unexpected keyword argument 'includeDetails'
, Казалось бы, просто удалив его, код работает нормально.
report
теперь можно экспортировать через report.to_csv('report.csv',index=False)
или манипулировать напрямую.
РЕДАКТИРОВАТЬ: parser.records()
изменился на parser.records_dict()
, поскольку это позволяет DataFrame иметь уже перечисленные столбцы, а не численно их индексировать.
Приведенный ниже код довольно длинный и может быть только для нашего случая использования, но основная идея заключается в следующем:
Узнайте длину интервала дат и дополнительную необходимую фильтрацию, чтобы никогда не выйти за предел "больше 2'000". В моем случае у меня может быть недельный фильтр диапазона дат, но мне нужно будет применить некоторые дополнительные фильтры.
Затем запустите его так:
report_id = '00O4…'
sf = SalesforceReport(user, pass, token, report_id)
it = sf.iterate_over_dates_and_filters(datetime.date(2020,2,1),
'Invoice__c.InvoiceDate__c', 'Opportunity.CustomField__c',
[('a', 'startswith'), ('b', 'startswith'), …])
for row in it:
# do something with the dict
Итератор работает каждую неделю (если вам нужны ежедневные или ежемесячные итераторы, вам нужно будет изменить код, но изменение должно быть минимальным) с 2020-02-01 и применяет фильтр CustomField__c.startswith('a'), затем CustomField__c.startswith('b'), … и действует как генератор, так что вам не нужно возиться с циклическим циклом фильтра самостоятельно.
Итератор выдает исключение, если есть запрос, который возвращает более 2000 строк, просто чтобы убедиться, что данные не являются неполными.
Одно предупреждение: SF имеет ограничение не более 500 запросов в час. Скажем, если у вас есть год с 52 неделями и 10 дополнительными фильтрами, вы уже исчерпали этот лимит.
Вот класс (полагается на simple_salesforce
)
import simple_salesforce
import json
import datetime
"""
helper class to iterate over salesforce report data
and manouvering around the 2000 max limit
"""
class SalesforceReport(simple_salesforce.Salesforce):
def __init__(self, username, password, security_token, report_id):
super(SalesforceReport, self).__init__(username=username, password=password, security_token=security_token)
self.report_id = report_id
self._fetch_describe()
def _fetch_describe(self):
url = f'{self.base_url}analytics/reports/{self.report_id}/describe'
result = self._call_salesforce('GET', url)
self.filters = dict(result.json()['reportMetadata'])
def apply_report_filter(self, column, operator, value, replace=True):
"""
adds/replaces filter, example:
apply_report_filter('Opportunity.InsertionId__c', 'startsWith', 'hbob').
For date filters use apply_standard_date_filter.
column: needs to correspond to a column in your report, AND the report
needs to have this filter configured (so in the UI the filter
can be applied)
operator: equals, notEqual, lessThan, greaterThan, lessOrEqual,
greaterOrEqual, contains, notContain, startsWith, includes
see https://sforce.co/2Tb5SrS for up to date list
value: value as a string
replace: if set to True, then if there's already a restriction on column
this restriction will be replaced, otherwise it's added additionally
"""
filters = self.filters['reportFilters']
if replace:
filters = [f for f in filters if not f['column'] == column]
filters.append(dict(
column=column,
isRunPageEditable=True,
operator=operator,
value=value))
self.filters['reportFilters'] = filters
def apply_standard_date_filter(self, column, startDate, endDate):
"""
replace date filter. The date filter needs to be available as a filter in the
UI already
Example: apply_standard_date_filter('Invoice__c.InvoiceDate__c', d_from, d_to)
column: needs to correspond to a column in your report
startDate, endDate: instance of datetime.date
"""
self.filters['standardDateFilter'] = dict(
column=column,
durationValue='CUSTOM',
startDate=startDate.strftime('%Y-%m-%d'),
endDate=endDate.strftime('%Y-%m-%d')
)
def query_report(self):
"""
return generator which yields one report row as dict at a time
"""
url = self.base_url + f"analytics/reports/query"
result = self._call_salesforce('POST', url, data=json.dumps(dict(reportMetadata=self.filters)))
r = result.json()
columns = r['reportMetadata']['detailColumns']
if not r['allData']:
raise Exception('got more than 2000 rows! Quitting as data would be incomplete')
for row in r['factMap']['T!T']['rows']:
values = []
for c in row['dataCells']:
t = type(c['value'])
if t == str or t == type(None) or t == int:
values.append(c['value'])
elif t == dict and 'amount' in c['value']:
values.append(c['value']['amount'])
else:
print(f"don't know how to handle {c}")
values.append(c['value'])
yield dict(zip(columns, values))
def iterate_over_dates_and_filters(self, startDate, date_column, filter_column, filter_tuples):
"""
return generator which iterates over every week and applies the filters
each for column
"""
date_runner = startDate
while True:
print(date_runner)
self.apply_standard_date_filter(date_column, date_runner, date_runner + datetime.timedelta(days=6))
for val, op in filter_tuples:
print(val)
self.apply_report_filter(filter_column, op, val)
for row in self.query_report():
yield row
date_runner += datetime.timedelta(days=7)
if date_runner > datetime.date.today():
break
Для тех, кто просто пытается загрузить отчет в DataFrame, вот как вы это делаете (я добавил несколько примечаний и ссылок для пояснений):
import pandas as pd
import csv
import requests
from io import StringIO
from simple_salesforce import Salesforce
# Input Salesforce credentials:
sf = Salesforce(
username='johndoe@mail.com',
password='<password>',
security_token='<security_token>') # See below for help with finding token
# Basic report URL structure:
orgParams = 'https://<INSERT_YOUR_COMPANY_NAME_HERE>.my.salesforce.com/' # you can see this in your Salesforce URL
exportParams = '?isdtp=p1&export=1&enc=UTF-8&xf=csv'
# Downloading the report:
reportId = 'reportId' # You find this in the URL of the report in question between "Report/" and "/view"
reportUrl = orgParams + reportId + exportParams
reportReq = requests.get(reportUrl, headers=sf.headers, cookies={'sid': sf.session_id})
reportData = reportReq.content.decode('utf-8')
reportDf = pd.read_csv(StringIO(reportData))
Вы можете получить свой токен, следуя инструкциям внизу этой страницы .