如何从 1 个 S3 文件读取和写入多个 json 对象到 dynamodb python 3.8

How do I read and write multiple json objects from 1 S3 file to dynamodb python 3.8

我能够从 S3 存储桶读取单个 json 记录并将其写入 dynamodb。但是,当我尝试从其中包含多个 json 对象的文件中读取和写入时,它给了我错误。请在下面找到代码和错误 - 请您帮助解决相同的问题 - Lambda 代码(读取 S3 文件并写入 dynamodb)

import json
import boto3

s3_client = boto3.client('s3')
dynamodb = boto3.resource('dynamodb')

def lambda_handler(event, context):
    # TODO implement
    bucket = event['Records'][0]['s3']['bucket']['name']
    json_file_name = event['Records'][0]['s3']['object']['key']
        
    print(bucket)
    print(json_file_name)
    json_object = s3_client.get_object(Bucket=bucket, Key=json_file_name)
    jsonFileReader = json_object['Body'].read()
    print(jsonFileReader)
    
    jsonFile = json.loads(jsonFileReader)
    
    print(jsonFile)
    print(type(jsonFile))
    
    jsonDict = {"test":item for item in jsonFile}
    print(type(jsonDict))
    print(jsonDict)
    
    table = dynamodb.Table('Twitter-data-stream')
    print(type(table))
    
    table.put_item(Item=jsonDict['test'])
    return 'Hello from Lambda!'

cloudwatch 出错 -

[ERROR] JSONDecodeError: Extra data: line 1 column 230 (char 229)
Traceback (most recent call last):
  File "/var/task/lambda_function.py", line 20, in lambda_handler
    jsonFilerec = json.loads(jsonFileReader)
  File "/var/lang/lib/python3.8/json/__init__.py", line 357, in loads
    return _default_decoder.decode(s)
  File "/var/lang/lib/python3.8/json/decoder.py", line 340, in decode
    raise JSONDecodeError("Extra data", s, end)

请在下面找到 S3 文件示例记录

b'[{"id": "1305857561179152385", "tweet": "If you like vintage coke machines and guys who look like Fred Flintstone you\'ll love the short we\'ve riffed: Coke R, "ts": "Tue Sep 15 13:14:38 +0000 2020"}][{"id": "1305858267067883521", "tweet": "Chinese unicorn Genki Forest plots own beverage hits  #China #Chinese #Brands #GoingGlobal\u2026 ", "ts": "Tue Sep 15 13:17:27 +0000 2020"}][{"id": "1305858731293507585", "tweet": "RT @CinemaCheezy: If you like vintage coke machines and guys who look like Fred Flintstone you\'ll love the short we\'ve riffed: Coke Refresh\u2026", "ts": "Tue Sep 15 13:19:17 +0000 2020"}]'

添加生成 Json 文件的生产者/输入代码

import boto3
import json
from datetime import datetime
import calendar
import random
import time
import sys
from tweepy.streaming import StreamListener
from tweepy import OAuthHandler
from tweepy import Stream
import preprocessor as p


#Variables that contains the user credentials to access Twitter API
consumer_key = '************'
consumer_secret ='******************'
access_token = '********************'
access_token_secret = '***************'

# Create tracklist with the words that will be searched for
tracklist = ['#coke']

awsRegionName='us-east-1'
awsAccessKey='************'
awsSecretKey='**********'

class TweetStreamListener(StreamListener):
    # on success
    def on_data(self, data):
        # decode json
        tweet = json.loads(data)
        print(type(tweet))
        #print(tweet)
        if "text" in tweet.keys():
            payload = {'id': str(tweet['id']),
                       'tweet': str(tweet['text'].encode('utf8', 'replace')),
                       #'tweet': str(tweet['text']),
                       'ts': str(tweet['created_at']),
                       },
                       
            try:
                print(tweet)
                #print(payload)
                
                               
                put_response = kinesis_client.put_record(
                    StreamName=stream_name,
                    Data=json.dumps(payload),
                    PartitionKey=str(['screen_name']))
                    #PartitionKey=str(tweet['user']['screen_name']))
            except (AttributeError, Exception) as e:
                print(e)
                pass
        return True

    # on failure
    def on_error(self, status):
        print("On_error status:", status)


stream_name = 'twitter-data-stream'  # fill the name of Kinesis data stream you created
#stream_name = 'demo-datastream' 

if __name__ == '__main__':
    # create kinesis client connection
    kinesis_client = boto3.client('kinesis',
                                  region_name=awsRegionName,
                                  aws_access_key_id=awsAccessKey,
                                  aws_secret_access_key=awsSecretKey)
    
    # create instance of the tweepy tweet stream listener
    listener = TweetStreamListener()
    # set twitter keys/tokens
    auth = OAuthHandler(consumer_key, consumer_secret)
    auth.set_access_token(access_token, access_token_secret)
    # create instance of the tweepy stream
    stream = Stream(auth, listener)
    # search twitter for tags or keywords from cli parameters
    #query = sys.argv[1:]  # list of CLI arguments
    #query_fname = ' '.join(query)  # string
    stream.filter(track=tracklist)
    #tweets = api.search(tracklist, count=10, lang='en', exclude='retweets',tweet_mode = 'extended')
                         
    
    

                         
    

此致, 普里蒂

也许您的 json 不正确:[tweet_data][...][...][...] 不是有效的 json 对象。 你应该处理你的输入数据,得到这样的东西:[{tweet_data},{...},{...},{...},{...}]