Test#
[4]:
%pip install polars==0.17.15 -q
%pip install boto_session_manager==1.5.3 -q
%pip install s3pathlib==2.0.1 -q
[notice] A new release of pip is available: 23.0.1 -> 23.2.1
[notice] To update, run: python3 -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[notice] A new release of pip is available: 23.0.1 -> 23.2.1
[notice] To update, run: python3 -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[notice] A new release of pip is available: 23.0.1 -> 23.2.1
[notice] To update, run: python3 -m pip install --upgrade pip
Note: you may need to restart the kernel to use updated packages.
[1]:
import typing as T
import os
import sys
from pathlib import Path
import boto3
from pyspark.sql import functions as F
from pyspark.sql import types as TP
from pyspark.sql import DataFrame
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from awsglue.job import Job
spark_ctx = SparkContext.getOrCreate()
glue_ctx = GlueContext(spark_ctx)
spark_ses = glue_ctx.spark_session
Starting Spark application
ID | YARN Application ID | Kind | State | Spark UI | Driver log | User | Current session? |
---|---|---|---|---|---|---|---|
2 | None | pyspark | idle | None | ✔ |
SparkSession available as 'spark'.
/home/glue_user/spark/python/pyspark/sql/context.py:112: FutureWarning: Deprecated in 3.0.0. Use SparkSession.builder.getOrCreate() instead.
warnings.warn(
[2]:
import polars as pl
from boto_session_manager import BotoSesManager
from s3pathlib import S3Path, context
[4]:
df = spark_ses.createDataFrame(
[
("a",),
(None,),
],
schema=TP.StructType([
TP.StructField("id", TP.StringType(), True),
])
)
df.printSchema()
df.show()
root
|-- id: string (nullable = true)
+----+
| id|
+----+
| a|
|null|
+----+
[10]:
from datetime import datetime
df = spark_ses.createDataFrame(
[
("a",),
(1,),
(datetime.utcnow(),)
],
schema=TP.StructType([
TP.StructField("id", TP.StringType(), True),
])
)
df.printSchema()
df.show()
for row in df.collect():
print(row["id"], type(row["id"]))
df.select("*").show()
root
|-- id: string (nullable = true)
+--------------------+
| id|
+--------------------+
| a|
| 1|
|java.util.Gregori...|
+--------------------+
a <class 'str'>
1 <class 'str'>
java.util.GregorianCalendar[time=?,areFieldsSet=false,areAllFieldsSet=false,lenient=true,zone=sun.util.calendar.ZoneInfo[id="UTC",offset=0,dstSavings=0,useDaylight=false,transitions=0,lastRule=null],firstDayOfWeek=1,minimalDaysInFirstWeek=1,ERA=?,YEAR=2023,MONTH=7,WEEK_OF_YEAR=?,WEEK_OF_MONTH=?,DAY_OF_MONTH=19,DAY_OF_YEAR=?,DAY_OF_WEEK=?,DAY_OF_WEEK_IN_MONTH=?,AM_PM=0,HOUR=4,HOUR_OF_DAY=4,MINUTE=35,SECOND=57,MILLISECOND=484,ZONE_OFFSET=?,DST_OFFSET=?] <class 'str'>
+--------------------+
| id|
+--------------------+
| a|
| 1|
|java.util.Gregori...|
+--------------------+
[19]:
df = spark_ses.createDataFrame(
[
(1, "alice"),
(2, "bob"),
(3, "cathy"),
(4, "david"),
],
schema=TP.StructType([
TP.StructField("id", TP.LongType(), True),
TP.StructField("name", TP.StringType(), True),
])
)
df.printSchema()
df.show()
df.createOrReplaceGlobalTempView("t")
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
+---+-----+
| id| name|
+---+-----+
| 1|alice|
| 2| bob|
| 3|cathy|
| 4|david|
+---+-----+
[20]:
import textwrap
sql = """
SELECT * FROM global_temp.t t LIMIT 2;
"""
sql = textwrap.dedent(sql)
spark_ses.sql(sql).show()
+---+-----+
| id| name|
+---+-----+
| 1|alice|
| 2| bob|
+---+-----+
[24]:
sql = """
SELECT
lpad(t.id, 4, '0') as id
FROM global_temp.t t;
"""
sql = textwrap.dedent(sql)
spark_ses.sql(sql).show()
+----+
| id|
+----+
|0001|
|0002|
|0003|
|0004|
+----+
[ ]: