Test#

[4]:
%pip install polars==0.17.15 -q
%pip install boto_session_manager==1.5.3 -q
%pip install s3pathlib==2.0.1 -q

[notice] A new release of pip is available: 23.0.1 -> 23.2.1
[notice] To update, run: python3 -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.

[notice] A new release of pip is available: 23.0.1 -> 23.2.1
[notice] To update, run: python3 -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.

[notice] A new release of pip is available: 23.0.1 -> 23.2.1
[notice] To update, run: python3 -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[1]:
import typing as T
import os
import sys
from pathlib import Path

import boto3

from pyspark.sql import functions as F
from pyspark.sql import types as TP
from pyspark.sql import DataFrame
from pyspark.context import SparkContext

from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job

spark_ctx = SparkContext.getOrCreate()
glue_ctx = GlueContext(spark_ctx)
spark_ses = glue_ctx.spark_session
Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logUserCurrent session?
2NonepysparkidleNone
SparkSession available as 'spark'.
/home/glue_user/spark/python/pyspark/sql/context.py:112: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
  warnings.warn(
[2]:
import polars as pl
from boto_session_manager import BotoSesManager
from s3pathlib import S3Path, context
[4]:
df = spark_ses.createDataFrame(
    [
        ("a",),
        (None,),
    ],
    schema=TP.StructType([
        TP.StructField("id", TP.StringType(), True),
    ])
)
df.printSchema()
df.show()
root
 |-- id: string (nullable = true)

+----+
|  id|
+----+
|   a|
|null|
+----+
[10]:
from datetime import datetime


df = spark_ses.createDataFrame(
    [
        ("a",),
        (1,),
        (datetime.utcnow(),)
    ],
    schema=TP.StructType([
        TP.StructField("id", TP.StringType(), True),
    ])
)
df.printSchema()
df.show()

for row in df.collect():
    print(row["id"], type(row["id"]))

df.select("*").show()
root
 |-- id: string (nullable = true)

+--------------------+
|                  id|
+--------------------+
|                   a|
|                   1|
|java.util.Gregori...|
+--------------------+

a <class 'str'>
1 <class 'str'>
java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2023,MONTH=7,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=19,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=4,HOUR_OF_DAY=4,MINUTE=35,SECOND=57,MILLISECOND=484,ZONE_OFFSET=?,DST_OFFSET=?] <class 'str'>
+--------------------+
|                  id|
+--------------------+
|                   a|
|                   1|
|java.util.Gregori...|
+--------------------+
[19]:
df = spark_ses.createDataFrame(
    [
        (1, "alice"),
        (2, "bob"),
        (3, "cathy"),
        (4, "david"),
    ],
    schema=TP.StructType([
        TP.StructField("id", TP.LongType(), True),
        TP.StructField("name", TP.StringType(), True),
    ])
)
df.printSchema()
df.show()

df.createOrReplaceGlobalTempView("t")
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
|  3|cathy|
|  4|david|
+---+-----+
[20]:
import textwrap

sql = """
SELECT * FROM global_temp.t t LIMIT 2;
"""
sql = textwrap.dedent(sql)
spark_ses.sql(sql).show()
+---+-----+
| id| name|
+---+-----+
|  1|alice|
|  2|  bob|
+---+-----+
[24]:
sql = """
SELECT
    lpad(t.id, 4, '0') as id
FROM global_temp.t t;
"""
sql = textwrap.dedent(sql)
spark_ses.sql(sql).show()
+----+
|  id|
+----+
|0001|
|0002|
|0003|
|0004|
+----+
[ ]: