QBoard » Big Data » Big Data on Cloud » UnicodeEncodeError for Google App Engine Datastore to BigQuery process

UnicodeEncodeError for Google App Engine Datastore to BigQuery process

  • I'm trying to follow along with this Codelab that shows you how to take data from your Google App Engine Datastore and move it through Google Cloud Storage and on to BigQuery by setting up a MapReduce pipeline. I set up a Google App Engine Datastore entity and have a process to collect tweets about certain stocks that I want to collect data on just as a test. I believe I've followed everything as was outlined in the example, but the shards that do all the work of breaking up the data and loading it into Cloud Storage are raising UnicodeEncodeErrors. Here's the log from where I tested the app on the dev app server:

    INFO     2012-12-18 20:41:07,645 dev_appserver.py:3103] "POST /mapreduce/worker_callback HTTP/1.1" 500 -
    WARNING  2012-12-18 20:41:07,648 taskqueue_stub.py:1981] Task appengine-mrshard-1582400592541472B07B9-0-0 failed to execute. This task will retry in 0.100 seconds
    ERROR    2012-12-18 20:41:09,453 webapp2.py:1552] 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)
    Traceback (most recent call last):
    File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1535, in __call__
    rv = self.handle_exception(request, response, e)
    File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1529, in __call__
    rv = self.router.dispatch(request, response)
    File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1278, in default_dispatcher
    return route.handler_adapter(request, response)
    File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 1102, in __call__
    return handler.dispatch()
    File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 572, in dispatch
    return self.handle_exception(e, self.app.debug)
    File "C:\Program Files (x86)\Google\google_appengine\lib\webapp2\webapp2.py", line 570, in dispatch
    return method(*args, **kwargs)
    File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\base_handler.py", line 65, in post
    self.handle()
    File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 181, in handle
    entity, input_reader, ctx, tstate)
    File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\handlers.py", line 298, in process_data
    output_writer.write(output, ctx)
    File "C:\Users\Tank\Documents\Aptana Studio 3 Workspace\jibdantest-bq\mapreduce\output_writers.py", line 659, in write
    ctx.get_pool("file_pool").append(self._filename, str(data))
    UnicodeEncodeError: 'ascii' codec can't encode character u'\u2019' in position 80: ordinal not in range(128)

     

    Here's the code:

    import json
    import webapp2
    import urllib2
    import time
    import calendar
    import datetime
    import httplib2
    
    from google.appengine.ext import db
    from google.appengine.api import taskqueue
    from google.appengine.ext import blobstore
    from google.appengine.ext.webapp.util import run_wsgi_app
    from google.appengine.ext.webapp import blobstore_handlers
    from google.appengine.ext.webapp import util
    from google.appengine.ext.webapp import template
    from google.appengine.api import urlfetch
    
    from mapreduce.lib import files
    from mapreduce import base_handler
    from mapreduce import mapreduce_pipeline
    from apiclient.discovery import build
    from oauth2client.appengine import AppAssertionCredentials
    
    SCOPE = 'https://www.googleapis.com/auth/bigquery'
    PROJECT_ID = 'project_id' # Your Project ID here
    BQ_DATASET_ID = 'datastore_data'
    GS_BUCKET = 'bucketname'
    ENTITY_KIND = 'main.streamdata'
    
    class streamdata(db.Model):
        querydate = db.DateTimeProperty(auto_now_add = True)
        ticker = db.StringProperty()
        created_at = db.StringProperty()
        tweet_id = db.StringProperty()
        text = db.TextProperty()
        source = db.StringProperty()
    
    class DatastoreMapperPipeline(base_handler.PipelineBase):
    
        def run(self, entity_type):
    
            output = yield mapreduce_pipeline.MapperPipeline(
              "Datastore Mapper %s" % entity_type,
              "main.datastore_map",
              "mapreduce.input_readers.DatastoreInputReader",
              output_writer_spec="mapreduce.output_writers.FileOutputWriter",
              params={
                  "input_reader":{
                      "entity_kind": entity_type,
                      },
                  "output_writer":{
                      "filesystem": "gs",
                      "gs_bucket_name": GS_BUCKET,
                      "output_sharding":"none",
                      }
                  },
                  shards=10)
    
            yield CloudStorageToBigQuery(output)
    
    class CloudStorageToBigQuery(base_handler.PipelineBase):
    
        def run(self, csv_output):
    
            credentials = AppAssertionCredentials(scope=SCOPE)
            http = credentials.authorize(httplib2.Http())
            bigquery_service = build("bigquery", "v2", http=http)
    
            jobs = bigquery_service.jobs()
            table_name = 'datastore_data_%s' % datetime.datetime.utcnow().strftime(
                '%m%d%Y_%H%M%S')
            files = [str(f.replace('/gs/', 'gs://')) for f in csv_output]
            result = jobs.insert(projectId=PROJECT_ID,
                                body=build_job_data(table_name,files))
    
            result.execute()
    
    def build_job_data(table_name, files):
      return {"projectId": PROJECT_ID,
              "configuration":{
                  "load": {
                      "sourceUris": files,
                      "schema":{
                          "fields":[
                              {
                                  "name":"querydate",
                                  "type":"INTEGER",
                              },
                              {
                                  "name":"ticker",
                                  "type":"STRING",
                              },
                              {
                                  "name":"created_at",
                                  "type":"STRING",
                              },
                              {
                                  "name":"tweet_id",
                                  "type":"STRING",
                              },
                              {   "name":"text",
                                  "type":"TEXT",
                              },
                              {    
                                  "name":"source",
                                  "type":"STRING",
                              }
                              ]
                          },
                      "destinationTable":{
                          "projectId": PROJECT_ID,
                          "datasetId": BQ_DATASET_ID,
                          "tableId": table_name,
                          },
                      "maxBadRecords": 0,
                      }
                  }
              }
    
    def datastore_map(entity_type):
        data = db.to_dict(entity_type)
        resultlist = [timestamp_to_posix(data.get('querydate')),
                        data.get('ticker'),
                        data.get('created_at'),
                        data.get('tweet_id'),
                        data.get('text'),
                        data.get('source')]
        result = ','.join(['"%s"' % field for field in resultlist])
        yield("%s\n" % result)
    
    def timestamp_to_posix(timestamp):
        return int(time.mktime(timestamp.timetuple()))
    
    class DatastoretoBigQueryStart(webapp2.RequestHandler):
        def get(self):
            pipeline = DatastoreMapperPipeline(ENTITY_KIND)
            pipeline.start()
            path = pipeline.base_path + "/status?root=" + pipeline.pipeline_id
            self.redirect(path)
    
    class StreamHandler(webapp2.RequestHandler):
    
        def get(self):
    
            tickers = ['AAPL','GOOG', 'IBM', 'BAC', 'INTC',
                       'DELL', 'C', 'JPM', 'WFM', 'WMT', 
                       'AMZN', 'HOT', 'SPG', 'SWY', 'HTSI', 
                       'DUK', 'CEG', 'XOM', 'F', 'WFC', 
                       'CSCO', 'UAL', 'LUV', 'DAL', 'COST', 'YUM',
                       'TLT', 'HYG', 'JNK', 'LQD', 'MSFT',
                       'GE', 'LVS', 'MGM', 'TWX', 'DIS', 'CMCSA',
                       'TWC', 'ORCL', 'WPO', 'NYT', 'GM', 'JCP', 
                       'LNKD', 'OPEN', 'NFLX', 'SBUX', 'GMCR', 
                       'SPLS', 'BBY', 'BBBY', 'YHOO', 'MAR', 
                       'L', 'LOW', 'HD', 'HOV', 'TOL', 'NVR', 'RYL', 
                       'GIS', 'K', 'POST', 'KRFT', 'CHK', 'GGP', 
                       'RSE', 'RWT', 'AIG', 'CB', 'BRK.A', 'CAT']
    
            for i in set(tickers):
    
                url = 'http://search.twitter.com/search.json?q='
                resultcount = '&rpp=100'
                language = '&lang=en'
                encoding = '%40%24'
                tickerstring = url + encoding + i + resultcount + language
                tickurl = urllib2.Request(tickerstring)
                tweets = urllib2.urlopen(tickurl)
                code = tweets.getcode()
    
                if code == 200:
                    results = json.load(tweets, 'utf-8')
                    if "results" in results:
                        entries = results["results"]
                        for entry in entries:
                            tweet = streamdata()
                            created = entry['created_at']
                            tweetid = entry['id_str']
                            tweettxt = entry['text']
                            tweet.ticker = i
                            tweet.created_at = created
                            tweet.tweet_id = tweetid
                            tweet.text = tweettxt
                            tweet.source = "Twitter"
                            tweet.put()
    
    class MainHandler(webapp2.RequestHandler):
    
        def get(self):
            self.response.out.write('<a href="/start">Click here</a> to start the Datastore to BigQuery pipeline. ')
            self.response.out.write('<a href="/add_data">Click here</a> to start adding data to the datastore. ')
    
    
    app = webapp2.WSGIApplication([
                                   ('/', MainHandler),
                                   ('/start', DatastoretoBigQueryStart), 
                                   ('/add_data', StreamHandler)], 
                                  debug=True)

     

    Any insights anyone may have would be a big help.


      October 13, 2021 2:13 PM IST
    0
  • You are converting Unicode data to a bytestring:

    ctx.get_pool("file_pool").append(self._filename, str(data))
    

     

    When you do that without specifying an encoding, Python falls back to the default, which is ASCII. You'll need to settle on a different encoding instead, one that can handle all Unicode codepoints your data contains.

    For most text, UTF-8 is a good choice for that; if you have a lot of non-western text (Arabic, Asian, etc.) then UTF-16 might be more efficient. In either case, you'll have to explicitly encode:

    ctx.get_pool("file_pool").append(self._filename, data.encode('utf8'))
    

     

    When reading back the data from that file, use filedata.decode to decode back to Unicode.

    See the Python Unicode HOWTO for more information on Python and Unicode.

      November 1, 2021 2:35 PM IST
    0
  • We’ll use a Flex custom runtime based on the gcr.io/google_appengine/python image for the service that launches the dataflow pipeline, as we’ll install the gcloud sdk in the instance container(s). So, the example includes a Dockerfile used to deploy the service. As the last command in the Dockerfile, we’ll start up a Gunicorn server to serve a Flask app script (main_df.py).

    The Python code for this service consists of the small Flask app script (main_df.py), which accesses a module (dfpipe) that does most of the heavy lifting in terms of defining and launching the example pipeline (in dfpipe/pipe.py).

    Setting the pipeline options
    As part of the process of launching a Dataflow pipeline, various options may be set. In order to make the dfpipe module available to the Dataflow workers, the pipeline options include a setup_file flag.

     pipeline_options = {
          'project': PROJECT,
          'staging_location': 'gs://' + BUCKET + '/staging',
          'runner': 'DataflowRunner',
          'setup_file': './setup.py',
          'job_name': PROJECT + '-twcount',
          'max_num_workers': 10,
          'temp_location': 'gs://' + BUCKET + '/temp'
      }​
      November 2, 2021 2:37 PM IST
    0
  • ctx.get_pool("file_pool").append(self._filename, str(data))
    

     

    if data contains unicode characters, this will fail. Try

    ctx.get_pool("file_pool").append(self._filename, unicode(data))
    



      October 16, 2021 12:44 PM IST
    0