我怎样才能改进这个脚本,让它更像 pythonic?

How can I improve this script to make it more pythonic?

我对 Python 编程还很陌生,到目前为止,我一直在对以前的开发人员制作的代码进行逆向工程,或者自己拼凑一些函数。

脚本本身有效;简而言之,它旨在解析 CSV 并 (a) 创建和/或更新在 CSV 中找到的联系人,以及 (b) 将联系人正确分配给他们的关联公司。全部使用 HubSpot API. To achieve this i've also imported requests and csvmapper.

我有以下问题:

  1. 如何改进此脚本以使其更符合 Python 风格?
  2. 在远程服务器上制作此脚本的最佳方法是什么 运行, 请记住 Requests 和 CSVMapper 可能不是 安装在该服务器上,我很可能不会 安装它们的权限 - "package" 最好的方法是什么 脚本,或者将 Requests 和 CSVMapper 上传到服务器?

非常感谢任何建议。

#!/usr/bin/env python
# -*- coding: utf-8 -*-

from __future__ import print_function
import sys, os.path, requests, json, csv, csvmapper, glob, shutil
from time import sleep
major, minor, micro, release_level, serial =  sys.version_info

# Client Portal ID
portal = "XXXXXX"

# Client API Key

hapikey = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"

# This attempts to find any file in the directory that starts with "note" and ends with ".CSV"
# Server Version
# findCSV = glob.glob('/home/accountName/public_html/clientFolder/contact*.CSV')

# Local Testing Version
findCSV = glob.glob('contact*.CSV')

for i in findCSV:

    theCSV = i

    csvfileexists = os.path.isfile(theCSV)

    # Prints a confirmation if file exists, prints instructions if it doesn't.

    if csvfileexists:
        print ("\nThe \"{csvPath}\" file was found ({csvSize} bytes); proceeding with sync ...\n".format(csvSize=os.path.getsize(theCSV), csvPath=os.path.basename(theCSV)))
    else:
        print ("File not found; check the file name to make sure it is in the same directory as this script. Exiting ...")
        sys.exit()

    # Begin the CSVmapper mapping... This creates a virtual "header" row - the CSV therefore does not need a header row.

    mapper = csvmapper.DictMapper([
      [
        {'name':'account'}, #"Org. Code"
        {'name':'id'}, #"Hubspot Ref"
        {'name':'company'}, #"Company Name"
        {'name':'firstname'}, #"Contact First Name"
        {'name':'lastname'}, #"Contact Last Name"
        {'name':'job_title'}, #"Job Title"
        {'name':'address'}, #"Address"
        {'name':'city'}, #"City"
        {'name':'phone'}, #"Phone"
        {'name':'email'}, #"Email"
        {'name':'date_added'} #"Last Update"
      ]
    ])

    # Parse the CSV using the mapper
    parser = csvmapper.CSVParser(os.path.basename(theCSV), mapper)

    # Build the parsed object
    obj = parser.buildObject()

    def contactCompanyUpdate():

        # Open the CSV, use commas as delimiters, store it in a list called "data", then find the length of that list.
        with open(os.path.basename(theCSV),"r") as f:
            reader = csv.reader(f, delimiter = ",", quotechar="\"")
            data = list(reader)

            # For every row in the CSV ...
            for row in range(0, len(data)):
                # Set up the JSON payload ...

                payload = {
                            "properties": [
                                {
                                    "name": "account",
                                    "value": obj[row].account
                                },
                                {
                                    "name": "id",
                                    "value": obj[row].id
                                },
                                {
                                    "name": "company",
                                    "value": obj[row].company
                                },
                                {
                                    "property": "firstname",
                                    "value": obj[row].firstname
                                },
                                {
                                    "property": "lastname",
                                    "value": obj[row].lastname
                                },
                                {
                                    "property": "job_title",
                                    "value": obj[row].job_title
                                },
                                {
                                    "property": "address",
                                    "value": obj[row].address
                                },
                                {
                                    "property": "city",
                                    "value": obj[row].city
                                },
                                {
                                    "property": "phone",
                                    "value": obj[row].phone
                                },
                                {
                                    "property": "email",
                                    "value": obj[row].email
                                },
                                {
                                    "property": "date_added",
                                    "value": obj[row].date_added
                                }
                            ]
                        }

                nameQuery = "{first} {last}".format(first=obj[row].firstname, last=obj[row].lastname)

                # Get a list of all contacts for a certain company.
                contactCheck = "https://api.hubapi.com/contacts/v1/search/query?q={query}&hapikey={hapikey}".format(hapikey=hapikey, query=nameQuery)

                # Convert the payload to JSON and assign it to a variable called "data"
                data = json.dumps(payload)

                # Defined the headers content-type as 'application/json'
                headers = {'content-type': 'application/json'}

                contactExistCheck = requests.get(contactCheck, headers=headers)

                for i in contactExistCheck.json()[u'contacts']:

                    # ... Get the canonical VIDs
                    canonicalVid = i[u'canonical-vid']

                    if canonicalVid:
                        print ("{theContact} exists! Their VID is \"{vid}\"".format(theContact=obj[row].firstname, vid=canonicalVid))
                        print ("Attempting to update their company...")
                        contactCompanyUpdate = "https://api.hubapi.com/companies/v2/companies/{companyID}/contacts/{vid}?hapikey={hapikey}".format(hapikey=hapikey, vid=canonicalVid, companyID=obj[row].id)
                        doTheUpdate = requests.put(contactCompanyUpdate, headers=headers)
                        if doTheUpdate.status_code == 200:
                            print ("Attempt Successful! {theContact}'s has an updated company.\n".format(theContact=obj[row].firstname))
                            break
                        else:
                            print ("Attempt Failed. Status Code: {status}. Company or Contact not found.\n".format(status=doTheUpdate.status_code))

    def createOrUpdateClient():

        # Open the CSV, use commas as delimiters, store it in a list called "data", then find the length of that list.
        with open(os.path.basename(theCSV),"r") as f:
            reader = csv.reader(f, delimiter = ",", quotechar="\"")
            data = list(reader)

            # For every row in the CSV ...
            for row in range(0, len(data)):
                # Set up the JSON payload ...

                payloadTest = {
                            "properties": [
                                {
                                    "property": "email",
                                    "value": obj[row].email
                                },
                                {
                                    "property": "firstname",
                                    "value": obj[row].firstname
                                },
                                {
                                    "property": "lastname",
                                    "value": obj[row].lastname
                                },
                                {
                                    "property": "website",
                                    "value": None
                                },
                                {
                                    "property": "company",
                                    "value": obj[row].company
                                },
                                {
                                    "property": "phone",
                                    "value": obj[row].phone
                                },
                                {
                                    "property": "address",
                                    "value": obj[row].address
                                },
                                {
                                    "property": "city",
                                    "value": obj[row].city
                                },
                                {
                                    "property": "state",
                                    "value": None
                                },
                                {
                                    "property": "zip",
                                    "value": None
                                }
                            ]
                        }

                # Convert the payload to JSON and assign it to a variable called "data"
                dataTest = json.dumps(payloadTest)

                # Defined the headers content-type as 'application/json'
                headers = {'content-type': 'application/json'}

                #print ("{theContact} does not exist!".format(theContact=obj[row].firstname))
                print ("Attempting to add {theContact} as a contact...".format(theContact=obj[row].firstname))
                createOrUpdateURL = 'http://api.hubapi.com/contacts/v1/contact/createOrUpdate/email/{email}/?hapikey={hapikey}'.format(email=obj[row].email,hapikey=hapikey)

                r = requests.post(createOrUpdateURL, data=dataTest, headers=headers)

                if r.status_code == 409:
                    print ("This contact already exists.\n")
                elif (r.status_code == 200) or (r.status_code == 202):
                    print ("Success! {firstName} {lastName} has been added.\n".format(firstName=obj[row].firstname,lastName=obj[row].lastname, response=r.status_code))
                elif r.status_code == 204:
                    print ("Success! {firstName} {lastName} has been updated.\n".format(firstName=obj[row].firstname,lastName=obj[row].lastname, response=r.status_code))
                elif r.status_code == 400:
                    print ("Bad request. You might get this response if you pass an invalid email address, if a property in your request doesn't exist, or if you pass an invalid property value.\n")
                else:
                    print ("Contact Marko for assistance.\n")

    if __name__ == "__main__":
        # Run the Create or Update function
        createOrUpdateClient()

        # Give the previous function 5 seconds to take effect.
        sleep(5.0)

        # Run the Company Update function
        contactCompanyUpdate()
        print("Sync complete.")

        print("Moving \"{something}\" to the archive folder...".format(something=theCSV))

        # Cron version
        #shutil.move( i, "/home/accountName/public_html/clientFolder/archive/" + os.path.basename(i))

        # Local version
        movePath = "archive/{thefile}".format(thefile=theCSV)
        shutil.move( i, movePath )

        print("Move successful! Exiting...\n")

sys.exit()

我将从上到下。第一条规则是,按照 PEP 8. 中的内容进行操作 这不是最终的风格指南,但它肯定是 Python 编码人员的参考基准,而且这一点更为重要,尤其是当您刚开始时。第二条规则是,使其可维护。几年后,当其他新孩子出现时,她应该很容易弄清楚你在做什么。有时这意味着做事要走很长的路,以减少错误。有时这意味着以捷径做事,以减少错误。 :-)

#!/usr/bin/env python
# -*- coding: utf-8 -*-

两件事:根据 PEP 8,你的编码是正确的。

Conventions for writing good documentation strings (a.k.a. "docstrings") are immortalized in PEP 257.

你有一个可以做某事的程序。但是你没有记录什么。

from __future__ import print_function
import sys, os.path, requests, json, csv, csvmapper, glob, shutil
from time import sleep
major, minor, micro, release_level, serial =  sys.version_info

根据 PEP 8:每行放置一个 import module 语句。

Per Austin:让您的段落有单独的主题。你在一些版本信息的旁边有一些导入。插入一个空行。另外,用数据做点什么!或者你不需要它就在这里,是吗?

# Client Portal ID
portal = "XXXXXX"

# Client API Key

hapikey = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"

您以多种方式掩盖了这些。 WTF 是 hapikey?我想你的意思是 Hubspot_API_keyportal 是做什么的?

一个忠告:一件事情越"global",它就应该越"formal"。如果你有一个for循环,可以调用其中一个变量i。如果您有一段数据在整个函数中使用,请将其称为 objportal。但是如果你有一个全局使用的数据,或者是一个 class 变量,把它放在领带和夹克上,这样每个人都能认出来:把它设为 Hubspot_api_key 而不是 client_api_key.如果有多个 API,甚至可能 Hubspot_client_api_key。对 portal.

做同样的事情
# This attempts to find any file in the directory that starts with "note" and ends with ".CSV"
# Server Version
# findCSV = glob.glob('/home/accountName/public_html/clientFolder/contact*.CSV')

评论没多久就变成了谎言。如果它们不正确,请将其删除。

# Local Testing Version
findCSV = glob.glob('contact*.CSV')

您应该为此创建一个函数。只需创建一个名为 "get_csv_files" 或其他名称的简单函数,然后将其设为 return 文件名列表。这将您与 glob 分离,这意味着您可以使测试代码成为数据驱动的(将文件名列表传递给函数,或将单个文件传递给函数,而不是要求它搜索它们)。此外,这些 glob 模式正是配置文件或全局变量中的那种东西,或者作为命令行参数传递的东西。

for i in findCSV:

我敢打赌一直用大写字母输入 CSV 是一件很痛苦的事情。 findCSV 是什么意思?阅读该行,并找出应该调用该变量的内容。也许 csv_files?还是new_contact_files?证明存在事物集合的事物。

    theCSV = i
    csvfileexists = os.path.isfile(theCSV)

现在 i 做什么?你在 BiiiiiiG 循环中有这个漂亮的小变量名。这是一个错误,因为如果您不能在一页上看到一个变量的整个范围,它可能需要更长的名称。但是后来您为它创建了一个别名。 itheCSV 指的是同一个东西。而且...我没有看到您再次使用 i。所以也许你的循环变量应该是theCSV。或者它应该是 the_csv 以便于打字。或者只是 csvname

    # Prints a confirmation if file exists, prints instructions if it doesn't.

这似乎有点多余。如果您使用 glob 来获取文件名,它们几乎会存在。 (如果他们不这样做,那是因为它们在您调用 glob 和您尝试打开它们的时间之间被删除了。这是可能的,但很少见。只是 continue 或引发异常,具体取决于。 )

    if csvfileexists:
        print ("\nThe \"{csvPath}\" file was found ({csvSize} bytes); proceeding with sync ...\n".format(csvSize=os.path.getsize(theCSV), csvPath=os.path.basename(theCSV)))

在此代码中,您使用 csvfileexists 的值。但那是你唯一使用它的地方。在这种情况下,您可以将对 os.path.isfile() 的调用移动到 if 语句中并删除变量。

    else:
        print ("File not found; check the file name to make sure it is in the same directory as this script. Exiting ...")
        sys.exit()

请注意,在这种情况下,当出现 实际问题时, 您没有打印文件名?这有多大帮助?

此外,还记得您在远程服务器上的部分吗?您应该考虑使用 Python 的 logging module 以有用的方式记录这些消息。

# Begin the CSVmapper mapping... This creates a virtual "header" row - the CSV therefore does not need a header row.

mapper = csvmapper.DictMapper([
  [
    {'name':'account'}, #"Org. Code"
    {'name':'id'}, #"Hubspot Ref"
    {'name':'company'}, #"Company Name"
    {'name':'firstname'}, #"Contact First Name"
    {'name':'lastname'}, #"Contact Last Name"
    {'name':'job_title'}, #"Job Title"
    {'name':'address'}, #"Address"
    {'name':'city'}, #"City"
    {'name':'phone'}, #"Phone"
    {'name':'email'}, #"Email"
    {'name':'date_added'} #"Last Update"
  ]
])

您正在创建一个包含大量数据的 object。这将是一个功能的好地方。定义一个 make_csvmapper() 函数来为您完成所有这一切,并将其移出线外。

另请注意,标准 csv module 具有您正在使用的大部分功能。我不认为你真的需要 csvmapper.

# Parse the CSV using the mapper
parser = csvmapper.CSVParser(os.path.basename(theCSV), mapper)

# Build the parsed object
obj = parser.buildObject()

这是另一个函数的机会。也许你可以 return obj 而不是制作 csv 映射器?

def contactCompanyUpdate():

此时,事情变得可疑了。您缩进了这些函数定义,但我认为您不需要它们。这是一个 Whosebug 问题,还是您的代码真的像这样?

    # Open the CSV, use commas as delimiters, store it in a list called "data", then find the length of that list.

    with open(os.path.basename(theCSV),"r") as f:

不对,好像真的是这个样子。因为当你真的不需要时,你在这个函数中使用了 theCSV 。请考虑使用形式函数参数,而不是仅仅抓取 outer-scope objects。另外,为什么要在 csv 文件上使用 basename?你用glob获取的,不是已经有了你要的路径吗?

        reader = csv.reader(f, delimiter = ",", quotechar="\"")
        data = list(reader)

        # For every row in the CSV ...
        for row in range(0, len(data)):

在这里你强迫data成为一个l从 reader 获得的 st 行,然后开始迭代它们。直接遍历 reader,例如:for row in reader: 但是等等! 你实际上是在遍历你已经打开的 CSV 文件,在你的 obj 变量。只需选择一个,然后对其进行迭代。您不需要为此打开文件两次。

            # Set up the JSON payload ...
            payload = {
                        "properties": [
                            {
                                "name": "account",
                                "value": obj[row].account
                            },
                            {
                                "name": "id",
                                "value": obj[row].id
                            },
                            {
                                "name": "company",
                                "value": obj[row].company
                            },
                            {
                                "property": "firstname",
                                "value": obj[row].firstname
                            },
                            {
                                "property": "lastname",
                                "value": obj[row].lastname
                            },
                            {
                                "property": "job_title",
                                "value": obj[row].job_title
                            },
                            {
                                "property": "address",
                                "value": obj[row].address
                            },
                            {
                                "property": "city",
                                "value": obj[row].city
                            },
                            {
                                "property": "phone",
                                "value": obj[row].phone
                            },
                            {
                                "property": "email",
                                "value": obj[row].email
                            },
                            {
                                "property": "date_added",
                                "value": obj[row].date_added
                            }
                        ]
                    }

好的,那是一段很长的代码,但没有做太多事情。至少,将那些内部 dicts 每个收紧到一行。但更好的是,编写一个函数以您想要的格式创建字典。您可以使用 getattr 按名称从 obj 中提取数据。

            nameQuery = "{first} {last}".format(first=obj[row].firstname, last=obj[row].lastname)

            # Get a list of all contacts for a certain company.
            contactCheck = "https://api.hubapi.com/contacts/v1/search/query?q={query}&hapikey={hapikey}".format(hapikey=hapikey, query=nameQuery)
            # Convert the payload to JSON and assign it to a variable called "data"
            data = json.dumps(payload)

            # Defined the headers content-type as 'application/json'
            headers = {'content-type': 'application/json'}

            contactExistCheck = requests.get(contactCheck, headers=headers)

在这里,您将 API 的详细信息编码到您的代码中。考虑将它们拉出到函数中。 (这样,您可以稍后回来构建它们的模块,在您的下一个程序中达到 re-use。)另外,请注意实际上没有告诉您任何信息的注释。并随意将其作为一个段落组合在一起,因为它们都在为同一件事服务 - 拨打 API 电话。

            for i in contactExistCheck.json()[u'contacts']:

                # ... Get the canonical VIDs
                canonicalVid = i[u'canonical-vid']

                if canonicalVid:
                    print ("{theContact} exists! Their VID is \"{vid}\"".format(theContact=obj[row].firstname, vid=canonicalVid))
                    print ("Attempting to update their company...")
                    contactCompanyUpdate = "https://api.hubapi.com/companies/v2/companies/{companyID}/contacts/{vid}?hapikey={hapikey}".format(hapikey=hapikey, vid=canonicalVid, companyID=obj[row].id)
                    doTheUpdate = requests.put(contactCompanyUpdate, headers=headers)
                    if doTheUpdate.status_code == 200:
                        print ("Attempt Successful! {theContact}'s has an updated company.\n".format(theContact=obj[row].firstname))
                        break
                    else:
                        print ("Attempt Failed. Status Code: {status}. Company or Contact not found.\n".format(status=doTheUpdate.status_code))

我不确定这最后一点是否应该是一个例外。 "Attempt Failed" 是正常行为,还是表示有问题?

无论如何,请查看您正在使用的API。我敢打赌,有更多信息可用于小故障。 (主要故障是互联网中断或服务器离线。)例如,他们可能会在 return JSON 中提供 "errors" 或 "error" 字段。这些应该与您的失败消息一起记录或打印。

def createOrUpdateClient():

大多数情况下,此功能与上一个功能存在相同的问题。

            else:
                print ("Contact Marko for assistance.\n")

这里除外。 永远不要把你的名字放在这样的地方。或者 10 年后您仍然会接到有关此代码的电话。输入您的部门名称 ("IT Operations") 或支持号码。需要知道的人已经知道了。而不需要知道的人可以通知已经知道的人。

if __name__ == "__main__":
    # Run the Create or Update function
    createOrUpdateClient()

    # Give the previous function 5 seconds to take effect.
    sleep(5.0)

    # Run the Company Update function
    contactCompanyUpdate()
    print("Sync complete.")

    print("Moving \"{something}\" to the archive folder...".format(something=theCSV))

    # Cron version
    #shutil.move( i, "/home/accountName/public_html/clientFolder/archive/" + os.path.basename(i))

    # Local version
    movePath = "archive/{thefile}".format(thefile=theCSV)
    shutil.move( i, movePath )

    print("Move successful! Exiting...\n")

这很尴尬。您可能会考虑采用一些命令行参数并使用它们来确定您的行为。

sys.exit()

并且不要这样做。永远不要在模块范围内放置 exit(),因为这意味着您不可能导入此代码。也许有人想导入它来解析文档字符串。或者他们可能想借用您编写的一些 API 函数。太糟糕了! sys.exit() 意味着总是必须说 "Oh, sorry, I'll have to do that for you." 把它放在实际 __name__ == "__main__" 代码的底部。或者,由于您实际上并未传递值,因此只需将其完全删除。